In [None]:
#数据导入
import pandas as pd
setattr(pd, "Int64Index", pd.Index)
setattr(pd, "Float64Index", pd.Index)
from statsmodels.tsa.arima.model import ARIMA
from arch import arch_model
from statsmodels.stats.diagnostic import acorr_ljungbox, het_arch
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

data = pd.read_excel('9*5.xlsx')
names=data['投资者分类'].unique()
grouped = data.groupby('投资者分类')
behavior_values = {}
for name, group in grouped:
    behavior = group['新行为类型']
    behavior_values[name] = behavior
#data = pd.read_csv('情绪.csv')
#names=data['behavior'].unique()
#grouped = data.groupby('behavior')
emotion_values = {}
for name, group in grouped:
    emotion = group['emotion_value']
    emotion_values[name] = emotion


In [None]:
#协方差
import numpy as np
from scipy.stats import norm
from sklearn.covariance import EmpiricalCovariance

# Function to compute the empirical CDF
def empirical_cdf(data):
    """Compute the empirical CDF for a data series."""
    return pd.Series(data).rank(method='average') / (len(data) + 1)

# Ensure the data length is consistent
n = len(next(iter(emotion_values.values())))
u_matrix = np.zeros((n, len(emotion_values)))
v_matrix = np.zeros((n, len(behavior_values)))

# Compute the empirical CDFs and convert to standard normal marginals
for i, name in enumerate(names):
    if not emotion_values[name].index.equals(behavior_values[name].index):
        raise ValueError(f"Data for {name} is not aligned by date.")
    u = empirical_cdf(emotion_values[name])
    v = empirical_cdf(behavior_values[name])
    u_matrix[:, i] = norm.ppf(u)
    v_matrix[:, i] = norm.ppf(v)

# Combine u_matrix and v_matrix into a single matrix for copula fitting
copula_data = np.hstack((u_matrix, v_matrix))
# Fit a Gaussian Copula
cov_estimator = EmpiricalCovariance().fit(copula_data)

copula_corr_matrix = cov_estimator.covariance_
# Extract the correlation matrix for the emotion and behavior parts separately
emotion_behavior_corr_matrix = copula_corr_matrix[:len(names), len(names):]

# Convert the results to a DataFrame for easier viewing
gaussian_copula_corr_df = pd.DataFrame(emotion_behavior_corr_matrix, index=names, columns=names)
gaussian_copula_corr_df.to_excel('新关联系数.xlsx')
# Display the correlation DataFrame
print(gaussian_copula_corr_df)

# Create a figure and axes
# fig, axes = plt.subplots(9, 9, figsize=(20, 20))
# fig.subplots_adjust(hspace=0.5, wspace=0.5)

# Plot each scatter plot
# for i, emotion_key in enumerate(names):
#     for j, behavior_key in enumerate(names):
#         ax = axes[i, j]
#         ax.scatter(emotion_values[emotion_key], behavior_values[behavior_key], alpha=0.5)
#         ax.set_title(f'{emotion_key} vs {behavior_key}')
#         ax.set_xlabel('Emotion Values')
#         ax.set_ylabel('Behavior Values')

# plt.savefig('scatter_plots.png', dpi=300)
# plt.show()


In [None]:
#Spearman
import pandas as pd
import numpy as np

# Function to compute the ranks for Spearman correlation
def rank_data(data):
    """Compute the ranks for a data series."""
    return pd.Series(data).rank(method='average')

# Ensure the data length is consistent
n = len(next(iter(emotion_values.values())))
u_matrix = np.zeros((n, len(emotion_values)))
v_matrix = np.zeros((n, len(behavior_values)))

# Compute the ranks for Spearman correlation
for i, name in enumerate(names):
    u_matrix[:, i] = rank_data(emotion_values[name])
    v_matrix[:, i] = rank_data(behavior_values[name])

# Combine u_matrix and v_matrix into a single matrix for correlation computation
copula_data = np.hstack((u_matrix, v_matrix))

# Compute the Spearman correlation matrix
spearman_corr_matrix = np.corrcoef(copula_data, rowvar=False)

