In [13]:
# 导入必要的库
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression, LassoCV
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import mean_squared_error
from statsmodels.api import OLS, add_constant
import matplotlib.pyplot as plt

# 读取数据并进行初步处理
MacroData = pd.read_csv("MacroData.csv")
MacroData.dropna(inplace=True)

# 规范化列名，移除特殊字符
MacroData.columns = MacroData.columns.str.replace('[^\w]', '', regex=True)

# 将除日期列外的所有列转换为数值类型
MacroData.iloc[:, 1:] = MacroData.iloc[:, 1:].apply(pd.to_numeric, errors='coerce')

# 解析日期列，根据实际的日期格式进行修改
print("原始日期格式示例：")
print(MacroData['日期'].head())

# 尝试解析日期
try:
    MacroData['日期'] = pd.to_datetime(MacroData['日期'], format='%b-%y')
except ValueError:
    try:
        MacroData['日期'] = pd.to_datetime(MacroData['日期'], format='%Y-%m')
    except ValueError:
        print("日期格式不符合预期，请检查日期格式并修改代码中的 'format' 参数。")
        raise

print("解析后的日期格式示例：")
print(MacroData['日期'].head())

# 按日期排序（如果需要）
MacroData.sort_values('日期', inplace=True)

# 划分训练集和测试集
split_point = int(0.985 * len(MacroData))
train_data = MacroData.iloc[:split_point]
test_data = MacroData.iloc[split_point:]

# 创建特征集和目标变量
exclude_columns = ["市半導體", "日期"]
features_excl_semiconductor = MacroData.drop(columns=exclude_columns)
features_excl_date = MacroData.drop(columns=["日期"])

# 检查训练数据样本
print(train_data.tail())

# 时间序列模型
train_dates = train_data['日期']
train_series = pd.Series(train_data['市半導體'].values, index=train_dates)

# 创建线性回归模型
X_train_ts = features_excl_semiconductor.iloc[:split_point]
X_train_ts = add_constant(X_train_ts)
y_train_ts = train_data['市半導體']

model_ts = OLS(y_train_ts, X_train_ts).fit()
print(model_ts.summary())

# 逐步回归 - 自定义前向选择算法
def forward_selection(X, y, criterion='bic'):
    initial_features = []
    remaining_features = list(X.columns)
    best_score = np.inf
    best_model = None

    while remaining_features:
        scores_with_candidates = []
        for candidate in remaining_features:
            features = initial_features + [candidate]
            X_subset = X[features]
            X_subset = add_constant(X_subset, has_constant='add')
            model = OLS(y, X_subset).fit()
            if criterion == 'bic':
                score = model.bic
            else:
                score = model.aic
            scores_with_candidates.append((score, candidate, model))

        scores_with_candidates.sort()
        best_new_score, best_candidate, best_candidate_model = scores_with_candidates[0]

        if best_new_score < best_score:
            best_score = best_new_score
            best_model = best_candidate_model
            initial_features.append(best_candidate)
            remaining_features.remove(best_candidate)
        else:
            break

    return best_model

X_train_fs = features_excl_date.iloc[:split_point]
y_train_fs = train_data['市半導體']

model_fs = forward_selection(X_train_fs, y_train_fs)
print(model_fs.summary())

# LASSO回归
X_lasso = features_excl_date
y_lasso = MacroData['市半導體']

# 标准化数据
scaler = StandardScaler()
X_lasso_scaled = scaler.fit_transform(X_lasso)

# 交叉验证选择最佳的 alpha 值
lasso_cv = LassoCV(cv=5, random_state=0)
lasso_cv.fit(X_lasso_scaled[:split_point], y_lasso[:split_point])

# 提取非零系数对应的特征
coef = lasso_cv.coef_
selected_features = X_lasso.columns[coef != 0]
print("LASSO选择的特征:", selected_features)

# 用 LASSO 选择的特征重新拟合模型
X_selected = X_lasso[selected_features]
X_train_lasso = X_selected.iloc[:split_point]
X_test_lasso = X_selected.iloc[split_point:]
y_train_lasso = y_lasso.iloc[:split_point]
y_test_lasso = y_lasso.iloc[split_point:]

model_lasso = LinearRegression()
model_lasso.fit(X_train_lasso, y_train_lasso)
pred_lasso = model_lasso.predict(X_test_lasso)

# 计算 LASSO 模型的 RMSE
rmse_lasso = calculate_rmse(y_test_lasso, pred_lasso)

# 主成分分析
pca = PCA()
X_pca = features_excl_date
X_pca_scaled = scaler.fit_transform(X_pca)
pca.fit(X_pca_scaled)

# 输出主成分的信息
print("主成分方差比例:", pca.explained_variance_ratio_)
eigenvalues = pca.explained_variance_
num_factors_kaiser = sum(eigenvalues > 1)
print("根据 Kaiser 准则选择的主成分数量:", num_factors_kaiser)

