# Stock Price Forecasting using ARIMA model

# Importing Libraries

In [53]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.stattools import kpss
from scipy.stats import normaltest


# Importing Data Sets

In [54]:
df1 = pd.read_csv("./data/SPX_Real.csv", index_col=False)
df1.columns = ['Timestamp', 'Close']
df1.set_index("Timestamp", inplace=True)
df1.index = pd.to_datetime(df1.index, format='%d-%m-%Y')
df2 = pd.read_csv("./data/AAPL_Real.csv")
df2.columns = ['Timestamp', 'Close']
df2.set_index("Timestamp", inplace=True)
df2.index = pd.to_datetime(df2.index, format='%d-%m-%Y')
df3 = pd.read_csv("./data/TWSE_Real.csv")
df3.columns = ['Timestamp', 'Close']
df3.set_index("Timestamp", inplace=True)
df3.index = pd.to_datetime(df3.index, format='%m/%d/%Y')

# Formatting Data Sets

## Setting Date as the index of the DataFrame.

In [3]:
df1.head

<bound method NDFrame.head of               Close
Timestamp          
1990-01-03   358.76
1990-01-04   355.67
1990-01-05   352.20
1990-01-08   353.79
1990-01-09   349.62
...             ...
2025-01-17  5996.66
2025-01-21  6049.24
2025-01-22  6086.37
2025-01-23  6118.71
2025-01-24  6101.24

[8831 rows x 1 columns]>

## Merging 3 datasets into 1 DataFrame for simplicity

In [None]:
# df = pd.DataFrame({
#     "SPX": df1.Close,
#     "AAPL": df2.Close,
#     "AAPL2": df3.Close
# }, index=df1.index) 
# symbols=['SPX','AAPL','AAPL2']

In [None]:
# df.head()

Unnamed: 0,SPX,AAPL,AAPL2
0,358.76,0.335,0.335
1,355.67,0.336,0.336
2,352.2,0.337,0.337
3,353.79,0.339,0.339
4,349.62,0.336,0.336


# Initial Plotting

## ADF test
Augmented Dickey Fuller(ADF) test is a common statistical test used to test whether a given Time series is stationary or not.

In [8]:
for i in symbols:
  print([i])
  result = adfuller(df[i], autolag='AIC')
  print('ADF Statistic: %f' % result[0])
  print('p-value: %f' % result[1])
  print('Critical Values:')
  for key, value in result[4].items():
      print('\t%s: %.3f' % (key, value))
  if result[0] < result[4]["5%"]:
      print ("Reject Null Hypothesis. So, Time Series is Stationary")
  else:
      print ("Failed to reject Null Hypothesis. So, Time Series is Not-Stationary")
  print("\n")

['SPX']
ADF Statistic: 3.429196
p-value: 1.000000
Critical Values:
	1%: -3.431
	5%: -2.862
	10%: -2.567
Failed to reject Null Hypothesis. So, Time Series is Not-Stationary


['AAPL']
ADF Statistic: 2.756153
p-value: 1.000000
Critical Values:
	1%: -3.431
	5%: -2.862
	10%: -2.567
Failed to reject Null Hypothesis. So, Time Series is Not-Stationary


['AAPL2']
ADF Statistic: 2.756153
p-value: 1.000000
Critical Values:
	1%: -3.431
	5%: -2.862
	10%: -2.567
Failed to reject Null Hypothesis. So, Time Series is Not-Stationary




## Normality Test

In [9]:
for i in symbols:
    print([i])
    stat, p = normaltest(df[i])
    print('Statistics=%.3f, p=%.3f' % (stat, p))
    print('Critical Values:')
    for key, value in result[4].items():
        print('\t%s: %.3f' % (key, value))
    alpha = 0.05
    if p > alpha:
        print('Data is normally distributed (fail to reject H0)')
    else:
        print('Data is not normally distributed(reject H0)')
    print('\n')

