# ARIMA Model 

In [None]:
# import required libarries 
import pandas as pd
import numpy as np
import yfinance as yf 
import seaborn as sns
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
from pandas.plotting import autocorrelation_plot
from statsmodels.tsa.stattools import adfuller, acf, pacf
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from sklearn.metrics import mean_absolute_error, mean_squared_error
from statsmodels.tsa.seasonal import seasonal_decompose

### Data Collection 

Download data first. Collect the data from yfinance API. The data should be 1-day period, 1-week period, 1-year period and 10-years period.

In [None]:
# one day - 1 min interval
gold_one_day = yf.download('GC=F', interval="1m")
# one month - 5 min interval
gold_one_month = yf.download('GC=F', interval="5m", period="1mo")
# one year - 1 day interval
gold_one_year = yf.download('GC=F', period="1y")
# ten years - 1 day interval 
gold_ten_year = yf.download('GC=F', period="10y")

# Data Preprocessing 

Check the data to make sure no missing values. 

In [None]:
# check for missing values
for index, i in {"gold_one_day": gold_one_day, "gold_one_month": gold_one_month, 
                 "gold_one_year": gold_one_year, "gold_one_year": gold_one_year, "gold_ten_year": gold_ten_year}.items():
    print(index)
    print("------------")
    print(i.isna().sum(), end="\n\n")

Great! There is no missing value. Save the data set for later use. Need to update everyday for the latest data. 

In [None]:
# save the data as csv file
gold_one_day.to_csv('data/gold_one_day.csv')
gold_one_month.to_csv('data/gold_one_month.csv')
gold_one_year.to_csv('data/gold_one_year.csv')
gold_ten_year.to_csv('data/gold_ten_year.csv')

### Load the CSV files into dataframes. 

In [None]:
# Load the data sets 
# load gold data for one day (1 min interval)
df_one_day = pd.read_csv('data/gold_one_day.csv')
# load gold data for one month (5 min interval)
df_one_month = pd.read_csv('data/gold_one_month.csv')
# load gold data for one year (1 day interval)
df_one_year = pd.read_csv('data/gold_one_year.csv')
# load gold data for one year (1 day interval)
df_ten_year = pd.read_csv('data/gold_ten_year.csv')

In [None]:
df_one_year.head(10)

In [None]:
df_one_year.info()

Drop unnecessary columns named "Adj Close" and "Volume". Datatime column needs to be datatime data type. Convert it right away. 

In [None]:
# drop columns 
df_one_year.drop(columns=['Adj Close', 'Volume'], inplace=True)

In [None]:
# convert Datetime column to datetime datatype 
df_one_year['Date']= pd.to_datetime(df_one_year['Date'])

In [None]:
df_one_year.set_index('Date', inplace=True)

In [None]:
# check the dataframe and data type again 
print(df_one_year.head(3))

print(df_one_year.info())

In [None]:
print(df_one_year.tail(10))

In [None]:
#  check the frequency 
df_one_year.index

In [None]:
def data_wrangle(path, droped_columns):
    """ A method that will clean the original dataset, 
        restructure the dataset and fill the missing values.
        
        input
        -----
        path: data path 
        dropped_columns: columns to be dropped"""
    
    # read the dataset through the path
    df=pd.read_csv(path)
    # change the "Date" column to datetime data type
    df['Date']=pd.to_datetime(df['Date'])
    # set the "Date" column to index
    df=df.set_index('Date')
    # assigned the desired frequecy to set up
    # 'D' stands for day
    desired_frequency = 'D'
    # set the frequency 
    df = df.asfreq(desired_frequency)
    # drop the unnecessary columns that are already specified 
    df = df.drop(columns=droped_columns)
    # fill the missing values 
    df=df.fillna(method='ffill')
    return df

In [None]:
df_one_year_1 = data_wrangle('data/gold_one_year.csv', ['Adj Close', 'Volume'])

