In [1]:
import findspark

#VectorAssembler provides transformer to convert Dataframe columns into vectors
from pyspark.ml.feature import VectorAssembler
#StringIndexer provides transformer to convert string labels into numerical values
from pyspark.ml.feature import StringIndexer

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

#N.B. before proceeding you must change the path passed to the findspark.init function to the location where spark is installed on your machine
findspark.init('/Users/gracilepereira/Desktop/spark-3.5.0-bin-hadoop3')

In [2]:
#Create a spark session
spark = SparkSession.builder.appName("StockAnalysis").config("spark.executor.heartbeatInterval", "60s").config("spark.executor.memory", "12g").config("spark.driver.memory", "8g").config("spark.sql.execution.arrow.pyspark.enabled", "false").getOrCreate()

24/01/21 23:15:12 WARN Utils: Your hostname, Graciles-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.3.132 instead (on interface en0)
24/01/21 23:15:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/21 23:15:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/21 23:15:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/01/21 23:15:14 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
# Load the stock history data into a Spark DataFrame
stocks = spark.read.csv("fh_5yrs.csv", header=True, inferSchema=True)

                                                                                

In [4]:
#View your data and understand its structure
stocks.show()
stocks.printSchema()

+----------+------+------------------+------------------+------------------+------------------+------------------+------+
|      date|volume|              open|              high|               low|             close|          adjclose|symbol|
+----------+------+------------------+------------------+------------------+------------------+------------------+------+
|2020-07-02|257500|17.639999389648438|17.739999771118164|  17.6200008392334|17.709999084472656|17.709999084472656|  AAAU|
|2020-07-01|468100|17.729999542236328|17.729999542236328|17.540000915527347| 17.68000030517578| 17.68000030517578|  AAAU|
|2020-06-30|319100|17.649999618530273|17.799999237060547|17.610000610351562|17.780000686645508|17.780000686645508|  AAAU|
|2020-06-29|405500|17.670000076293945|17.690000534057614|  17.6299991607666| 17.68000030517578| 17.68000030517578|  AAAU|
|2020-06-26|335100|17.489999771118164|17.670000076293945|17.420000076293945|17.670000076293945|17.670000076293945|  AAAU|
|2020-06-25|246800|17.60

In [5]:
# checking the number of rows
stocks.count()

                                                                                

6852038

In [6]:
# checking the number of columns
len(stocks.columns)

8

In [7]:
# Missing values
from pyspark.sql.functions import col, count, when, isnan
from pyspark.sql.types import DoubleType, FloatType

# Check for missing values within the dataset
def count_missing_values(column, dtype):
    if isinstance(dtype, (DoubleType, FloatType)):
        return count(when(isnan(column) | col(column).isNull(), column))
    else:
        return count(when(col(column).isNull(), column))

missing_counts = [count_missing_values(c, stocks.schema[c].dataType).alias(c) for c in stocks.columns]
stocks.select(missing_counts).show()



+----+------+----+----+---+-----+--------+------+
|date|volume|open|high|low|close|adjclose|symbol|
+----+------+----+----+---+-----+--------+------+
|   0|     0|   0|   0|  0|    0|       0|     0|
+----+------+----+----+---+-----+--------+------+



                                                                                

In [8]:
# Basic descriptive statistics
stocks.describe().show(truncate=False)

24/01/21 23:16:06 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 11:>                                                         (0 + 1) / 1]

+-------+------------------+--------------------+------------------+------------------+--------------------+------------------+-------+
|summary|volume            |open                |high              |low               |close               |adjclose          |symbol |
+-------+------------------+--------------------+------------------+------------------+--------------------+------------------+-------+
|count  |6852038           |6852038             |6852038           |6852038           |6852038             |6852038           |6852038|
|mean   |1015413.7055740205|298.0861291125565   |305.87568055661404|291.0136200348426 |296.7830805422864   |293.23114568836394|NaN    |
|stddev |4833379.770318016 |115808.75604569825  |118216.0919370147 |113528.60373528661|115157.54666985657  |114344.26187778253|NaN    |
|min    |1                 |0.001000000047497451|0.0               |0.0               |0.001000000047497451|-3.770960807800293|AAAU   |
|max    |2156725200        |6.9155304E7         

                                                                                

