In [1]:
import sparknlp
import numpy as np
# Import the required modules and classes
from sparknlp.base import DocumentAssembler, Pipeline, Finisher

from sparknlp.annotator import (
    SentenceDetector,
    Tokenizer,
    Lemmatizer,
    SentimentDetector
)
import pyspark.sql.functions as F
# Start Spark Session

from sparknlp.pretrained import PretrainedPipeline
import afinn
from pyspark.sql import types
import yahooquery as yq
import matplotlib.pyplot as plt
import pandas as pd
import datetime as dt
from sklearn.preprocessing import StandardScaler,MinMaxScaler
from pyspark.ml.feature import VectorAssembler
import seaborn as sns
from pyspark.streaming import dstream,StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import StandardScaler
from pyspark.sql.window import Window

In [2]:
# spark = SparkSession.builder.appName("sentiment").getOrCreate()


24/05/13 00:44:44 WARN Utils: Your hostname, alber-victus resolves to a loopback address: 127.0.1.1; using 192.168.1.25 instead (on interface wlp4s0)
24/05/13 00:44:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/alber/.ivy2/cache
The jars for the packages stored in: /home/alber/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-79ca612a-f9bb-41c2-bcb9-70a51d5c0e35;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.3.3 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central
	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found joda-time#joda-time;2.8.1 in central
	found com.amazonaws#jmespath-java;1.12.500 in central


In [6]:
# ! wget -N https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/lemma-corpus-small/lemmas_small.txt -P /tmp
# ! wget -N https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/sentiment-corpus/default-sentiment-dict.txt -P /tmp

In [22]:


def get_scores(column='article',stock='Tesla'):


    data = spark.read.option("header","true").json("../data/json/"+stock)
    tickers = {"NVIDIA":"NVDA",
              "Bitcoin":"BTC-USD",
              "Apple":"AAPL",
              "Tesla":"TSLA"}
    
    # Step 1: Transforms raw texts to `document` annotation
    document_assembler = (
        DocumentAssembler() \
        .setInputCol(column) \
        .setOutputCol("document")
    )

    schema = StructType([
    StructField("article",StringType(),True),
    StructField("description",StringType(),True),
    StructField("published date",StringType(),True),
    StructField("publisher_url",StringType(),True),
    StructField("title",StringType(),True),
    StructField("url",StringType(),True)
    ])

    
    
    # Step 2: Sentence Detection
    sentence_detector = SentenceDetector().setInputCols(["document"]).setOutputCol("sentence")
    
    # Step 3: Tokenization
    tokenizer = Tokenizer().setInputCols(["sentence"]).setOutputCol("token")
    
    # Step 4: Lemmatization
    lemmatizer= Lemmatizer().setInputCols("token").setOutputCol("lemma") \
                            .setDictionary("lemmas_small.txt", key_delimiter="->", value_delimiter="\t")
    
    # Step 5: Sentiment Detection
    sentiment_detector= (
        SentimentDetector() \
        .setInputCols(["lemma", "sentence"]) \
        .setOutputCol("sentiment_score") \
        .setDictionary("default-sentiment-dict.txt", ",")
    )

    
    # Step 6: Finisher
    finisher= (
        Finisher() \
        .setInputCols(["sentiment_score"]).setOutputCols("sentiment")
    )
    
    # Define the pipeline
    pipeline = Pipeline(
        stages=[
            document_assembler,
            sentence_detector, 
            tokenizer, 
            lemmatizer, 
            sentiment_detector, 
            finisher
        ]
    )
    
    def compute_with_afinn(text):
    
        return afinn.Afinn().score(text)
    
    compute_sentiment_score_udf = F.udf(compute_with_afinn, types.FloatType())

    # data = data.withColumnRe

    # scores
    result = pipeline.fit(data).transform(data)
    
    result = result.withColumn("afinn_sentiment",compute_sentiment_score_udf(F.col('article')))
    
    result = result.withColumn("pnn_sentiment",
                             F.when(F.array_contains(F.col("sentiment"), "positive"), 1)
                              .when(F.array_contains(F.col("sentiment"), "negative"), -1)
                              .otherwise(0))
    result = result.withColumnRenamed("published date","published_date")
    result = result.withColumn("published_date", F.regexp_extract(F.col("published_date"), r"^.*,\s*(.+)\s\d+:\d+:\d+", 1))

    func = F.udf(lambda x: dt.datetime.strptime(x, '%d %b %Y'), DateType())
    
    # Apply UDF to the column
    result = result.withColumn('published_date', func(result['published_date']))

    result = result.select('published_date','afinn_sentiment','pnn_sentiment')


    result = result.groupBy("published_date").agg(F.mean("afinn_sentiment").alias("afinn_sentiment_mean"),\
                                                  F.mean("pnn_sentiment").alias("pnn_sentiment"))

    
    feature_columns = ["afinn_sentiment_mean"]

    # scale
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="afinn_sentiment_scaled")

    # Scale the features using MinMaxScaler
    scaler = StandardScaler(inputCol="afinn_sentiment_scaled", outputCol="afinn_sentiment")
    
    # Create a pipeline to execute the assembler and scaler
    pipeline = Pipeline(stages=[assembler, scaler])
    
    # Fit and transform the pipeline
    result = pipeline.fit(result).transform(result)


    firstelement = F.udf(lambda v:float(v[0]),FloatType())
    result = result.withColumn("afinn_sentiment", firstelement("afinn_sentiment"))
    
    result = result.withColumnRenamed("published_date","date")
    
    result = result.select('date','afinn_sentiment','pnn_sentiment')


    
    return result


