# Course: ICS 574
# Prject: Alchemists Project - EDA
- Date: May 1st 2024
- Project Members
    - AHMED DHAFER ALQARNI, ID: 201453160
    - WALEED ABDULLAH ALFAIFI, ID: 201640920
    - ALGHAMDI, BANDAR, ID: 202206560

In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import seaborn as sns
import numpy as np


In [3]:
# Create a Spark session
spark = SparkSession.builder.appName("Decision Tree Classifier Example 3").getOrCreate()




24/03/31 06:35:56 WARN Utils: Your hostname, Bandars-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.100.154 instead (on interface en0)
24/03/31 06:35:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/31 06:35:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Load the datasets as pandas DataFrames
stocks_df = pd.read_csv("./datasets/Tadawul_stcks.csv")
brent_df = pd.read_csv("./datasets/BrentOilPrices.csv")
gold_df = pd.read_csv("./datasets/Gold_Daily.csv")

# Preprocess the stock data
stocks_specific_df = stocks_df[(stocks_df['sectoer'] == 'Energy') | (stocks_df['sectoer'] == 'Materials')]
stocks_specific_df.rename(columns={'date': 'Date'}, inplace=True)
stocks_specific_df.rename(columns={'close': 'Stock_Price'}, inplace=True)
stocks_specific_df['Date'] = pd.to_datetime(stocks_specific_df['Date'])
brent_df['Date'] = pd.to_datetime(brent_df['Date'])
gold_df['Date'] = pd.to_datetime(gold_df['Date'])
brent_df.rename(columns={'Price': 'Brent_Price'}, inplace=True)
gold_df.rename(columns={'Price': 'Gold_Price'}, inplace=True)
stocks_specific_df.rename(columns={'sectoer': 'Sector'}, inplace=True)
stocks_specific_df = stocks_specific_df[['Date', 'name', 'Stock_Price', 'Sector', 'symbol']]
brent_df = brent_df[['Date', 'Brent_Price']]
gold_df = gold_df[['Date', 'Gold_Price']]
stocks_specific_df.fillna(method='ffill', inplace=True)
stocks_brent_df = pd.merge(stocks_specific_df, brent_df, on='Date', how='inner')
stocks_brent_gold_df = pd.merge(stocks_brent_df, gold_df, on='Date', how='inner')




