In [52]:
import os
import pandas as pd
import numpy as np
import abc

from typing import Callable
from numbers import Number

_path = os.path.expanduser('~/data/quantitative_trading/')


In [53]:
def assert_msg(condition, msg):
    if not condition:
        raise Exception(msg)

def read_file(filename):
    # 获得文件路径
    filepath = os.path.join(os.path.dirname('__file__'), filename)

    # 判定文件是否存在
    assert_msg(os.path.exists(filepath), 'the file not found')

    # 读取csv文件并返回
    return pd.read_csv(
        filepath,
        index_col = 0,
        parse_dates = True,
        infer_datetime_format = True
    )

BTCUSD = read_file(_path + 'BTCUSD_GEMINI.csv')
assert_msg(BTCUSD.__len__() > 0, 'read failed')
print(BTCUSD.head())


                     Symbol      Open      High       Low     Close     Volume
Date                                                                          
2019-07-08 00:00:00  BTCUSD  11475.07  11540.33  11469.53  11506.43  10.770731
2019-07-07 23:00:00  BTCUSD  11423.00  11482.72  11423.00  11475.07  32.996559
2019-07-07 22:00:00  BTCUSD  11526.25  11572.74  11333.59  11423.00  48.937730
2019-07-07 21:00:00  BTCUSD  11515.80  11562.65  11478.20  11526.25  25.323908
2019-07-07 20:00:00  BTCUSD  11547.98  11624.88  11423.94  11515.80  63.211972


In [54]:
class Backtest:
    """
    Backtest回测类, 用于读取历史行情数据、执行策略、模拟交易并估计收益

    初始化的时候调用Backtest.run来时回测

    instance, or `backtesting.backtesting.Backtest.optimize` to optimize it.
    """

    def __init__(self,
                data: pd.DataFrame,
                strategy_type: type(Strategy),
                broker_type: type(ExchangeAPI),
                cash: float = 10000,
                commission: float = .0):
        """ 
        构造回测对象。需要的参数包括：历史数据，策略对象，初始资金数量，手续费率等。 
        初始化过程包括检测输入类型，填充数据空值等。 
        参数： 
        :param data:                pd.DataFrame                pandas Dataframe格式的历史OHLCV数据 
        :param broker_type:         type(ExchangeAPI)           交易所API类型，负责执行买卖操作以及账户状态的维护 
        :param strategy_type:       type(Strategy)              策略类型 
        :param cash:                float                       初始资金数量 
        :param commission:          float                       每次交易手续费率。如2%的手续费此处为0.02 
        """

        assert_msg(issubclass(strategy_type, Strategy), 'strategy_type不是一个Strategy类型')
        assert_msg(issubclass(broker_type, ExchangeAPI), 'strategy_type不是一个Strategy类型')
        assert_msg(isinstance(commission, Number), 'commission不是浮点数值类型')

        data = data.copy(False)

        # 如果没有volumn列, 填充NaN
        if 'Volume' not in data:
            data['Volume'] = np.nan

        # 验证OHLC数据格式
        assert_msg(len(data.columns & {'Open', 'High', 'Low', 'Close', 'Volume'}) == 5,
                ("输入的`data`格式不正确, 至少需要包含这些咧: " "'Open', 'High', 'Low', 'Close'"))

        # 检查缺失值
        assert_msg(not data[['Open', 'High', 'Low', 'Close']].max().isnull().any(), 
                ('部分OHLC包含缺失值, 请去掉那些行或者通过差值填充'))

        # 利用数据, 初始化交易所对象和策略对象
        self._data = data # type: pd.DataFrame
        self._broker = broker_type(data, cash, commission)
        self._strategy = strategy_type(self._broker, self._data)
        self._results = None

    def run(self):
        """
        运行回测，迭代历史数据，执行模拟交易并返回回测结果。 
        Run the backtest. Returns `pd.Series` with results and statistics. 
        
        Keyword arguments are interpreted as strategy parameters.
        """
        strategy = self._strategy
        broker = self._broker

        # 策略初始化
        strategy.init()

        # 设定回测开始和结束为止
        start = 100
        end = len(self._data)

        # 回测住循环, 更新市场状态, 然后执行策略
        for i in range(start, end):
            # 注意要先把市场状态移动到第i时刻, 然后再执行策略
            broker.next(i)
            strategy.next(i)

        # 完成策略执行之后, 计算结果并返回
        self._results = self._compute_result(broker)
        return self._results

    def _compute_result(self, broker):
        s = pd.Series()
        s['初始市值'] = broker.initial_cash
        s['结束市值'] = broker.market_value
        s['收益'] = broker.market_value - broker.initial_cash
        return s

        
        

