In [None]:
from google.colab import drive

drive.mount("/content/drive")

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.stattools import adfuller

In [None]:
path_to_csv = "/content/drive/MyDrive/UIT - Semester 6/Data Analysis in Business/Dataset"

In [None]:
#Đọc dữ liệu
df = pd.read_csv(path_to_csv + '/AAPL.csv', index_col=0, parse_dates=True)

FileNotFoundError: ignored

In [None]:
df.head(10)

In [None]:
mask = np.triu(df.corr())
plt.figure(figsize=(6, 6))
sns.heatmap(df.corr(), mask=mask, xticklabels=True, yticklabels=True, cmap='coolwarm', annot=True)
plt.show()

In [None]:
df = df[['Open','High', 'Low', 'Close']]
fig, ax = plt.subplots(4, 1, sharex=True, figsize=(15,7))
df.plot(subplots=True, ax=ax, legend=False)
y_label = ['Open prices','Highest prices', 'Lowest prices','Closed price']
for a in range(len(ax)):
    ax[a].set_ylabel(f"{y_label[a]}")
plt.tight_layout()
plt.show()

In [None]:
#Chia dữ liệu theo tỉ lệ 7:2:1
from sklearn.model_selection import train_test_split

total_length = len(df)

train_length = int(total_length * 0.7)  # 70% for training
test_length = int(total_length * 0.2)  # 20% for testing
validate_length = total_length - train_length - test_length  # Remaining 10% for validation

train_data = df[:train_length]
test_data = df[train_length:train_length + test_length]
validate_data = df[train_length + test_length:]

In [None]:
#Kiểm tra tính dừng với ADF
def adf_test(series,title=''):
    """
    Pass in a time series and an optional title, returns an ADF report
    """
    print(f'Augmented Dickey-Fuller Test: {title}')
    result = adfuller(series.dropna(),autolag='AIC') # .dropna() handles differenced data
    labels = ['ADF test statistic','p-value','# lags used','# observations']
    out = pd.Series(result[0:4],index=labels)
    for key,val in result[4].items():
        out[f'critical value ({key})']=val
    print(out.to_string())          # .to_string() removes the line "dtype: float64"
    if result[1] <= 0.05:
        print("Strong evidence against the null hypothesis")
        print("Reject the null hypothesis")
        print("Data is stationary")
        print("---------------------------------------")
    else:
        print("Weak evidence against the null hypothesis")
        print("Fail to reject the null hypothesis")
        print("Data has a unit root and is non-stationary")
        print("------------------------------------------")

In [None]:
#Kết quả kiểm tra của dữ liệu gốc
adf_test(train_data['Open'], 'Open prices')
adf_test(train_data['High'], 'Highest prices')
adf_test(train_data['Low'], 'Lowest prices')
adf_test(train_data['Close'], 'Closed prices')

In [None]:
# Tiến hành diff dữ liệu
train_diff_data = train_data.diff().dropna()
test_diff_data = test_data.diff().dropna()
validate_diff_data = validate_data.diff().dropna()

In [None]:
# Kết quả kiểm tra tính dừng của dữ liệu đã diff
adf_test(train_diff_data['Open'], 'Open prices')
adf_test(train_diff_data['High'], 'Highest prices')
adf_test(train_diff_data['Low'], 'Lowest prices')
adf_test(train_diff_data['Close'], 'Closed prices')

In [None]:
aic, bic, fpe, hqic = [], [], [], []
model = VAR(train_diff_data)
p = np.arange(1,40)
for i in p:
    result = model.fit(i)
    aic.append(result.aic)
    bic.append(result.bic)
    fpe.append(result.fpe)
    hqic.append(result.hqic)
lags_metrics_df = pd.DataFrame({'AIC': aic,
                                'BIC': bic,
                                'HQIC': hqic,
                                'FPE': fpe},
                               index=p)
fig, ax = plt.subplots(1, 4, figsize=(15, 3), sharex=True)
lags_metrics_df.plot(subplots=True, ax=ax, marker='o')
plt.tight_layout()

