<a href="https://colab.research.google.com/github/emmetorior/CN7030-/blob/main/stock_classifier_reduced_vars_label_py.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



```
# This is formatted as code
```

# AAPL Stock Price Regression Prediction
## Using PySpark MLLib with Feature Engineering

In [1]:
# Install required libraries
!pip install pyspark pandas numpy



In [2]:
# Import required libraries
from pyspark.sql import SparkSession
# Importing package
from pyspark.sql.functions import (
    expr, col, lag, lead, window, stddev, mean, first, last,
    when, isnan, count
)
from pyspark.ml.linalg import Vectors
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, lag, when
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import pandas as pd
import numpy as np

In [3]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("AAPL Stock Prediction") \
    .getOrCreate()

In [4]:
# Create sample AAPL stock data
def create_sample_stock_data():
    # Generate a year of simulated stock data
    dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='B')
    np.random.seed(42)  # For reproducibility

    data = {
        'Date': dates,
        'Volume': np.random.randint(1000000, 5000000, len(dates)),
        'High': 150 + np.cumsum(np.random.normal(0.1, 1, len(dates))),
        'Low': 140 + np.cumsum(np.random.normal(-0.1, 1, len(dates))),
        'Close': 145 + np.cumsum(np.random.normal(0, 1, len(dates)))
    }

    df = pd.DataFrame(data)
    df.to_csv('AAPL.csv', index=False)
    return df

create_sample_stock_data()

Unnamed: 0,Date,Volume,High,Low,Close
0,2023-01-02,3219110,149.616877,139.350742,144.593056
1,2023-01-03,3768307,149.147049,136.791548,143.140852
2,2023-01-04,3229084,147.155449,136.748961,143.954540
3,2023-01-05,4511566,148.519190,135.804854,142.453376
4,2023-01-06,3356330,148.603706,137.063670,142.184542
...,...,...,...,...,...
255,2023-12-25,1228576,172.654969,122.746283,162.037294
256,2023-12-26,4112754,173.309662,123.828257,161.968272
257,2023-12-27,4683561,174.142835,124.086429,161.780693
258,2023-12-28,4011243,173.138613,123.874456,161.905848


In [5]:
# Read Stock Data
df = spark.read.csv('AAPL.csv', header=True, inferSchema=True)

# Convert Date column to timestamp
df = df.withColumn("Date", col("Date").cast("timestamp"))

In [6]:
### new code here

# Calculate 9-day moving average
window_spec = Window.orderBy("Date").rowsBetween(-8, 0)
df = df.withColumn("9_day_avg", avg("Close").over(window_spec))

# Calculate percentage change for labeling
df = df.withColumn("next_day_close", lag("Close", -1).over(Window.orderBy("Date")))
df = df.withColumn("change", (col("next_day_close") - col("Close")) / col("Close") * 100)

# Define buckets for categorization
df = df.withColumn("label",
    when(col("change") > 2, "positively high")
    .when((col("change") > 0) & (col("change") <= 2), "positively medium")
    .when((col("change") > -2) & (col("change") <= 0), "positively low")
    .when((col("change") > -4) & (col("change") <= -2), "negatively low")
    .when((col("change") > -6) & (col("change") <= -4), "negatively medium")
    .otherwise("negatively high")
)

# Drop rows with null values
df = df.dropna()

# Assemble features
features = ["Volume", "9_day_avg"]
assembler = VectorAssembler(inputCols=features, outputCol="features")
data_prepared = assembler.transform(df)

# Split the data
train_data, test_data = data_prepared.randomSplit([0.7, 0.3], seed=42)




In [8]:
## Train the Decision Tree classifier
#dt = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=5)
#dt_model = dt.fit(train_data)

# Make predictions
#predictions = dt_model.transform(test_data)

# Evaluate the model
#evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
#accuracy = evaluator.evaluate(predictions)
#print(f"Accuracy: {accuracy}")


In [9]:
# Feature Engineering Steps

# 1. 9-day Moving Average
window_spec = Window.orderBy("Date").rowsBetween(-8, 0)
df = df.withColumn("9_Day_MA", mean("Close").over(window_spec))

# 2. Trading Range (High - Low)
df = df.withColumn("Trading_Range", col("High") - col("Low"))

# 3. Previous Day's Closing Price (using lag)
df = df.withColumn("Prev_Close", lag("Close").over(Window.orderBy("Date")))

# 4. QQQ Previous Close (simulated)
# Note: In real scenario, you'd load actual QQQ data
df = df.withColumn("QQQ_Prev_Close", col("Close") * 1.05)  # Simulated proxy

# 5. Standard Deviation from 25-day EMA
ema_window = Window.orderBy("Date").rowsBetween(-24, 0)
df = df.withColumn("25_Day_EMA", mean("Close").over(ema_window))

df = df.withColumn("EMA_Distance_StdDev",
  stddev(expr("abs(Close - `25_Day_EMA`)")).over(ema_window)
)

In [10]:
# Handle Null Values
# Simple strategy: drop rows with nulls
df = df.na.drop()

In [11]:
# Prepare Features and Label
feature_columns = [
    "9_Day_MA",
    "Trading_Range",
    "Prev_Close",
    "QQQ_Prev_Close",
    "EMA_Distance_StdDev"
]

# Target: Next day's closing price
#df = df.withColumn("Next_Close", lag(-1).over(Window.orderBy("Date")))
df = df.withColumn("Next_Close", lead("Close", 1).over(Window.orderBy("Date")))
# Assemble feature vector
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="features"
)
df = assembler.transform(df)

In [12]:

# Split Data
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [13]:
# Train Linear Regression Model
lr = LinearRegression(
    featuresCol="features",
    labelCol="Next_Close",
    predictionCol="predicted_close"
)
model = lr.fit(train_data)

Py4JJavaError: An error occurred while calling o234.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 4) (ed987e813312 executor driver): java.lang.RuntimeException: Labels MUST NOT be Null or NaN
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1264)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1265)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2488)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1202)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1196)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$2(RDD.scala:1289)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1256)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1242)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1242)
	at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:107)
	at org.apache.spark.ml.regression.LinearRegression.trainWithNormal(LinearRegression.scala:456)
	at org.apache.spark.ml.regression.LinearRegression.$anonfun$train$1(LinearRegression.scala:354)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:329)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:186)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:114)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Labels MUST NOT be Null or NaN
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1264)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1265)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [None]:
# Make Predictions
predictions = model.transform(test_data)

# Evaluate Model
evaluator = RegressionEvaluator(
    labelCol="Next_Close",
    predictionCol="predicted_close",
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {rmse}")

# Display sample predictions
predictions.select("Date", "Close", "Next_Close", "predicted_close").show()