In [None]:
df_one_year_1.head(10)

In [None]:
print(len(df_one_year))

In [None]:
print(len(df_one_year_1))

In [None]:
df_one_year_1.tail(10)

# Data Visualization

In [None]:
# Create a plot
fig, ax = plt.subplots(figsize=(16, 8))

# Plot the DataFrame
df_one_year_1['Close'].plot(ax=ax)

plt.title('Gold Prices Over Time (1-year 1-day interval)')
plt.xlabel('Date')
plt.ylabel('Price')

# Rotate x-axis labels 90 degrees
plt.xticks(rotation=90)

# Show plot
plt.show()

In [None]:
# Create a plot
fig, ax = plt.subplots(figsize=(16, 8))

# Plot the DataFrame
df_one_year_1['Open'].plot(ax=ax)

plt.title('Gold Prices Over Time (1-year 1-day interval)')
plt.xlabel('Date')
plt.ylabel('Price')

# Rotate x-axis labels 90 degrees
plt.xticks(rotation=90)

# Show plot
plt.show()

# Decomposition 

Applying ARIMA directly do not give the desired output. Need to decompose the timeseries into trend, sesonality and residuals.

In [None]:
ss_decomposition = seasonal_decompose(df_one_year_1['Close'], model='additive')
# trend
estimated_trend = ss_decomposition.trend
# seasonal
estimated_seasonal = ss_decomposition.seasonal
# residual
estimated_residual = ss_decomposition.resid

In [None]:
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(12, 10))

df_one_year_1['Close'].plot(ax=ax1, title='Original Time Series for Gold Price')
ss_decomposition.trend.plot(ax=ax2, title='Trend')
ss_decomposition.seasonal.plot(ax=ax3, title='Seasonal')
ss_decomposition.resid.plot(ax=ax4, title='Residuals')

plt.tight_layout()
plt.show()

In [None]:
fig, axes = plt.subplots(4, 1, sharex=True, sharey=False)
fig.set_figheight(10)
fig.set_figwidth(15)

axes[0].plot(df_one_year_1['Close'], label='Original')
axes[0].legend(loc='upper left');

axes[1].plot(estimated_trend, label='Trend')
axes[1].legend(loc='upper left');

axes[2].plot(estimated_seasonal, label='Seasonality')
axes[2].legend(loc='upper left');

axes[3].plot(estimated_residual, label='Residuals')
axes[3].legend(loc='upper left');

In [None]:
import statsmodels.api as sm
#Histogram for the Residuals
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.hist(ss_decomposition.resid, bins=30, density=True, alpha=0.6, color='b')
plt.title('Residuals Histogram')
plt.xlabel('Residuals')
plt.ylabel('Relative Frequency')

#Q-Q plot of the residuals
plt.subplot(1, 2, 2)
sm.qqplot(ss_decomposition.resid, line='s', ax=plt.gca())
plt.title('Q-Q Plot of Residuals')

plt.show()

In [None]:
plt.figure(figsize=(12,6))
plot_acf(ss_decomposition.resid)
plt.title('ACF plot of residuals')
plt.xlabel('Lags [Days]')
plt.ylabel('Autocorrelation')
plt.show()

In [None]:
ss_decomposition.trend

In [None]:
trend=ss_decomposition.trend.dropna()
trend

In [None]:
plt.figure(figsize=(12,6))
plot_pacf(trend)
plt.title('PACF plot of Trend Data')
plt.xlabel('Lags [Days]')
plt.ylabel('Autocorrelation')

In [None]:
#Splitting the trend data into the training set and the test set
y_train=trend[:int(0.80*len(trend))]
print(int(0.8*len(trend)))
print(len(trend))
y_test=trend[int(0.80*len(trend)):]


#Here's the code for the forecast, using walk-forward validation

y_prediction = pd.Series() #Starts an empty series to store the predicted values