['SPX']
Statistics=1911.944, p=0.000
Critical Values:
	1%: -3.431
	5%: -2.862
	10%: -2.567
Data is not normally distributed(reject H0)


['AAPL']
Statistics=3494.842, p=0.000
Critical Values:
	1%: -3.431
	5%: -2.862
	10%: -2.567
Data is not normally distributed(reject H0)


['AAPL2']
Statistics=3494.842, p=0.000
Critical Values:
	1%: -3.431
	5%: -2.862
	10%: -2.567
Data is not normally distributed(reject H0)




#log Transformation
The log transformation can be used to make highly skewed distributions less skewed.

#Using the log transformed data to do ADF and Normality test


We can see that now the data for all three stocks is stationary. But, even if IUS4 is normally distributed, the other two are not. Have to look into why this is happening.

In [27]:
from pmdarima import auto_arima
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, r2_score, mean_absolute_error


In [None]:
def arima_auto(arr, unscaled, name, prediction_size = 1): 
    train_data=pd.DataFrame()
    test_data=pd.DataFrame() 

    unscaled_train=pd.DataFrame()
    unscaled_test=pd.DataFrame() 
    split_index = int(len(arr)*0.9)
    train_data, test_data = arr[:split_index], arr[split_index:]
    unscaled_train, unscaled_test = unscaled[:split_index], unscaled[split_index:]
    model = auto_arima(train_data, trace=True, stepwise=False, error_action='warn', max_p=10, max_d=5, max_q=10, max_D=2, max_P=5, max_Q=5, max_order=5)
    model.fit(train_data) 
   
    predictions = []
    real = []

    preds_test = []
    real_test = []

    for i in range(1, len(test_data) - prediction_size, prediction_size):
        # Predict the next `prediction_size` steps
        forecast_val = model.predict(n_periods=prediction_size)
        
        # Save predictions and actuals
        predictions.extend(forecast_val)
        real.extend(test_data.iloc[i:i+prediction_size].values)
        
        # Update the model with the latest `prediction_size` observed points
        # AFTER they have been predicted (now part of the past)
        model = model.update(test_data.iloc[i:i+prediction_size])
        
        preds_test.append((1+predictions[-1]) * unscaled_test.iloc[i+prediction_size-2])
        real_test.append(unscaled_test.iloc[i+prediction_size-1])

        print((1+predictions[-1]) * unscaled_test.iloc[i+prediction_size-2])
        print(unscaled_test.iloc[i+prediction_size-1])

    mape = mean_absolute_percentage_error(real_test, preds_test)
    mse = mean_squared_error(real_test, preds_test)
    r2 = r2_score(real_test, preds_test)
    mae = mean_absolute_error(real_test, preds_test)
    print(f"{name} MAPE: {mape:.7f}% for forecast length: {prediction_size}")
    print(f"{name} MSE: {mse:.5f} for forecast length: {prediction_size}")
    print(f"{name} R2: {r2:.5f} for forecast length: {prediction_size}")
    print(f"{name} MAE: {mae:.5f} for forecast length: {prediction_size}")
    
    # Plotting
#     plt.figure(figsize=(12,8))
#     plt.plot(dates[::5], real[::5], label='Value')
#     plt.plot(dates[::5], predictions[::5], label='Prediction')
#     plt.title(f"Return Prediction for {name}")
#     plt.xlabel('Date')
#     plt.ylabel('Return ($)')
#     handles, labels = plt.gca().get_legend_handles_labels()
#     by_label = dict(zip(labels, handles))
#     plt.legend(by_label.values(), by_label.keys())
# #    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0)
#     plt.show()
#     plt.close()
    