In [9]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define a window specification
windowSpec = Window().orderBy("date")

# Create a new column 'label' based on the difference between current and previous day's close
stocks = stocks.withColumn("label", F.when(F.lag("close").over(windowSpec) < stocks["close"], 1).otherwise(0))

# Drop rows with null label (first row after creating the lag)
stocks = stocks.na.drop(subset=["label"])

# Show the resulting DataFrame
stocks.show()

24/01/21 23:16:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:16:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:16:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:17:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:17:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+----------+--------+------------------+------------------+------------------+------------------+------------------+------+-----+
|      date|  volume|              open|              high|               low|             close|          adjclose|symbol|label|
+----------+--------+------------------+------------------+------------------+------------------+------------------+------+-----+
|2015-01-02|    2000|             37.25|             37.25| 36.63999938964844| 36.63999938964844|  35.3997688293457|  AADR|    0|
|2015-01-02|10748600| 54.27999877929688|54.599998474121094| 53.06999969482422| 53.90999984741211| 51.07991790771485|   AAL|    1|
|2015-01-02|   11500|             308.0| 348.5899963378906|             308.0|327.17999267578125|327.17999267578125|  AAMC|    1|
|2015-01-02|   11400| 3.990000009536743|4.0300002098083505|3.9800000190734863|4.0300002098083505| 3.917722225189209|  AAME|    0|
|2015-01-02|  898900|30.809999465942386| 30.86000061035156|30.040000915527344|  30.6200008

In [10]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import year, month, dayofmonth

In [11]:
# Extract year, month, and day from the 'date' column
stocks = stocks.withColumn('year', year(stocks['date']))
stocks = stocks.withColumn('month', month(stocks['date']))
stocks = stocks.withColumn('day', dayofmonth(stocks['date']))

In [17]:
%%time

# Split the data set into training and test
[trainingData, testData] = stocks.randomSplit([0.6, 0.4])

# Create a VectorAssembler to transform feature columns into vectors
assembler = VectorAssembler(
    inputCols=["year", "month", "day", "volume", "open", "high", "low", "close", "adjclose"],
    outputCol="features"
)

# Create a StringIndexer to convert the 'symbol' column into numerical format
indexer = StringIndexer(inputCol="symbol", outputCol="indexedSymbol")

# Create an estimator based on XGBoost (GBTClassifier in this case)
lr = LogisticRegression(maxIter=30, regParam=0.01, labelCol="label")

# Create an ML pipeline in order of assembler -> indexer -> logistic regression
pipeline = Pipeline(stages=[assembler, indexer, lr])

CPU times: user 1.9 ms, sys: 5.9 ms, total: 7.8 ms
Wall time: 241 ms


In [18]:
%%time

# Train a model using the pipeline and the training data
model = pipeline.fit(trainingData)

# Test the classification model using the test data
prediction = model.transform(testData)
prediction.select('date', 'symbol', 'label', 'probability', 'prediction').show(truncate=False)

24/01/21 23:30:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:30:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:30:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:30:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:30:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:30:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 2

+----------+------+-----+----------------------------------------+----------+
|date      |symbol|label|probability                             |prediction|
+----------+------+-----+----------------------------------------+----------+
|2015-01-02|CHAU  |1    |[0.49927781323829923,0.5007221867617008]|1.0       |
|2015-01-02|DTP   |0    |[0.4992991064141682,0.5007008935858318] |1.0       |
|2015-01-02|AYLA  |1    |[0.49929827226465934,0.5007017277353407]|1.0       |
|2015-01-02|ZYXI  |0    |[0.49929993216795937,0.5007000678320406]|1.0       |
|2015-01-02|SMIT  |0    |[0.4992992246252693,0.5007007753747307] |1.0       |
|2015-01-02|SVBI  |0    |[0.4992988515487742,0.5007011484512258] |1.0       |
|2015-01-02|EVSI  |0    |[0.4992982152726881,0.5007017847273119] |1.0       |
|2015-01-02|LIND  |1    |[0.4992975079706507,0.5007024920293492] |1.0       |
|2015-01-02|MPB   |0    |[0.49929613499977665,0.5007038650002233]|1.0       |
|2015-01-02|GJP   |1    |[0.4992944320881943,0.5007055679118058]

                                                                                