In [None]:
#tìm p-order phù hợp
model = VAR(train_diff_data)
for i in range(20, 40):
    results = model.fit(i)
    print(f'VAR Order {i}')
    print('AIC {}'.format(results.aic))
    print('BIC {}'.format(results.bic))
    print()

In [None]:
#tiến hành fit mô hình với p-order
model_fitted = model.fit(29)
model_fitted.summary()

In [None]:
#Dự báo dữ liệu diff
lag_order = model_fitted.k_ar  # Lag order of the model
def forecase_diff_data(diff_data, org_data, lag_order):
	forecast_input = diff_data.values[-lag_order:]  # Last lag_order number of observations from train_diff_data
	fc = model_fitted.forecast(y=forecast_input, steps=30)
	index = pd.date_range(org_data.index[-1], periods=30, freq='B')  # Assuming business days frequency
	fc_df = pd.DataFrame(fc, index=index, columns=train_data.columns + '_forecast')
	return fc_df

In [None]:
#Dự báo trên tập test và 30 ngày tiếp theo
fc_test_diff = forecase_diff_data(train_diff_data, train_data, lag_order)
fc_validate_diff = forecase_diff_data(test_diff_data, test_data, lag_order)
fc_30next_diff = forecase_diff_data(validate_diff_data, validate_data, lag_order)

In [None]:
#Chuyển dữ liệu diff đã dự báo về dữ liệu gốc
def invert_transformation(train_data, fc_df, second_diff=False):
    """Revert differencing and invert the transformation to get the forecasted values back to the original scale."""
    fc = fc_df.copy()
    fc.index.name = "Date"
    columns = train_data.columns
    for col in columns:
        # Cumulative sum
        fc[col + '_forecast'] = train_data[col].iloc[-1] + fc[col + '_forecast'].cumsum()
        # Second differencing if applied
        if second_diff:
            fc[col + '_forecast'] = train_data[col].iloc[-lag_order - 1] + fc[col + '_forecast'].cumsum()
    return fc

In [None]:
fc_test_org = invert_transformation(train_data, fc_test_diff, second_diff=False)
fc_validate_org = invert_transformation(test_data, fc_validate_diff, second_diff=False)
fc_30next_org = invert_transformation(validate_data, fc_30next_diff, second_diff=False)

In [None]:
#Đánh giá độ chính xác validate, test
test_rmse = np.sqrt(np.mean((fc_test_org.Close_forecast.values - test_data.Close.head(30).values)**2))
validate_rmse = np.sqrt(np.mean((fc_validate_org.Close_forecast.values - validate_data.Close.head(30).values)**2))
print('Testing RMSE:', test_rmse)
print('Validate RMSE:', validate_rmse)

In [None]:
test_mape = np.mean(np.abs((test_data.Close.head(30).values - fc_test_org.Close_forecast.values) / test_data.Close.head(30).values)) * 100
validate_mape = np.mean(np.abs((validate_data.Close.head(30).values - fc_validate_org.Close_forecast.values) / validate_data.Close.head(30).values)) * 100
print('Test MAPE:', test_mape)
print('Validate MAPE:', validate_mape)

In [None]:
test_mae = mean_absolute_error(test_data.Close.head(30), fc_test_org.Close_forecast)
validate_mae = mean_absolute_error(validate_data.Close.head(30), fc_validate_org.Close_forecast)
print('Test MAE:', test_mae)
print('Validate MAE:', validate_mae)

In [None]:
#Trực quan hóa kết quả
plt.figure(figsize=(15, 7))
plt.plot(df.index, df['Close'], label='Original Data')
plt.plot(train_data.index, train_data['Close'], label='Train Data')
plt.plot(test_data.index, test_data['Close'], label='Test Data')
plt.plot(validate_data.index, validate_data['Close'], label='Validation Data')
plt.plot(fc_test_org.index, fc_test_org['Close_forecast'], label='Predictions')
plt.plot(fc_validate_org.index, fc_validate_org['Close_forecast'], label='Validate Predictions')
plt.plot(fc_30next_org.index, fc_30next_org['Close_forecast'], label='Next30Day')
plt.xlabel('Date')
plt.ylabel('Close Price')
plt.title('Train, Test, and Validation Data')
plt.legend()
plt.show()