In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, collect_list, avg
from pyspark.sql.window import Window
import yfinance as yf
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import pandas as pd
import os

In [2]:
os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk-11"
os.environ["SPARK_HOME"] = "C:\\spark\\spark-3.5.4-bin-hadoop3"
os.environ["PATH"] = f"{os.environ['JAVA_HOME']}\\bin;{os.environ['PATH']}"

In [3]:
# Initialize Spark session
spark = (
    SparkSession.builder.appName("Stock Market Prediction")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

In [4]:
def load_data(ticker, start, end):
    data = yf.download(ticker, start=start, end=end, auto_adjust=False)
    data.reset_index(inplace=True)
    data.columns = ["Date", "Open", "High", "Low", "Close", "Adj Close", "Volume"]
    return data

In [5]:
# Download stock data using yfinance
start = "2010-06-29"
end = "2024-12-31"
stock = "TSLA"
df = load_data(stock, start, end)

[*********************100%***********************]  1 of 1 completed


In [7]:
df

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume
0,2010-06-29,1.592667,1.592667,1.666667,1.169333,1.266667,281494500
1,2010-06-30,1.588667,1.588667,2.028000,1.553333,1.719333,257806500
2,2010-07-01,1.464000,1.464000,1.728000,1.351333,1.666667,123282000
3,2010-07-02,1.280000,1.280000,1.540000,1.247333,1.533333,77097000
4,2010-07-06,1.074000,1.074000,1.333333,1.055333,1.333333,103003500
...,...,...,...,...,...,...,...
3646,2024-12-23,430.600006,430.600006,434.510010,415.410004,431.000000,72698100
3647,2024-12-24,462.279999,462.279999,462.779999,435.140015,435.899994,59551800
3648,2024-12-26,454.130005,454.130005,465.329987,451.019989,465.160004,76366400
3649,2024-12-27,431.660004,431.660004,450.000000,426.500000,449.519989,82666800


In [63]:
# Convert to Spark DataFrame
spark_df = spark.createDataFrame(df)

In [64]:
spark_df.show()

+-------------------+------------------+------------------+------------------+------------------+------------------+---------+
|               Date|              Open|              High|               Low|             Close|         Adj Close|   Volume|
+-------------------+------------------+------------------+------------------+------------------+------------------+---------+
|2010-06-29 00:00:00|1.5926669836044312|1.5926669836044312|1.6666669845581055|1.1693329811096191|1.2666670083999634|281494500|
|2010-06-30 00:00:00|1.5886670351028442|1.5886670351028442|2.0280001163482666| 1.553333044052124|1.7193330526351929|257806500|
|2010-07-01 00:00:00|1.4639999866485596|1.4639999866485596|1.7280000448226929|1.3513330221176147|1.6666669845581055|123282000|
|2010-07-02 00:00:00|1.2799999713897705|1.2799999713897705|1.5399999618530273|  1.24733304977417|1.5333329439163208| 77097000|
|2010-07-06 00:00:00|1.0740000009536743|1.0740000009536743|1.3333330154418945|1.0553330183029175|1.333333015441

In [68]:
# Calculate moving averages
windowSpec = Window.orderBy("Date").rowsBetween(-49, 0)
spark_df = spark_df.withColumn("MA50", avg("Close").over(windowSpec))

In [65]:
windowSpec = Window.orderBy("Date").rowsBetween(-99, 0)
spark_df = spark_df.withColumn("MA100", avg("Close").over(windowSpec))

In [57]:
windowSpec = Window.orderBy("Date").rowsBetween(-199, 0)
spark_df = spark_df.withColumn("MA200", avg("Close").over(windowSpec))

In [70]:
# Split data
train_df, test_df = spark_df.randomSplit([0.8, 0.2], seed=42)

In [71]:
# Convert to pandas for LSTM processing
train_data = train_df.select("Close").toPandas().values
test_data = test_df.select("Close").toPandas().values

25/01/05 20:31:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/05 20:31:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/05 20:31:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/05 20:31:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/05 20:31:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/05 20:31:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/05 2

In [72]:
# Scale data
scaler = MinMaxScaler(feature_range=(0, 1))
train_scaled = scaler.fit_transform(train_data)
test_scaled = scaler.transform(test_data)


def create_sequences(data, seq_length=100):
    x, y = [], []
    for i in range(seq_length, len(data)):
        x.append(data[i - seq_length : i])
        y.append(data[i])
    return np.array(x), np.array(y)

In [73]:
# Prepare sequences
x_train, y_train = create_sequences(train_scaled)
x_test, y_test = create_sequences(test_scaled)

In [74]:
# Build LSTM model
model = Sequential(
    [
        LSTM(50, activation="relu", return_sequences=True, input_shape=(100, 1)),
        Dropout(0.2),
        LSTM(60, activation="relu", return_sequences=True),
        Dropout(0.3),
        LSTM(80, activation="relu", return_sequences=True),
        Dropout(0.4),
        LSTM(120, activation="relu"),
        Dropout(0.5),
        Dense(1),
    ]
)

  super().__init__(**kwargs)


In [75]:
model.compile(optimizer="adam", loss="mean_squared_error")

In [76]:
# Train model
model.fit(x_train, y_train, epochs=50, batch_size=32, verbose=1)

Epoch 1/50
[1m90/90[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 183ms/step - loss: 0.0367
Epoch 2/50
[1m90/90[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 131ms/step - loss: 0.0044
Epoch 3/50
[1m90/90[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 127ms/step - loss: 0.0052
Epoch 4/50
[1m90/90[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 124ms/step - loss: 0.0037
Epoch 5/50
[1m90/90[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 125ms/step - loss: 0.0041
Epoch 6/50
[1m90/90[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 136ms/step - loss: 0.0038
Epoch 7/50
[1m90/90[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 123ms/step - loss: 0.0032
Epoch 8/50
[1m90/90[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 162ms/step - loss: 0.0035
Epoch 9/50
[1m90/90[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 141ms/step - loss: 0.0025
Epoch 10/50
[1m90/90[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 156ms

<keras.src.callbacks.history.History at 0x78474f5e6190>

In [77]:
model.summary()

In [78]:
# Predictions
y_pred = model.predict(x_test)
y_pred = scaler.inverse_transform(y_pred)
y_test = scaler.inverse_transform(y_test.reshape(-1, 1))

[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 44ms/step


In [79]:
# Save model
model.save('Stock Predictions Model.keras')

In [80]:
# Clean up
spark.stop()