In [None]:
import numpy as np
import matplotlib.pyplot as plt

# 设置随机种子（可重复结果）
np.random.seed(42)

# 参数设置
days = 500
initial_price = 100
volatility = 0.02  # 日波动率
trend = 0.0005     # 小幅上涨趋势


# 生成对数收益率（正态分布）
log_returns = np.random.normal(trend, volatility, days)

# 累积对数收益率 → 得到价格序列
price_series = initial_price * np.exp(np.cumsum(log_returns))


# 添加一些“真实感”:偶尔大波动（模拟市场冲击）
shock_days = np.random.choice(days, size=10, replace=False)
log_returns[shock_days] += np.random.normal(0, 0.1, 10)  # 大幅波动
price_series = initial_price * np.exp(np.cumsum(log_returns))

print(f"生成了 {len(price_series)} 天的股票价格数据")

生成了 500 天的股票价格数据


In [None]:
plt.figure(figsize=(12, 6))
plt.plot(price_series, label="股票价格", color='blue')
plt.title("模拟股票价格序列")
plt.xlabel("天数")
plt.ylabel("价格")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
window_size = 20
kernel = np.ones(window_size) / window_size  # 平均核

# 使用 convolve 计算移动平均 MA
ma = np.convolve(price_series, kernel, mode='valid')

# 注意:convolve 返回长度为 len(data) - window_size + 1
# 所以我们要补上前面的缺失值
ma_padded = np.concatenate([np.full(window_size - 1, np.nan), ma])

print(f"移动平均后长度: {len(ma_padded)}")

移动平均后长度: 500


In [None]:
plt.figure(figsize=(12, 6))
plt.plot(price_series, label="原始价格", alpha=0.7)
plt.plot(ma_padded, label=f"{window_size}日移动平均", color='red', linewidth=2)
plt.title(f"使用 numpy.convolve 的 {window_size} 日移动平均")
plt.xlabel("天数")
plt.ylabel("价格")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import yfinance as yf  # 用于获取免费股票数据

# 获取股票数据
def get_stock_data(symbol, start_date, end_date):
    """获取股票历史数据"""
    stock = yf.download(symbol, start=start_date, end=end_date)
    return stock

# 示例:获取苹果股票数据
# data = get_stock_data(symbol='AAPL', start_date='2020-01-01', end_date='2023-12-31')
# print(data.head())

# 由于网络问题，我们先用模拟数据演示
dates = pd.date_range('2020-01-01', periods=1000, freq='D')
np.random.seed(42)
prices = 100 + np.cumsum(np.random.randn(1000) * 0.5)  # 随机游走生成价格
data = pd.DataFrame({'Close': prices}, index=dates)

In [4]:
data

Unnamed: 0,Close
2020-01-01,100.248357
2020-01-02,100.179225
2020-01-03,100.503069
2020-01-04,101.264584
2020-01-05,101.147507
...,...
2022-09-22,108.446061
2022-09-23,109.344905
2022-09-24,109.665326
2022-09-25,109.379737


In [None]:
def calculate_sma(prices, window):
    """计算简单移动平均线"""
    return prices.rolling(window=window).mean()
calculate_sma(data, 5)

def calculate_ema(prices, window):
    """计算指数移动平均线

    从第一天开始就有值:第一天EMA = 第一天价格
    使用递推公式:基于前一天的EMA和当天价格计算
    近期数据权重更高:越近的数据对结果影响越大

    """
    return prices.ewm(span=window, adjust=False).mean()

calculate_ema(data, 5)

In [None]:
def calculate_rsi(prices, window=14):
    """计算相对强弱指数(RSI)"""
    delta = prices.diff() # diff = 昨天 - 今天
    gain = (delta.where(delta > 0, 0)).rolling(window=window).mean() # 如果diff值小于零，即今天比昨天低，就用0替换负数
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean() # 相反，用0替换正数
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi
calculate_rsi(data, 14)


In [12]:
def calculate_bollinger_bands(prices, window=20, num_std=2):
    """计算布林带"""
    sma = calculate_sma(prices, window)
    std = prices.rolling(window=window).std()
    upper_band = sma + (std * num_std)
    lower_band = sma - (std * num_std)
    return upper_band, sma, lower_band

In [13]:
data['SMA_20'] = calculate_sma(data['Close'], 20)
data['SMA_50'] = calculate_sma(data['Close'], 50)
data['EMA_12'] = calculate_ema(data['Close'], 12)
data['RSI'] = calculate_rsi(data['Close'])
upper, middle, lower = calculate_bollinger_bands(data['Close'])

In [None]:
data['BB_Upper'] = upper
data['BB_Middle'] = middle
data['BB_Lower'] = lower

In [15]:
print("技术指标计算完成！")
print(data[['Close', 'SMA_20', 'SMA_50', 'RSI']].tail())

技术指标计算完成！
                 Close      SMA_20      SMA_50        RSI
2022-09-22  108.446061  109.917918  110.819676  28.228709
2022-09-23  109.344905  109.811078  110.810834  43.708918
2022-09-24  109.665326  109.780855  110.815320  47.278160
2022-09-25  109.379737  109.775950  110.814549  44.894758
2022-09-26  109.666028  109.766350  110.817071  52.723634