#     plt.figure(figsize=(12,8))
#     plt.plot(dates[::5], unscaled_real[::5], label='Test')
#     plt.plot(dates[::5], unscaled_predictions[::5], label='Prediction')
#     handles, labels = plt.gca().get_legend_handles_labels()
#     by_label = dict(zip(labels, handles))
#     plt.legend(by_label.values(), by_label.keys())
#     plt.title(f"Stock Price Prediction for {name}")
#     plt.xlabel('Date')
#     plt.ylabel('Stock Price ($)')
#     plt.show()
#     plt.close()


    # predictions = []
    # unscaled_predictions = []
    # real_values = []
    # unscaled_real_values = []
    # dates = []

    # ---------------------------------------------------------
    # 1) Fit the ARIMA model on your training set
    # ---------------------------------------------------------
    # model = auto_arima(
    #     train_data, trace=True, 
    #     error_action='warn', max_order=5
    # )
    # model.fit(train_data)

    # # ---------------------------------------------------------
    # # 2) Keep track of the last known *unscaled* price
    # #    (the final training-set price, for example)
    # # ---------------------------------------------------------
    # last_unscaled_price = unscaled_train.iloc[-1]

    # # ---------------------------------------------------------
    # # 3) Walk-forward over the test data in chunks of size `prediction_size`
    # # ---------------------------------------------------------
    # for i in range(0, len(test_data), prediction_size):
    #     # --- A) Forecast out-of-sample (prediction_size steps ahead) ---
    #     forecast_vals = model.predict(n_periods=prediction_size)
        
    #     # --- B) Store predictions and real values ---
    #     for j, pred in enumerate(forecast_vals):
    #         # If i + j exceeds test_data length, break
    #         if i + j >= len(test_data):
    #             break
            
    #         # 1) Save the predicted *scaled* value
    #         predictions.append(pred)
            
    #         # 2) "Unscale" using the last known real price (NOT a future price!)
    #         unscaled_pred = (1.0 + pred) * last_unscaled_price
    #         unscaled_predictions.append(unscaled_pred)
            
    #         # 3) Get the real test data
    #         real_val = test_data.iloc[i + j]
    #         unscaled_val = unscaled_test.iloc[i + j]
            
    #         real_values.append(real_val)
    #         unscaled_real_values.append(unscaled_val)
    #         dates.append(unscaled_test.index[i + j])
            
    #         # 4) IMPORTANT: Now that "time i+j" has *actually occurred*, 
    #         #               you can update last_unscaled_price
    #         last_unscaled_price = unscaled_val
        
    #     # --- C) Only AFTER forecasting, update the ARIMA model with the 
    #     #         newly observed chunk of test data. This is the crucial step
    #     #         to prevent leaking future info into your model.
    #     # ---
    #     #  (Note: if i+prediction_size > len(test_data), it will just update
    #     #   with however many points remain)
    #     new_data_chunk = test_data.iloc[i : i + prediction_size]
    #     model = model.update(new_data_chunk)
    #     if i % 50 == 0:
    #         print(i)

    # # ---------------------------------------------------------
    # # 4) Compute your errors on out-of-sample predictions
    # # ---------------------------------------------------------
    # mape = mean_absolute_percentage_error(unscaled_real_values, unscaled_predictions)
    # mse  = mean_squared_error(unscaled_real_values, unscaled_predictions)
    # r2   = r2_score(unscaled_real_values, unscaled_predictions)
    # mae  = mean_absolute_error(unscaled_real_values, unscaled_predictions)
    
    return (mape, mse, r2, mae)

Best fit ARIMA (p,d,q) parameters:
* EXX5 : (1,1,1)
* IQQE : (0,1,0)

#Below is the same process but using interday change instead of log transform

In [47]:
dfs = {'SPX': df1, 'AAPL': df2, "TWSE" : df3}

In [None]:
res = []
for name in dfs:
    for pred_size in [10]:
        df = dfs[name]
        df_adj = df.iloc[1::]
        df_interday = df.pct_change()  # This computes (Price_t - Price_(t-1)) / Price_(t-1)
        df_interday = df_interday.dropna()
        mape, mse, r2, mae = arima_auto(df_interday, df_adj, name, pred_size)
        res.append({
            "DataFrame": name,
            "Prediction_Size": pred_size,
            "MAE": mae,
            "MSE": mse,
            "MAPE": mape,
            "r2": r2
        })

        print(res)