history = y_train.copy() #Training set starts with y_train, and gradually increases by 1 observation with each passing day.

In [None]:
len(y_test)

In [None]:
for i in range(1,1+len(y_test)):

    ARIMA_Model=ARIMA(history,order=(4,1,1)).fit() #Model is trained on history which increases with each loop

    next_prediction=ARIMA_Model.forecast()  #Gives the prediction for the next timestamp
    
    y_prediction=pd.concat([y_prediction, next_prediction]) #Puts all the predictions and timestamps into the series y_prediction
    
    history=trend[:len(y_train)+i] #Training set increases by one observation in preparation for the next loop

In [None]:
future_days = 10
# Creating a list to store future dates
last_date = trend.index[-1]
future_dates = pd.date_range(start=last_date, periods=future_days + 1)

In [None]:
# Extending the loop to include the future days
for i in range(1, 1 + len(y_test) + future_days):
    ARIMA_Model = ARIMA(history, order=(4, 1, 1)).fit()  # Model is trained on history which increases with each loop

    next_prediction = ARIMA_Model.forecast()  # Gives the prediction for the next timestamp
    
    # Setting the index for the next prediction
    if i <= len(y_test):
        next_date = trend.index[len(y_train) + i - 1]
    else:
        next_date = future_dates[i - len(y_test) - 1]
    
    next_prediction.index = [next_date]
    
    # Puts all the predictions and timestamps into the series y_prediction
    prediction = pd.concat([y_prediction, next_prediction])  
    
    if i <= len(y_test):
        # Continue updating the history with actual test data
        history = trend[:len(y_train) + i]  # Training set increases by one observation in preparation for the next loop
    else:
        # After test data, keep updating history with predictions
        history = pd.concat([history, next_prediction])

# Print the forecast for the future days
print(prediction[-future_days:])

In [None]:
y_prediction

In [None]:
plt.figure(figsize=(16,8))
plt.plot(y_prediction)
plt.plot(y_test)
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(16,8))
plt.plot(y_test, label="Test data")
plt.plot(prediction, label="Forecast")
plt.legend()
plt.show()

In [None]:
# Calculate and print evaluation metrics
mae = mean_absolute_error(y_test, y_prediction)
rmse = np.sqrt(mean_squared_error(y_test, y_prediction))

print('MAE:', mae)
print('RMSE:', rmse)

In [None]:
df_one_year_1.tail(10)

In [None]:
df_one_year_1['prediction'] = y_prediction

In [None]:
plt.figure(figsize=(10, 5))
plot_pacf(ss_decomposition.resid, lags=50)
plt.title('ACF Plot of Residuals')
plt.xlabel("Lags [Days]", fontsize=15) 
plt.ylabel("Autocorrelation", fontsize=15)

# Check Stationarity 

It is obvious that the data is non-stationarity according to the visualization. But, to make sure check with the statistics method.

In [None]:
# check with adfuller methods
result = adfuller(df_one_year_1['Close'])
print('ADF Statistic:', result[0])
print('p-value:', result[1])

Hypothesis testing 

If p-value < 0.05, reject the null hypothesis.
If not, fail to reject the null hypothesis.

In [None]:
#Ho: It is non stationary
#H1: It is stationary

def adfuller_test(df):
    """A method for testing hypothesis for data stationarity.

        input
        -----
        df: dataframe 

        output
        ------
        ADF: 
        p-value: the significant value
        Lags: the significant lags / spikes
        No.of observation: the numbers of lags that observe
    """
    # assign the column into Augmented Dickey-Fuller Test (ADF)
    result=adfuller(df)
    # creat a list of labels 
    labels = ['ADF Test Statistic','p-value','#Lags Used','Number of Observations Used']
    # zip the value and label together 
    for value,label in zip(result,labels):
        # print the label and value 
        print(label+' : '+str(value) )
    # if p-value is less than 0.05,
    if result[1] <= 0.05:
        # reject the null hypothesis 
        print("strong evidence against the null hypothesis(Ho), reject the null hypothesis. Data has no unit root and is stationary")
    # if not,
    else:
        # fail to reject null hypothesis
        print("weak evidence against null hypothesis, time series has a unit root, indicating it is non-stationary ")

