In [None]:
import numpy as np
import pandas as pd
from pyecharts import Kline, Overlap, EffectScatter, Line, Graph, Line, Grid, Bar
from pyecharts import online

In [None]:
print('t')

In [None]:
from IPython.display import display, clear_output, IFrame

In [3]:
df = pd.read_msgpack('../dataset/ts.msgpack')

In [4]:
df.set_index('datetime', inplace=True)

In [26]:
def set_kline_color(kline_serie, up_color, down_color, 
                    up_board_color, down_board_color):
    kline_serie['itemStyle'] = {'color': up_color, 'color0': down_color, 
                               'borderColor': up_board_color, 
                               'borderColor0': down_board_color}
    return kline_serie

def plot_kline(df_ohlc, chart_name='Kline', symbol_name='', size=(800, 400),
               ochl_columns=['open', 'close', 'high', 'low'], 
               zoom_percent_range=(90, 100), legend_pos='10%',
               datazoom_share_axis=[0, ], tooltip_bg_color='rgba(50,50,50,0.7)',
               up_color='rgba(255,0,0,0.9)', down_color='rgba(0,255,0,0.9)', 
               up_board_color='rgba(255,0,0,0.9)', 
               down_board_color='rgba(0,255,0,0.9)'):
    kline = Kline(chart_name, is_animation=False, )
    kline.add(symbol_name, df_ohlc.index.astype(str).tolist(),
              df_ohlc[ochl_columns].values.tolist(),
              datazoom_type='both', is_datazoom_show=True, 
              datazoom_range=zoom_percent_range, 
              legend_pos=legend_pos, 
              datazoom_xaxis_index=datazoom_share_axis)
    kline._option['series'][0] = set_kline_color(kline._option['series'][0], up_color, down_color,
                                                 up_board_color, down_board_color)
    kline._option['tooltip']._config = dict(trigger='axis',
                                            triggerOn='mousemove|click',
                                            axisPointer={'type': 'cross'},
                                            textStyle={'fontSize': 14},
                                            backgroundColor=tooltip_bg_color,
                                            borderColor='#333',
                                            borderWidth=0)
    return kline

def add_efs(es, name, data, color, rotate):
    es.add(name, [], [], symbol_size=15, effect_scale=3, effect_period=3, symbol="pin")
    es._option['series'][-1]['data'] = data
    es._option['series'][-1]['symbolRotate'] = rotate
    es._option['series'][-1]['label']['emphasis']._config.update(dict(formatter='pl: {@[3]}'))
    es._option['series'][-1]['color'] = color
    return es

def add_pair_efs(es, side, trades, entry_color, cover_coler):
    es = add_efs(es, '{}_entry'.format(side), 
             [[trade[0]['x'], trade[0]['y'], trade[0]['value'], trade[3]] for trade in trades], 
             entry_color, rotate=180 if side=='long' else 0)
    es = add_efs(es, '{}_cover'.format(side),  
             [[trade[1]['x'], trade[1]['y'], trade[1]['value'], trade[3]] for trade in trades], 
             cover_coler, rotate=0 if side=='long' else 180)
    return es

def trade_line(name='esline', trades=[], all_dates=[], win_color='rgba(0,255,255,0.8)', loss_color='rgba(255,0,255,0.8)'):
    esline = Line('esline')
    for trade in trades:
        entry_date, cover_date, entry_p, cover_p = trade[0]['x'], trade[1]['x'], trade[0]['y'], trade[1]['y']
        entry_index, cover_index = all_dates.index(entry_date), all_dates.index(cover_date)
        xs = all_dates[entry_index: cover_index+1]
        ys = np.linspace(entry_p, cover_p, num=len(xs)).tolist()
        win_loss_point = trade[3]
        color =  win_color if win_loss_point > 0 else loss_color
        esline.add(f'{entry_date}->{cover_date}: {trade[2]}', [], [], is_symbol_show=False)
        esline._option['series'][-1]['data'] = [[x, y, win_loss_point] for x, y in zip(xs, ys)]
        esline._option['series'][-1]['lineStyle']['normal']._config.update({'color': color, 'width': 2, 'type': 'dashed'})
        esline._option['series'][-1]['label']['emphasis']._config.update({'formatter': '{@[2]}', 'color': color})
        esline._option['series'][-1].update(clipOverflow=False)
        esline._option['series'][-1]['zlevel'] = 1
    esline._option['legend'][0]['data'] = []
    return esline

