# 1. 使用AWS Batch进行并行回测任务

在这个实验中，我们会把上一个实验中的代码，做容器化改造，让代码可以接受参数的输入，这样我们就可以并行的回测多个币对在同一个交易策略中的表现，达到快速验证同一个交易策略在不同的币对中的盈利能力

### 定义环境变量

首先定义环境变量

In [13]:
import boto3

aws_account_id = boto3.client('sts').get_caller_identity().get('Account')
repository_name = 'batch-repo'
aws_region = 'us-east-1'

### 编写Dockerfile

Batch运行任务会基于容器来运行，我们需要修改上一个实验的代码，让这个代码可以接受参数输入，这样在并行执行的时候，把币对作为参数传入即可。

In [None]:
!mkdir batch

In [None]:
%%writefile batch/backtest.py
#!/usr/bin/env python
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
import datetime
import boto3
import json
import numpy as np
import pandas as pd
import os.path
import sys
import pytz
import pyarrow.parquet as pq

import backtrader as bt

class MyStrategy(bt.Strategy):
    ## 1、全局参数
    params=(('maperiod', 15),
            ('printlog', False),)

    ## 2、初始化
    def __init__(self):

        # 初始化交易指令、买卖价格和手续费
        self.order = None
        self.buyprice = None
        self.buycomm = None

        # 添加15日移动均线指标。Backtrader 集成了 talib，可以自动算出一些常见的技术指标
        self.sma = bt.indicators.SimpleMovingAverage(self.datas[0], period=self.params.maperiod)

    ## 3、策略核心逻辑
    def next(self):
        # 记录收盘价
        # self.log('收盘价：%.2f' % self.datas[0].close[0])
        if self.order: # 检查是否有指令等待执行
            return
        # 检查是否持仓   
        if not self.position: # 没有持仓
            # 执行买入条件判断：收盘价格上涨突破15日均线
            if self.datas[0].close > self.sma[0]:
                self.size = int(self.broker.cash / self.datas[0].close[0])
                self.log('买入委托：%.2f * %.0f' % (self.datas[0].close[0], self.size))
                #执行买入
                self.order = self.buy(size=self.size)
        else:
            # 执行卖出条件判断：收盘价格跌破15日均线
            if self.datas[0].close < self.sma[0]:
                self.log('卖出委托：%.2f * %.0f' % (self.datas[0].close[0], self.size))
                #执行卖出
                self.order = self.sell(size=self.size)

    ## 4、日志记录
    # 交易记录日志（可选，默认不输出结果）
    def log(self, txt, dt=None, doprint=False):
        if self.params.printlog or doprint:
            dt = dt or self.datas[0].datetime.date(0)
            print(f'{dt.isoformat()},{txt}')

    # 记录交易执行情况（可选，默认不输出结果）
    def notify_order(self, order):
        # 如果 order 为 submitted/accepted，返回空
        if order.status in [order.Submitted, order.Accepted]:
            return
        # 如果 order 为 buy/sell executed，报告价格结果
        if order.status in [order.Completed]: 
            if order.isbuy():
                self.log(f'买入：\n价格：%.2f,\
                现金流：-%.2f,\
                手续费：%.2f' % (order.executed.price, order.executed.value, order.executed.comm))
                self.buyprice = order.executed.price
                self.buycomm = order.executed.comm
            else:
                self.log(f'卖出:\n价格：%.2f,\
                现金流：%.2f,\
                手续费：%.2f' % (order.executed.price, order.executed.price*self.size, order.executed.comm))
            self.bar_executed = len(self) 

        # 如果指令取消/交易失败, 报告结果
        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log('交易失败')
        self.order = None

    # 记录交易收益情况（可省略，默认不输出结果）
    def notify_trade(self,trade):
        if not trade.isclosed:
            return
        self.log(f'策略收益：\n毛收益 {trade.pnl:.2f}, 净收益 {trade.pnlcomm:.2f}')

    # 回测结束后输出结果（可省略，默认输出结果）
    def stop(self):
        self.log('(MA均线： %2d日) 期末总资金 %.2f' %
                 (self.params.maperiod, self.broker.getvalue()), doprint=True)



def downloadFile(bucket_name, object_name,file_name):
    
    s3 = boto3.client('s3',region_name='us-east-1')
    s3.download_file(bucket_name, object_name, file_name)
        
        
