In [29]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import sys
import tqdm
from scipy.stats import pearsonr, spearmanr, rankdata
from statsmodels.api import OLS
from statsmodels.tsa.stattools import grangercausalitytests, adfuller
from statsmodels.tsa.api import VAR
from olsjan10 import *

In [17]:
with pd.ExcelFile('./data/宏观数据.xlsx') as p:
    sheet_names = p.sheet_names

In [None]:
# 平稳化时间序列
def cal_DiffOrder(series: pd.Series):
    # 进行单位根检验 adfuller检验
    p_value = adfuller(series)[1]
    d = 0
    while p_value > 0.05:
        d += 1
        series = series.diff().iloc[1:]
        p_value = adfuller(series)[1]
    return d

In [None]:
# 函数输入宏观数据和行业数据，得到的表格行为行业，列为宏观变量，值为0,1,2,3，分别代表双向不通过检验，宏观变量存在领先关系，宏观变量存在滞后关系，宏观变量存在领先滞后关系
def granger_test_for_factor_selection(macro, price, lag=3, test='ssr_chi2test'):
    test_value_list = [[0, 2], [1, 3]]
    test_matrix = np.zeros((price.shape[1], macro.shape[1]))
    # 对于宏观数据进行遍历
    for col in range(macro.shape[1]):
        macro_column = macro.iloc[:, col].dropna()
        stationary_dict = stationaize_series(macro_column)
        if not stationary_dict:
            continue
        else:
            macro_column = stationary_dict['Series']

        # 对所有的行业loop
        for i in range(price.shape[1]):
            price_column = price.iloc[:, i].dropna()
            stationary_dict = stationaize_series(price_column)
            if not stationary_dict:
                test_matrix[i][col] = None
                continue
            price_column = stationary_dict['Series']
            price_column = pd.merge(price_column, macro_column, left_index=True, right_index=True)

            price_column = price_column.dropna()
            # 进行格兰杰因果检验，检验宏观变量的领先关系
            granger_test = grangercausalitytests(price_column, maxlag=lag, verbose=False)
            min_p_value_lead = np.min([round(granger_test[j + 1][0][test][1], 4) for j in range(lag)])
            # 进行格兰杰因果检验，检验宏观变量的滞后关系
            granger_test = grangercausalitytests(price_column.iloc[:, [1, 0]], maxlag=lag, verbose=False)
            min_p_value_lag = np.min([round(granger_test[j + 1][0][test][1], 4) for j in range(lag)])
            # 把0.05当成是否显著的threshold
            test_matrix[i][col] = test_value_list[min_p_value_lead <= 0.05][min_p_value_lag <= 0.05]
    test_matrix = pd.DataFrame(test_matrix, index=price.columns, columns=macro.columns)
    return test_matrix

In [None]:
# 进行格兰杰因果检验
max_lag = 3

# 获取表格的表名
financial_names = ['EPS TTM', 'EPS BASIC', 'PE TTM']
sheet_names = pd.ExcelFile(r'data/宏观数据.xlsx').sheet_names
for i in range(1, 2):
    writer = pd.ExcelWriter('result/grangertest_{}.xlsx'.format(financial_names[i - 1]))
    price = get_financial_data(i)
    for j in tqdm(range(20)):
        macro = macro_data(j)
        table = granger_test_for_factor_selection(macro, price, lag=max_lag)
        table.to_excel(writer, encoding='utf_8_sig', sheet_name='{}'.format(sheet_names[j]))
    writer.save()
    writer.close()

In [None]:
# VAR测定领先滞后关系
def VAR_lead_lag_relationship(macro, price, granger, lag=3):
    result = pd.DataFrame(index=price.columns, columns=macro.columns, dtype='object')
    # 对于宏观数据进行遍历
    for col in range(macro.shape[1]):
        # 宏观经济数据
        macro_column = macro.iloc[:, col].dropna()
        # 把序列差分直至平稳
        stationary_dict = stationaize_series(macro_column)
        if not stationary_dict:
            continue
        else:
            macro_column = stationary_dict['Series']

        # 对所有的行业进行遍历
        for i in range(price.shape[1]):
            # 判断是否通过granger检验
            if granger is None or granger.iloc[i, col] == 0:
                continue
            price_column = price.iloc[:, i].dropna()
            # 把序列差分直至平稳
            stationary_dict = stationaize_series(price_column)
            price_column = stationary_dict['Series']
            var_data = pd.merge(price_column, macro_column, left_index=True, right_index=True)
            var_data.dropna(inplace=True)
            model = VAR(var_data)
            model_fitted = model.fit(lag)
            lead_lag = {}
            # print(model_fitted.pvalues)
            for index in range(3):
                if model_fitted.pvalues.iloc[2 * index + 2, 0] <= 0.05:
                    lead_lag[str(index+1)] = model_fitted.pvalues.iloc[2 * index + 2, 0]
                if model_fitted.pvalues.iloc[2 * index + 1, 1] <= 0.05:
                    lead_lag[str(-(index+1))] = model_fitted.pvalues.iloc[2 * index + 2, 0]
            lead_lag = sorted(lead_lag.items(), key=lambda x:x[1])
            lead_lag = [key for key, value in lead_lag]
            # print(lead_lag_li)
            if lead_lag:
                result.iloc[i, col] = ','.join(lead_lag)
    return result

In [None]:
max_lag = 3

# 获取表格的表名
financial_names = ['EPS TTM', 'EPS BASIC', 'PE TTM']
sheet_names = pd.ExcelFile(r'data/宏观数据.xlsx').sheet_names
for i in range(1, 2):
    writer = pd.ExcelWriter('result/lead_lag_{}.xlsx'.format(financial_names[i - 1]))
    price = get_financial_data(i)
    for j in tqdm(range(20)):
        macro = macro_data(j)
        granger_table = pd.read_excel(r'result/grangertest_EPS TTM.xlsx', index_col=0, sheet_name=j)
        table = VAR_lead_lag_relationship(macro, price, granger_table, lag=max_lag)
        table.to_excel(writer, encoding='utf_8_sig', sheet_name='{}'.format(sheet_names[j]))
    writer.save()
    writer.close()

In [30]:
import pandas as pd

In [31]:
xxx = [1,2,3,4,5,6,7]
yyy = np.random.rand(7)

In [42]:
xc = xxx.copy() 
xc = pd.DataFrame(xc)
xc.insert(loc=0,column='intercept', value=np.ones(len(xxx))) # 用insert之前先拷贝一份
alldata = pd.concat([pd.DataFrame(yyy),xc],axis=1)
alldata.dropna(inplace=True,axis=0)
model = sm.OLS(alldata.iloc[:,0],alldata.iloc[:,1:])
result.resid **2

0    0.159645
1    0.230574
2    0.000097
3    0.006970
4    0.131135
5    0.259686
6    0.091046
dtype: float64

In [62]:
x = 2

In [64]:
if x < 3: print(x)

2