def plot_trades(trades, all_dates,
                entry_long_color='rgba(255, 0, 0, 0.75)', cover_long_color='rgba(0, 255, 50, 0.6)', 
                entry_short_color='rgba(0, 255, 0, 0.75)', cover_short_color='rgba(255, 0, 50, 0.6)',
                win_color='rgba(0,255,255,0.8)', loss_color='rgba(255,0,255,0.8)'):
    long_trades = [trade for trade in trades if trade[2]=='long']
    short_trades = [trade for trade in trades if trade[2]=='short']
    es = EffectScatter()
    es = add_pair_efs(es, 'long', long_trades, entry_long_color, cover_long_color)
    es = add_pair_efs(es, 'short', short_trades, entry_short_color, cover_short_color)
    esline = trade_line(trades=trades, all_dates=all_dates, win_color=win_color, loss_color=loss_color)
    return es, esline

In [454]:
from copy import deepcopy
class Base:
    __slots__ = ()
    def keys(self):
        return self.__slots__
    
    def __getitem__(self, key):
        return getattr(self, key)
    
    def __gen_container(self):
        return ', '.join(['{}={!r}'.format(k, v) for k, v in {**self}.items()])
    
    def __repr__(self):
        return '{}({})'.format(self.__class__.__name__, self.__gen_container())
    
class PlotKind(Base):
    __slots__ = ('kind', 'columns', 'grid', 'config')
    def __init__(self, kind='kline', columns=['open', 'close', 'high', 'low'], grid=0, config={}):
        self.kind = kind
        self.columns = columns
        self.grid = grid
        self.config = config

        
class PlotGrammer(Base):
    def __init__(self, grids={0: 100, }, **kwargs):
        self.__slots__ = tuple([k for k in kwargs]+['grids'])
        self._grids = grids
        self.calculate()
        [setattr(self, k, v) for k, v in kwargs.items()]
        
    def calculate(self, fig_height=800):
        grids = deepcopy(self._grids)
        assert sum(grids.values()) == 100, 'Grids sum must 100%'
        rescale = (100 - (60/fig_height)*100*2)/100
        top_fit = (60/fig_height)*100
        top, bottom = (0, 0)
        for grid, val in list(grids.items()):
            grids.update({grid: dict(top=(top+bottom)*rescale + top_fit + 2 if (top+bottom) else 0, 
                                     bottom=(100-(bottom+val))*rescale + top_fit + 2 if (100-(bottom+val)) else 0)})
            bottom += val
        self.grids = grids
        
    def __gen_container(self):
        return ',\n    '.join(['{}={!r}'.format(k, v) for k, v in {**self}.items()])
    
    def __repr__(self):
        return '{}(\n    {}\n)'.format(self.__class__.__name__, self.__gen_container())

In [540]:
plot_gram = PlotGrammer(twse=PlotKind(), 
                        bband=PlotKind('line', ['bup', 'ma', 'bdw'], 0), 
                        ordersell=PlotKind('bar', ['order_sell', ], 1, config={'colors': ['rgba(155,155,155, 0.7)',]}),
                        osband=PlotKind('line', ['osma', 'osdw', 'osup'], 1, config={'colors': ['rgba(0,255,255,0.7)','rgba(0,255,0,0.7)', 'rgba(255,0,0,0.7)']}),
                        #volume=PlotKind('bar', ['volume', ], 2),
                        #rewardf=PlotKind('line', ['reward_fluctuant', ], 3, config={'is_fill':True, 'area_opacity': 0.4}),
                        #grids={0: 60, 1: 15, 2: 15, 3: 10}
                        grids={0: 60, 1: 40}
                       )
                    
plot_gram

PlotGrammer(
    twse=PlotKind(kind='kline', columns=['open', 'close', 'high', 'low'], grid=0, config={}),
    bband=PlotKind(kind='line', columns=['bup', 'ma', 'bdw'], grid=0, config={}),
    ordersell=PlotKind(kind='bar', columns=['order_sell'], grid=1, config={'colors': ['rgba(155,155,155, 0.7)']}),
    osband=PlotKind(kind='line', columns=['osma', 'osdw', 'osup'], grid=1, config={'colors': ['rgba(0,255,255,0.7)', 'rgba(0,255,0,0.7)', 'rgba(255,0,0,0.7)']}),
    grids={0: {'top': 0, 'bottom': 43.5}, 1: {'top': 60.5, 'bottom': 0}}
)

In [None]:
plot_gram = PlotGrammer(twse=PlotKind(), 
                        txf=PlotKind(grid=1),
                        bband=PlotKind('line', ['bup', 'ma', 'bdw'], 0), 
                        ordersell=PlotKind('bar', ['order_sell', ], 2, config={'colors': ['rgba(155,155,155, 0.7)',]}),
                        osband=PlotKind('line', ['osma', 'osdw', 'osup'], 2, config={'colors': ['rgba(0,255,255,0.7)','rgba(0,255,0,0.7)', 'rgba(255,0,0,0.7)']}),
                        #volume=PlotKind('bar', ['volume', ], 2),
                        #rewardf=PlotKind('line', ['reward_fluctuant', ], 3, config={'is_fill':True, 'area_opacity': 0.4}),
                        #grids={0: 60, 1: 15, 2: 15, 3: 10}
                        grids={0: 40, 1: 40, 2: 20}
                       )
                    
