# Sprint Predicción de Stock - Ignacio Bayón Jiménez-Ugarte

Imports

In [126]:
# Pip installs
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

# System imports
import os
import sys
from google.colab import drive
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

# Spark
import findspark
findspark.init()
findspark.find()
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, IntegerType, FloatType
from pyspark.sql.window import Window
from pyspark.ml.functions import array_to_vector

# Data management
from datetime import datetime, timedelta
import numpy as np
import math
import pandas as pd
import subprocess
import plotly.express as px
from sklearn.datasets import load_iris

[33m0% [Working][0m            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
45 packages can be upgraded. Run 'apt list --upgradable' to see them.
tar: spark-3.2.1-bin-hadoop3.2.tgz: Cannot open: No such file 

In [127]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import PCA
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler


In [128]:
appName = 'abd03_ford_sparksql'
spark = SparkSession.builder\
                    .appName(appName) \
                    .getOrCreate()
sc = spark.sparkContext

In [129]:
path = "datos_ford.csv"
df = pd.read_csv(path)
df = df.sort_values(by="_u1.date")

# Remove the initial "_u1." from the column names
df.columns = [col.split(".")[1] for col in df.columns]
# Reset the index
df = df.reset_index(drop=True)

df.head()

Unnamed: 0,date,ticker,cik,open,high,low,close,adj_close,volume
0,2018-01-02,F,37996,12.52,12.66,12.5,12.66,9.247076,20773300
1,2018-01-03,F,37996,12.68,12.8,12.67,12.76,9.320119,29765600
2,2018-01-04,F,37996,12.78,13.04,12.77,12.98,9.480811,37478200
3,2018-01-05,F,37996,13.06,13.22,13.04,13.2,9.641503,46121900
4,2018-01-08,F,37996,13.21,13.22,13.11,13.15,9.604981,33828300


In [130]:
dfs = spark.createDataFrame(df)
dfs.createOrReplaceTempView("stock_ford")

df_ford = spark.table("stock_ford")
df_ford.show()

+----------+------+-----+------------------+------------------+------------------+------------------+-----------------+---------+
|      date|ticker|  cik|              open|              high|               low|             close|        adj_close|   volume|
+----------+------+-----+------------------+------------------+------------------+------------------+-----------------+---------+
|2018-01-02|     F|37996|12.520000457763672| 12.65999984741211|              12.5| 12.65999984741211|9.247076034545898| 20773300|
|2018-01-03|     F|37996| 12.68000030517578|12.800000190734863|12.670000076293944|12.760000228881836| 9.32011890411377| 29765600|
|2018-01-04|     F|37996|12.779999732971191|13.039999961853027|12.770000457763672|12.979999542236328| 9.48081111907959| 37478200|
|2018-01-05|     F|37996|  13.0600004196167|13.220000267028809|13.039999961853027|13.199999809265137| 9.64150333404541| 46121900|
|2018-01-08|     F|37996|13.210000038146973|13.220000267028809|13.109999656677246|13.14999

In [131]:
window = Window.orderBy("date")
df_ford = df_ford.withColumn("next_adj_close", F.lag("adj_close", -1).over(window))
df_ford.show()

+----------+------+-----+------------------+------------------+------------------+------------------+-----------------+---------+-----------------+
|      date|ticker|  cik|              open|              high|               low|             close|        adj_close|   volume|   next_adj_close|
+----------+------+-----+------------------+------------------+------------------+------------------+-----------------+---------+-----------------+
|2018-01-02|     F|37996|12.520000457763672| 12.65999984741211|              12.5| 12.65999984741211|9.247076034545898| 20773300| 9.32011890411377|
|2018-01-03|     F|37996| 12.68000030517578|12.800000190734863|12.670000076293944|12.760000228881836| 9.32011890411377| 29765600| 9.48081111907959|
|2018-01-04|     F|37996|12.779999732971191|13.039999961853027|12.770000457763672|12.979999542236328| 9.48081111907959| 37478200| 9.64150333404541|
|2018-01-05|     F|37996|  13.0600004196167|13.220000267028809|13.039999961853027|13.199999809265137| 9.64150333

Data Preprocessing

In [132]:
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.regression import (
    RandomForestRegressor, RandomForestRegressionModel, GBTRegressor)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

pred_days = 8
num_days_for_features = 7

# Data Processing
# Add label column as the adj_close of the next_day
window = Window.orderBy("date")
df_ford = df_ford.withColumn("label", F.lag("adj_close", -1).over(window))
# Add features column and scale it
# feature_cols = ["open", "high", "low", "close"]
# assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_unscaled")

# Add lagged features for the previous 5 days
for i in range(1, num_days_for_features + 1):
    for col in ["close"]:
        lag_col = f"{col}_lag{i}"
        df_ford = df_ford.withColumn(lag_col, F.lag(col, i).over(window))

# Combine lagged features into a single vector column
feature_cols = [f"{col}_lag{i}" for i in range(1, num_days_for_features + 1) for col in ["close"]]
# feature_cols += ["open", "high", "low"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_unscaled")

scaler = StandardScaler(inputCol="features_unscaled", outputCol="features")

# Split into train/test data
nrows = df_ford.count()
train_data = df_ford.limit(nrows - pred_days)
test_data =  df_ford.subtract(train_data)

# Model
model = GBTRegressor()

# Pipeline
pipeline = Pipeline(stages=[assembler, scaler, model])

evaluator = RegressionEvaluator()

# Cross Validation
# # Param grid for hyperparameter tuning
# param_grid = ParamGridBuilder() \
#   .addGrid(rf.numTrees, [50, 100, 200]) \
#   .addGrid(rf.maxDepth, [5, 10, 15]) \
#   .build()
# cv = CrossValidator(
#     estimator=pipeline,
#     estimatorParamMaps=param_grid,
#     evaluator=evaluator,
#     numFolds=5,
#     seed=42
# )

In [133]:
# Train model
# cv_model = cv.fit(train_data)
pipeline_tr = pipeline.fit(train_data)

In [134]:
# Make predictions
# predictions = cv_model.transform(test_data)
predictions = pipeline_tr.transform(test_data)

In [135]:
# predictions = predictions.dropna(subset=["next_adj_close", "label"])

# Check for null values
# null_counts = [predictions.where(F.col(c).isNull()).count() for c in predictions.columns]

# # Print null counts
# print("Null counts in predictions DataFrame:")
# for i, c in enumerate(predictions.columns):
#     print(f"{c}: {null_counts[i]}")

In [136]:
# Evaluate model: r2, mae, rmse
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
print(f"R2: {r2}")
print(f"MAE: {mae}")
print(f"RMSE: {rmse}")

# # Best model
# best_model = cv_model.bestModel
# best_rf_model = best_model.stages[-1]

R2: 0.24393480223934072
MAE: 0.2946245101710345
RMSE: 0.3540676166000292


In [137]:
predictions.show()

+----------+------+-----+------------------+------------------+------------------+------------------+------------------+--------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+--------------------+------------------+
|      date|ticker|  cik|              open|              high|               low|             close|         adj_close|  volume|    next_adj_close|             label|        close_lag1|        close_lag2|        close_lag3|        close_lag4|        close_lag5|        close_lag6|        close_lag7|   features_unscaled|            features|        prediction|
+----------+------+-----+------------------+------------------+------------------+------------------+------------------+--------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------

In [138]:
# Convert PySpark DataFrame to pandas DataFrame
predictions_df = predictions.select("date", "adj_close", "prediction").toPandas()

fig = px.line(predictions_df, x="date", y=predictions_df.columns[1:3])
fig.show()