In [1]:
!pip install yfinance pandas

Collecting yfinance
  Downloading yfinance-0.2.37-py2.py3-none-any.whl.metadata (11 kB)
Collecting multitasking>=0.0.7 (from yfinance)
  Downloading multitasking-0.0.11-py3-none-any.whl.metadata (5.5 kB)
Collecting lxml>=4.9.1 (from yfinance)
  Downloading lxml-5.2.1-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (3.4 kB)
Collecting appdirs>=1.4.4 (from yfinance)
  Downloading appdirs-1.4.4-py2.py3-none-any.whl.metadata (9.0 kB)
Collecting frozendict>=2.3.4 (from yfinance)
  Downloading frozendict-2.4.1.tar.gz (315 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m315.2/315.2 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting peewee>=3.16.2 (from yfinance)
  Downloading peewee-3.17.1.tar.gz (3.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 M

In [3]:
from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("Stock Price Prediction - Level 1") \
    .getOrCreate()


In [4]:
import yfinance as yf

# Fetch historical stock data
stock_data = yf.download("AAPL", start="2020-01-01", end="2022-01-01")

# Convert the data to a Spark DataFrame
sdf = spark.createDataFrame(stock_data.reset_index())


[*********************100%%**********************]  1 of 1 completed


In [5]:
from pyspark.sql.functions import col

# Ensure date is in the correct format and fill missing values if necessary
sdf = sdf.withColumn("Date", col("Date").cast("date"))
# Example of filling missing values for 'Volume' with zero
sdf = sdf.na.fill({"Volume": 0})


In [6]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, avg, col

# Define a window spec for ordering by date
windowSpec = Window.orderBy("Date")

# Lagged features: Previous day's closing price
sdf = sdf.withColumn("Prev_Close", lag("Close").over(windowSpec))

# Moving averages: 7-day and 30-day moving averages of the closing price
sdf = sdf.withColumn("MA_7", avg("Close").over(windowSpec.rowsBetween(-6, 0)))
sdf = sdf.withColumn("MA_30", avg("Close").over(windowSpec.rowsBetween(-29, 0)))

# Drop rows with any null values that might have been created during feature engineering
sdf = sdf.na.drop()


In [7]:
sdf.show()


+----------+-----------------+-----------------+-----------------+-----------------+-----------------+---------+-----------------+-----------------+-----------------+
|      Date|             Open|             High|              Low|            Close|        Adj Close|   Volume|       Prev_Close|             MA_7|            MA_30|
+----------+-----------------+-----------------+-----------------+-----------------+-----------------+---------+-----------------+-----------------+-----------------+
|2020-01-03| 74.2874984741211| 75.1449966430664|           74.125|74.35749816894531| 72.3491439819336|146322800| 75.0875015258789|74.72249984741211|74.72249984741211|
|2020-01-06|73.44750213623047|74.98999786376953|          73.1875|74.94999694824219|72.92562866210938|118387200|74.35749816894531|74.79833221435547|74.79833221435547|
|2020-01-07|74.95999908447266| 75.2249984741211|74.37000274658203|74.59750366210938|72.58265686035156|108872000|74.94999694824219|74.74812507629395|74.74812507629395

In [9]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

# Assuming your Spark DataFrame is named 'sdf' and has the following columns:
# 'Open', 'High', 'Low', 'Close', 'Volume', 'Prev_Close', 'MA_7', 'MA_30'
# where 'Close' is the target variable.

# Define the columns that will be used as features
featureCols = ['Open', 'High', 'Low', 'Prev_Close', 'MA_7', 'MA_30', 'Volume']

# VectorAssembler to combine feature columns into a single vector column
vecAssembler = VectorAssembler(inputCols=featureCols, outputCol="features")

# Prepare the data
sdf = vecAssembler.transform(sdf)
sdf = sdf.withColumnRenamed("Close", "label")  # Rename 'Close' to 'label' as required by Spark ML

# Split the data into training and test sets
trainData, testData = sdf.randomSplit([0.7, 0.3], seed=42)


In [10]:
# Initialize the models
lr = LinearRegression(featuresCol="features", labelCol="label")
gbt = GBTRegressor(featuresCol="features", labelCol="label")

# Train the models
lrModel = lr.fit(trainData)
gbtModel = gbt.fit(trainData)


In [11]:
# Make predictions
lrPredictions = lrModel.transform(testData)
gbtPredictions = gbtModel.transform(testData)

# Evaluate the models
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

lrRMSE = evaluator.evaluate(lrPredictions)
gbtRMSE = evaluator.evaluate(gbtPredictions)

print("Linear Regression RMSE on test data = %g" % lrRMSE)
print("GBT Regression RMSE on test data = %g" % gbtRMSE)


Linear Regression RMSE on test data = 0.995082
GBT Regression RMSE on test data = 2.09486


In [20]:
df=sdf.toPandas()
df.head()

Unnamed: 0,Date,Open,High,Low,label,Adj Close,Volume,Prev_Close,MA_7,MA_30,features
0,2020-01-03,74.287498,75.144997,74.125,74.357498,72.349144,146322800,75.087502,74.7225,74.7225,"[74.2874984741211, 75.1449966430664, 74.125, 7..."
1,2020-01-06,73.447502,74.989998,73.1875,74.949997,72.925629,118387200,74.357498,74.798332,74.798332,"[73.44750213623047, 74.98999786376953, 73.1875..."
2,2020-01-07,74.959999,75.224998,74.370003,74.597504,72.582657,108872000,74.949997,74.748125,74.748125,"[74.95999908447266, 75.2249984741211, 74.37000..."
3,2020-01-08,74.290001,76.110001,74.290001,75.797501,73.750252,132079200,74.597504,74.958,74.958,"[74.29000091552734, 76.11000061035156, 74.2900..."
4,2020-01-09,76.809998,77.607498,76.550003,77.407501,75.316757,170108400,75.797501,75.36625,75.36625,"[76.80999755859375, 77.60749816894531, 76.5500..."


In [21]:
# Note: This is a conceptual guideline for benchmarking. Actual implementation will vary based on your setup.
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression as SklearnLR
from sklearn.ensemble import GradientBoostingRegressor as SklearnGBR
from sklearn.metrics import mean_squared_error
import numpy as np

# Assume 'df' is a pandas DataFrame equivalent of the Spark DataFrame 'sdf' used earlier

X = df[featureCols]
y = df['label']

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Train the models
lr_sk = SklearnLR().fit(X_train, y_train)
gbt_sk = SklearnGBR().fit(X_train, y_train)

# Make predictions
lr_predictions_sk = lr_sk.predict(X_test)
gbt_predictions_sk = gbt_sk.predict(X_test)

# Calculate RMSE
lr_rmse_sk = np.sqrt(mean_squared_error(y_test, lr_predictions_sk))
gbt_rmse_sk = np.sqrt(mean_squared_error(y_test, gbt_predictions_sk))

print("Scikit-learn Linear Regression RMSE on test data = %g" % lr_rmse_sk)
print("Scikit-learn GBT Regression RMSE on test data = %g" % gbt_rmse_sk)


Scikit-learn Linear Regression RMSE on test data = 0.848373
Scikit-learn GBT Regression RMSE on test data = 1.31216


In [26]:
from pyspark.streaming import StreamingContext

# Create a StreamingContext with a 1-second batch interval
ssc = StreamingContext(spark.sparkContext,1)


In [27]:
from pyspark import RDD
import random

# Simulate stock data stream
def simulate_stock_data():
    # Simulating stock data; in practice, this would come from your real-time data source
    return [(random.uniform(100, 200), random.uniform(100, 200), random.uniform(100, 200), random.uniform(100, 200), random.randint(1000, 10000)) for _ in range(100)]

# Create a DStream that simulates incoming stock data
# Each RDD in the DStream represents data received in one batch interval
stock_data_dstream = ssc.queueStream([ssc.sparkContext.parallelize(simulate_stock_data()) for _ in range(5)])


In [28]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row

def process_time_batch(rdd: RDD):
    if not rdd.isEmpty():
        # Convert RDD to DataFrame
        df = rdd.map(lambda x: Row(Open=x[0], High=x[1], Low=x[2], Close=x[3], Volume=x[4])).toDF()
        
        # Apply the same feature engineering as used for training the model
        # This should include creating features like lagged variables, moving averages, etc.
        # Here, for simplicity, let's assume the model was trained directly on these features without additional engineering
        feature_df = vecAssembler.transform(df)
        
        # Predict using the model
        predictions = lrModel.transform(feature_df)
        
        # Show predictions
        predictions.select("prediction").show()

# Apply processing to each time batch
stock_data_dstream.foreachRDD(process_time_batch)


In [29]:
ssc.start()
ssc.awaitTerminationOrTimeout(10)  # Run the streaming computation for 10 seconds
ssc.stop(stopSparkContext=False)


Py4JJavaError: An error occurred while calling o377.awaitTerminationOrTimeout.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/streaming/util.py", line 71, in call
    r = self.func(t, *rdds)
        ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/streaming/dstream.py", line 236, in func
    return old_func(rdd)  # type: ignore[call-arg, arg-type]
           ^^^^^^^^^^^^^
  File "/tmp/ipykernel_3668/3059495156.py", line 12, in process_time_batch
    feature_df = vecAssembler.transform(df)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/ml/base.py", line 262, in transform
    return self._transform(dataset)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 398, in _transform
    return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sparkSession)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 185, in deco
    raise converted from None
pyspark.errors.exceptions.captured.IllegalArgumentException: Prev_Close does not exist. Available: Open, High, Low, Close, Volume

	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1(PythonDStream.scala:179)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1$adapted(PythonDStream.scala:179)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
