架构顺序
- 安装依赖库
- 数据获取
- 因子计算
- 选股策略
- 回测系统
- 模拟实盘交易系统
- 交易管理和资金管理

数据获取
- 我们利用 AKShare 来获取 A 股的基本面和市场数据。获取股票的历史行情数据和财务因子是基础步骤。

In [None]:
import akshare as ak
import pandas as pd

# 获取所有A股股票代码及名称
stock_df = ak.stock_info_a_code_name()

# 获取单只股票的历史数据
def get_stock_history(stock_code, start_date, end_date):
    stock_data = ak.stock_zh_a_hist(symbol=stock_code, period="daily", start_date=start_date, end_date=end_date, adjust="qfq")
    stock_data['date'] = pd.to_datetime(stock_data['日期'])
    stock_data.set_index('date', inplace=True)
    return stock_data[['开盘', '收盘', '最高', '最低', '成交量']]

# 获取股票的财务因子数据
def get_financial_data(stock_code):
    return ak.stock_financial_analysis_indicator(symbol=stock_code)

# 示例：获取贵州茅台的历史行情和财务数据
stock_code = '600519'
start_date = '2023-01-01'
end_date = '2023-12-31'
stock_data = get_stock_history(stock_code, start_date, end_date)
financial_data = get_financial_data(stock_code)

因子计算
- 通过提取关键的财务和市场因子（如市盈率 PE、市净率 PB、股息率等），形成多因子模型。

In [None]:
# 计算多因子
def calculate_factors(stock_code, start_date, end_date):
    stock_data = get_stock_history(stock_code, start_date, end_date)
    financial_data = get_financial_data(stock_code)
    
    # 提取 PE、PB、股息率等因子
    pe = financial_data[financial_data['指标名称'] == '市盈率'][['报告日期', '最新值']]
    pb = financial_data[financial_data['指标名称'] == '市净率'][['报告日期', '最新值']]
    dividend_yield = financial_data[financial_data['指标名称'] == '股息率(%)'][['报告日期', '最新值']]
    
    # 合并因子数据
    factors = pd.merge(pe, pb, on='报告日期', suffixes=('_pe', '_pb'))
    factors = pd.merge(factors, dividend_yield, on='报告日期')
    factors.columns = ['report_date', 'PE', 'PB', 'DividendYield']
    factors['report_date'] = pd.to_datetime(factors['report_date'])
    
    return factors

factors = calculate_factors(stock_code, start_date, end_date)

选股策略
- 根据计算的因子来选股，例如选取低 PE、低 PB 和高股息率的股票。

In [None]:
def stock_selection(factors):
    # 筛选低PE、低PB、高股息的股票
    selected_stocks = factors[(factors['PE'] < factors['PE'].quantile(0.3)) &
                              (factors['PB'] < factors['PB'].quantile(0.3)) &
                              (factors['DividendYield'] > factors['DividendYield'].quantile(0.7))]
    return selected_stocks

# 根据因子筛选股票
selected_stocks = stock_selection(factors)

回测系统
- 使用 backtrader 进行历史数据的回测。回测将验证策略在历史数据上的表现。

In [None]:
import backtrader as bt

# 自定义回测策略
class MultiFactorStrategy(bt.Strategy):
    def next(self):
        # 示例：简单买卖逻辑
        if not self.position:  # 没有持仓时
            if some_buy_condition:  # 可结合因子策略
                self.buy()
        else:
            if some_sell_condition:
                self.sell()

# 设置回测
cerebro = bt.Cerebro()

# 添加策略
cerebro.addstrategy(MultiFactorStrategy)

# 添加数据
for stock in selected_stocks:
    data = bt.feeds.PandasData(dataname=get_stock_history(stock, start_date, end_date))
    cerebro.adddata(data)

# 设置初始资金
cerebro.broker.setcash(1000000.0)

# 设置佣金
cerebro.broker.setcommission(commission=0.001)

# 运行回测
cerebro.run()

# 回测结果可视化
cerebro.plot()


模拟实盘交易系统
- 模拟实盘交易系统通过定时任务获取实时数据，并根据策略实时买卖股票。

In [None]:
import time
import schedule

# 获取实时行情数据
def get_realtime_data(stock_code):
    realtime_data = ak.stock_zh_a_spot_em()
    return realtime_data[realtime_data['代码'] == stock_code]

# 实盘交易策略
class RealTimeMultiFactorStrategy(bt.Strategy):
    def next(self):
        stock_code = '600519'
        current_price = get_realtime_data(stock_code)
        
        if not self.position:
            if self.buy_condition(current_price):
                self.buy()
        else:
            if self.sell_condition(current_price):
                self.sell()

    def buy_condition(self, price):
        # 示例买入条件
        return price['最新价'].values[0] < price['昨收'].values[0] * 0.95

    def sell_condition(self, price):
        return price['最新价'].values[0] > price['昨收'].values[0] * 1.05

# 每天模拟交易
schedule.every().day.at("09:30").do(run_backtest)

while True:
    schedule.run_pending()
    time.sleep(60)


资金管理和风险控制
- 加入止损、止盈、仓位管理等机制，确保策略能够更接近实际交易中的风控需求。

In [None]:
class RiskManagementStrategy(bt.Strategy):
    params = (('stop_loss', 0.05),  # 5% 止损
              ('take_profit', 0.10))  # 10% 止盈

    def next(self):
        stock_code = '600519'
        current_price = get_realtime_data(stock_code)
        
        if not self.position:
            if self.buy_condition(current_price):
                self.buy(size=self.broker.getvalue() * 0.05 / current_price['最新价'].values[0])
        else:
            current_profit = (self.data_close[0] - self.position.price) / self.position.price
            if current_profit <= -self.params.stop_loss:
                self.sell()
            elif current_profit >= self.params.take_profit:
                self.sell()