<a href="https://colab.research.google.com/github/Bike001/Real-time-stock-prediciton-/blob/master/Stock_prediction_on_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=a30c5a8b5d5686eec65e84f0ac7c1c88b8bb4a0455773764877ce6d1975227bd
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("StockPricePrediction").getOrCreate()


In [3]:
from google.colab import files
uploaded = files.upload()


Saving AAPL.csv to AAPL.csv


In [11]:
df = spark.read.csv("AAPL.csv", header=True, inferSchema=True).select("Date", "Open")
df.show(15)


+----------+------------------+
|      Date|              Open|
+----------+------------------+
|1980-12-12|0.5133928656578064|
|1980-12-15|0.4888392984867096|
|1980-12-16|          0.453125|
|1980-12-17|0.4620535671710968|
|1980-12-18|0.4754464328289032|
|1980-12-19|0.5044642686843872|
|1980-12-22|0.5290178656578064|
|1980-12-23|0.5513392686843872|
|1980-12-24|0.5803571343421936|
|1980-12-26|0.6339285969734192|
|1980-12-29|0.6428571343421936|
|1980-12-30|0.6294642686843872|
|1980-12-31|0.6116071343421936|
|1981-01-02|0.6160714030265808|
|1981-01-05|0.6049107313156128|
+----------+------------------+
only showing top 15 rows



In [14]:
# Create a window specification which orders by the date
windowSpec = Window.orderBy("Date")

# Add lagged features for the past 20 days
for i in range(1, 21):
    df = df.withColumn(f"lag_{i}", F.lag("Open", i).over(windowSpec))

# Add column for the next day's open price
df = df.withColumn("label", F.lead("Open", 1).over(windowSpec))

# Drop rows with any null values (these will be the first 20 and last 1 rows)
df = df.dropna()


df.show(10)


+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|      Date|              Open|             lag_1|             lag_2|             lag_3|             lag_4|             lag_5|             lag_6|             lag_7|             lag_8|             lag_9|            lag_10|            lag_11|            lag_12|            lag_13|            lag_14|            lag_15|            lag_16|            lag_17|            lag_18|            lag_19|            lag_20|             label|
+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------

In [15]:
# Select features and label
feature_cols = [f"lag_{i}" for i in range(1, 21)] + ["label"]
df = df.select(*feature_cols)

# Show the first few rows to verify
df.show(5)



+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|             lag_1|             lag_2|             lag_3|             lag_4|             lag_5|             lag_6|             lag_7|             lag_8|             lag_9|            lag_10|            lag_11|            lag_12|            lag_13|            lag_14|            lag_15|            lag_16|            lag_17|            lag_18|            lag_19|            lag_20|             label|
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------

In [25]:
# Select features and label
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# Assemble the features into a single feature vector
assembler = VectorAssembler(inputCols=feature_cols[:-1], outputCol="features")

# Optionally convert the label to a float type if it's not already
df = df.withColumn("label", col("label").cast("float"))

# Split the data into training and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)



In [26]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression

# Assemble the features into a single feature vector
assembler = VectorAssembler(inputCols=feature_cols[:-1], outputCol="features")

# Scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

# Define a pipeline that includes both assembling and scaling
pipeline = Pipeline(stages=[assembler, scaler])

# Fit the pipeline to the training data
pipelineModel = pipeline.fit(train_data)

# Transform both training and testing datasets
train_data = pipelineModel.transform(train_data)
test_data = pipelineModel.transform(test_data)



In [27]:
# Show the first few rows of the transformed training data
train_data.select("features", "scaledFeatures", "label").show(5)

+--------------------+--------------------+----------+
|            features|      scaledFeatures|     label|
+--------------------+--------------------+----------+
|[0.19866071641445...|[-0.5550840333597...|0.20758928|
|[0.203125,0.19866...|[-0.5550076428654...|0.22098215|
|[0.20758928358554...|[-0.5549312523711...|0.22321428|
|[0.20758928358554...|[-0.5549312523711...|0.19866072|
|[0.21651785075664...|[-0.5547784713824...|0.20535715|
+--------------------+--------------------+----------+
only showing top 5 rows



In [29]:
from pyspark.ml.regression import LinearRegression

# Initialize the linear regression model
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="label")

# Train the model on the training data
lrModel = lr.fit(train_data)

# Print the coefficients and intercept of the model
print("Coefficients:", lrModel.coefficients)
print("Intercept:", lrModel.intercept)


Coefficients: [50.99190062016939,2.8872432932386243,0.8421789917923502,6.10821049616707,6.02483725550703,-5.125983040980264,-3.9009550491176688,-0.4620940215576214,3.616750819424749,3.7478466609098198,-9.458256665743654,7.60337966465072,1.6509025057823568,-5.8874240984172115,5.028579667413049,2.0204531856366077,1.25711402600923,-4.040964138171333,-3.4387974775715455,-0.9533390025541219]
Intercept: 32.695935027557226


In [32]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define the evaluator for use with the cross-validator
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

In [33]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define parameter grid for tuning
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Setup CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations to find the best model
cvModel = cv.fit(train_data)

# Use the best model to make predictions on the test data
bestModel = cvModel.bestModel
predictions = bestModel.transform(test_data)
bestRmse = evaluator.evaluate(predictions)
print(f"Best RMSE: {bestRmse}")


Best RMSE: 1.6927210181328785


In [34]:
from pyspark.sql import Row

# Example new data point (the last 20 days' "Open" prices)
new_data = [
    Row(lag_1=100.5, lag_2=101.2, lag_3=102.3, lag_4=100.0,
        lag_5=99.5, lag_6=98.3, lag_7=97.6, lag_8=98.4,
        lag_9=97.8, lag_10=96.9, lag_11=98.2, lag_12=99.1,
        lag_13=100.1, lag_14=100.7, lag_15=101.8, lag_16=102.6,
        lag_17=103.5, lag_18=104.3, lag_19=105.2, lag_20=95.7)
]

# Create DataFrame
sample_data = spark.createDataFrame(new_data)

# Transform and predict
sample_data_transformed = pipelineModel.transform(sample_data)
sample_predictions = bestModel.transform(sample_data_transformed)

# Display predictions
sample_predictions.select("prediction").show()


+------------------+
|        prediction|
+------------------+
|100.73291543108093|
+------------------+