def prepare_price(stock,start_date=None,end_date=None):

    price = yq.Ticker(stock).history(start=start_date,end=end_date).reset_index()
    # return price

    price_df = spark.createDataFrame(price)


    # only adj close and volume
    price_df = price_df.select("date","volume","adjclose")

    # scale
    
    def compute_percent_change(current_price, previous_price):
        if current_price is not None and previous_price is not None:
            return ((current_price - previous_price) / previous_price) * 100
        else:
            return None
    
    percent_change_udf = F.udf(compute_percent_change, FloatType())
    
    # Calculate percent change
    window_spec = Window.orderBy("date")
    
    price_df = price_df.withColumn("prev_price", F.lag("adjclose", 1).over(window_spec))
    price_df = price_df.withColumn("price_percent_change", percent_change_udf(F.col("adjclose"), F.col("prev_price")))
    
    
    price_df = price_df.withColumn("prev_volume", F.lag("volume", 1).over(window_spec))
    price_df = price_df.withColumn("volume_percent_change", percent_change_udf(F.col("volume"), F.col("prev_volume")))
    
    
    price_df = price_df.select('date','price_percent_change','volume_percent_change')   
    price_df = price_df.withColumn("next_day_price_percent_change_shifted", F.lead("price_percent_change", 1).over(window_spec))

    return price_df


def prepare_mix_data(scores,stock='NVDA'):

    min_date = scores.agg(F.min("date")).collect()[0][0].strftime('%Y-%m-%d')
    max_date = scores.agg(F.max("date")).collect()[0][0].strftime('%Y-%m-%d')

    price_df = prepare_price(stock,min_date,max_date)

    df = scores.join(price_df, on="date", how="right")
    df = df.dropna()

    return df

    


In [23]:
scores = get_scores()

df = prepare_mix_data(scores)


  has_live_indice = index_utc[-1] >= last_trade - pd.Timedelta(2, "S")


In [24]:
df.show(truncate=True)

24/05/13 00:58:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 00:58:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 00:58:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 00:58:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 00:58:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 00:58:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 0