In [None]:
def generate_ma_crossover_signals(data, short_window=20, long_window=50):
    """生成移动平均线交叉信号"""
    # 计算移动平均线
    short_ma = calculate_sma(data['Close'], short_window)
    long_ma = calculate_sma(data['Close'], long_window)

    # 生成信号
    signals = pd.DataFrame(index=data.index)
    signals['signal'] = 0
    signals['short_ma'] = short_ma
    signals['long_ma'] = long_ma

    # 当短期均线上穿长期均线时买入(1)，下穿时卖出(0)
    signals['signal'][short_window:] = np.where(
        short_ma[short_window:] > long_ma[short_window:], 1, 0
    )

    # 计算实际的交易信号（买入/卖出点）
    signals['positions'] = signals['signal'].diff()

    return signals


def generate_rsi_signals(data, overbought=70, oversold=30):
    """生成RSI超买超卖信号"""
    signals = pd.DataFrame(index=data.index)
    signals['rsi'] = data['RSI']
    signals['signal'] = 0

    # RSI低于超卖线买入，高于超买线卖出
    signals['signal'] = np.where(signals['rsi'] < oversold, 1,
                                np.where(signals['rsi'] > overbought, -1, 0))
    signals['positions'] = signals['signal'].diff()

    return signals

# 生成信号
ma_signals = generate_ma_crossover_signals(data)
rsi_signals = generate_rsi_signals(data)
ma_signals.to_csv("./data/ma_signals.csv", header=True, index=False)
rsi_signals.to_csv("./data/rsi_signals.csv", header=True, index=False)

In [None]:
def backtest_strategy(data, signals, initial_capital=10000.0):
    """手动回测策略"""
    # 创建持仓和资金记录
    positions = pd.DataFrame(index=data.index).fillna(0.0)
    portfolio = pd.DataFrame(index=data.index).fillna(0.0)

    # 假设每次交易100股
    positions['stock'] = 100 * signals['signal']

    # 计算持仓价值
    portfolio['positions_value'] = positions['stock'] * data['Close']

    # 计算现金变化（考虑交易成本）
    portfolio['cash'] = initial_capital - (positions['stock'].diff() * data['Close']).cumsum()

    # 总资产 = 持仓价值 + 现金
    portfolio['total'] = portfolio['positions_value'] + portfolio['cash']

    # 计算收益率
    portfolio['returns'] = portfolio['total'].pct_change() # 今天比昨天多了多少%

    # 基于 日回报率，计算累计回报率
    # (1 + portfolio['returns']) --> 转换为增长因子
    # (1 + portfolio['returns']).cumprod()  --> 累计增长因子
    # 复合增长效应: 今天的增长基于昨天的结果-
    portfolio['cumulative_returns'] = (1 + portfolio['returns']).cumprod()

    return portfolio

# 回测移动平均线策略
portfolio = backtest_strategy(data, ma_signals)
portfolio.to_csv("./data/portfolio.csv", header=True, index=False)

print("回测结果:")
print(f"初始资金: $10,000")
print(f"最终资金: ${portfolio['total'][-1]:.2f}")
print(f"总收益率: {(portfolio['total'][-1]/10000 - 1)*100:.2f}%")

回测结果:
初始资金: $10,000
最终资金: $10547.59
总收益率: 5.48%


  print(f"最终资金: ${portfolio['total'][-1]:.2f}")
  print(f"总收益率: {(portfolio['total'][-1]/10000 - 1)*100:.2f}%")


In [28]:
a = pd.Series([1,2,3,0.5,0.1])
a.cumprod()

0    1.0
1    2.0
2    6.0
3    3.0
4    0.3
dtype: float64

In [None]:
# 生成正态分布地随机数
daily_mean = 0.0005    # 每日平均收益率
daily_std = 0.02       # 每日波动率（标准差）
trading_days = 252     # 一年的交易日数

# 年化收益率 = 每日收益率 × 交易日数
annual_return = daily_mean * trading_days
print(f"\n年化收益率计算:")
print(f"年化收益率 = {daily_mean} × {trading_days} = {annual_return:.4f} = {annual_return*100:.2f}%")


# 年化波动率 = 每日波动率 × √交易日数
annual_volatility = daily_std * np.sqrt(trading_days)
print(f"\n年化波动率计算:")
print(f"年化波动率 = {daily_std} × √{trading_days} = {annual_volatility:.4f} = {annual_volatility*100:.2f}%")

# 验证
daily_returns = np.random.normal(daily_mean, daily_std, 252)
actual_daily_mean = daily_returns.mean()
actual_daily_std = daily_returns.std()

actual_annual_return = actual_daily_mean * trading_days
actual_annual_vol = actual_daily_std * np.sqrt(trading_days)

print(f"\n实际模拟结果验证:")
print(f"实际每日平均收益: {actual_daily_mean:.6f} (理论: {daily_mean})")
print(f"实际每日波动率: {actual_daily_std:.4f} (理论: {daily_std})")
print(f"实际年化收益: {actual_annual_return*100:.2f}%")
print(f"实际年化波动率: {actual_annual_vol*100:.2f}%")


年化收益率计算:
年化收益率 = 0.0005 × 252 = 0.1260 = 12.60%

年化波动率计算:
年化波动率 = 0.02 × √252 = 0.3175 = 31.75%

实际模拟结果验证:
实际每日平均收益: 0.002325 (理论: 0.0005)
实际每日波动率: 0.0198 (理论: 0.02)
实际年化收益: 58.58%
实际年化波动率: 31.45%