plot_gram

In [536]:
def plot_line(df, name, columns=[], colors=[], **kwargs):
    line = Line(name)
    for column in columns:
        line.add(column, df.index.astype(str).tolist(), df[column].tolist(), 
                 is_symbol_show=False, is_label_emphasis=False, **kwargs)
    if colors:
        [line._option['series'][i].update(dict(itemStyle=dict(color=color)))  for i, color in enumerate(colors)]
    return line

def plot_bar(df, name, columns=[], colors=[], pos='10%', **kwargs):
    bar = Bar(name, title_top=pos)
    for column  in columns:
        bar.add(column, df.index.astype(str).tolist(), df[column].tolist(), 
                is_label_emphasis=False, legend_pos=pos, **kwargs)
    if colors:
        [bar._option['series'][i].update(dict(itemStyle=dict(color=color))) for i, color in enumerate(colors)]
    return bar

In [537]:
def trading_plot(df, plot_grammer, figsize=(1200, 800), trades=[]):
    grid = Grid(width=figsize[0], height=figsize[1])
    charts = [Overlap() for g in plot_grammer['grids']]
    plot_grammer.calculate(figsize[1])
    all_dates = df.index.astype(str).tolist()
    for name, plotk in {**plot_grammer}.items():
        if isinstance(plotk, PlotKind):
            if plotk['kind'] == 'kline':
                plot = plot_kline(df, chart_name=name, symbol_name=name,
                                   ochl_columns=plotk['columns'],
                                   datazoom_share_axis=list(range(len(charts))), 
                                   **plotk['config'])
            elif plotk['kind'] == 'line':
                plot = plot_line(df, name=name, columns=plotk['columns'], **plotk['config'])
            elif plotk['kind'] == 'bar':
                plot = plot_bar(df, name=name, columns=plotk['columns'], 
                                pos="{}%".format(plot_grammer['grids'][plotk['grid']]['top']),
                                **plotk['config'])
            else:
                Exception('Kind: {} not implement.'.format(plotk['kind']))
            charts[plotk['grid']].add(plot)
    if trades:
        es, esline = plot_trades(trades, all_dates,)
        charts[0].add(es)
        charts[0].add(esline)
    [grid.add(charts[g], 
              grid_top="{}%".format(vol['top']) if vol['top'] else 60, 
              grid_bottom="{}%".format(vol['bottom']) if vol['bottom'] else 60) for g, vol in plot_grammer['grids'].items()]
    grid._option.update(axisPointer={'link': {'xAxisIndex': 'all'}})
    return grid

In [541]:
g = trading_plot(df, plot_gram, figsize=(1200, 800), trades=trades)
g.render()

In [542]:
IFrame('render.html', 1500, 850)

In [477]:
df['ma'] = df['close'].rolling(20, min_periods=0).mean()
df['std'] = df['close'].rolling(20, min_periods=0).std()
df['bup'] = df['ma'] + 2 * df['std']
df['bdw'] = df['ma'] - 2 * df['std']
df['osma'] = df['order_sell'].rolling(20, min_periods=0).mean()
df['osdw'] = df['osma']*0.8
df['osup'] = df['osma']*1.2

In [510]:
entry_points = df[df['entry_cover']==1]
cover_points = df[df['entry_cover']==-1]
trades = [[
    {
        'x': entry_point['datetime'].strftime('%Y-%m-%d'),
        'y': entry_point['close'],
        'value': entry_point['position_variation']
    },
    {
        'x': cover_point['datetime'].strftime('%Y-%m-%d'),
        'y': cover_point['close'],
        'value': cover_point['position_variation']
    },
    'long' if entry_point['position_variation'] > 0 else 'short',
    round(cover_point['reward'], 2),
] for entry_point, cover_point in zip(
    entry_points.reset_index().to_dict('records'),
    cover_points.reset_index().to_dict('records'))]
all_dates = df.index.astype(str).tolist()

In [None]:
trades

In [27]:
kchart = plot_kline(df, symbol_name='twse')
es, esline = plot_trades(trades, all_dates)

In [28]:
op = Overlap(width=1200, height=800)
op.add(kchart)
op.add(es)
op.add(esline)
op.render()

In [29]:
IFrame('render.html', 1500, 850)