+----------+---------------+------------------+--------------------+---------------------+-------------------------------------+
|      date|afinn_sentiment|     pnn_sentiment|price_percent_change|volume_percent_change|next_day_price_percent_change_shifted|
+----------+---------------+------------------+--------------------+---------------------+-------------------------------------+
|2017-01-04|     0.83295304|0.9354838709677419|           2.3330994|           -20.158243|                           -2.5385501|
|2017-01-05|      1.7214363|0.8666666666666667|          -2.5385501|           -17.921982|                            1.3367301|
|2017-01-06|       1.813135|               1.0|           1.3367301|            -16.40157|                            4.0543356|
|2017-01-09|      1.3718747|               1.0|           4.0543356|            11.349738|                           -0.7550339|
|2017-01-10|      2.9906306|0.7777777777777778|          -0.7550339|           -3.8557246|       

24/05/13 00:58:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 00:58:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### Sklearn

In [None]:
min_max = MinMaxScaler((-1,1))
std_scaler = StandardScaler()


In [None]:
model_df = df.copy()
model_df['date'] = model_df['published date'].apply(lambda x: dt.datetime.strptime(x, "%d %b %Y").strftime("%Y-%m-%d"))

model_df = model_df[['date','sentiment','afinn_sentiment']]
# date_obj = 

# Convert to the desired format
# formatted_date = 
# pd.to_datetime(model_df['published date'],format="%Y-%m-%d")

min_max_results = min_max.fit_transform(model_df['afinn_sentiment'].values.reshape(-1,1))
std_results = std_scaler.fit_transform(model_df['afinn_sentiment'].values.reshape(-1,1))

model_df['min_max_afinn'] = min_max_results
model_df['std_afinn'] = std_results
model_df['div_to_max'] = model_df['afinn_sentiment']/model_df['afinn_sentiment'].max()
model_df = model_df.groupby("date").sum().reset_index()

In [None]:
model_df

In [None]:
model_df['date'] = pd.to_datetime(model_df['date'])

In [None]:
model_df.date.max(),model_df.date.min()