# Extract the correlation matrix for the emotion and behavior parts separately
emotion_behavior_corr_matrix = spearman_corr_matrix[:len(names), len(names):]

# Convert the results to a DataFrame for easier viewing
spearman_corr_df = pd.DataFrame(emotion_behavior_corr_matrix, index=names, columns=names)
spearman_corr_df.to_excel('新关联系数_spearman.xlsx')

# Display the correlation DataFrame
print(spearman_corr_df)


In [None]:
#pearson
import pandas as pd
import numpy as np

# Ensure the data length is consistent
n = len(next(iter(emotion_values.values())))
u_matrix = np.zeros((n, len(emotion_values)))
v_matrix = np.zeros((n, len(behavior_values)))

# Convert dictionaries to matrices
for i, name in enumerate(names):
    u_matrix[:, i] = emotion_values[name]
    v_matrix[:, i] = behavior_values[name]

# Combine u_matrix and v_matrix into a single matrix for correlation computation
combined_data = np.hstack((u_matrix, v_matrix))

# Compute the Pearson correlation matrix
pearson_corr_matrix = np.corrcoef(combined_data, rowvar=False)

# Extract the correlation matrix for the emotion and behavior parts separately
emotion_behavior_corr_matrix = pearson_corr_matrix[:len(names), len(names):]

# Convert the results to a DataFrame for easier viewing
pearson_corr_df = pd.DataFrame(emotion_behavior_corr_matrix, index=names, columns=names)
pearson_corr_df.to_excel('新关联系数_pearson.xlsx')

# Display the correlation DataFrame
print(pearson_corr_df)


In [None]:
#造9*5数据（模拟数据）
import pandas as pd
import numpy as np

# 读取上传的文件
file_path = '新行为.xlsx'
data = pd.read_excel(file_path)

# 假设投资者分类在 '投资者分类' 列，每个交易日的数据在 '异动时间' 列
investor_column = '投资者分类'
date_column = '异动时间'
behavior_column = '新行为类型'

# 初始化新行为类型
behavior_categories = ['a', 'b', 'c', 'd', 'e']
np.random.seed(0)  # For reproducibility

# 获取唯一的投资者分类和交易日期
unique_investors = data[investor_column].unique()
unique_dates = sorted(data[date_column].unique())

# 初始化新行为类型列
new_behavior_types = []

# 为第一个交易日生成随机行为类型
initial_behavior_types = {investor: np.random.choice(behavior_categories) for investor in unique_investors}

# 为每个投资者初始化当前行为类型
current_behavior_types = initial_behavior_types.copy()

# 按日期顺序为每个投资者生成新行为类型
for date in unique_dates:
    for investor in unique_investors:
        # 决定是否改变行为类型
        if np.random.rand() < 0.05:
            # 改变为其他四种行为类型中的一种
            new_behavior = np.random.choice([b for b in behavior_categories if b != current_behavior_types[investor]])
            current_behavior_types[investor] = new_behavior
        # 记录当前行为类型
        new_behavior_types.append(current_behavior_types[investor])

# 将新行为类型加入数据中
data[behavior_column] = new_behavior_types

# 保存结果到新文件
output_file_path = '9*5.xlsx'
data.to_excel(output_file_path, index=False)

data.head()


In [None]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score


# 初始化结果矩阵
result_matrix = np.zeros((len(names), len(behavior_categories)))

# 对行为数据进行标签编码
label_encoder = LabelEncoder()

# 模型训练并计算关系矩阵
for i, investor in enumerate(names):
    X = emotion_values[investor]  
    y = behavior_values[investor]  
    
    # 标签编码
    y_encoded = label_encoder.fit_transform(y)
    
    # 将X转换为numpy数组并reshape
    X_reshaped = np.array(X).reshape(-1, 1)
    
    # 拆分训练集和测试集
    X_train, X_test, y_train, y_test = train_test_split(X_reshaped, y_encoded, test_size=0.2, random_state=42)
    
    # 逻辑回归模型训练
    model = LogisticRegression(multi_class='multinomial', solver='lbfgs')
    model.fit(X_train, y_train)
    
    # 计算分类准确度
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Investor {investor} - Accuracy: {accuracy:.2f}")
    
    # 存储模型系数（关系矩阵）
    for j, category in enumerate(behavior_categories):
        coef_index = list(label_encoder.classes_).index(category)
        result_matrix[i, j] = model.coef_[coef_index]