# 提取前 6 个主成分
n_components = 6
X_pca_reduced = pca.transform(X_pca_scaled)[:, :n_components]
pc_columns = [f'PC{i+1}' for i in range(n_components)]
pc_data = pd.DataFrame(X_pca_reduced, columns=pc_columns)
pc_data['市半導體'] = MacroData['市半導體']

# 用主成分建立线性回归模型
X_train_pca = pc_data.iloc[:split_point].drop(columns=['市半導體'])
X_test_pca = pc_data.iloc[split_point:].drop(columns=['市半導體'])
y_train_pca = pc_data.iloc[:split_point]['市半導體']
y_test_pca = pc_data.iloc[split_point:]['市半導體']

model_pca = LinearRegression()
model_pca.fit(X_train_pca, y_train_pca)
pred_pca = model_pca.predict(X_test_pca)
rmse_pca = calculate_rmse(y_test_pca, pred_pca)

# 计算 RMSE 的函数
def calculate_rmse(y_true, y_pred):
    return np.sqrt(mean_squared_error(y_true, y_pred))

# 时间序列模型预测
# 获取模型使用的特征名称
feature_names_ts = model_ts.model.exog_names

# 从测试数据中选择相同的特征
X_test_ts = features_excl_semiconductor.iloc[split_point:]
X_test_ts = add_constant(X_test_ts, has_constant='add')

# 确保测试数据包含所有特征
missing_features_ts = set(feature_names_ts) - set(X_test_ts.columns)
for feature in missing_features_ts:
    X_test_ts[feature] = 0  # 或者使用适当的填充值

# 确保列顺序一致
X_test_ts = X_test_ts[feature_names_ts]

# 进行预测
pred_ts = model_ts.predict(X_test_ts)
rmse_ts = calculate_rmse(test_data['市半導體'], pred_ts)

# 逐步回归模型预测
# 获取模型使用的特征名称
feature_names_fs = model_fs.model.exog_names

# 从测试数据中选择相同的特征
X_test_fs = features_excl_date.iloc[split_point:]
X_test_fs = add_constant(X_test_fs, has_constant='add')

# 确保测试数据包含所有特征
missing_features_fs = set(feature_names_fs) - set(X_test_fs.columns)
for feature in missing_features_fs:
    X_test_fs[feature] = 0  # 或者使用适当的填充值

# 确保列顺序一致
X_test_fs = X_test_fs[feature_names_fs]

# 进行预测
pred_fs = model_fs.predict(X_test_fs)
rmse_fs = calculate_rmse(y_lasso.iloc[split_point:], pred_fs)

# 输出各模型的 RMSE
rmse_results = {
    'BIC': rmse_fs,
    'LASSO': rmse_lasso,
    'TS': rmse_ts,
    'PC': rmse_pca
}
print("各模型的 RMSE：")
print(rmse_results)

# 打印预测结果（可选）
print("逐步回归预测结果:")
print(pred_fs)
print("时间序列预测结果:")
print(pred_ts)


原始日期格式示例：
0    Jan-11
1    Feb-11
2    Mar-11
3    Apr-11
4    May-11
Name: 日期, dtype: object
解析后的日期格式示例：
0   2011-01-01
1   2011-02-01
2   2011-03-01
3   2011-04-01
4   2011-05-01
Name: 日期, dtype: datetime64[ns]
            日期    市半導體  製造業營業氣候測驗點         股價指數  外銷訂單動向指數  工業及服務業受僱員工淨進入率  \
151 2023-08-01  366.65       92.63  16648.14273     46.45            0.03   
152 2023-09-01  366.14       94.09  16569.90350     44.74           -0.09   
153 2023-10-01  369.82       93.77  16422.40450     44.80            0.10   
154 2023-11-01  391.72       93.24  16977.86409     44.66            0.06   
155 2023-12-01  405.70       97.06  17569.97952     43.69           -0.03   

          M1B  實質半導體設備進口值  建築物開工樓地板面積  工業生產指數  ...     中國EPU指數     美國EPU指數  \
151  26527742       44081    2552.502   91.05  ...  227.150059   76.844224   
152  26457439       38240    1626.627   89.47  ...  249.484819  117.698777   
153  26306314       54316    2228.247   92.42  ...  160.979522  133.594075   
154  2631409

  MacroData.columns = MacroData.columns.str.replace('[^\w]', '', regex=True)


In [15]:
# 逐步回归 - 自定义前向选择算法
def forward_selection(X, y, criterion='bic'):
    initial_features = []
    remaining_features = list(X.columns)
    best_score = np.inf
    best_model = None

    while remaining_features:
        scores_with_candidates = []
        for candidate in remaining_features:
            features = initial_features + [candidate]
            X_subset = X[features]
            X_subset = add_constant(X_subset)
            model = OLS(y, X_subset).fit()
            if criterion == 'bic':
                score = model.bic
            else:
                score = model.aic
            scores_with_candidates.append((score, candidate, model))

        scores_with_candidates.sort()
        best_new_score, best_candidate, best_candidate_model = scores_with_candidates[0]

        if best_new_score < best_score:
            best_score = best_new_score
            best_model = best_candidate_model
            initial_features.append(best_candidate)
            remaining_features.remove(best_candidate)
        else:
            break

    return best_model

