In [1]:
## 使用資源導入

import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression  #預測型用
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
import matplotlib.ticker as mticker
import statsmodels.formula.api as sm  #解釋型用
from sklearn.model_selection import train_test_split #做切分
from sklearn import metrics #看precisioin/recall
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from sklearn.model_selection import cross_val_score
from datetime import datetime, timedelta
import streamlit as st
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go

In [2]:
rgb_colors = [
    (0.12156862745098039, 0.4666666666666667, 0.7058823529411765),
    (0.6823529411764706, 0.7803921568627451, 0.9098039215686274),
    (1.0, 0.4980392156862745, 0.054901960784313725),
    (1.0, 0.7333333333333333, 0.47058823529411764),
    (0.17254901960784313, 0.6274509803921569, 0.17254901960784313),
    (0.596078431372549, 0.8745098039215686, 0.5411764705882353),
    (0.8392156862745098, 0.15294117647058825, 0.1568627450980392),
    (1.0, 0.596078431372549, 0.5882352941176471),
    (0.5803921568627451, 0.403921568627451, 0.7411764705882353),
    (0.7725490196078432, 0.6901960784313725, 0.8352941176470589),
    (0.5490196078431373, 0.33725490196078434, 0.29411764705882354),
    (0.7686274509803922, 0.611764705882353, 0.5803921568627451),
    (0.8901960784313725, 0.4666666666666667, 0.7607843137254902),
    (0.9686274509803922, 0.7137254901960784, 0.8235294117647058),
    (0.4980392156862745, 0.4980392156862745, 0.4980392156862745),
    (0.7803921568627451, 0.7803921568627451, 0.7803921568627451),
    (0.7372549019607844, 0.7411764705882353, 0.13333333333333333),
    (0.8588235294117647, 0.8588235294117647, 0.5529411764705883),
    (0.09019607843137255, 0.7450980392156863, 0.8117647058823529),
    (0.6196078431372549, 0.8549019607843137, 0.8980392156862745)
]

# 将 RGB 颜色转换为 HEX
def rgb_to_hex(rgb):
    return '#{:02x}{:02x}{:02x}'.format(int(rgb[0] * 255), int(rgb[1] * 255), int(rgb[2] * 255))

hex_colors = [rgb_to_hex(color) for color in rgb_colors]

### 新資料

資料處理

In [None]:
data = pd.read_excel("variant_clean_ts_20240805.xlsx")
data['date_collected'] = pd.to_datetime(data['date_collected'])

# 選2024的本土個案
sd= pd.to_datetime('2023-12-30')
import_data = data[(data['date_collected'] >= sd) & (data['imported'] == 0)]
filt_local = import_data.groupby(['var_grp', 'date_collected'])['imported'].count().reset_index().rename(columns={'imported':'local'})
total_local = filt_local.groupby('date_collected')['local'].sum().reset_index().rename(columns={'local':'total'})
local_data = pd.merge(filt_local, total_local, on='date_collected', how='left')
local_data['percentage'] = round((local_data['local'] / local_data['total'])*100,2)
local_data.head()


In [None]:
# 提取年份、月份、日-01
local_data['Year'] = local_data['date_collected'].dt.year
local_data['Month'] = local_data['date_collected'].dt.month
local_data['Day'] = local_data['date_collected'].dt.day

# 可以選擇將日期轉換為距離某個基準日期的天數
base_date = pd.to_datetime('2023-12-24')
local_data['Days_Since_Base'] = (local_data['date_collected'] - base_date).dt.days
local_data['new_case_relative'] = round((local_data['local'] / local_data['total']),4)
local_data

模型建立

In [None]:
X = pd.DataFrame()
X['Year'] = local_data['Year']
X['Month'] = local_data['Month']
X['Day'] = local_data['Day']
X['Days_Since_Base'] = local_data['Days_Since_Base']
X['new_case_relative'] = round((local_data['local'] / local_data['total']),2)


