In [2]:
import datetime
import time
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
import tushare as ts
import dateutil


In [4]:
'''
1.数据获取
'''
# 1.设置Tushare 初始化pro接口
def ts_pro():
    # 如要注册tushare官网，请点击:https://tushare.pro/register?reg-515947
    #注册账户，就有了你的token，在tushare个人主页中找
    ts.set_token('fb2c456a0621a9719d29a23a2980501acf84728d1d9d43fc598e0eb4')
    # 初始化pro接口
    pro = ts.pro_api()
    return pro

#2.获取上交所(Tushare)交易日历数据
def ts_trade_cal(exchange, start_date, end_date):
    pro =ts_pro()
    #获取各大交易所交易日历数据,默认提取的是上交所
    '''
        输入参数
        名称       类型    必选 描述
        exchange   str     N  SSE上交所,SZSE深交所,CFFEX 中金所,SHFE 上期所,CZCE 郑商所,DCE 大商所,INE 上能源
        start_date str     N  开始日期 (格式:YYYYMMDD 下同)
        end_date   str     N  结束日期
        is_open    str     N  是否交易 '0'休市'1'交易

        输出参数
        名称       类型  默认显示 描述
        exchange   str     Y  交易所 SSE上交所 SZSE深交所
        cal_date   str     Y  日历日期
        is_open    str     Y  是否交易 0休市 1交易
        pretrade_date str  Y  上一个交易日
    '''
    df = pro.trade_cal(exchange=exchange, start_date=start_date, end_date=end_date)
    # 按'cal_date'排序
    df.sort_values('cal_date', inplace=True)
    #转换为日期格式
    df['cal_date'] = pd.to_datetime(df['cal_date'])
    #设置'cal_date'为索引
    df.set_index('cal_date', inplace=True)
    #存储为csv文件
    df.to_csv('trade_cal.csv')

#3.获取股票日线行情(Tushare)
def ts_daily(security, start_date, end_date, fields=('open', 'close', 'high', 'low', 'vol')):
    pro = ts_pro()
    # A股日线行情(Tushare)
    df = pro.daily(ts_code = security, start_date = start_date, end_date = end_date)
    df.sort_values('trade_date',inplace = True)
    df['trade_date'] = pd.to_datetime(df['trade_date'])
    df.set_index('trade_date', inplace = True)
    # 截取start date:end date段数据
    df = df.loc[start_date:end_date, :]
    return df[list(fields)]

#4.下载交易日历数据
ts_trade_cal(exchange='SSE',start_date='20220101',end_date='20230310')
trade_cal = pd.read_csv('trade_cal.csv')


In [None]:
'''
2.Context类(上下文数据)
'''
# Context类(上下文数据)
class Context:
    # zone ='Asia/Beijing'#类属性
    def init (self, cash, start_date, end_date):
        # 资金
        self.cash=cash #实例属性
        #开始时间
        self.start_date = start_date
        #结束时间
        self.end_date = end_date
        # 持仓标的信息
        self.positions = {}
        # 基准
        self.benchmark = None
        #交易日期
        self.date_range = trade_cal[(trade_cal['is_open'] == 1) & (trade_cal['cal_date']>=start_date)&(trade_cal['cal_date']<=end_date)]['cal_date'].values
        #回测今天日期
        self.dt = dateutil.parser.parse(start_date)#将 start_date 字符串解析为一个 datetime 对象。
    # def print(self):
    #     print("cash:", self.cash)

# 全局变量用
class G:
    pass
# 全局变量
g = G()

# 实例化上下文数据context
# 初始资金cash=100080，开始时间2022-03-01，结束时间2023-03-10
context = Context(cash = 100000, start_date = '2022-03-01', end_date = '2023-03-10')

# 设置基准
# 如果像聚宽那样设置沪深300为基准
# 比较麻烦
# 代码量多
# 7.这里就设置单个股票为基准
def set_bench_mark(security):
    context.benchmark = security

# 获取历史数据基础函数
def attribute_daterange_history(security, start_date, end_date, fields=('open', 'close', 'high', 'low', 'vol')):
    # 2022-10-18 转换为 20221018
    time_array1 = time.strptime(start_date, '%Y-%m-%d')  # strptime()函数将字符串转换为时间元组
    start_date = time.strftime('%Y%m%d', time_array1)  # strftime()函数将时间元组转换为字符串
    time_array2 = time.strptime(end_date, '%Y-%m-%d')  # strptime()函数将字符串转换为时间元组
    end_date = time.strftime('%Y%m%d', time_array2)  # strftime()函数将时间元组转换为字符串
    df = ts_daily(security, start_date, end_date)  # 获取股票日线行情
    return df[list(fields)]  # 截取所需字段