In [55]:
def SMA(values, n):
    """
    返回简单滑动平均
    """
    return pd.Series(values).rolling(n).mean()

def crossover(series1, series2) -> bool:
    """
    检查两个序列是否在结尾交叉
    :param series1: 序列1
    :param series2: 序列2
    :return:        如果交叉返回True, 反之False
    """
    return series1[-2] < series2[-2] and series1[-1] > series2[-1]


In [56]:
def next(self, tick):
    # 如果此时快线刚好越过慢线, 买入全部
    if crossover(self.sma1[:tick], self.sma2[:tick]):
        self.buy()

    # 如果是慢线刚好越过快线, 卖出全部
    elif crossover(self.sma2[:tick], self.sma1[:tick]):
        self.sell()

    # 否则, 这个时刻不执行任何操作
    else:
        pass


    

In [45]:


class Strategy(metaclass=abc.ABCMeta):

    def __init__(self, broker, data):

        self._indicators = []
        self._broker = broker # type: _Broker
        self._data = data # type: _Data
        self._tick = 0

    def I(self, func: Callable, *args) -> np.ndarray:
        """
        def init():
            self.sma = self.I(utils.SMA, self.data.Close, N)
        """
        

        value = func(*args)
        value = np.asarray(value)
        assert_msg(value.shape[-1] == len(self._data.Close), '指示器长度必须和data长度相同')

        self._indicators.append(value)
        return value

    @property
    def tick(self):
        return self._tick

    @abc.abstractmethod
    def init(self):

        pass

    @abc.abstractmethod
    def next(self, tick):
        """
        
        """
        pass

    def buy(self):
        self._broker.buy()

    def sell(self):
        self._broker.sell()

    @property
    def data(self):
        return self._data

        

In [49]:
class SmaCross(Strategy):

    fast = 10


    slow = 20

    def init(self):

        self.sma1 = self.I(SMA, self.data.Close, self.fast)
        self.sma2 = self.I(SMA, self.data.Close, self.slow)

    def next(self, tick):

        if crossover(self.sma1[:tick], self.sma2[:tick]):
            self.buy()

        elif crossover(self.sma2[:tick], self.sma1[:tick]):
            self.sell()

        #
        else:
            pass




In [50]:
class ExchangeAPI:
    def __init__(self, data, cash, commission):
        assert_msg(0 < cash, '初始现金数量大于0, 输入的现金数量: {}'.format(cash))
        assert_msg(0 <= commission <= 0.05, "合理的手续费率一般不会超过5%, 输入的费率: {}".format(commission))
        self._inital_cash = cash
        self._data = data
        self._commission = commission
        self._position = 0
        self._cash = cash
        self._i = 0

    @property
    def cash(self):

        return self._cash
    
    @property
    def position(self):

        return self._position

    @property
    def initial_cash(self):

        return self._inital_cash

    @property
    def market_value(self):

        return self._cash + self._position * self.current_price

    @property
    def current_price(self):

        return self._data.Close[self._i]

    def buy(self):

        self._position = float(self._cash / (self.current_price * (1 + self._commission)))
        self._cash = 0.0

    def sell(self):

        self._cash += float(self._position * self.current_price (1 - self._commission))
        self._position = 0.0

    def next(self, tick):
        self._i = tick

        

In [51]:
def main():
    BTCUSD = read_file(_path + 'BTCUSD_GEMINI.csv')
    ret = Backtest(BTCUSD, SmaCross, ExchangeAPI, 10000.0, 0.00).run()
    print(ret)

if __name__ == '__main__':
    main()

  assert_msg(len(data.columns & {'Open', 'High', 'Low', 'Close', 'Volume'}) == 5,


TypeError: 'numpy.float64' object is not callable