In [23]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from yahoo_fin import stock_info as si
import datetime as dt
import pandas_datareader as pdr
import matplotlib.pyplot as plt
from lime.lime_tabular import LimeTabularExplainer
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id
import pyspark.pandas as pd
import pyspark.sql.functions as f
from pyspark.sql.window import Window

In [2]:
ticker = 'TSLA'
start_date = dt.datetime(2010, 1, 1)
end_date = dt.datetime(2023, 1, 1)
fred_symbols = ['UNRATE', 'GDP', 'FEDFUNDS', 'CPIAUCNS', 'M2', 'DGS10', 'PCE', 'T10Y2Y', 'USROA', 'USROE', 'WTISPLC', 'HOUST', 'INDPRO', 'PAYEMS', 'BAMLH0A0HYM2', 'GS10', 'BASE', 'RIFSPPFAAD01NB', 'EXUSEU', 'NETEXP']

In [3]:
historical_data = si.get_data(ticker, start_date, end_date, interval='1d')
historical_data

Unnamed: 0,open,high,low,close,adjclose,volume,ticker
2010-06-29,1.266667,1.666667,1.169333,1.592667,1.592667,281494500,TSLA
2010-06-30,1.719333,2.028000,1.553333,1.588667,1.588667,257806500,TSLA
2010-07-01,1.666667,1.728000,1.351333,1.464000,1.464000,123282000,TSLA
2010-07-02,1.533333,1.540000,1.247333,1.280000,1.280000,77097000,TSLA
2010-07-06,1.333333,1.333333,1.055333,1.074000,1.074000,103003500,TSLA
...,...,...,...,...,...,...,...
2022-12-23,126.370003,128.619995,121.019997,123.150002,123.150002,166989700,TSLA
2022-12-27,117.500000,119.669998,108.760002,109.099998,109.099998,208643400,TSLA
2022-12-28,110.349998,116.269997,108.239998,112.709999,112.709999,221070500,TSLA
2022-12-29,120.389999,123.570000,117.500000,121.820000,121.820000,221923300,TSLA


In [4]:
fred_df = pdr.get_data_fred(fred_symbols, start_date, end_date)
fred_df

Unnamed: 0_level_0,UNRATE,GDP,FEDFUNDS,CPIAUCNS,M2,DGS10,PCE,T10Y2Y,USROA,USROE,WTISPLC,HOUST,INDPRO,PAYEMS,BAMLH0A0HYM2,GS10,BASE,RIFSPPFAAD01NB,EXUSEU,NETEXP
DATE,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1
2010-01-01,9.8,14764.611,0.11,216.687,,,10056.1,,0.49,4.8,78.22,614.0,89.1911,129798.0,,3.73,,,1.4266,-509.192
2010-01-04,,,,,8451.4,3.85,,2.76,,,,,,,6.34,,,0.13,,
2010-01-05,,,,,,3.77,,2.76,,,,,,,6.30,,,0.12,,
2010-01-06,,,,,,3.85,,2.84,,,,,,,6.17,,,0.10,,
2010-01-07,,,,,,3.85,,2.82,,,,,,,6.03,,,0.10,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2022-12-28,,,,,,3.88,,-0.43,,,,,,,4.77,,,4.30,,
2022-12-29,,,,,,3.83,,-0.51,,,,,,,4.81,,,4.30,,
2022-12-30,,,,,,3.88,,-0.53,,,,,,,4.79,,,4.29,,
2022-12-31,,,,,,,,,,,,,,,4.81,,,,,


In [5]:
fred_df.columns = fred_df.columns.str.lower()
fred_df

Unnamed: 0_level_0,unrate,gdp,fedfunds,cpiaucns,m2,dgs10,pce,t10y2y,usroa,usroe,wtisplc,houst,indpro,payems,bamlh0a0hym2,gs10,base,rifsppfaad01nb,exuseu,netexp
DATE,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1
2010-01-01,9.8,14764.611,0.11,216.687,,,10056.1,,0.49,4.8,78.22,614.0,89.1911,129798.0,,3.73,,,1.4266,-509.192
2010-01-04,,,,,8451.4,3.85,,2.76,,,,,,,6.34,,,0.13,,
2010-01-05,,,,,,3.77,,2.76,,,,,,,6.30,,,0.12,,
2010-01-06,,,,,,3.85,,2.84,,,,,,,6.17,,,0.10,,
2010-01-07,,,,,,3.85,,2.82,,,,,,,6.03,,,0.10,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2022-12-28,,,,,,3.88,,-0.43,,,,,,,4.77,,,4.30,,
2022-12-29,,,,,,3.83,,-0.51,,,,,,,4.81,,,4.30,,
2022-12-30,,,,,,3.88,,-0.53,,,,,,,4.79,,,4.29,,
2022-12-31,,,,,,,,,,,,,,,4.81,,,,,


In [6]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("StockPrediction") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/28 12:10:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
historical_data_spark = spark.createDataFrame(historical_data)
historical_data_spark.show()

[Stage 0:>                                                          (0 + 1) / 1]