A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stocks_specific_df.rename(columns={'date': 'Date'}, inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stocks_specific_df.rename(columns={'close': 'Stock_Price'}, inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stocks_specific_df['Date'] = pd.to_datetime(stocks_specific_df['Date'])
  brent_df['Date'] = pd.to_datetime(brent_df['Date'])
A value is trying to be set on a copy of a slice from

In [5]:
# Create a Spark DataFrame
stocks_brent_gold_df_spark = spark.createDataFrame(stocks_brent_gold_df)

In [6]:
# Create a VectorAssembler for the features
assembler = VectorAssembler(inputCols=["Stock_Price", "Brent_Price", "Gold_Price"], outputCol="features")
stocks_brent_gold_df_spark = assembler.transform(stocks_brent_gold_df_spark)

In [7]:
# Define the DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol="features", labelCol="Stock_Price")

In [8]:
# define the evaluator
# note the metricName parameter is rmse, which stands for Root Mean Squared Error. This is the default metric for regression problems.
# Other metrics include r2 (R squared) and mae (Mean Absolute Error)
# see the documentation for more details: https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="Stock_Price", predictionCol="prediction", metricName="rmse")

In [9]:
# Define the grid of hyperparameters
# We will use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for dt.maxDepth and 5 values for dt.maxBins, this grid will have 3 x 5 = 15 parameter settings for CrossValidator to choose from.
# see the documentation for more details: https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [2, 3, 4, 5, 6, 7, 8, 9, 10]) \
    .addGrid(dt.maxBins, [10, 20, 40, 80, 100]) \
    .build()

In [10]:
# Define the CrossValidator
# We will use a CrossValidator to select the best model.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# see the documentation for more details: https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [11]:
# fit the cv model with the assembled data
cvModel = cv.fit(stocks_brent_gold_df_spark)

                                                                                

In [12]:
# get the average cross-validated RMSE
# note the RMSE is the root of the average of the squares of the differences between the predicted and the actual values
# see the documentation for more details: https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator

# the higher the RMSE, the worse the model
# the lower the RMSE, the better the model
# the RMSE is a measure of the quality of the model
# the RMSE is in the same units as the target variable
# the RMSE is always non-negative
# the RMSE is scale-dependent
# the RMSE is not a percentage
# the RMSE can be used to compare different models
# the RMSE can be used to compare different transformations of the same model
# the RMSE can be used to compare different models on different datasets
# the RMSE can be used to compare different models on the same dataset
# the RMSE can be used to compare different models on the same dataset with different target variables
# the RMSE can be used to compare different models on the same dataset with the same target variable
# the RMSE can be used to compare different models on the same dataset with the same target variable and different features
# the RMSE can be used to compare different models on the same dataset with the same target variable and the same features
# the RMSE can be used to compare different models on the same dataset with the same target variable and the same features and different hyperparameters
# the RMSE can be used to compare different models on the same dataset with the same target variable and the same features and the same hyperparameters
# the RMSE value of 0 means the model is perfect
# the RMSE value of 9 means the model is good?
avg_rmse = np.mean(cvModel.avgMetrics)
print("Average RMSE: ", avg_rmse)

Average RMSE:  9.841781224229344


In [13]:
# print all the parameters of the best model
print(cvModel.bestModel.extractParamMap())


{Param(parent='DecisionTreeRegressor_434d5017f14b', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='DecisionTreeRegressor_434d5017f14b', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='DecisionTreeRegressor_434d5017f14b', name='featuresCol', doc='features column name.'): 'features', Param(parent='DecisionTreeRegressor_434d5017f14b', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: variance'): 'variance', Para

In [14]:
# print the RMSE of all the models
for rmse in cvModel.avgMetrics:
    print(rmse)
    

15.915918017272668
14.281627995004516
13.344991087045804
13.230093311265122
13.152253749942513
14.571579011179526
11.60517856311905
9.953710149096313
8.736322743530852
8.45405101145592
14.204317004683372
11.046878927104736
8.774626088088315
6.916011299131246
6.573127580083129
14.109634446054141
10.858033054980577
8.46427368210331
6.342382854255309
5.9738789918995545
14.057219052059262
10.819929795729022
8.383647789105341
6.155372826761932
5.852099521952239
13.985779346677958
10.795701039963602
8.368194133246872
6.1516046333679
5.781998302890603
13.959294812630285
10.750884171568286
8.36139494754078
6.225511646818347
5.7968964043610764
13.958351184427233
10.747112771052723
8.396109221578362
6.321505019330617
5.942038590825033
13.955867639310995
10.73575510870954
8.40692276306232
6.432773036711319
6.029301763342873


In [None]:
# define multi-class evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

In [12]:
# Train-test split
splits = stocks_brent_gold_df_spark.randomSplit([0.8, 0.2])
train = splits[0]
test = splits[1]



In [14]:
# Train the model
model = dt.fit(train)


In [15]:
# Make predictions
predictions = model.transform(test)

In [16]:
# Evaluate the model
evaluator = RegressionEvaluator(labelCol="Stock_Price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)


Root Mean Squared Error (RMSE): 8.042969898488115


In [17]:
# evaluate the model using accuracy
evaluator = RegressionEvaluator(labelCol="Stock_Price", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2:", r2)

# An R-squared (R2) value of 0.946 indicates that approximately 94.6% of the variance in the stock price can be explained by the features (Brent_Price and Gold_Price) in your model.

# Generally, an R2 value close to 1 indicates that the model is able to explain a large proportion of the variance in the data, which is considered good. However, it's important to interpret this value in the context of your specific problem and domain knowledge.

R2: 0.9460308155664667


In [18]:
# Save the model
model.save("decision_tree_regressor_model 2")

In [21]:
# predict the stock price for the next day
predictions = model.transform(stocks_brent_gold_df_spark)
predictions.show(5)



+-------------------+--------------------+-----------+------+------+-----------+----------+--------------------+------------------+
|               Date|                name|Stock_Price|Sector|symbol|Brent_Price|Gold_Price|            features|        prediction|
+-------------------+--------------------+-----------+------+------+-----------+----------+--------------------+------------------+
|2020-03-05 00:00:00|Saudi Arabia Refi...|       34.9|Energy|  2030|      51.29|    1690.5| [34.9,51.29,1690.5]| 34.65715564486664|
|2020-03-05 00:00:00|Saudi Arabian Oil...|       33.0|Energy|  2222|      51.29|    1690.5| [33.0,51.29,1690.5]| 34.65715564486664|
|2020-03-05 00:00:00|Rabigh Refining a...|       14.8|Energy|  2380|      51.29|    1690.5| [14.8,51.29,1690.5]|15.163399172310012|
|2020-03-05 00:00:00|National Shipping...|      31.15|Energy|  4030|      51.29|    1690.5|[31.15,51.29,1690.5]| 30.75689315068493|
|2020-03-05 00:00:00|Aldrees Petroleum...|       60.1|Energy|  4200|      51

In [22]:
# compare the predicted stock price with the actual stock price
predictions.toPandas().to_csv("predictions.csv")


In [23]:
# Stop the Spark session
spark.stop()