In [None]:
adfuller_test(df_one_year_1['Close'])

# Differencing

Since the original is non-stationarity, we need to difference the time series data to determine degree of integration "d" to make the data stationarity. 

In [None]:
# first order differencing 
df_one_year_1d = df_one_year_1['Close'].diff()
# test the hypothesis
adfuller_test(df_one_year_1d.dropna())

The data becomes sationary. To make sure, find the second order differencing of the data too. 

In [None]:
# first order differencing 
df_one_year_2d = df_one_year_1['Close'].diff().diff()
# test the hypothesis
adfuller_test(df_one_year_2d.dropna())

Since the data become stationary at degree of integration one, we can choose one as our "d" value.

In [None]:
# d = 1

After determining "d", we need to consider "p" and "q". We need to visualize ACFs and PACFs to determine those values.

In [None]:
# acf 
num_lags = 20
acf_values = acf(df_one_year_1.Close.diff().dropna())
plt.bar(range(num_lags), acf_values[:num_lags])

In [None]:
# pacf 
num_lags = 20
pacf_values = pacf(df_one_year_1.Close.diff().dropna())
plt.bar(range(num_lags), pacf_values[:num_lags])

In [None]:
# plot first order differencing
fig = plt.figure(figsize=(16, 4))
ax1 = fig.add_subplot(131)  # Corrected typo here
ax1.set_title('1st Order Differencing')
ax1.plot(df_one_year_1.Close.diff());

ax2 = fig.add_subplot(132)
plot_acf(df_one_year_1.Close.diff(), ax=ax2, lags=20);

ax3 = fig.add_subplot(133)
plot_pacf(df_one_year_1.Close.diff(), ax=ax3, lags=20);

plt.show()

According to the ACFs and PACFs, there is one lag at PACFs. Therefore, p = 1. No signigicant lag in ACFs. The value of "q" is zero.

# ARIMA Model Implementation

In [None]:
len(df_one_year_1.Close)

In [None]:
# split train test data 
train_data = int(len(df_one_year_1.Close) * 0.8)
test_data = int(len(df_one_year_1.Close)-train_data)
print("No. of train data: ",train_data)
print("No. of test data:", test_data)
train_data_df = df_one_year_1[0:train_data]
test_data_df = df_one_year_1[train_data:]



In [None]:
train_data_df.head(3)

In [None]:
train_data_df.tail()

In [None]:
test_data_df.head(5)

In [None]:
test_data_df.tail()

In [None]:
from pmdarima import auto_arima

stepwise_model = auto_arima(train_data_df['Close'], start_p=1, start_q=1,
                            max_p=3, max_q=3, seasonal=False,
                            d=1, trace=True, error_action='ignore',
                            suppress_warnings=True, stepwise=True)

print(stepwise_model.summary())


According to autoarima, the best parameters are p = 2, d = 1 and q = 2.

In [None]:
# p = 2, d = 1, q = 2

model = ARIMA(train_data_df.Close, order=(2, 1, 2))
model_fit = model.fit()
print(model_fit.summary())

In [None]:
# In-sample predictions
in_sample_pred = model_fit.predict(start=train_data_df.index[0], end=train_data_df.index[-1])

# Plot in-sample predictions vs actual values
plt.figure(figsize=(12, 6))
plt.plot(train_data_df.index, train_data_df['Close'], label='Actual')
plt.plot(train_data_df.index, in_sample_pred, label='In-Sample Prediction', color='red')
plt.title('In-Sample Prediction vs Actual')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.show()


In [None]:
test_data_df.index[0]

