In [0]:
# First to get data we will use few commands
%pip install yfinance

In [0]:
%restart_python

In [0]:
import yfinance as yf

# Download Apple's last 5 years of daily data
data = yf.download("AAPL", start="2019-01-01", end="2024-12-31")
data.reset_index(inplace=True)
data.columns = [str(c).replace(' ', '_') for c in data.columns]  # Clean column names

# Save to workspace
data.to_csv("/Workspace/Users/deepakpvinodsharma@gmail.com/THE_LEARNING_BRICK/Databricks-begining-to-end/data hub/AAPL.csv", index=False)

In [0]:
# now we import liberies
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow
import mlflow.spark

In [0]:
# now we read the data 
df = spark.read.csv("/Workspace/Users/deepakpvinodsharma@gmail.com/THE_LEARNING_BRICK/Databricks-begining-to-end/data hub/AAPL.csv")

In [0]:
df.printSchema()
df.select("Date", "Open", "High", "Low", "Close", "Volume").show()
#In this command we are printing the schema and using show command to view it

In [0]:
# now we use window command to target next close
window = Window.orderBy("Date")
df = df.withColumn("Next_Close", lag("Close", -1).over(window))
df = df.dropna()

In [0]:
#now we use vector assembler
assembler = VectorAssembler(inputCols=["Open", "High", "Low", "Volume"], outputCol="features")

In [0]:
#Now we use linear regression
lr = LinearRegression(labelCol="Next_Close", featuresCol="features")

In [0]:
pipeline = Pipeline(stages=[assembler, lr])
# In this command we are creating pipeline

In [0]:
train, test = df.randomSplit([0.8, 0.2], seed=42)
# Now we are spliting data in train and test

In [0]:
with mlflow.start_run() as run:
    model = pipeline.fit(train)
    prediction = model.transform(test)

    evaluator = RegressionEvaluator(
        labelCol="NextClose", predictionCol="prediction", metricName="rmse"
    )
    rmse = evaluator.evaluate(prediction)
    mlflow.log_metric("rmse", rmse)
    mlflow.spark.log_model(model, "Stock_lr_model")
    print(f"RMSE: {rmse}")

In [0]:
prediction.select("Open", "Close", "NextClose", "prediction").show()