# 转换结果矩阵为DataFrame
result_df = pd.DataFrame(result_matrix, index=names, columns=behavior_categories)

# 输出结果
result_df.to_excel('逻辑回归关联.xlsx')
print(result_df)


In [None]:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed

# 读取上传的文件
file_path = '新行为.xlsx'
data = pd.read_excel(file_path)

# 定义投资者类别和行为类型
investor_types = data['投资者分类'].unique()
behavior_categories = ['长期类', '短线类', '趋势类', '追涨类', '抄底类']

# 定义行为变化概率
change_prob = 0.1  # 10%的概率改变行为类型

# 生成随机初始权重
def generate_random_weights(size):
    weights = np.random.rand(size)
    return weights / weights.sum()

# 生成初始行为类型
def generate_initial_behavior(size, weights):
    return np.random.choice(behavior_categories, size=size, p=weights)

# 更新行为类型
def update_behavior(current_behaviors, weights):
    change_mask = np.random.rand(len(current_behaviors)) < change_prob
    new_behaviors = current_behaviors.copy()
    for i, change in enumerate(change_mask):
        if change:
            new_behaviors[i] = np.random.choice(behavior_categories, p=weights)
    return new_behaviors

# 模拟每天的行为数据
def simulate_daily_behavior(num_investors, initial_weights, num_days):
    current_behaviors = generate_initial_behavior(num_investors, initial_weights)
    daily_behavior = np.zeros((num_days, len(behavior_categories)), dtype=int)
    weights = initial_weights
    for day in range(num_days):
        weights = generate_random_weights(len(behavior_categories))  # 每天重新生成随机权重
        current_behaviors = update_behavior(current_behaviors, weights)
        unique, counts = np.unique(current_behaviors, return_counts=True)
        behavior_count = dict(zip(unique, counts))
        for behavior in behavior_categories:
            daily_behavior[day, behavior_categories.index(behavior)] = behavior_count.get(behavior, 0)
    return daily_behavior

# 初始化数据结构
dates = data['异动时间'].unique()
num_days = len(dates)

# 并行处理每种投资者的数据
def process_investor(investor_type, dates, num_days):
    if investor_type == '散户':
        base_num_investors = np.random.randint(100000, 1000000)  # 模拟中小散户数量
    elif investor_type == '大户':
        base_num_investors = np.random.randint(1000, 10000)  # 模拟大户数量
    elif investor_type == '超有钱户':
        base_num_investors = np.random.randint(100, 1000)  # 模拟超大户数量
    else:
        base_num_investors = np.random.randint(100, 1000)  # 模拟机构投资者数量
    
    # 随机生成初始权重
    initial_weights = generate_random_weights(len(behavior_categories))
    
    # 模拟每天的行为数据
    daily_behavior = simulate_daily_behavior(base_num_investors, initial_weights, num_days)
    
    rows = []
    for day, date in enumerate(dates):
        row = {
            '异动时间': date,
            '投资者分类': investor_type,
        }
        row.update({behavior_categories[i]: daily_behavior[day, i] for i in range(len(behavior_categories))})
        rows.append(row)
    return rows

# 使用并行处理来优化性能
results = Parallel(n_jobs=-1)(delayed(process_investor)(investor, dates, num_days) for investor in investor_types)
new_data = [row for result in results for row in result]

# 转换为DataFrame
new_behavior_df = pd.DataFrame(new_data)

# 将新数据合并到原始数据中
merged_data = pd.merge(data, new_behavior_df, on=['异动时间', '投资者分类'], how='left')

# 保存结果到Excel文件
output_file_path = '新行为_带新行为类型_改进版.xlsx'
merged_data.to_excel(output_file_path, index=False)

merged_data.head()


