In [2]:
import pandas as pd
import numpy as np
from numpy import log
from matplotlib import pyplot as plt 
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_pacf, plot_acf
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from statsmodels.stats.diagnostic import acorr_ljungbox
from scipy.stats import normaltest

import warnings
warnings.filterwarnings("ignore")

In [3]:
import os
import sys

# 確保項目根目錄在模組路徑中
project_root = os.path.abspath("COVID-19-TIME-SERIES-PREDICT")  # 替換為你的根目錄路徑
if project_root not in sys.path:
    sys.path.append(project_root)

# 導入模組
from utils.common_tools import *

In [10]:
data = read_covid_data()
data.head()

['AFG' 'OWID_AFR' 'ALB' 'DZA' 'AND' 'AGO' 'AIA' 'ATG' 'ARG' 'ARM' 'ABW'
 'OWID_ASI' 'AUS' 'AUT' 'AZE' 'BHS' 'BHR' 'BGD' 'BRB' 'BLR' 'BEL' 'BLZ'
 'BEN' 'BMU' 'BTN' 'BOL' 'BES' 'BIH' 'BWA' 'BRA' 'VGB' 'BRN' 'BGR' 'BFA'
 'BDI' 'KHM' 'CMR' 'CAN' 'CPV' 'CYM' 'CAF' 'TCD' 'CHL' 'CHN' 'COL' 'COM'
 'COG' 'COK' 'CRI' 'CIV' 'HRV' 'CUB' 'CUW' 'CYP' 'CZE' 'COD' 'DNK' 'DJI'
 'DMA' 'DOM' 'ECU' 'EGY' 'SLV' 'GNQ' 'ERI' 'EST' 'SWZ' 'ETH' 'OWID_EUR'
 'OWID_EUN' 'FRO' 'FLK' 'FJI' 'FIN' 'FRA' 'PYF' 'GAB' 'GMB' 'GEO' 'DEU'
 'GHA' 'GIB' 'GRC' 'GRL' 'GRD' 'GTM' 'GGY' 'GIN' 'GNB' 'GUY' 'HTI'
 'OWID_HIC' 'HND' 'HKG' 'HUN' 'ISL' 'IND' 'IDN' 'OWID_INT' 'IRN' 'IRQ'
 'IRL' 'IMN' 'ISR' 'ITA' 'JAM' 'JPN' 'JEY' 'JOR' 'KAZ' 'KEN' 'KIR'
 'OWID_KOS' 'KWT' 'KGZ' 'LAO' 'LVA' 'LBN' 'LSO' 'LBR' 'LBY' 'LIE' 'LTU'
 'OWID_LIC' 'OWID_LMC' 'LUX' 'MAC' 'MDG' 'MWI' 'MYS' 'MDV' 'MLI' 'MLT'
 'MHL' 'MRT' 'MUS' 'MEX' 'FSM' 'MDA' 'MCO' 'MNG' 'MNE' 'MSR' 'MAR' 'MOZ'
 'MMR' 'NAM' 'NRU' 'NPL' 'NLD' 'NCL' 'NZL' 'NIC' 'NER' 'NGA' 'NIU'
 'OWI

Unnamed: 0,iso_code,continent,location,date,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,new_deaths_smoothed,...,female_smokers,male_smokers,handwashing_facilities,hospital_beds_per_thousand,life_expectancy,human_development_index,excess_mortality_cumulative_absolute,excess_mortality_cumulative,excess_mortality,excess_mortality_cumulative_per_million
0,AFG,Asia,Afghanistan,2020-02-24,5.0,5.0,,,,,...,,,37.746,0.5,64.83,0.511,,,,
1,AFG,Asia,Afghanistan,2020-02-25,5.0,0.0,,,,,...,,,37.746,0.5,64.83,0.511,,,,
2,AFG,Asia,Afghanistan,2020-02-26,5.0,0.0,,,,,...,,,37.746,0.5,64.83,0.511,,,,
3,AFG,Asia,Afghanistan,2020-02-27,5.0,0.0,,,,,...,,,37.746,0.5,64.83,0.511,,,,
4,AFG,Asia,Afghanistan,2020-02-28,5.0,0.0,,,,,...,,,37.746,0.5,64.83,0.511,,,,


In [11]:
data = extract_target_dataa(data)
data.head()

Available columns: ['iso_code', 'continent', 'location', 'date', 'total_cases', 'new_cases', 'new_cases_smoothed', 'total_deaths', 'new_deaths', 'new_deaths_smoothed', 'total_cases_per_million', 'new_cases_per_million', 'new_cases_smoothed_per_million', 'total_deaths_per_million', 'new_deaths_per_million', 'new_deaths_smoothed_per_million', 'reproduction_rate', 'icu_patients', 'icu_patients_per_million', 'hosp_patients', 'hosp_patients_per_million', 'weekly_icu_admissions', 'weekly_icu_admissions_per_million', 'weekly_hosp_admissions', 'weekly_hosp_admissions_per_million', 'new_tests', 'total_tests', 'total_tests_per_thousand', 'new_tests_per_thousand', 'new_tests_smoothed', 'new_tests_smoothed_per_thousand', 'positive_rate', 'tests_per_case', 'tests_units', 'total_vaccinations', 'people_vaccinated', 'people_fully_vaccinated', 'total_boosters', 'new_vaccinations', 'new_vaccinations_smoothed', 'total_vaccinations_per_hundred', 'people_vaccinated_per_hundred', 'people_fully_vaccinated_pe

Unnamed: 0,date,new_cases
0,2020-02-24,5.0
1,2020-02-25,0.0
2,2020-02-26,0.0
3,2020-02-27,0.0
4,2020-02-28,0.0


In [12]:
from sklearn.preprocessing import StandardScaler

def standardize_data(data: pd.DataFrame, column: str):
    """
    使用 z-score 進行標準化處理
    """
    scaler = StandardScaler()
    data["standardized_" + column] = scaler.fit_transform(data[column].values.reshape(-1, 1))
    return data, scaler

def log_transform(data: pd.DataFrame, column: str):
    data["log_" + column] = log(data[column] + 0.01)
    return data

def adf_test(data: pd.DataFrame, column: str):
    data.dropna(inplace=True)
    result = adfuller(data[column])
    print('ADF Statistic: %f' % result[0])
    print('p-value: %f' % result[1])
    print('Critical Values:')
    for key, value in result[4].items():
        print('\t%s: %.3f' % (key, value))

    if result[1] < 0.05:
        print("數據是穩態的")
    else:
        print("數據是非穩態的，需要進行差分")

    # 3. 繪製 ACF 和 PACF 檢查自相關性
    # plt.figure(figsize=(8, 4))
    # plot_acf(data["new_cases"], lags=30, ax=plt.subplot(121))
    # plot_pacf(data["new_cases"], lags=30, ax=plt.subplot(122))
    # plt.tight_layout()
    # plt.show()
def calculate_mape(true_values, predictions):
    """
    計算 Mean Absolute Percentage Error (MAPE)
    """
    true_values = np.array(true_values)
    predictions = np.array(predictions)
    return np.mean(np.abs((true_values - predictions) / true_values)) * 100

def calculate_ppe(true_values, predictions):
    """
    計算 Peak Prediction Error (PPE)
    包括高峰值大小和高峰時間點的誤差
    """
    peak_true_value = np.max(true_values)
    peak_true_time = np.argmax(true_values)

    peak_pred_value = np.max(predictions)
    peak_pred_time = np.argmax(predictions)

    # 高峰值誤差
    value_error = np.abs(peak_true_value - peak_pred_value)

    # 高峰時間誤差
    time_error = np.abs(peak_true_time - peak_pred_time)

    return value_error, time_error

def grid_search_ARIMA(data: pd.DataFrame, column: str):
    best_model = None
    best_order = None
    best_aic = float("inf")

    for p in range(3):
        for d in range(3):
            for q in range(3):
                try:
                    model = ARIMA(data[column], order=(p, d, q))
                    model_fit = model.fit()

                    if model_fit.aic < best_aic:
                        best_model = model_fit
                        best_order = (p, d, q)
                        best_aic = model_fit.aic
                except Exception as e:
                    # 捕捉錯誤，但繼續進行其他組合的測試
                    print(f"Error for order ({p}, {d}, {q}): {e}")
                    continue

    if best_model is not None:
        print("Best Model Summary:")
        print(best_model.summary())
    else:
        print("No suitable model found.")

    return best_order

def rolling_forecast(data: pd.DataFrame, column: str, best_order: tuple):
    # 劃分數據集
    train_size = int(len(data) * 0.8 // 1)  # 使用前 80% 的數據作為訓練集
    train = data.iloc[:train_size]     # 訓練集
    test = data.iloc[train_size:]      # 測試集

    print(f"Train Size: {len(train)}, Test Size: {len(test)}")

    # 初始化滾動預測
    history = [x for x in train[column]]  # 使用訓練集的初始數據
    predictions = []  # 儲存每次的預測結果

    # 滾動預測
    for t in range(len(test)):
        # 構建並擬合 ARIMA 模型
        model = ARIMA(history, order=best_order)
        model_fit = model.fit()

        # 預測下一個時間點
        yhat = model_fit.forecast(steps=1)[0]
        predictions.append(yhat)

        # 將真實值加入訓練集
        history.append(test[column].iloc[t])

    # 計算各種指標
    mae = mean_absolute_error(test[column], predictions)
    r2 = r2_score(test[column], predictions)
    rmse = np.sqrt(mean_squared_error(test[column], predictions))

    print(f"Mean of Real Data: {np.mean(test[column])}")
    print(f"Median of Real Data: {np.median(test[column])}")
    print(f"Mean Absolute Error: {mae}") # 平均絕對誤差
    print(f"Percentage of MAE: {mae / np.mean(test[column])}") # 平均絕對誤差占比
    print(f"Rolling Forecast RMSE: {rmse}") # RMSE

def check_residuals(data: pd.DataFrame, column: str, best_order: tuple):
    # 檢查殘差的自相關性
    model = ARIMA(data[column], order=best_order)
    model_fit = model.fit()
    residuals = model_fit.resid
    lb_test = acorr_ljungbox(residuals, lags=[10], return_df=True)
    print(lb_test)

    # 檢查殘差的正態分佈
    stat, p = normaltest(residuals)
    print(f"Normality Test: Statistic={stat}, p={p}")
    if p > 0.05:
        print("殘差接近正態分佈")
    else:
        print("殘差偏離正態分佈")

def rolling_window_forecast(data: pd.DataFrame, column: str, best_order: tuple, input_weeks: int = 4, forecast_weeks: int = 1, scaler=None):
    # 計算窗口大小
    input_days = input_weeks * 7
    forecast_days = forecast_weeks * 7

    # 初始化預測結果和真實值
    predictions = []
    true_values = []

    # 滾動窗口預測
    for start in range(len(data) - input_days - forecast_days + 1):
        # 定義訓練窗口和測試窗口
        train_window = data[column].iloc[start:start + input_days]
        test_window = data[column].iloc[start + input_days:start + input_days + forecast_days]

        # 構建並擬合 ARIMA 模型
        model = ARIMA(train_window, order=best_order)
        model_fit = model.fit()

        # 預測未來數據
        forecast = model_fit.forecast(steps=forecast_days)
        predictions.extend(forecast)
        true_values.extend(test_window)

    # 將標準化數據還原到原始尺度
    if scaler:
        predictions = scaler.inverse_transform(np.array(predictions).reshape(-1, 1)).flatten()
        true_values = scaler.inverse_transform(np.array(true_values).reshape(-1, 1)).flatten()

    # 計算性能指標
    rmse = np.sqrt(mean_squared_error(true_values, predictions))
    mae = mean_absolute_error(true_values, predictions)
    mape = calculate_mape(true_values, predictions)
    ppe_value_error, ppe_time_error = calculate_ppe(true_values, predictions)

    # 輸出結果
    print(f"MAE: {mae}")
    print(f"MAE / Mean: {mae / np.mean(true_values)}")
    print(f"RMSE: {rmse}")
    print(f"MAPE: {mape:.2f}%")
    print(f"Peak Prediction Error (Value): {ppe_value_error}")
    print(f"Peak Prediction Error (Time): {ppe_time_error} days")


def pipeline(data: pd.DataFrame, column: str, forecast_steps: int):
    # 1. 標準化數據
    data, scaler = standardize_data(data, column)

    # 2. ADF 單根檢定
    adf_test(data, "standardized_" + column)

    # 3. 使用網格搜索尋找最佳參數
    print("----------------Grid Search ARIMA------------------")
    best_order = grid_search_ARIMA(data, "standardized_" + column)

    # 4. 滾動窗口多步預測
    print("----------------Multi-step Forecast------------------")
    rolling_window_forecast(data, "standardized_" + column, best_order, input_weeks=forecast_steps, scaler=scaler)

In [6]:
regions = ["MYS", "MEX", "THA", "TWN", "CHN","JPN", "CAN", "VNM", "SGP", "ARG"]

In [13]:
for region in regions:
    print(f"--------------------------------------Region: {region}--------------------------------------")
    data = pd.read_csv(r"data\owid-covid-data.csv")
    data = data[data["iso_code"] == region]
    data = data[["date", "new_cases"]]
    pipeline(data, "new_cases", 1)

--------------------------------------Region: MYS--------------------------------------
ADF Statistic: -0.131844
p-value: 0.946173
Critical Values:
	1%: -3.439
	5%: -2.865
	10%: -2.569
數據是非穩態的，需要進行差分
----------------Grid Search ARIMA------------------
Best Model Summary:
                                 SARIMAX Results                                  
Dep. Variable:     standardized_new_cases   No. Observations:                  771
Model:                     ARIMA(2, 1, 2)   Log Likelihood                 517.618
Date:                    Sun, 15 Dec 2024   AIC                          -1025.236
Time:                            00:43:15   BIC                          -1002.004
Sample:                                 0   HQIC                         -1016.295
                                    - 771                                         
Covariance Type:                      opg                                         
                 coef    std err          z      P>|z|      [0.0