<a href="https://colab.research.google.com/github/charliezou/stock/blob/master/sto05.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!git clone https://github.com/charliezou/stock.git
%cd stock

Cloning into 'stock'...
remote: Enumerating objects: 41, done.[K
remote: Counting objects: 100% (41/41), done.[K
remote: Compressing objects: 100% (35/35), done.[K
remote: Total 41 (delta 15), reused 9 (delta 3), pack-reused 0 (from 0)[K
Receiving objects: 100% (41/41), 1.21 MiB | 4.35 MiB/s, done.
Resolving deltas: 100% (15/15), done.
/content/stock


In [136]:
!git config --global user.email "962278683@qq.com"
!git config --global user.name "charliezou"
#!git remote set-url origin git@github.com:charliezou/stock.git



In [137]:
!git push origin master

remote: Support for password authentication was removed on August 13, 2021.
remote: Please see https://docs.github.com/get-started/getting-started-with-git/about-remote-repositories#cloning-with-https-urls for information on currently recommended modes of authentication.
fatal: Authentication failed for 'https://github.com/charliezou/stock.git/'


In [138]:
!git add .
!git commit -m "add form colab"
!git push origin master

On branch master
Your branch is ahead of 'origin/master' by 1 commit.
  (use "git push" to publish your local commits)

nothing to commit, working tree clean
remote: Support for password authentication was removed on August 13, 2021.
remote: Please see https://docs.github.com/get-started/getting-started-with-git/about-remote-repositories#cloning-with-https-urls for information on currently recommended modes of authentication.
fatal: Authentication failed for 'https://github.com/charliezou/stock.git/'


In [99]:
import warnings
warnings.filterwarnings('ignore')

import yfinance as yf
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, r2_score
from keras.models import Model
from keras.layers import Input, LSTM, Dense, Attention, concatenate
from keras.callbacks import EarlyStopping

ZHIBIAO = ['Norm_Close','MACD_Hist_Z', 'Norm_Volume', 'KC', 'KC_Vol', 'RSI','Momentum_3','Momentum_7']

TARGET_HORIZON = 7

tickers = ['AAPL', 'AMZN', 'BRK-B', 'GOOGL', 'META', 'MSFT', 'NVDA', 'TSLA']

ticker = 'TSLA'

def download_data():
    """获取数据"""
    data = yf.download(ticker, start='2010-01-01', end='2023-12-31', interval='1D', auto_adjust=True)
    data = data.drop_duplicates()
    data = pd.DataFrame({
        'Close' : data['Close'][ticker],
        'High' : data['High'][ticker],
        'Low' : data['Low'][ticker],
        'Open' : data['Open'][ticker],
        'Volume' : data['Volume'][ticker],
    })
    return resample_weekly(data)


def resample_weekly(data):
    """将日线数据重采样为周线数据"""
    return data.resample('W-FRI').agg({
        'Open': 'first',
        'High': 'max',
        'Low': 'min',
        'Close': 'last',
        'Volume': 'sum'
    }).dropna()

def fetch_data():
    """获取纳斯达克指数数据"""
    data = pd.read_csv('dpixic.csv', index_col=0, parse_dates=True)
    data.index = pd.to_datetime(data.index)

    return resample_weekly(data)

def calculate_kc(df):
    """计算Keltner通道"""
    ema = df['Close'].ewm(span=20, adjust=False).mean()
    atr = (df['High'] - df['Low']).rolling(20).mean()
    df['KC_Middle'] = ema
    df['KC_Upper'] = ema + 1.5 * atr
    df['KC_Lower'] = ema - 1.5 * atr
    df['KC'] = (df['Close'] - df['KC_Middle'])/(df['KC_Upper'] - df['KC_Lower'])*2*100
    df['KC_Vol'] = (df['KC_Upper'] - df['KC_Lower'])/df['KC_Middle']*100
    return df

# MACD指标计算
def calculate_macd(data):
    data['EMA12'] = data['Close'].ewm(span=12, adjust=False).mean()
    data['EMA26'] = data['Close'].ewm(span=26, adjust=False).mean()
    data['DIF'] = data['EMA12'] - data['EMA26']
    data['DEA'] = data['DIF'].ewm(span=9, adjust=False).mean()
    data['MACD_Hist'] = data['DIF'] - data['DEA']
    data['MACD_Hist_Z'] = data['MACD_Hist']/data['Close'].shift(1)*100
    return data

def calculate_rsi(df):
    """计算RSI指标"""
    delta = df['Close'].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)

    avg_gain = gain.rolling(14).mean()
    avg_loss = loss.rolling(14).mean()

    rs = avg_gain / avg_loss
    df['RSI'] = 100 - (100 / (1 + rs))
    return df


# ========== base-动量计算 ==========
def calculate_momentum(df):
    df['Momentum_1'] = df['Close'].pct_change(1)
    df['Momentum_3'] = df['Close'].pct_change(3)
    df['Momentum_7'] = df['Close'].pct_change(7)

    return df

def calculate_norm_volume(df):
    """计算标准化volume"""
    med = df['Volume'].rolling(window=26,min_periods=1).median()
    df['Norm_Volume'] = df['Volume']/ med - 1
    return df

def calculate_norm_close(df):
    """计算标准化Close"""
    med = df['Close'].rolling(window=26,min_periods=1).median()
    df['Norm_Close'] = df['Close']/ med - 1
    return df


def calculate_indicators(df):
    """计算技术指标"""
    df = calculate_kc(df)
    df = calculate_macd(df)
    df = calculate_rsi(df)
    df = calculate_momentum(df)

    # 标准化成交量
    df = calculate_norm_volume(df)
    df = calculate_norm_close(df)

    return df.dropna()

def calculate_target_v1(df):
    """计算目标值"""
    df['Targer'] = 1/(1+df['Close'].pct_change(-TARGET_HORIZON))-1
    return df

def calculate_target(df):
    """计算目标值"""
    max = df['Close'].rolling(window=TARGET_HORIZON,min_periods=1).max().shift(-TARGET_HORIZON)
    min = df['Close'].rolling(window=TARGET_HORIZON,min_periods=1).min().shift(-TARGET_HORIZON)

    df['Targer'] = (max + min) / df['Close'] / 2 -1
    return df

def create_dataset(data, lookback=26, horizon=TARGET_HORIZON):
    """创建时间序列数据集"""
    X, y, timestamps = [], [], []
    for i in range(len(data)-lookback-horizon+1):
        # 输入特征：过去lookback周的技术指标
        features = data[ZHIBIAO]
        seq = features.iloc[i:i+lookback].values

        # 输出标签：未来第3周的收益率
        target = data['Targer'].iloc[i+lookback-1]

        X.append(seq)
        y.append(target)
        timestamps.append(data.index[i+lookback-1])  # 记录每个样本的结束时间
    return np.array(X), np.array(y), np.array(timestamps)

def build_hybrid_model(input_shape):
    """构建混合神经网络模型"""
    # 时间序列特征分支
    inputs = Input(shape=input_shape)
    lstm_out = LSTM(64, return_sequences=True)(inputs)
    att_out = Attention()([lstm_out, lstm_out])
    ts_feature = Dense(64, activation='relu')(att_out[:, -1, :])

    # 技术指标统计特征
    dense_feature = Dense(16, activation='relu')(inputs[:, -1, :])

    # 特征融合
    merged = concatenate([ts_feature, dense_feature])
    output = Dense(1, activation='tanh')(Dense(32, activation='relu')(merged))

    model = Model(inputs=inputs, outputs=output)
    model.compile(optimizer='adam', loss='mse', metrics=['mae'])
    return model

def evaluate_model(model, X_test, y_test):
    """模型评估与量化验证"""
    predictions = model.predict(X_test).flatten()

    # 回归指标
    print(f"MAE: {mean_absolute_error(y_test, predictions):.4f}")
    print(f"R²: {r2_score(y_test, predictions):.4f}")

    # 方向准确率
    direction_acc = np.mean((np.sign(y_test) == np.sign(predictions)).astype(int))
    print(f"方向准确率: {direction_acc:.2%}")

    # 收益曲线模拟
    strategy_returns = np.sign(predictions) * y_test
    print(f"累计收益率: {np.prod(1 + strategy_returns) - 1:.2%}")

def run_model(model, X):
    """模型评估与量化验证"""
    predictions = model.predict(X).flatten()
    return predictions



In [104]:

# 数据准备
#data = fetch_data()  #dapan
data = download_data()
data = calculate_indicators(data)
data = calculate_target(data)

# 创建数据集
X, y, timestamps = create_dataset(data)

# 数据标准化
scaler = StandardScaler()
X = scaler.fit_transform(X.reshape(-1, X.shape[-1])).reshape(X.shape)

# 划分训练测试集
split = int(0.8 * len(X))
X_train, y_train, timestamps_train = X[:split], y[:split], timestamps[:split]
X_test, y_test, timestamps_test = X[split:], y[split:], timestamps[split:]


# 构建模型
size = len(ZHIBIAO)
model = build_hybrid_model(input_shape=(26, len(ZHIBIAO)))
early_stop = EarlyStopping(monitor='val_loss', patience=5,restore_best_weights=True)

# 模型训练
history = model.fit(X_train, y_train,
                    epochs=100,
                    batch_size=32,
                    validation_split=0.2,
                    callbacks=[early_stop],
                    verbose=1)

# 模型评估
evaluate_model(model, X_test, y_test)

# 未来预测
latest_data = scaler.transform(data[ZHIBIAO].iloc[-26:].values)
future_pred = model.predict(latest_data.reshape(1, 26, len(ZHIBIAO)))
print(f"\n未来第3周预测涨跌幅: {future_pred[0][0]:.2%}")



[*********************100%***********************]  1 of 1 completed


Epoch 1/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 52ms/step - loss: 0.0224 - mae: 0.1078 - val_loss: 0.0677 - val_mae: 0.2112
Epoch 2/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 40ms/step - loss: 0.0145 - mae: 0.0863 - val_loss: 0.0799 - val_mae: 0.2322
Epoch 3/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 37ms/step - loss: 0.0107 - mae: 0.0766 - val_loss: 0.0738 - val_mae: 0.2185
Epoch 4/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 43ms/step - loss: 0.0100 - mae: 0.0731 - val_loss: 0.0791 - val_mae: 0.2276
Epoch 5/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 41ms/step - loss: 0.0092 - mae: 0.0672 - val_loss: 0.0854 - val_mae: 0.2375
Epoch 6/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step - loss: 0.0109 - mae: 0.0735 - val_loss: 0.0888 - val_mae: 0.2439
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 57ms/step
MAE: 0.1319
R²: 

In [105]:
predictions_train = run_model(model, X_train)
predictions_test = run_model(model, X_test)

# 初始化全零信号矩阵
data.loc[timestamps_train, 'Pred_train'] = predictions_train
data.loc[timestamps_test, 'Pred_test'] = predictions_test

data[['Close']+ZHIBIAO+['Targer']+['Pred_train', 'Pred_test']].to_csv(f'formodel_{ticker}.csv')


[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step


In [106]:
model.save_weights(f'model_{ticker}.weights.h5')

In [107]:
model.load_weights(f'model_{ticker}.weights.h5')
# 模型评估
evaluate_model(model, X_test, y_test)

[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step 
MAE: 0.1319
R²: -0.1080
方向准确率: 62.60%
累计收益率: 2659.27%