# 获取历史数据
def attribute_history(security, count, fields=('open', 'close', 'high', 'low','vol')):
    end_date =(context.dt - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
    start_date = trade_cal[(trade_cal['is_open']== 1) & (trade_cal['cal_date'] <= end_date)][-count:].iloc[0, :]['cal_date']
    # print(start_date, end_date)
    return attribute_daterange_history(security, start_date, end_date, fields)

#今天日线行情
def get_today_data(security):
    today = context.dt.strftime('%Y-%m-%d')# 获取今天日期
    try:
        pro = ts_pro()
        time_array = time.strptime(today, '%Y-%m-%d')# strptime()函数将字符串转换为时间元组
        today = time.strftime('%Y%m%d',time_array)# strftime()函数将时间元组转换为字符串
        data = pro.daily(ts_code=security, start_date=today, end_date=today).loc[0,:]# 获取股票日线行情
    #停牌返回空数据
    except KeyError:
        data = pd.Series()
    return data

In [None]:
'''
3.交易相关订单函数
'''

# 买卖订单基础函数
def _order(today_data, security, amount):
    # 股票价格
    p = today_data['open']
    # 停牌
    if len(today_data) == 0:
        print('今日停牌')
        return
    # 现金不足
    if context.cash - amount * p < 0:
        amount = int(context.cash / p / 100) * 100
        print('现金不足，已调整为%d' % amount)
    # 100的倍数
    if amount % 100 != 0:
        if amount != -context.positions.get(security, 0):
            amount = int(amount / 100) * 100
            print('不是100的倍数,已调整为%d' % amount)
    #卖出数量超过持仓数量
    if context.positions.get(security, 0) < -amount:
        amount = -context.positions.get(security,0)
        print("卖出数量不能超过持仓数量,已调整为%d" % amount)
    #将买卖股票数量存入持仓标的信息
    context.positions[security] = context.positions.get(security, 0) + amount
    # 剩余资金
    context.cash -= amount * p
    # 如果一只股票持仓为0，则删除上下文数据持仓标的信息中该股信息
    if context.positions[security] == 0:
        del context.positions[security]

# 按股数下单
def order(security, amount):
    today_data = get_today_data(security)
    _order(today_data, security, amount)

# 目标股数下单
def order_target(security, amount):
    if amount < 0:
        print("数量不能为负,已调整为0")
        amount = 0
    today_data = get_today_data(security)
    hold_amount = context.positions.get(security, 0)
    delta_amount = amount - hold_amount
    _order(today_data, security, delta_amount)

# 按价值下单
def order_value(security, value):
    today_data = get_today_data(security)
    amount = int(value / today_data['open'])
    _order(security, amount)

# 目标价值下单
def order_target_value(security, value):
    if value < 0:
        print("价值不能为负,已调整为0")
        value = 0
    today_data = get_today_data(security)
    hold_value = context.positions.get(security, 0) * today_data['open']
    delta_value = value - hold_value
    order_value(security, delta_value)


In [None]:
'''
4.策略回测层(主函数)
'''

# 框架主体函数
def run():
    # 创建收益数据表
    plt_df = pd.DataFrame(index=pd.to_datetime(context.date_range), columns=['value'])
    # 初始资金
    init_value = context.cash
    # 初始化函数
    initialize(context)
    last_price = {}
    # 模拟每个bar运行
    for dt in context.date_range:
        # 将context对象中的日期context.dt更新为当前迭代的日期dt。这是在模拟回测过程中更新当前日期的操作
        context.dt = dateutil.parser.parse(dt)
        # 调用handle_data()函数，传递context对象作为参数，执行具体的交易逻辑处理
        handle_data(context)
        # 将当前资金的数值保存到value变量中
        value = context.cash
        # 遍历每支股票计算股票价值
        for stock in context.positions:
            today_data = get_today_data(stock)
            # 停牌
            if len(today_data) == 0:
                # 停牌前一个交易日价格
                p = last_price[stock]
            else:
                p = today_data['open']
                #存储为停牌前一个交易日价格
                last_price[stock] = p
            # 计算当前持仓股票的总价值
            value += p * context.positions[stock]
        # 将当前日期dt对应的总资产价值value，保存到收益数据表plt_df中
        plt_df.loc[dt, 'value'] = value
    #计算投资组合价值相对于初始值的收益率，并将结果存储在名为ratio 的新列中
    plt_df['ratio'] = (plt_df['value'] - init_value) / init_value

    # 获取基准指数(benchmark)在回测期间的历史数据
    bm_df = attribute_daterange_history(context.benchmark, context.start_date, context.end_date)
    # 获取基准指数在回测开始时的开盘价，并将其存储在bm_init变量中
    bm_init = bm_df['open'][0]
    # 计算基准(Benchmark)收益率
    plt_df['benchmark_ratio'] = (bm_df['open'] - bm_init) / bm_init

    # 可视化
    #设置字体 显示汉字
    plt.rcParams["font.sans-serif"]="SimHei"
    #用来正常显示负号
    plt.rcParams['axes.unicode_minus']=False
    #设置画布的尺寸为18*10
    plt.figure(figsize = (18,10))
    plt.title("python SMA量化框架")
    #绘制收益率曲线
    plt.plot(plt_df['ratio'], label = "ratio")
    # 绘制基准收益率曲线
    plt.plot(plt_df['benchmark_ratio'], label = "benchmark_ratio")
    #设置x轴标签
    plt.xlabel("日期")
    #设置y轴标签
    plt.ylabel("收益率")
    #x坐标斜率
    plt.xticks(rotation=46)
    # 添加图注
    plt.legend()
    # 显示
    plt.show()

    #初始化函数，设定基准等等
    #initialize(context)函数接受一个context参数，这个参数就是一个Context类的实例。
    #在函数内部，可以使用这个context对象来访问其属性和方法，以实现特定的功能。
    def initialize(context):
        # 设定002624作为基准
        set_bench_mark('000001.SZ')
        # 设定10日均线全局参数
        g.ma1 = 10
        # 设定60日均线全局参数
        g.ma2 = 60
        # 设定要操作的股票002572
        g.security = '002572.SZ'

    #该函数每个bar(单位时间)会调用一次
    def handle_data(context):
        # 获取历史数据
        hist = attribute_history(g.security, g.ma2)
        # 10日均线
        ma10 = hist['close'][-g.ma1:].mean()
        # 60日均线
        ma60 = hist['close'].mean()
        # 如果10日均线大于60日均线，并且没有持仓，则全仓买入
        if ma10 > ma60 and g.security not in context.positions:
            order_value(g.security, context.cash)
        # 如果10日均线小于60日均线，并且持仓，则清仓
        elif ma10 < ma60 and g.security in context.positions:
            order_target(g.security, 0)


run()