# 將病毒變異株轉換成數值格式
y = local_data['var_grp'].astype('category').cat.codes

# 分割數據為訓練集和測試集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

# 建立並訓練邏輯迴歸模型
model = LogisticRegression(multi_class='multinomial', solver='lbfgs')
model.fit(X_train, y_train)

# 預測測試集並輸出每個變異株的機率
y_pred_proba = model.predict_proba(X_test)

# 顯示結果
print(y_pred_proba)

In [None]:
#模型評估

# 預測測試集
y_pred = model.predict(X_test)

# 計算準確率
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.2f}")

# 混淆矩陣
conf_matrix = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:")
print(conf_matrix)

# 分類報告
class_report = classification_report(y_test, y_pred)
print("Classification Report:")
print(class_report)

預測

In [None]:
# 預測未來四週 以兩週為單位

# 預測未來日期
future_dates = pd.date_range(start='2024-07-20', end='2024-08-16', freq='14D')
future_data = pd.DataFrame()
future_data['date_collected'] = future_dates
future_data['Year'] = future_data['date_collected'].dt.year
future_data['Month'] = future_data['date_collected'].dt.month
future_data['Day'] = future_data['date_collected'].dt.day
future_data['Days_Since_Base'] = (future_data['date_collected'] - base_date).dt.days
future_data['new_case_relative'] = 0  # Assuming no new cases data

# 進行預測
X_future = future_data[['Year', 'Month', 'Day', 'Days_Since_Base', 'new_case_relative']]
y_future_pred_proba = model.predict_proba(X_future)

# 把預測機率轉換成 dataframe
y_future_pred_df = pd.DataFrame(y_future_pred_proba, columns=local_data['var_grp'].astype('category').cat.categories)
y_future_pred_df['date'] = future_data['date_collected']

# 按日期分组並計算每個變異珠的平均預測機率
y_future_pred_df = y_future_pred_df.groupby('date').mean()
y_future_pred_df.index = y_future_pred_df.index.date  ##########

# 繪圖
y_future_pred_df.plot(kind='bar', stacked=True, figsize=(14, 8), colormap='tab20')

