In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [2]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

23/11/25 12:38:54 WARN Utils: Your hostname, Gias-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.100 instead (on interface en0)
23/11/25 12:38:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/25 12:38:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/25 12:38:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = spark.read.csv("file:/Users/giasmith/DCFinal/Data/KOv3.csv", header=True, inferSchema=True)

In [4]:
df = df.drop("Date", "TaxEffectOfUnusualItems")
#df.take(1)

### Outcome Correlation
Reference: 
https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a

In [6]:
outcome = "Adj Close"
# compile all of the columns that are not the outcome
feature_cols = [col for col in df.columns if col != outcome]

### Linear Regression

In [7]:
# create a feature vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

In [8]:
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=123)

In [9]:
lr = LinearRegression(featuresCol="features", labelCol="Adj Close")

In [10]:
param_grid = ParamGridBuilder().build()

In [11]:
# Set up a cross-validator
cross_val = CrossValidator(estimator=lr,
                           estimatorParamMaps=param_grid,
                           evaluator=RegressionEvaluator(labelCol=outcome),
                           numFolds=5)

In [12]:
# Train the model
model = cross_val.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)
predictions.select("Adj Close", "PercentChangeClose", "prediction").show(10)

23/11/25 12:39:00 WARN Instrumentation: [b2679d6d] regParam is zero, which might cause numerical instability and overfitting.
23/11/25 12:39:00 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/25 12:39:00 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/11/25 12:39:00 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
23/11/25 12:39:00 WARN Instrumentation: [b2679d6d] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
23/11/25 12:39:02 WARN Instrumentation: [8f70f2dc] regParam is zero, which might cause numerical instability and overfitting.
23/11/25 12:39:02 WARN Instrumentation: [8f70f2dc] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
23/11/25 12:39:03 WARN Instrumentation: [f9f27397] regParam is zero, which might cause numerical instability and overfitting.
23/11/

+---------+------------------+------------------+
|Adj Close|PercentChangeClose|        prediction|
+---------+------------------+------------------+
| 2.048348|              1.07| 2.471610622974673|
|   2.6057|             1.196|2.7835756240851843|
| 4.564611|             1.167|4.1474764291904895|
| 4.833506|             0.973|  4.34332594188609|
| 5.241195|             1.007| 4.868896368889013|
| 6.481538|             1.046| 5.512423485318865|
|11.048383|             1.004|10.252246540744263|
|12.042693|             0.974|11.207298514738003|
|11.766837|             1.027|10.376999117581304|
|12.571919|             1.014|12.117556620539489|
+---------+------------------+------------------+
only showing top 10 rows



In [13]:
# evaluate the model
evaluator = RegressionEvaluator(labelCol="Adj Close")
#calculate and print the RMSE
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)

Root Mean Squared Error (RMSE): 0.8731171093234846


### SVM

Percent Change Close is calculated by (Last Price minus Close Price)/Close Price x 100
* If "PercentChangeClose" is greater than 1, it suggests that the closing price of the stock has increased.
* If "PercentChangeClose" is less than 1, it suggests that the closing price has decreased.
* If "PercentChangeClose" is equal to 1, it suggests that the closing price has remained unchanged.

In [42]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import when
from pyspark.ml.feature import StringIndexer

In [47]:
# create a binary classification system using PercentChangeClose
from pyspark.sql.functions import when

# Create a new column "classification" based on PercentChangeClose
svm_df = predictions.withColumn(
    "classification",
    when(predictions["PercentChangeClose"] > 1, 0)
    .when(predictions["PercentChangeClose"] < 1, 1)
    .otherwise(2)
)

# Show the updated DataFrame
svm_df.select("Adj Close", "PercentChangeClose", "prediction", "classification").show(10)


+---------+------------------+------------------+--------------+
|Adj Close|PercentChangeClose|        prediction|classification|
+---------+------------------+------------------+--------------+
| 2.048348|              1.07| 2.471610622974673|             0|
|   2.6057|             1.196|2.7835756240851843|             0|
| 4.564611|             1.167|4.1474764291904895|             0|
| 4.833506|             0.973|  4.34332594188609|             1|
| 5.241195|             1.007| 4.868896368889013|             0|
| 6.481538|             1.046| 5.512423485318865|             0|
|11.048383|             1.004|10.252246540744263|             0|
|12.042693|             0.974|11.207298514738003|             1|
|11.766837|             1.027|10.376999117581304|             0|
|12.571919|             1.014|12.117556620539489|             0|
+---------+------------------+------------------+--------------+
only showing top 10 rows



23/11/25 13:59:14 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)

In [49]:
label = "classification"

# Create a feature vector
feature_columns =  [col for col in svm_df.columns if col != "classification"]
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="svm_features")

# Split data
(train_data, test_data) = svm_df.randomSplit([0.8, 0.2], seed=123)

# Index the label column
indexer = StringIndexer(inputCol= predictions["classification"], outputCol="label")

# Create an SVM model
svm = LinearSVC(featuresCol="svm_features", labelCol="label", maxIter=100)

# Create a pipeline
pipeline = Pipeline(stages=[vector_assembler, indexer, svm])

# Fit the pipeline on the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions_svm = pipeline.transform(predictions)  # Assuming 'predictions' is your test dataset

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="label")
accuracy = evaluator.evaluate(predictions_svm)

# Show the accuracy
print("Accuracy:", accuracy)

AssertionError: Undefined error message parameter for error class: CANNOT_PARSE_DATATYPE. Parameters: {'error': '[Errno 61] Connection refused'}

### Computing a predicted Percent Change Close

In [None]:
from pyspark.sql import Window
from pyspark.sql import functions as F

# Define a window specification
windowSpec = Window.orderBy(predictions["prediction"])

# Calculate the predictedCC column using lag function
predictions = predictions.withColumn("new_price", F.col("prediction"))
predictions = predictions.withColumn("old_price", F.lag("prediction").over(windowSpec))
predictions = predictions.withColumn("predictedCC", (((F.col("new_price") - F.col("old_price")) / F.col("old_price")) * 100).alias("predictedCC"))

# Drop intermediate columns if needed
predictions = predictions.drop("new_price", "old_price")


In [28]:
predictions.select("Adj Close", "PercentChangeClose", "prediction", "predictedCC").show(10)

+---------+------------------+------------------+------------------+
|Adj Close|PercentChangeClose|        prediction|       predictedCC|
+---------+------------------+------------------+------------------+
| 2.048348|              1.07| 2.471610622974673|              NULL|
|   2.6057|             1.196|2.7835756240851843| 12.62193155388895|
| 4.564611|             1.167|4.1474764291904895|48.998158817889056|
| 4.833506|             0.973|  4.34332594188609|4.7221368472931085|
| 5.241195|             1.007| 4.868896368889013|12.100644391765215|
| 6.481538|             1.046| 5.512423485318865|13.217104404641344|
|11.048383|             1.004|10.252246540744263| 85.98437816051107|
|11.766837|             1.027|10.376999117581304|1.2168316118935703|
|11.000153|             0.874|10.432988238960608|0.5395502181786195|
|11.702862|             0.917|10.864535412213387| 4.136371702607923|
+---------+------------------+------------------+------------------+
only showing top 10 rows



23/11/25 13:02:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/25 13:02:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/25 13:02:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/25 13:02:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/25 13:02:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/25 13:02:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/25 1