In [19]:
evaluator = MulticlassClassificationEvaluator(
    labelCol = 'label', predictionCol = 'prediction', metricName='accuracy'
)

accuracy = evaluator.evaluate(prediction)

print("Accuracy = {:.2%}".format(accuracy))
print("Classification Error = %g" % (1.0 - accuracy))

24/01/21 23:33:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:33:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:33:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:33:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:33:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 119:>                                                        (0 + 1) / 1]

Accuracy = 50.60%
Classification Error = 0.494032


                                                                                

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import functions as F

# Ensure the "label" column is of type DoubleType
prediction = prediction.withColumn("label", F.col("label").cast("double"))

# Extract true labels and predicted labels as RDD
predictionAndLabels = prediction.select("prediction", "label").rdd

# Instantiate MulticlassMetrics
metrics = MulticlassMetrics(predictionAndLabels)

# Get the confusion matrix
confusion_matrix = metrics.confusionMatrix().toArray()
print("Confusion Matrix:")
print(confusion_matrix)

24/01/21 23:33:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:33:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:33:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:34:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/21 23:34:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 125:>                                                        (0 + 1) / 1]

Confusion Matrix:
[[  33496. 1325462.]
 [  27862. 1352523.]]


                                                                                

In [22]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define the parameter grid to search through
paramGrid = (ParamGridBuilder()
             .addGrid(lr.maxIter, [10, 20, 30])
             .addGrid(lr.regParam, [0.01, 0.1, 1.0])
             .build())

# Define the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Create a CrossValidator with 5 folds
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Perform hyperparameter tuning
cvModel = cv.fit(trainingData)

# Get the best model from the cross-validation
best_model = cvModel.bestModel

# Test the best model on the test data
best_prediction = best_model.transform(testData)
best_prediction.select('date', 'symbol', 'label', 'probability', 'prediction').show(truncate=False)

# Evaluate the best model
best_accuracy = evaluator.evaluate(best_prediction)
print("Best Model Accuracy = {:.2%}".format(best_accuracy))


24/01/22 02:37:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 02:37:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 02:37:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 02:37:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 02:37:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 02:37:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 0

24/01/22 02:51:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 02:51:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 02:52:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 02:52:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 02:53:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 02:53:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 0

24/01/22 03:05:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:05:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:06:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:06:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:07:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:07:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 0

24/01/22 03:19:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:19:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:20:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:20:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:20:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:21:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 0

+----------+------+-----+----------------------------------------+----------+
|date      |symbol|label|probability                             |prediction|
+----------+------+-----+----------------------------------------+----------+
|2015-01-02|CHAU  |1    |[0.4992780293300256,0.5007219706699744] |1.0       |
|2015-01-02|DTP   |0    |[0.4992993222953445,0.5007006777046554] |1.0       |
|2015-01-02|AYLA  |1    |[0.49929848813721817,0.5007015118627818]|1.0       |
|2015-01-02|ZYXI  |0    |[0.49930014803773687,0.5006998519622632]|1.0       |
|2015-01-02|SMIT  |0    |[0.49929944050109737,0.5007005594989027]|1.0       |
|2015-01-02|SVBI  |0    |[0.499299067431593,0.500700932568407]   |1.0       |
|2015-01-02|EVSI  |0    |[0.4992984311572832,0.5007015688427168] |1.0       |
|2015-01-02|LIND  |1    |[0.49929772386138904,0.500702276138611] |1.0       |
|2015-01-02|MPB   |0    |[0.49929635094164565,0.5007036490583543]|1.0       |
|2015-01-02|GJP   |1    |[0.4992946480056489,0.5007053519943512]

24/01/22 03:21:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:21:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:21:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:22:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/22 03:22:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 2891:>                                                       (0 + 1) / 1]

Best Model Accuracy = 50.60%


                                                                                