# 標題和圖示
plt.title('Variants Distribution Over Time (Past and Future)')
plt.xlabel('Date')
plt.ylabel('Average Predicted Probability')
plt.legend(title='Variants', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.xticks(rotation=45, ha='right')

plt.show()

In [None]:
start_date = pd.Timestamp('2023-12-23')
# 將早於指定開始日期的數據過濾掉
data_2024 = data[(data['date_collected'] >= start_date) & (data['imported'] == 0)]
# 創建每兩週的區間
bins = pd.date_range(start=start_date, end=data_2024['date_collected'].max() + pd.DateOffset(days=7), freq='14D')
# 創建每兩週的區間標籤
labels = [f"{start.date()}" for start, end in zip(bins[:-1], bins[1:])]
data_2024['date'] = pd.cut(data_2024['date_collected'], bins=bins, labels=labels, right=False)

fl = data_2024.groupby(['var_grp', 'date'])['imported'].count().reset_index().rename(columns={'imported':'local'})
tl = fl.groupby('date')['local'].sum().reset_index().rename(columns={'local':'total'})
ld = pd.merge(fl, tl, on='date', how='left')
ld['percentage'] = round((ld['local'] / ld['total']),4)
df = pd.pivot(ld, index='date', columns='var_grp', values='percentage').reset_index().rename_axis(None, axis=1)

df.set_index('date', inplace=True)

# 合併過去和未來的數據
bined_df = pd.concat([df, y_future_pred_df])
#bined_df.index = bined_df.index.date
bined_df.plot(kind='bar', stacked=True, figsize=(14, 8), colormap='tab20')


plt.axvspan(14.5,17.5, color='grey', alpha=0.4, zorder=0)
plt.text(x=15.5, y=max(bined_df.max().max()*1.08, bined_df.max().max()), 
         s='Predicted', color='black', ha='center', va='center', fontsize=12, zorder=2)

# 新增標題
plt.title('Proportion of Variants Over Time(Observed and Predicted)')
plt.xlabel('Collection Date, two-week period')
plt.ylabel('Predicted Probability(%)')

# 把y軸換成百分比形式
formatter = mticker.FuncFormatter(lambda x, _: f'{x*100:.0f}%')
plt.gca().yaxis.set_major_formatter(formatter)

plt.legend(title='Variants', bbox_to_anchor=(1.02, 1), loc='upper left')
plt.tight_layout()
plt.xticks(rotation=45, ha='right')

plt.show()

In [320]:
## html

bined_df_reset = bined_df.reset_index().rename(columns={'index':'date'})

# DataFrame短換長
bined_df_long = bined_df_reset.melt(id_vars='date', var_name='var_grp', value_name='percentage')

# 繪製長條圖
fig = px.bar(bined_df_long, x='date', y='percentage', color='var_grp', 
             title='Variants Distribution Over Time (Past and Future)', 
             labels={'percentage': 'Average Predicted Probability', 'date': 'Date'},
             color_discrete_sequence=hex_colors)

# 
fig.add_vrect(x0='2024-07-24', x1='2024-08-06', 
              fillcolor='grey', opacity=0.4, 
              layer='below', line_width=0)

fig.add_vrect(x0='2024-07-03', x1='2024-07-16', 
              fillcolor='red', opacity=0.3, 
              layer='below', line_width=0)

# 座標
fig.add_annotation(x='2024-07-30', y=max(bined_df.max().max()*1.07, bined_df.max().max()),
                   text='Nowcast', showarrow=False, 
                   font=dict(size=18, color='black'))

fig.update_layout(
    xaxis=dict(tickangle=-45, tickvals=bined_df_long['date'].unique(), ticktext=bined_df_long['date'].unique()),
    yaxis_title='Average Predicted Probability', barmode='stack', legend_title='Variants', bargap=0.5,
    xaxis_title='Date', title='Variants Distribution Over Time (Past and Future)', yaxis=dict(tickformat='.1%'))


fig.write_html('Variants Distribution Over Time (Past and Future).html')


計算預測資料的信心區間

In [None]:
from scipy.stats import norm

# 選取2024-08-03的預測資料
pred_2024_08_03 = y_future_pred_df.iloc[-1]

# 計算95%信賴區間
def calculate_confidence_interval(proportion, n=1, confidence_level=0.95):
    """計算給定比例的信賴區間。"""
    z = norm.ppf((1 + confidence_level) / 2)
    std_error = np.sqrt(proportion * (1 - proportion) / n)
    ci_lower = proportion - z * std_error
    ci_upper = proportion + z * std_error
    return max(0, ci_lower), min(1, ci_upper)  # 確保CI範圍在0和1之間

# 假設樣本數 n 是100，可以根據實際數據調整
n = 100

# 計算所有變異株的信賴區間
ci_df = pred_2024_08_03.apply(lambda p: pd.Series(calculate_confidence_interval(p, n=n)))

# 重命名CI列
ci_df.columns = ['CI Lower', 'CI Upper']

# 合併預測比例和信賴區間
final_df = pd.concat([pred_2024_08_03, ci_df], axis=1)
final_df.columns = ['Proportion', 'CI Lower', 'CI Upper']

# 將比例和信賴區間轉換為百分比
final_df = final_df.apply(lambda x: round(x * 100, 2))

# 轉換成表格
final_df['Proportion'] = final_df.apply(lambda row: f"{row['Proportion']}%", axis=1)
final_df['95% PI'] = final_df.apply(lambda row: f"{row['CI Lower']}%-{row['CI Upper']}%", axis=1)
final_df = final_df.drop(columns=['CI Lower', 'CI Upper'])

final_df


### 舊資料(用來比較過去預測和現實資料的差異)

資料處理

In [None]:
data = pd.read_excel("variant_clean_ts.xlsx") #舊資料
data['date_collected'] = pd.to_datetime(data['date_collected'])

# 選2024的本土個案
sd= pd.to_datetime('2023-12-30')
import_data = data[(data['date_collected'] >= sd ) & (data['imported'] == 0)]
filt_local = import_data.groupby(['var_grp', 'date_collected'])['imported'].count().reset_index().rename(columns={'imported':'local'})
total_local = filt_local.groupby('date_collected')['local'].sum().reset_index().rename(columns={'local':'total'})
local_data = pd.merge(filt_local, total_local, on='date_collected', how='left')
local_data['percentage'] = round((local_data['local'] / local_data['total'])*100,2)
local_data


In [13]:
# 提取年份、月份、日-01
local_data['Year'] = local_data['date_collected'].dt.year
local_data['Month'] = local_data['date_collected'].dt.month
local_data['Day'] = local_data['date_collected'].dt.day

# 可以選擇將日期轉換為距離某個基準日期的天數
base_date = pd.to_datetime('2023-12-24')
local_data['Days_Since_Base'] = (local_data['date_collected'] - base_date).dt.days
local_data['new_case_relative'] = round((local_data['local'] / local_data['total']),2)

模型建立

In [None]:
# 將處理後的數據與其他數據合併
X = pd.DataFrame()
X['Year'] = local_data['Year']
X['Month'] = local_data['Month']
X['Day'] = local_data['Day']
X['Days_Since_Base'] = local_data['Days_Since_Base']
X['new_case_relative'] = round((local_data['local'] / local_data['total']),2)


# 將病毒變異株轉換成數值格式
y = local_data['var_grp'].astype('category').cat.codes

# 分割數據為訓練集和測試集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

# 建立並訓練邏輯迴歸模型
model = LogisticRegression(multi_class='multinomial', solver='lbfgs')
model.fit(X_train, y_train)

# 預測測試集並輸出每個變異株的機率
y_pred_proba = model.predict_proba(X_test)

# 顯示結果
print(y_pred_proba)

In [None]:
#模型評估

# 預測測試集
y_pred = model.predict(X_test)

# 準確率
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.2f}")