In [None]:
import pandas as pd
import numpy as np
from scipy.stats import norm
from sklearn.covariance import EmpiricalCovariance

# 读取生成的行为数据文件
file_path = '新行为_带新行为类型_改进版.xlsx'
data = pd.read_excel(file_path)

# 定义投资者类别和行为类型
investor_types = data['投资者分类'].unique()
behavior_categories = ['长期价值类', '短线交易类', '趋势类', '偏向追涨类', '偏向抄底类']

# 初始化 emotion_values 和 behavior_values
emotion_values = {}
behavior_values = {}

# 遍历每种投资者
for investor in investor_types:
    investor_data = data[data['投资者分类'] == investor]
    emotion_values[investor] = investor_data['emotion_value']  # 假设情绪值在 '情绪值' 列中
    
    behavior_values[investor] = {}
    for behavior in behavior_categories:
        behavior_values[investor][behavior] = investor_data[behavior]
        
def empirical_cdf(data):
    """Compute the empirical CDF for a data series."""
    return pd.Series(data).rank(method='average') / (len(data) + 1)
        
# 确保数据长度一致
def compute_correlation(emotion_data, behavior_data):
    """Compute the correlation matrix between emotion data and behavior data."""
    n = len(emotion_data)
    m = len(behavior_data)
    
    u_matrix = np.zeros((n, 1))
    v_matrix = np.zeros((n, m))
    
    # Compute the empirical CDFs and convert to standard normal marginals
    u = empirical_cdf(emotion_data)
    u_matrix[:, 0] = norm.ppf(u)
    
    for i, behavior in enumerate(behavior_categories):
        v = empirical_cdf(behavior_data[behavior])
        v_matrix[:, i] = norm.ppf(v)
    
    # Combine u_matrix and v_matrix into a single matrix for copula fitting
    copula_data = np.hstack((u_matrix, v_matrix))
    # Fit a Gaussian Copula
    cov_estimator = EmpiricalCovariance().fit(copula_data)
    copula_corr_matrix = cov_estimator.covariance_
    # Extract the correlation matrix for the emotion and behavior parts separately
    emotion_behavior_corr_matrix = copula_corr_matrix[0, 1:]
    return emotion_behavior_corr_matrix

# 初始化结果矩阵
result_matrix = np.zeros((len(investor_types), len(behavior_categories)))

# 计算每种投资者的情绪和行为之间的关系
for i, investor in enumerate(investor_types):
    emotion_data = emotion_values[investor]
    behavior_data = behavior_values[investor]
    result_matrix[i, :] = compute_correlation(emotion_data, behavior_data)

# 转换结果矩阵为 DataFrame
result_df = pd.DataFrame(result_matrix, index=investor_types, columns=behavior_categories)

# 输出结果
output_file_path = '投资者情绪与行为关系矩阵.xlsx'
result_df.to_excel(output_file_path)
print(result_df)


In [None]:
import numpy as np
abc=np.array([True,1,2])+np.array([3,4,False])
print(abc)

In [None]:
#草稿页
for name in names:
    print(name)
# Fit ARMA model
    p, q = 1,1
    arma_model = ARIMA(emotion_values[name], order=(p, 0, q)).fit()
    residuals = arma_model.resid

    # Fit GARCH model
    garch_model = arch_model(residuals, vol='GARCH', p=1, q=1).fit()
    conditional_volatility = garch_model.conditional_volatility
    standardized_residuals = residuals / conditional_volatility

    # Check for no autocorrelation and no conditional heteroscedasticity
    lb_test = acorr_ljungbox(standardized_residuals, lags=10, return_df=True)
    print("Ljung-Box test results for autocorrelation:\n", lb_test)

    arch_test = het_arch(standardized_residuals, nlags=10)
    print("ARCH test results for conditional heteroscedasticity:\n", arch_test)
    arch_test = het_arch(emotion_values[name])
    print('ARCH LM Test:')
    print('Statistic:', arch_test[0])
    print('P-value:', arch_test[1])
    print('F-statistic:', arch_test[2])
    print('F-test p-value:', arch_test[3])

    conditional_variance = conditional_volatility**2

    #Plot ACF and PACF of conditional variances
    plt.figure(figsize=(12, 6))

    plt.subplot(121)
    plot_acf(conditional_variance, lags=40, ax=plt.gca())
    plt.title('ACF of Conditional Variances')

    plt.subplot(122)
    plot_pacf(conditional_variance, lags=40, ax=plt.gca())
    plt.title('PACF of Conditional Variances')

    plt.tight_layout()
    plt.show()
    print(standardized_residuals.max())
    print(standardized_residuals.min())