In [None]:
#get the predictions and residuals
model_fit.fittedvalues

In [None]:
# residual plotting
residuals = model_fit.resid
plt.figure(figsize=(12, 6))
plt.plot(residuals)
plt.title('Residuals')
plt.show()

plot_acf(residuals)
plot_pacf(residuals)
plt.show()

In [None]:
residuals = model_fit.resid
fig, ax = plt.subplots(1, 2, figsize=(16, 4))
sns.histplot(residuals, kde=True, ax=ax[0])
plot_acf(residuals, lags=20, ax=ax[1])
plt.show()


In [None]:
train_data_df.head(5)

In [None]:
train_decomposition = seasonal_decompose(train_data_df['Close'], model='additive')
# trend
train_trend = train_decomposition.trend
# seasonal
train_seasonal = train_decomposition.seasonal
# residual
train_residual = train_decomposition.resid

fig, axes = plt.subplots(4, 1, sharex=True, sharey=False)
fig.set_figheight(10)
fig.set_figwidth(15)

axes[0].plot(train_data_df['Close'], label='Original')
axes[0].legend(loc='upper left');

axes[1].plot(train_trend, label='Trend')
axes[1].legend(loc='upper left');

axes[2].plot(train_seasonal, label='Seasonality')
axes[2].legend(loc='upper left');

axes[3].plot(train_residual, label='Residuals')
axes[3].legend(loc='upper left');

In [None]:
test_decomposition = seasonal_decompose(test_data_df['Close'], model='additive')
# trend
test_trend = test_decomposition.trend
# seasonal
test_seasonal = test_decomposition.seasonal
# residual
test_residual = test_decomposition.resid

fig, axes = plt.subplots(4, 1, sharex=True, sharey=False)
fig.set_figheight(10)
fig.set_figwidth(15)

axes[0].plot(test_data_df['Close'], label='Original')
axes[0].legend(loc='upper left');

axes[1].plot(test_trend, label='Trend')
axes[1].legend(loc='upper left');

axes[2].plot(test_seasonal, label='Seasonality')
axes[2].legend(loc='upper left');

axes[3].plot(test_residual, label='Residuals')
axes[3].legend(loc='upper left');

In [None]:
test_seasonal

In [None]:
# Out-of-sample forecast
forecast_steps = len(test_data_df)
out_sample_forecast = model_fit.forecast(steps=forecast_steps)

# Plot forecast vs actual values
plt.figure(figsize=(12, 6))
plt.plot(train_data_df.index, train_data_df['Close'], label='Train')
plt.plot(test_data_df.index, test_data_df['Close'], label='Test')
plt.plot(test_data_df.index, out_sample_forecast, label='Forecast', color='red')
plt.title('Out-of-Sample Forecast vs Actual')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.show()

In [None]:
# Plot forecast vs actual values
plt.figure(figsize=(12, 6))
plt.plot(train_data_df.index, train_data_df['Close'], label='Train')
plt.plot(test_data_df.index, test_data_df['Close'], label='Test')
plt.plot(df_one_year_1.index, df_one_year_1.prediction, label='Forecast', color='red')
plt.title('Out-of-Sample Forecast vs Actual')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.show()

In [None]:
# Calculate and print evaluation metrics
mae = mean_absolute_error(test_data_df['Close'], out_sample_forecast)
rmse = np.sqrt(mean_squared_error(test_data_df['Close'], out_sample_forecast))

print('MAE:', mae)
print('RMSE:', rmse)

In [None]:
test_data_df["Prediction"] = model_fit.forecast(len(test_data_df))

test_data_df

In [None]:
residuals = test_data_df['Close'] - out_sample_forecast
plt.figure(figsize=(16,8))
plt.plot(residuals)
plt.show()

In [None]:
fig = plt.figure(figsize=(32, 8))
model.fit().plot_diagnostics()
plt.show()

In [None]:
output = model_fit.forecast()
print(output)