In [85]:
price = yq.Ticker("NVDA").history(start='2017-01-02')
price

  has_live_indice = index_utc[-1] >= last_trade - pd.Timedelta(2, "S")
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df["dividends"].fillna(0, inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df["splits"].fillna(0, inplace=True)


Unnamed: 0_level_0,Unnamed: 1_level_0,open,high,low,close,volume,adjclose,dividends,splits
symbol,date,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
NVDA,2017-01-03,26.100000,26.592501,24.844999,25.502501,150199600,25.134892,0.0,0.0
NVDA,2017-01-04,25.850000,26.375000,25.382500,26.097500,119922000,25.721317,0.0,0.0
NVDA,2017-01-05,26.132500,26.455000,25.262501,25.434999,98429600,25.068356,0.0,0.0
NVDA,2017-01-06,25.712500,26.062500,25.299999,25.775000,82285600,25.403461,0.0,0.0
NVDA,2017-01-09,25.875000,27.000000,25.875000,26.820000,91624800,26.433397,0.0,0.0
NVDA,...,...,...,...,...,...,...,...,...
NVDA,2024-05-06,893.900024,922.200012,890.549988,921.400024,37620300,921.400024,0.0,0.0
NVDA,2024-05-07,910.979980,917.809998,823.250000,905.539978,43734200,905.539978,0.0,0.0
NVDA,2024-05-08,894.830017,911.940002,894.200012,904.119995,32572100,904.119995,0.0,0.0
NVDA,2024-05-09,905.289978,910.719971,882.309998,887.469971,37801300,887.469971,0.0,0.0


In [None]:
price_df = price.reset_index()[['date','adjclose']]
price_df['p_return'] = price_df['adjclose'].pct_change(1)
price_df = price_df[['date','p_return']]

In [None]:
price_df['date'] = pd.to_datetime(price_df['date'])

In [None]:
final_df = model_df.merge(price_df,on='date',how='right')

In [None]:
final_df.dropna(inplace=True)

In [None]:
final_df

In [None]:
plt.figure(figsize=(15,7))
plt.plot(final_df['date'],final_df['p_return'])
plt.plot(final_df['date'],final_df['std_afinn'])

In [None]:
corr = final_df.corr()

In [None]:
plt.figure(figsize=(15,7))
sns.heatmap(corr,annot=True)

In [None]:
reg_df = final_df.copy()
reg_df = final_df[['sentiment','std_afinn','p_return']]
reg_df['next_day_return'] = reg_df['p_return'].shift(-1)
reg_df['label'] = np.where(reg_df['next_day_return']<=0,0,1)
reg_df.dropna(inplace=True)

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR,SVC

In [None]:
model = LinearRegression()
X,y = reg_df.iloc[:,:3].values,reg_df.iloc[:,3]

n = len(X)
train_size = int(0.8*n)

X_train,X_test = X[:train_size],X[train_size:]
y_train,y_test = y[:train_size],y[train_size:]

In [None]:
model = model.fit(X_train,y_train)

In [None]:
model.score(X_train,y_train)

In [None]:
model.score(X_test,y_test)

In [None]:
svm = SVR()

In [None]:
svm = svm.fit(X_train,y_train)

In [None]:
svm.score(X_train,y_train)

In [None]:
svm.score(X_test,y_test)

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier

In [None]:
X,y = reg_df.iloc[:,:3].values,reg_df.iloc[:,4]

n = len(X)
train_size = int(0.8*n)

X_train,X_test = X[:train_size],X[train_size:]
y_train,y_test = y[:train_size],y[train_size:]

In [None]:
log_reg = LogisticRegression()
log_reg = log_reg.fit(X_train,y_train)

In [None]:
log_reg.score(X_train,y_train),log_reg.score(X_test,y_test)

In [None]:
svc = SVC()
svc = svc.fit(X_train,y_train)

In [None]:
svc.score(X_train,y_train),svc.score(X_test,y_test)

In [None]:
forest = RandomForestClassifier(n_estimators=66,max_depth=5)
forest = forest.fit(X_train,y_train)
forest.score(X_train,y_train),forest.score(X_test,y_test)



In [None]:
knn = KNeighborsClassifier(n_neighbors=7)
knn = knn.fit(X_train,y_train)
knn.score(X_train,y_train),knn.score(X_test,y_test)

In [None]:
reg_df

In [None]:
reg_df.value_counts("label")

In [None]:
xgb = XGBClassifier(max_depth=5)
xgb = xgb.fit(X_train,y_train.values.reshape(-1,1))
xgb.score(X_train,y_train),xgb.score(X_test,y_test)

In [None]:
reg_df

### LSTM

In [25]:
from elephas import ml,ml_model,mllib,enums,parameter,spark_model,utils,worker
from elephas.spark_model import SparkModel

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import LSTM
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator
from pyspark.ml.feature import VectorAssembler

2024-05-13 00:59:30.067690: I tensorflow/core/util/port.cc:111] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-05-13 00:59:30.091505: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-13 00:59:30.203080: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-05-13 00:59:30.203146: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-05-13 00:59:30.203924: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to regi

In [26]:
n = df.count()
df.columns

24/05/13 00:59:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 00:59:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 00:59:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 00:59:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 00:59:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 00:59:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 0

['date',
 'afinn_sentiment',
 'pnn_sentiment',
 'price_percent_change',
 'volume_percent_change',
 'next_day_price_percent_change_shifted']

In [33]:
train_size = int(n*.8)

train_data = df.limit(train_size)
test_data = df.subtract(train_data)

X_train = train_data.select('afinn_sentiment', 'pnn_sentiment', 'price_percent_change','volume_percent_change')
y_train = train_data.select("next_day_price_percent_change_shifted")

X_train = np.array(X_train.rdd.map(lambda x: [x.afinn_sentiment,x.pnn_sentiment,x.price_percent_change,x.volume_percent_change]).collect())
y_train = np.array(y_train.rdd.map(lambda x: [x.next_day_price_percent_change_shifted]).collect())