In [None]:
#看一下acf和pacf来看怎么修改模型
#这是原始数据的自相关性图和偏自相关性图
import matplotlib.pyplot as plt
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

for name in names:
    print(name)
    # Plot ACF and PACF for raw emotion values
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))

    # ACF plot
    plot_acf(emotion_values[name], ax=axes[0])
    axes[0].set_title('ACF')

    # PACF plot
    plot_pacf(emotion_values[name], ax=axes[1])
    axes[1].set_title('PACF')
    plt.show()


In [None]:
#这是ADF平稳性测试 结果P值<0.05 甚至是0 说明非常平稳 不需要做差分计算
from statsmodels.tsa.stattools import adfuller
for name in names:
    # Perform ADF test to check stationarity
    result = adfuller(emotion_values[name])
    print('ADF Statistic:', result[0])
    print('p-value:', result[1])


In [None]:
for name in names:
    print(name)
    arch_test = het_arch(emotion_values[name])

    # Extracting the results
    lm_statistic = arch_test[0]
    lm_pvalue = arch_test[1]
    f_statistic = arch_test[2]
    f_pvalue = arch_test[3]

    print('LM Statistic:', lm_statistic)
    print('LM p-value:', lm_pvalue)
    print('F-Statistic:', f_statistic)
    print('F p-value:', f_pvalue)

根据上述acf和pacf 以及ADF的p值得知 我们的数据具有稳定且非自相关性 ARMA模型不会有太大的效果

In [None]:
#对原始数据的分析
# Perform seasonal decomposition
from statsmodels.tsa.seasonal import seasonal_decompose
for name in names:
    print(name)
    decomposition = seasonal_decompose(emotion_values[name], model='additive', period=12)
    # Plot the decomposition
    fig = decomposition.plot()
    fig.set_size_inches(15, 8)
    plt.show()

没有明显trend 没有明显重复规律 残差也比较随机均匀分布

In [None]:
def fit_garch_model(p, q):
    model = arch_model(residuals, vol='Garch', p=p, q=q)
    garch_result = model.fit(disp='off')
    
    # Print the summary
    print(f'GARCH({p},{q}) Model Summary:')
    print(garch_result.summary())
    
    # Plot the fitted volatility
    plt.figure(figsize=(10, 5))
    plt.plot(garch_result.conditional_volatility)
    plt.title(f'Fitted Volatility from GARCH({p},{q}) Model')
    plt.show()
    
    # Plot the residuals
    plt.figure(figsize=(10, 5))
    plt.plot(garch_result.resid)
    plt.title(f'Residuals from GARCH({p},{q}) Model')
    plt.show()
    
    # Ljung-Box test on the residuals
    lb_test = acorr_ljungbox(garch_result.resid, lags=[10], return_df=True)
    print(f'Ljung-Box Test for GARCH({p},{q}) Residuals:\n', lb_test)
    
    # ARCH LM test on the residuals
    arch_test = garch_result.arch_lm_test(lags=10)
    print(f'ARCH LM Test for GARCH({p},{q}) Residuals:\n', arch_test)
    
    arch_lm_results = het_arch(garch_result.resid, nlags=10)
    print(arch_lm_results)

fit_garch_model(1,1)

In [None]:
#看一下arma garch模型后的残差图
from statsmodels.stats.diagnostic import acorr_ljungbox, het_arch

# Plot the residuals to see if they resemble white noise
plt.figure(figsize=(12, 6))
plt.plot(garch_result.resid, label='Residuals')
plt.title('Residuals from ARMA-GARCH Model')
plt.xlabel('Time')
plt.ylabel('Residuals')
plt.legend()
plt.show()

