In [8]:
!pip install pyspark
!pip install arch
!pip install statsmodels



In [9]:
# PySpark & Pandas Setup
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np

# Modeling & Metrics
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from arch import arch_model
from statsmodels.tsa.api import VAR
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Start Spark Session
spark = SparkSession.builder \
    .appName("Spark_TimeSeries_Models") \
    .getOrCreate()

# Load data
data_path = "/content/part-00000-88d747c0-e1c3-4314-bcd6-a3bf54a570b7-c000.csv" # change to your file path
df = spark.read.option("header", True).option("inferSchema", True).csv(data_path)

# Prepare Data
df = df.select("Date", "Close", "Open", "Volume").orderBy("Date")
pdf = df.toPandas()
pdf["Date"] = pd.to_datetime(pdf["Date"])
pdf.set_index("Date", inplace=True)
pdf = pdf.sort_index()



In [22]:
# Use last 30 values as test set
train_close = pdf["Close"].iloc[:-30]
test_close = pdf["Close"].iloc[-30:]

# For VAR
train_var = pdf[["Close", "Open", "Volume"]].iloc[:-30]
test_var = pdf[["Close", "Open", "Volume"]].iloc[-30:]


In [16]:
arima_model = ARIMA(train_close, order=(4, 0, 1))
arima_result = arima_model.fit()
arima_forecast = arima_result.forecast(steps=30)

# Evaluation
mse = mean_squared_error(test_close, arima_forecast)
rmse = np.sqrt(mse)
mae = mean_absolute_error(test_close, arima_forecast)

r2 = r2_score(test_close, arima_forecast)

print(f"[ARIMA] RMSE: {rmse}\n MSE: {mse}\n MAE: {mae}\n R²: {r2}\n")
results_df = pd.DataFrame({
    'Date': test_close.index,
    'Actual': test_close.values,           # Replace 'y' with actual column name
    'Forecast': arima_forecast
})

results_df.to_csv('arima_forecast.csv', index=False)





  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)


[ARIMA] RMSE: 6.401788253161106
 MSE: 40.98289283831152
 MAE: 5.557801422751388
 R²: -1.0837619597122141



  return get_prediction_index(
  return get_prediction_index(


In [19]:
exp_model = ExponentialSmoothing(train_close, trend='add', seasonal=None)
exp_result = exp_model.fit()
exp_forecast = exp_result.forecast(30)

mse = mean_squared_error(test_close, exp_forecast)
rmse = np.sqrt(mse)
mae = mean_absolute_error(test_close, exp_forecast)

r2 = r2_score(test_close, exp_forecast)

print(f"[Exponential Smoothing] RMSE: {rmse}\n MSE: {mse}\n MAE: {mae}\n R²: {r2}")
results_df = pd.DataFrame({
    'Date': test_close.index,
    'Actual': test_close.values,           # Replace 'y' with actual column name
    'Forecast': exp_forecast
})
results_df.to_csv('exponential_smoothing_forecast.csv', index=False)




[Exponential Smoothing] RMSE: 6.905262471050182
 MSE: 47.68264979409407
 MAE: 6.23476110092094
 R²: -1.4244089398767348


  self._init_dates(dates, freq)
  return get_prediction_index(
  return get_prediction_index(


In [None]:
returns = train_close.pct_change().dropna() * 100
garch_model = arch_model(returns, vol='Garch', p=1, q=1)
garch_result = garch_model.fit(disp="off")
garch_forecast = garch_result.forecast(horizon=30)

vol_df = garch_forecast.variance.iloc[-1:].T
vol_df.columns = ["forecasted_volatility"]

vol_df.to_csv("/content/garch_forecast.csv")

print("[GARCH] Forecasted variance for next 30 days saved (no R² applicable).")



[GARCH] Forecasted variance for next 30 days saved (no R² applicable).


In [28]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
from datetime import timedelta

# Fit VAR model on the training data
var_model = VAR(train_var)
var_result = var_model.fit(maxlags=5, ic='aic')

# Forecast the next 30 days (for the last month)
forecast = var_result.forecast(train_var.values[-var_result.k_ar:], steps=30)
forecast_df = pd.DataFrame(forecast, columns=["Close", "Open", "Volume"])

# Assume the actual values for the last 30 days are available in 'test_var'
true = test_var["Close"][-30:]
pred = forecast_df["Close"]

# Metrics for the last 30 days
mse = mean_squared_error(true, pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(true, pred)

# Output metrics
print(f"[VAR] RMSE: {rmse}\n MSE: {mse}\n MAE: {mae}")

# Create Date range for the forecast
last_date = test_var.index[-30]
forecast_dates = [last_date + timedelta(days=i) for i in range(1, 31)]  # 30 days forecast

forecast_df['Date'] = forecast_dates
forecast_df['Actual'] = true.values  # Actual values for Close

# Reorder columns: Date, Actual, Forecast
forecast_df = forecast_df[['Date', 'Actual', 'Close']]

# Save the forecasted values for Power BI
forecast_df.to_csv("/content/var_forecast_last_month.csv", index=False)

print("[VAR] Last month forecasted values saved.")




  self._init_dates(dates, freq)


[VAR] RMSE: 6.318502981360787
 MSE: 39.92347992546515
 MAE: 5.504881993687263
[VAR] Last month forecasted values saved.