+------------------+------------------+------------------+------------------+------------------+---------+------+
|              open|              high|               low|             close|          adjclose|   volume|ticker|
+------------------+------------------+------------------+------------------+------------------+---------+------+
|1.2666670083999634|1.6666669845581055|1.1693329811096191|1.5926669836044312|1.5926669836044312|281494500|  TSLA|
|1.7193330526351929|2.0280001163482666| 1.553333044052124|1.5886670351028442|1.5886670351028442|257806500|  TSLA|
|1.6666669845581055|1.7280000448226929|1.3513330221176147|1.4639999866485596|1.4639999866485596|123282000|  TSLA|
|1.5333329439163208|1.5399999618530273|  1.24733304977417|1.2799999713897705|1.2799999713897705| 77097000|  TSLA|
|1.3333330154418945|1.3333330154418945|1.0553330183029175|1.0740000009536743|1.0740000009536743|103003500|  TSLA|
|1.0933330059051514| 1.108667016029358|0.9986670017242432| 1.053333044052124| 1.05333304

                                                                                

In [8]:
fred_df_spark = spark.createDataFrame(fred_df)
fred_df_spark.show()

+------+---------+--------+--------+------+-----+-------+------+-----+-----+-------+-----+-------+--------+------------+----+--------+--------------+------+--------+
|unrate|      gdp|fedfunds|cpiaucns|    m2|dgs10|    pce|t10y2y|usroa|usroe|wtisplc|houst| indpro|  payems|bamlh0a0hym2|gs10|    base|rifsppfaad01nb|exuseu|  netexp|
+------+---------+--------+--------+------+-----+-------+------+-----+-----+-------+-----+-------+--------+------------+----+--------+--------------+------+--------+
|   9.8|14764.611|    0.11| 216.687|   NaN|  NaN|10056.1|   NaN| 0.49|  4.8|  78.22|614.0|89.1911|129798.0|         NaN|3.73|     NaN|           NaN|1.4266|-509.192|
|   NaN|      NaN|     NaN|     NaN|8451.4| 3.85|    NaN|  2.76|  NaN|  NaN|    NaN|  NaN|    NaN|     NaN|        6.34| NaN|     NaN|          0.13|   NaN|     NaN|
|   NaN|      NaN|     NaN|     NaN|   NaN| 3.77|    NaN|  2.76|  NaN|  NaN|    NaN|  NaN|    NaN|     NaN|         6.3| NaN|     NaN|          0.12|   NaN|     NaN|
|   

In [19]:
fred_df_pandaspark = pd.DataFrame(fred_df_spark)
fred_df_pandaspark

Unnamed: 0,unrate,gdp,fedfunds,cpiaucns,m2,dgs10,pce,t10y2y,usroa,usroe,wtisplc,houst,indpro,payems,bamlh0a0hym2,gs10,base,rifsppfaad01nb,exuseu,netexp
0,9.8,14764.611,0.11,216.687,,,10056.1,,0.49,4.8,78.22,614.0,89.1911,129798.0,,3.73,,,1.4266,-509.192
1,,,,,8451.4,3.85,,2.76,,,,,,,6.34,,,0.13,,
2,,,,,,3.77,,2.76,,,,,,,6.3,,,0.12,,
3,,,,,,3.85,,2.84,,,,,,,6.17,,,0.1,,
4,,,,,,3.85,,2.82,,,,,,,6.03,,,0.1,,
5,,,,,,3.83,,2.87,,,,,,,6.02,,,0.09,,
6,,,,,8444.3,3.85,,2.9,,,,,,,5.99,,,0.09,,
7,,,,,,3.74,,2.82,,,,,,,6.07,,,0.09,,
8,,,,,,3.8,,2.83,,,,,,,6.03,,1971.426,0.09,,
9,,,,,,3.76,,2.82,,,,,,,6.04,,,0.1,,


# Pre-Processing

In [None]:
# Drop unnecessary columns from historical_data
historical_data_spark = historical_data_spark.drop('ticker')

historical_data_spark = historical_data_spark.withColumn("ID", monotonically_increasing_id())
fred_df_spark = fred_df_spark.withColumn("ID", monotonically_increasing_id())

# Renaming columns if necessary
historical_data_spark = historical_data_spark.withColumnRenamed('index', 'DATE')

In [28]:
# Print di che oggetto è fred_df_spark
print(type(fred_df_spark))
print(type(fred_df_pandaspark))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.pandas.frame.DataFrame'>


In [9]:
fred_df_spark.columns

['unrate',
 'gdp',
 'fedfunds',
 'cpiaucns',
 'm2',
 'dgs10',
 'pce',
 't10y2y',
 'usroa',
 'usroe',
 'wtisplc',
 'houst',
 'indpro',
 'payems',
 'bamlh0a0hym2',
 'gs10',
 'base',
 'rifsppfaad01nb',
 'exuseu',
 'netexp']

In [24]:
w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())

In [25]:
spark.conf.set("spark.sql.shuffle.partitions", 11)

In [26]:
fred_df_pandaspark_part = fred_df_pandaspark.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1")
)

fred_df_pandaspark_part.rdd.glom().count()

AttributeError: 'DataFrame' object has no attribute 'withColumn'