# 混淆矩陣
conf_matrix = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:")
print(conf_matrix)

# 分類報告
class_report = classification_report(y_test, y_pred)
print("Classification Report:")
print(class_report)

預測

In [None]:
# 以舊資料預測未來四週 以兩週為單位

# 預測未來日期
future_dates = pd.date_range(start='2024-07-06', end='2024-08-02', freq='14D')
future_data = pd.DataFrame()
future_data['date_collected'] = future_dates
future_data['Year'] = future_data['date_collected'].dt.year
future_data['Month'] = future_data['date_collected'].dt.month
future_data['Day'] = future_data['date_collected'].dt.day
future_data['Days_Since_Base'] = (future_data['date_collected'] - base_date).dt.days
future_data['new_case_relative'] = 0  # Assuming no new cases data

# 進行預測
X_future = future_data[['Year', 'Month', 'Day', 'Days_Since_Base', 'new_case_relative']]
y_future_pred_proba = model.predict_proba(X_future)

# 把預測機率轉換成 DataFrame
y_future_pred_df = pd.DataFrame(y_future_pred_proba, columns=local_data['var_grp'].astype('category').cat.categories)
y_future_pred_df['date'] = future_data['date_collected']

# 按日期分組並計算每個變異珠的平均預測機率
y_future_pred_df = y_future_pred_df.groupby('date').mean()
y_future_pred_df.index = y_future_pred_df.index.date  ##########

# 繪圖
y_future_pred_df.plot(kind='bar', stacked=True, figsize=(14, 8), colormap='tab20')