# Perform Ljung-Box test on the residuals
ljung_box_results = acorr_ljungbox(garch_result.resid, lags=[10], return_df=True)

# Check for heteroskedasticity using ARCH LM test on the residuals
arch_lm_results = het_arch(garch_result.resid, nlags=10)

ljung_box_results, arch_lm_results


残差自相关和数据波动性的自相关都减少了 但是始终存在聚集性的波动性 接下来尝试不同参数 (1,2) (2,1) (2,2)

In [None]:
fit_garch_model(1,2)
fit_garch_model(2,1)
fit_garch_model(2,2)

结果相似 尝试egarch和tgarch

In [None]:
# Fit EGARCH(1,1) model
egarch_model = arch_model(residuals, vol='EGARCH', p=1, q=1)
egarch_fit = egarch_model.fit()

# Print the results
print(egarch_fit.summary())

# Plot the fitted volatility
plt.figure(figsize=(10, 6))
plt.plot(egarch_fit.conditional_volatility, label='Fitted Volatility')
plt.title('Fitted Volatility from EGARCH(1,1) Model')
plt.legend()
plt.show()

# Perform Ljung-Box test on the residuals
from statsmodels.stats.diagnostic import acorr_ljungbox
ljung_box_egarch = acorr_ljungbox(egarch_fit.resid, lags=[10], return_df=True)
print("Ljung-Box Test for EGARCH Residuals:")
print(ljung_box_egarch)

# Perform ARCH LM test on the residuals
from statsmodels.stats.diagnostic import het_arch

arch_lm_egarch = het_arch(egarch_fit.resid)
print("ARCH LM Test for EGARCH Residuals:")
print(f"Statistic: {arch_lm_egarch[0]}")
print(f"P-value: {arch_lm_egarch[1]}")
print(f"F-statistic: {arch_lm_egarch[2]}")
print(f"F-test p-value: {arch_lm_egarch[3]}")


In [None]:
# Fit TGARCH(1,1) model
tgarch_model = arch_model(residuals, vol='GARCH', p=1, o=1, q=1)
tgarch_fit = tgarch_model.fit()

# Print the results
print(tgarch_fit.summary())

# Plot the fitted volatility
plt.figure(figsize=(10, 6))
plt.plot(tgarch_fit.conditional_volatility, label='Fitted Volatility')
plt.title('Fitted Volatility from TGARCH(1,1) Model')
plt.legend()
plt.show()

# Perform Ljung-Box test on the residuals
ljung_box_tgarch = acorr_ljungbox(tgarch_fit.resid, lags=[10], return_df=True)
print("Ljung-Box Test for TGARCH Residuals:")
print(ljung_box_tgarch)

# Perform ARCH LM test on the residuals
arch_lm_tgarch = het_arch(tgarch_fit.resid)
print("ARCH LM Test for TGARCH Residuals:")
print(f"Statistic: {arch_lm_tgarch[0]}")
print(f"P-value: {arch_lm_tgarch[1]}")
print(f"F-statistic: {arch_lm_tgarch[2]}")
print(f"F-test p-value: {arch_lm_tgarch[3]}")


尝试了garch不同参数以及egarch和tgarch都没有什么明显的变化 接下来尝试从arma入手 找到最好的arma参数后再代入garch

In [None]:
import itertools

# Define the p and q parameters to take any value between 0 and 3
p = q = range(0, 4)
# Generate all different combinations of p, q, and q triplets
pdq = list(itertools.product(p, q))

# Find the best ARMA model parameters based on AIC
best_aic = float('inf')
best_order = None
for param in pdq:
    try:
        temp_model = ARIMA(emotion_values, order=(param[0], 0, param[1])).fit()
        temp_aic = temp_model.aic
        if temp_aic < best_aic:
            best_aic = temp_aic
            best_order = param
    except:
        continue

print('Best ARMA order:', best_order)


In [None]:
# Fit the best ARMA model
best_arma_model = ARIMA(emotion_values, order=(best_order[0], 0, best_order[1])).fit()
print(best_arma_model.summary())