def readData(file_name):
    df = pq.read_table(file_name).to_pandas()
    df.drop('symbol', 1, inplace=True)
    df.drop('volume_btc', 1, inplace=True)
    df.drop('unix_time',1, inplace=True)

    df.rename(columns={'t_date': 'tradedate'}, inplace=True)
    df.rename(columns={'t_high': 'high'}, inplace=True)
    df.rename(columns={'t_low': 'low'}, inplace=True)
    df.rename(columns={'t_open': 'open'}, inplace=True)
    df.rename(columns={'t_close': 'close'}, inplace=True)
    df.rename(columns={'volume_usd': 'volume'}, inplace=True)

    df['openinterest'] = 0 # 利率直接设为 0

    df.set_index('tradedate', inplace=True)
    df.sort_index(inplace=True)
    return df
    
        
if __name__ == '__main__':
    
    # 创建 Cerebro 对象
    cerebro = bt.Cerebro()

    # 读取输入参数，读取s3数据源数据，然后转化为dataframe
    bucket_name = sys.argv[1]
    object_name = sys.argv[2]
    file_name = 'source.parquet'
    
    downloadFile(bucket_name, object_name,file_name)
    
    df = readData(file_name)
    
    # 创建 Data Feed
    df.index = pd.to_datetime(df.index)
    start = df.index[0]
    end = df.index[-1]
    print(start, '-', end)
    data = bt.feeds.PandasData(dataname=df, fromdate=start, todate=end)
    # 将 Data Feed 添加至 Cerebro
    cerebro.adddata(data)

        # 添加策略 Cerebro
    cerebro.addstrategy(MyStrategy, maperiod=15, printlog=True)
    
    # 设置初始资金
    cerebro.broker.setcash(100000.0)
    # 设置手续费为万二
    cerebro.broker.setcommission(commission=0.0002) 

    # 在开始时 print 初始账户价值
    print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue())

    # 运行回测流程
    cerebro.run()

    # 在结束时 print 最终账户价值
    print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue())
    print('Return: %.4f' % (float(cerebro.broker.getvalue())/1e5 - 1))

    sys.exit(0)

安装backtrader相关模块

In [None]:
!pip install --upgrade pip
!pip install backtrader
!pip install matplotlib==3.2.0
!pip show backtrader

本地执行代码

In [None]:
!python batch/backtest.py salen-datalab ods/union/hour/part-00000-856800ad-3133-4334-ad10-537de18b6879-c000.snappy.parquet

接下来，创建一个容器镜像仓库，用于推送镜像

In [59]:
ecr = boto3.client('ecr', region_name=aws_region)

In [None]:
ecr.create_repository(repositoryName=repository_name)

编写Dockfile

In [None]:
%%writefile batch/Dockerfile
FROM python:3.8

RUN pip --no-cache-dir install \
    backtrader\
    boto3 \
    pandas
RUN pip install matplotlib==3.2.0
RUN pip install pyarrow

ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE

COPY backtest.py /
RUN chmod -R 777 backtest.py


In [None]:
!docker build batch -t {repository_name}

将容器推送到远程的ECR镜像仓库

In [63]:
!docker tag {repository_name} {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/{repository_name}

In [None]:
!docker images

In [None]:
!aws ecr get-login-password | docker login --username AWS --password-stdin {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com

In [None]:
!docker push {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/{repository_name}

推送成功后，我们就可以在ECR中看到推送的容器镜像，下一步就是配置AWS Batch的并行任务了

## 请参考实验文档，我们通过网页创建AWS Batch的相关环境后，并且通过网页提交不同的计算任务

AWS Batch环境的准备主要分为以下几个步骤，我们依次创建，计算环境，任务队列，任务定义，最终提交多个并行任务，等待结果

## 使用代码，提交并行任务

在之前的实验环节中，我们通过控制台的方式，创建了batch的任务，接下来，我们会通过代码来提交并行任务，注意，我们需要从在console上找到我们的jobqueue以及job definition名字，用来填写进入以下程序中。实际环境中，我们可以通过更多的程序代码来完全实现代码创建以及提交

In [52]:
import boto3

batch_client = boto3.client('batch')

def submit_job(job_name, queue_name, job_definition):
    response = batch_client.submit_job(
        jobName=job_name,
        jobQueue= queue_name,
        jobDefinition=job_definition
    )


# 在AWS Batch中定义好的任务queue
quene_name = 'backtestqueue'
# 所有在AWS Batch中定义好的job definition名
job_definition_list=['btc-usdt-job-def','eth-usdt-jobdef']

# 循环提交所有的任务
for definition in job_definition_list:
    submit_job(definition,quene_name,definition)


不断执行以下代码获取正在运行的job状态

In [None]:
response = batch_client.list_jobs(
    jobQueue=quene_name,
    jobStatus='RUNNING',
    maxResults=100
)
for item in response['jobSummaryList']:
    print(item)

以上就是通过代码提交并行任务，我们甚至可以通过修改参数，传入更多的参数，计算更多的交易对。在实际的环境中，我们甚至可以把行情数据下载到S3，通过自有环境来做更高频的回测测试