In [21]:
# Fill missing values in fred_df_pandaspark
fred_df_pandaspark = fred_df_pandaspark.ffill()
fred_df_pandaspark

23/09/28 12:22:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/28 12:22:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/28 12:22:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/28 12:22:02 WARN AttachDistributedSequenceExec: clean up cached RDD(91) in AttachDistributedSequenceExec(432)
23/09/28 12:22:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/28 12:22:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Unnamed: 0,unrate,gdp,fedfunds,cpiaucns,m2,dgs10,pce,t10y2y,usroa,usroe,wtisplc,houst,indpro,payems,bamlh0a0hym2,gs10,base,rifsppfaad01nb,exuseu,netexp
0,9.8,14764.611,0.11,216.687,0.0,0.0,10056.1,0.0,0.49,4.8,78.22,614.0,89.1911,129798.0,0.0,3.73,0.0,0.0,1.4266,-509.192
1,0.0,0.0,0.0,0.0,8451.4,3.85,0.0,2.76,0.0,0.0,0.0,0.0,0.0,0.0,6.34,0.0,0.0,0.13,0.0,0.0
2,0.0,0.0,0.0,0.0,0.0,3.77,0.0,2.76,0.0,0.0,0.0,0.0,0.0,0.0,6.3,0.0,0.0,0.12,0.0,0.0
3,0.0,0.0,0.0,0.0,0.0,3.85,0.0,2.84,0.0,0.0,0.0,0.0,0.0,0.0,6.17,0.0,0.0,0.1,0.0,0.0
4,0.0,0.0,0.0,0.0,0.0,3.85,0.0,2.82,0.0,0.0,0.0,0.0,0.0,0.0,6.03,0.0,0.0,0.1,0.0,0.0
5,0.0,0.0,0.0,0.0,0.0,3.83,0.0,2.87,0.0,0.0,0.0,0.0,0.0,0.0,6.02,0.0,0.0,0.09,0.0,0.0
6,0.0,0.0,0.0,0.0,8444.3,3.85,0.0,2.9,0.0,0.0,0.0,0.0,0.0,0.0,5.99,0.0,0.0,0.09,0.0,0.0
7,0.0,0.0,0.0,0.0,0.0,3.74,0.0,2.82,0.0,0.0,0.0,0.0,0.0,0.0,6.07,0.0,0.0,0.09,0.0,0.0
8,0.0,0.0,0.0,0.0,0.0,3.8,0.0,2.83,0.0,0.0,0.0,0.0,0.0,0.0,6.03,0.0,1971.426,0.09,0.0,0.0
9,0.0,0.0,0.0,0.0,0.0,3.76,0.0,2.82,0.0,0.0,0.0,0.0,0.0,0.0,6.04,0.0,0.0,0.1,0.0,0.0


In [27]:
fred_df_spark = fred_df_pandaspark.to_spark()
fred_df_spark.show()

23/09/28 12:27:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/28 12:27:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/28 12:27:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/28 12:27:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/28 12:27:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------+---------+--------+--------+------+-----+-------+------+-----+-----+-------+-----+-------+--------+------------+----+--------+--------------+------+--------+
|unrate|      gdp|fedfunds|cpiaucns|    m2|dgs10|    pce|t10y2y|usroa|usroe|wtisplc|houst| indpro|  payems|bamlh0a0hym2|gs10|    base|rifsppfaad01nb|exuseu|  netexp|
+------+---------+--------+--------+------+-----+-------+------+-----+-----+-------+-----+-------+--------+------------+----+--------+--------------+------+--------+
|   9.8|14764.611|    0.11| 216.687|   0.0|  0.0|10056.1|   0.0| 0.49|  4.8|  78.22|614.0|89.1911|129798.0|         0.0|3.73|     0.0|           0.0|1.4266|-509.192|
|   0.0|      0.0|     0.0|     0.0|8451.4| 3.85|    0.0|  2.76|  0.0|  0.0|    0.0|  0.0|    0.0|     0.0|        6.34| 0.0|     0.0|          0.13|   0.0|     0.0|
|   0.0|      0.0|     0.0|     0.0|   0.0| 3.77|    0.0|  2.76|  0.0|  0.0|    0.0|  0.0|    0.0|     0.0|         6.3| 0.0|     0.0|          0.12|   0.0|     0.0|
|   

# MLLib

In [None]:
# Prepare data for MLlib
vector_assembler = VectorAssembler(inputCols=X.columns, outputCol="features")
X = vector_assembler.transform(dataset_spark).select("features")
y = dataset_spark.select("close")

In [None]:
# Splitting data
(trainingData, testData) = dataset_spark.randomSplit([0.8, 0.2])

In [None]:
# Linear Regression in Spark
lr = LinearRegression(featuresCol="features", labelCol="close")
lr_model = lr.fit(trainingData)

In [None]:
# Model Evaluation in Spark
lr_predictions = lr_model.transform(testData)
evaluator = RegressionEvaluator(labelCol="close", predictionCol="prediction", metricName="rmse")
lr_rmse = evaluator.evaluate(lr_predictions)
print(f"RMSE: {lr_rmse}")