# 
plt.title('Variants Distribution Over Time (Past and Future)')
plt.xlabel('Date')
plt.ylabel('Average Predicted Probability')
plt.legend(title='Variants', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.xticks(rotation=45, ha='right')

plt.show()

In [None]:
y_future_pred_df

預測vs觀察

In [None]:
data = pd.read_excel("variant_clean_ts_20240805.xlsx") #新資料
data['date_collected'] = pd.to_datetime(data['date_collected'])

# 選2024的本土個案
sd= pd.to_datetime('2023-12-30')
import_data = data[(data['date_collected'] >= sd) & (data['imported'] == 0)]
filt_local = import_data.groupby(['var_grp', 'date_collected'])['imported'].count().reset_index().rename(columns={'imported':'local'})
total_local = filt_local.groupby('date_collected')['local'].sum().reset_index().rename(columns={'local':'total'})
local_data = pd.merge(filt_local, total_local, on='date_collected', how='left')
local_data['percentage'] = round((local_data['local'] / local_data['total'])*100,2)

# 可以選擇將日期轉換為距離某個基準日期的天數
base_date = pd.to_datetime('2023-12-24')
local_data['Days_Since_Base'] = (local_data['date_collected'] - base_date).dt.days
local_data['new_case_relative'] = round((local_data['local'] / local_data['total']),4)

start_date = pd.Timestamp('2023-12-23')
# 將早於指定開始日期的數據過濾掉
data_2024 = data[(data['date_collected'] >= start_date) & (data['imported'] == 0)]
# 創建區間
bins = pd.date_range(start=start_date, end=data_2024['date_collected'].max() + pd.DateOffset(days=7), freq='14D')
# 創建區間標籤
labels = [f"{start.date()}" for start, end in zip(bins[:-1], bins[1:])]
data_2024['date'] = pd.cut(data_2024['date_collected'], bins=bins, labels=labels, right=False)

fl = data_2024.groupby(['var_grp', 'date'])['imported'].count().reset_index().rename(columns={'imported':'local'})
tl = fl.groupby('date')['local'].sum().reset_index().rename(columns={'local':'total'})
ld = pd.merge(fl, tl, on='date', how='left')
ld['percentage'] = round((ld['local'] / ld['total']),4)
ld['date'] = pd.to_datetime(ld['date'])
ld1 = ld[(ld['date'] >= pd.to_datetime('2024-07-06')) & (ld['date'] <= pd.to_datetime('2024-07-19'))]
df = pd.pivot(ld1, index='date', columns='var_grp', values='percentage').reset_index().rename_axis(None, axis=1)

df.set_index('date', inplace=True)
df.index = df.index.date
predicted_data = y_future_pred_df.drop(y_future_pred_df.index[-1])
# 合併過去和未來的數據
bined_df = pd.concat([predicted_data, df])
#bined_df = bined_df.sort_index()

bined_df.plot(kind='bar', stacked=True, figsize=(14, 8), colormap='tab20')

plt.axvspan(-0.5,0.5, color='grey', alpha=0.4, zorder=0)
plt.text(x=0, y=max(bined_df.max().max()*2.59, bined_df.max().max()), 
         s='Predict', color='black', ha='center', va='center', fontsize=16, zorder=2)
plt.text(x=1, y=max(bined_df.max().max()*2.59, bined_df.max().max()), 
         s='Observe', color='black', ha='center', va='center', fontsize=16, zorder=2)


plt.title('Variants Distribution Over Time (Past and Future)')
plt.xlabel('Date')
plt.ylabel('Average Predicted Probability')

# y軸換成百分比
formatter = mticker.FuncFormatter(lambda x, _: f'{x*100:.0f}%')
plt.gca().yaxis.set_major_formatter(formatter)

plt.legend(title='Variants', bbox_to_anchor=(1, 1), loc='upper left')
plt.tight_layout()
plt.xticks(rotation=0, ha='right')

plt.show()