# Extract residuals and check if they resemble white noise
arma_resid = best_arma_model.resid

plt.figure(figsize=(10, 6))
plt.subplot(211)
plot_acf(arma_resid, lags=40, ax=plt.gca())
plt.subplot(212)
plot_pacf(arma_resid, lags=40, ax=plt.gca())
plt.show()


In [None]:
model = arch_model(arma_resid, vol='Garch', p=1, q=1)
garch_result = model.fit(disp='off')

# Print the summary
print(f'GARCH(1,1) Model Summary:')
print(garch_result.summary())

# Plot the fitted volatility
plt.figure(figsize=(10, 5))
plt.plot(garch_result.conditional_volatility)
plt.title(f'Fitted Volatility from GARCH(1,1) Model')
plt.show()

# Plot the residuals
plt.figure(figsize=(10, 5))
plt.plot(garch_result.resid)
plt.title(f'Residuals from GARCH(1,1) Model')
plt.show()

# Ljung-Box test on the residuals
lb_test = acorr_ljungbox(garch_result.resid, lags=[10], return_df=True)
print(f'Ljung-Box Test for GARCH(1,1) Residuals:\n', lb_test)

# ARCH LM test on the residuals
arch_test = garch_result.arch_lm_test(lags=10)
print(f'ARCH LM Test for GARCH(1,1) Residuals:\n', arch_test)

arch_lm_results = het_arch(garch_result.resid, nlags=10)
print(arch_lm_results)

找到更好的arma参数并且将结果代入到（1,1）的garch模型中 得到了略微更好的结果 但是离理想结果还是很远

In [None]:
import numpy as np
from copulas.multivariate import GaussianMultivariate
from sklearn.preprocessing import StandardScaler
from scipy.stats import norm, rankdata

data = pd.read_excel('新行为.xlsx')
behavior_data = data["行为系数"]

# Combine the datasets
data = np.column_stack((standardized_residuals.values, behavior_data.values))

# Step 2: Standardize the data
scaler = StandardScaler()
data_standardized = scaler.fit_transform(data)

# Step 3: Compute the empirical CDF
ranks = np.apply_along_axis(rankdata, 0, data_standardized)
uniform_data = ranks / (len(data_standardized) + 1)

# Step 4: Apply the inverse CDF of the standard normal distribution
gaussian_data = norm.ppf(uniform_data)

# Step 5: Fit a Gaussian copula
copula = GaussianCopula()
copula.fit(gaussian_data)

# Step 6: Extract the correlation matrix
correlation_matrix = copula.corr

print("Correlation matrix estimated from Gaussian copula:")
print(correlation_matrix)


In [None]:
data = pd.read_excel('calculated_distances.xlsx')
# Group the data by '投资者分类'
grouped = data.groupby('投资者分类')

# Create an empty column for the new behavior values
data['行为'] = None

# Calculate the new behavior value for each investor category
for name, group in grouped:
    mean_volume = group['成交量'].mean()
    std_volume = group['成交量'].std()
    data.loc[group.index, '行为'] = (group['成交量'] - mean_volume) / std_volume

data.to_excel('新行为.xlsx')

In [None]:
import pandas as pd
import numpy as np

data = pd.read_excel('合并后的行为数据.xlsx')

# 投资者分类
individual_investors = ['超有钱户', '大户', '散户']
institutional_investors = ['基金用户', '专户', '券商', '社保基金', '深股通', '私募']

# 计算总占比
def calculate_total_ratios():
    total_ratio = np.random.uniform(0.900000, 1.000000)
    individual_total = np.random.uniform(0.200000, 0.500000)
    institutional_total = total_ratio - individual_total
    return round(individual_total, 6), round(institutional_total, 6)