results_df = pd.DataFrame(res)
results_df.to_csv("arima_errors.csv", index=False)


 ARIMA(0,0,0)(0,0,0)[1] intercept   : AIC=-48509.365, Time=0.29 sec
 ARIMA(0,0,1)(0,0,0)[1] intercept   : AIC=-48572.181, Time=0.54 sec
 ARIMA(0,0,2)(0,0,0)[1] intercept   : AIC=-48570.743, Time=0.83 sec
 ARIMA(0,0,3)(0,0,0)[1] intercept   : AIC=-48568.897, Time=1.59 sec
 ARIMA(0,0,4)(0,0,0)[1] intercept   : AIC=-48573.710, Time=1.22 sec
 ARIMA(0,0,5)(0,0,0)[1] intercept   : AIC=-48576.461, Time=1.34 sec
 ARIMA(1,0,0)(0,0,0)[1] intercept   : AIC=-48570.667, Time=0.37 sec
 ARIMA(1,0,1)(0,0,0)[1] intercept   : AIC=-48570.818, Time=0.67 sec
 ARIMA(1,0,2)(0,0,0)[1] intercept   : AIC=-48574.126, Time=0.62 sec
 ARIMA(1,0,3)(0,0,0)[1] intercept   : AIC=-48567.835, Time=1.14 sec
 ARIMA(1,0,4)(0,0,0)[1] intercept   : AIC=-48570.771, Time=1.44 sec
 ARIMA(2,0,0)(0,0,0)[1] intercept   : AIC=-48570.568, Time=0.33 sec
 ARIMA(2,0,1)(0,0,0)[1] intercept   : AIC=-48568.751, Time=0.80 sec
 ARIMA(2,0,2)(0,0,0)[1] intercept   : AIC=-48574.455, Time=1.11 sec
 ARIMA(2,0,3)(0,0,0)[1] intercept   : AIC=-48570

  return get_prediction_index(
  return get_prediction_index(


Close    4361.090228
Name: 2021-09-29 00:00:00, dtype: float64
Close    4307.54
Name: 2021-09-30 00:00:00, dtype: float64


TypeError: cannot unpack non-iterable NoneType object

In [None]:
df3 = pd.read_csv("./data/TWSE_Real.csv")
df3.columns = ['Timestamp', 'Close']
df3.set_index("Timestamp", inplace=True)
df3.index = pd.to_datetime(df3.index, format='%m/%d/%Y')

Unnamed: 0_level_0,Close
Timestamp,Unnamed: 1_level_1
1990-01-04,9853.15
1990-01-05,9862.42
1990-01-06,9927.06
1990-01-08,9964.72
1990-01-09,9805.40
...,...
2025-01-16,23025.10
2025-01-17,23148.08
2025-01-20,23266.82
2025-01-21,23300.01


In [39]:
df3_dict = {"TWSE" : df3}
for name in df3_dict:
    df = df3_dict[name]
    df_adj = df.iloc[1::]
    df_interday = df.diff()
    df_interday = df_interday.dropna()
    arima_auto(df_interday, df_adj, name)

 ARIMA(0,0,0)(0,0,0)[1] intercept   : AIC=87189.385, Time=0.06 sec
 ARIMA(0,0,1)(0,0,0)[1] intercept   : AIC=87161.018, Time=0.16 sec
 ARIMA(0,0,2)(0,0,0)[1] intercept   : AIC=87162.879, Time=0.28 sec
 ARIMA(0,0,3)(0,0,0)[1] intercept   : AIC=87147.859, Time=0.35 sec
 ARIMA(0,0,4)(0,0,0)[1] intercept   : AIC=87141.683, Time=0.48 sec


KeyboardInterrupt: 