X_train_fs = features_excl_date.iloc[:split_point]
y_train_fs = train_data['市半導體']

model_fs = forward_selection(X_train_fs, y_train_fs)
print(model_fs.summary())


                            OLS Regression Results                            
Dep. Variable:                   市半導體   R-squared:                       1.000
Model:                            OLS   Adj. R-squared:                  1.000
Method:                 Least Squares   F-statistic:                 1.466e+34
Date:                Thu, 19 Sep 2024   Prob (F-statistic):               0.00
Time:                        20:01:12   Log-Likelihood:                 4835.4
No. Observations:                 156   AIC:                            -9665.
Df Residuals:                     153   BIC:                            -9656.
Df Model:                           2                                         
Covariance Type:            nonrobust                                         
                 coef    std err          t      P>|t|      [0.025      0.975]
------------------------------------------------------------------------------
const      -7.816e-14   1.01e-14     -7.742      0.0

In [17]:
# LASSO回归
X_lasso = features_excl_date
y_lasso = MacroData['市半導體']

# 标准化数据
scaler = StandardScaler()
X_lasso_scaled = scaler.fit_transform(X_lasso)

# 交叉验证选择最佳的alpha值
lasso_cv = LassoCV(cv=5, random_state=0)
lasso_cv.fit(X_lasso_scaled[:split_point], y_lasso[:split_point])

# 提取非零系数对应的特征
coef = lasso_cv.coef_
selected_features = X_lasso.columns[coef != 0]
print("LASSO选择的特征:", selected_features)

# 用LASSO选择的特征重新拟合模型
X_selected = X_lasso[selected_features]
X_train_lasso = X_selected.iloc[:split_point]
X_test_lasso = X_selected.iloc[split_point:]
y_train_lasso = y_lasso.iloc[:split_point]
y_test_lasso = y_lasso.iloc[split_point:]

model_lasso = LinearRegression()
model_lasso.fit(X_train_lasso, y_train_lasso)
pred_lasso = model_lasso.predict(X_test_lasso)

LASSO选择的特征: Index(['市半導體'], dtype='object')


In [None]:
# 主成分分析
pca = PCA()
X_pca = features_excl_date
X_pca_scaled = scaler.fit_transform(X_pca)
pca.fit(X_pca_scaled)

# 输出主成分的信息
print("主成分方差比例:", pca.explained_variance_ratio_)
eigenvalues = pca.explained_variance_
num_factors_kaiser = sum(eigenvalues > 1)
print("根据Kaiser准则选择的主成分数量:", num_factors_kaiser)

# 提取前6个主成分
n_components = 6
X_pca_reduced = pca.transform(X_pca_scaled)[:, :n_components]
pc_columns = [f'PC{i+1}' for i in range(n_components)]
pc_data = pd.DataFrame(X_pca_reduced, columns=pc_columns)
pc_data['市半導體'] = MacroData['市半導體']

# 用主成分建立线性回归模型
X_train_pca = pc_data.iloc[:split_point].drop(columns=['市半導體'])
X_test_pca = pc_data.iloc[split_point:].drop(columns=['市半導體'])
y_train_pca = pc_data.iloc[:split_point]['市半導體']
y_test_pca = pc_data.iloc[split_point:]['市半導體']

model_pca = LinearRegression()
model_pca.fit(X_train_pca, y_train_pca)
pred_pca = model_pca.predict(X_test_pca)

In [None]:
# 计算RMSE
def calculate_rmse(y_true, y_pred):
    return np.sqrt(mean_squared_error(y_true, y_pred))

# 时间序列模型预测
X_test_ts = features_excl_semiconductor.iloc[split_point:]
X_test_ts = add_constant(X_test_ts)
pred_ts = model_ts.predict(X_test_ts)
rmse_ts = calculate_rmse(test_data['市半導體'], pred_ts)

# 逐步回归模型预测
X_test_fs = add_constant(features_excl_date.iloc[split_point:][model_fs.model.exog_names[1:]])
pred_fs = model_fs.predict(X_test_fs)
rmse_fs = calculate_rmse(y_lasso.iloc[split_point:], pred_fs)

# LASSO模型的RMSE
rmse_lasso = calculate_rmse(y_test_lasso, pred_lasso)

# PCA模型的RMSE
rmse_pca = calculate_rmse(y_test_pca, pred_pca)

# 输出各模型的RMSE
rmse_results = {
    'BIC': rmse_fs,
    'LASSO': rmse_lasso,
    'TS': rmse_ts,
    'PC': rmse_pca
}
print("各模型的RMSE：")
print(rmse_results)

# 打印预测结果（可选）
print("逐步回归预测结果:")
print(pred_fs)
print("时间序列预测结果:")
print(pred_ts)