# 根据涨幅计算买入和卖出比例
def calculate_proportions(change_rate):
    base_ratio = 0.500000
    adjustment = max(min(change_rate * 100, 0.400000), -0.400000)
    buy_ratio = base_ratio + adjustment
    sell_ratio = base_ratio - adjustment
    
    total_ratio = buy_ratio + sell_ratio
    if total_ratio > 1.0000:
        excess = total_ratio - 1.0000
        buy_ratio -= excess / 2
        sell_ratio -= excess / 2
    elif total_ratio < 0.9000:
        deficit = 0.9000 - total_ratio
        buy_ratio += deficit / 2
        sell_ratio += deficit / 2
        
    #buy_ratio = round(max(min(buy_ratio, 0.8000), 0.2000), 6)
    #sell_ratio = round(max(min(sell_ratio, 0.8000), 0.2000), 6)
    
    return buy_ratio, sell_ratio

#计算当天每一个投资者的买入和卖出占比
def distribution(individual_total, institutional_total, buy_ratio, sell_ratio):
    # Randomly split individual_total into three parts
    individual_parts = np.random.dirichlet(np.ones(3), size=1)[0] * individual_total
    
    # Randomly split institutional_total into six parts
    institutional_parts = np.random.dirichlet(np.ones(6), size=1)[0] * institutional_total
    
    # Combine parts into a single list
    all_parts = list(individual_parts) + list(institutional_parts)
    
    # Initialize result dictionary
    distribution_dict = {}
    
    # Calculate buy and sell proportions for each part
    for idx, part in enumerate(all_parts):
        buy_key = chr(97 + idx) + '_buy'  # 'a_buy', 'b_buy', ...
        sell_key = chr(97 + idx) + '_sell'  # 'a_sell', 'b_sell', ...
        distribution_dict[buy_key] = round(part * buy_ratio, 6)
        distribution_dict[sell_key] = round(part * sell_ratio, 6)
    
    return distribution_dict

# 分配每类投资者的成交量
def allocate_volumes(data):
    for date in data['异动时间'].unique():
        individual_total, institutional_total = calculate_total_ratios()
        buy_ratio, sell_ratio = calculate_proportions(data.loc[data['异动时间'] == date, 'change_rate'].values[0])
        dist = distribution(individual_total, institutional_total, buy_ratio, sell_ratio)

        for i, investor_type in enumerate(individual_investors):
            data.loc[(data['异动时间'] == date) & (data['投资者分类'] == investor_type), '主动买入占比'] = dist[chr(97 + i) + '_buy']
            data.loc[(data['异动时间'] == date) & (data['投资者分类'] == investor_type), '主动卖出占比'] = dist[chr(97 + i) + '_sell']
        
        # Assign values to institutional investors
        for i, investor_type in enumerate(institutional_investors):
            data.loc[(data['异动时间'] == date) & (data['投资者分类'] == investor_type), '主动买入占比'] = dist[chr(100 + i) + '_buy']
            data.loc[(data['异动时间'] == date) & (data['投资者分类'] == investor_type), '主动卖出占比'] = dist[chr(100 + i) + '_sell']
    
    # 分配成交量
    for i, row in data.iterrows():
        date = row['异动时间']
        volume = row['volume']
        data.at[i, '成交量'] = round(volume * (row['主动买入占比'] + row['主动卖出占比']))
    
    return data

# 分配成交量和买卖占比
new_behavior_data = allocate_volumes(data)

# 输出新数据
new_behavior_data.to_excel('新行为数据.xlsx', index=False)


In [None]:
import pandas as pd

# 读取行为数据
behavior_data = pd.read_csv('行为.csv')
# 读取市场数据
market_data = pd.read_excel('行情.xlsx')

# 确保异动时间列的日期格式一致
behavior_data['异动时间'] = pd.to_datetime(behavior_data['异动时间'])
market_data['date'] = pd.to_datetime(market_data['date'])

# 保留市场数据中的交易日
filtered_behavior_data = behavior_data[behavior_data['异动时间'].isin(market_data['date'])]

# 合并市场数据
merged_data = pd.merge(filtered_behavior_data, market_data, left_on='异动时间', right_on='date')

# 移除合并后不需要的列（如 'date' 列）
merged_data = merged_data.drop(columns=['date'])

# 输出合并后的数据
merged_data.to_excel('合并后的行为数据.xlsx', index=False)

merged_data.head()