X_test = test_data.select('afinn_sentiment', 'pnn_sentiment', 'price_percent_change','volume_percent_change')
y_test = test_data.select("next_day_price_percent_change_shifted")

X_test = np.array(X_test.rdd.map(lambda x: [x.afinn_sentiment,x.pnn_sentiment,x.price_percent_change,x.volume_percent_change]).collect())
y_test = np.array(y_test.rdd.map(lambda x: [x.next_day_price_percent_change_shifted]).collect())



# outdata.show(10, truncate=False)

24/05/13 01:04:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 01:04:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 01:04:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 01:04:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 01:04:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 01:04:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 0

In [34]:
num_features = 3
input_shape = 30

model = Sequential()

model = Sequential()
model.add(LSTM(150, activation='tanh', return_sequences=True, input_shape=(input_shape, num_features)))
model.add(LSTM(64, activation='relu'))
model.add(Dense(64))
model.add(Dense(1))

2024-05-13 01:04:35.095180: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:894] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-05-13 01:04:35.161350: W tensorflow/core/common_runtime/gpu/gpu_device.cc:2211] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...


In [35]:
model.compile(optimizer='adam', loss='mean_squared_error')
# sgd = SGD(lr=0.1)
# model.compile(sgd, loss='categorical_crossentropy', ['acc'])



In [36]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 30, 150)           92400     
                                                                 
 lstm_1 (LSTM)               (None, 64)                55040     
                                                                 
 dense (Dense)               (None, 64)                4160      
                                                                 
 dense_1 (Dense)             (None, 1)                 65        
                                                                 
Total params: 151665 (592.44 KB)
Trainable params: 151665 (592.44 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [38]:
# pip install gunicorn

In [39]:
from pyspark.sql import Row
from elephas.utils.rdd_utils import to_simple_rdd

In [40]:
epochs=1
batch_size=2

sc = spark.sparkContext
simple_rdd = to_simple_rdd(sc, X_train,y_train)
spark_model = SparkModel(model, frequency='epoch', mode='asynchronous',batch_size=batch_size)
spark_model.fit(simple_rdd, epochs=epochs, batch_size=batch_size, verbose=0, validation_split=0.1)


>>> Fit model
 * Serving Flask app 'elephas.parameter.server'
 * Debug mode: off


 * Running on http://127.0.1.1:4000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


>>> Initialize workers
>>> Distribute load


Traceback (most recent call last):
  File "/home/alber/.local/lib/python3.11/site-packages/pyspark/serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alber/.local/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/home/alber/.local/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
TypeError: cannot pickle 'weakref.ReferenceType' object


PicklingError: Could not serialize object: TypeError: cannot pickle 'weakref.ReferenceType' object

In [None]:
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.utils import to_categorical

from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd

from pyspark import SparkContext, SparkConf

# Define basic parameters
batch_size = 64
nb_classes = 10
epochs = 1

# Create Spark context
# conf = SparkConf().setAppName('Mnist_Spark_MLP').setMaster('local[8]')
# sc = SparkContext(conf=conf)

# Load data
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.reshape(60000, 784)
x_test = x_test.reshape(10000, 784)
x_train = x_train.astype("float32")
x_test = x_test.astype("float32")
x_train /= 255
x_test /= 255
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# Convert class vectors to binary class matrices
y_train = to_categorical(y_train, nb_classes)
y_test = to_categorical(y_test, nb_classes)

model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))

sgd = SGD(lr=0.1)
model.compile(sgd, 'categorical_crossentropy', ['acc'])

# Build RDD from numpy features and labels
rdd = to_simple_rdd(sc, x_train, y_train)

# Initialize SparkModel from tensorflow.keras model and Spark context
spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')

# Train Spark model
spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, verbose=0, validation_split=0.1)
# Evaluate Spark model by evaluating the underlying model
score = spark_model.evaluate(x_test, y_test, verbose=2)
# print('Test accuracy:', score[1])

In [None]:
type(rdd)

In [None]:
type(simple_rdd)