# Quantmoon Technologies - Intraday Backtester

Backtester based on .zarr files for tick data. 

## Part 1 - Importing General Libraries

In [10]:
import os
import sys
import glob
import queue
import warnings

import numpy as np
import xarray as xr
import pandas as pd

from importlib import reload  
from datetime import datetime
from abc import ABCMeta, abstractmethod

from fracdiff import StationaryFracDiff
from math import floor
import time
import pprint

from dateutil import parser
import itertools

In [11]:
#not useful during package construction
import matplotlib.pyplot as plt

## Part 2 - Basic Extractor and Management Functions

2.1. Error and VWAP functions

In [12]:
class UnAcceptedValueError(Exception):
    """
    The following class defines a personalized ErrorMessage.
    It'll be useful to compute any ErrorMessage
    and returns it during a TRY-RAISE-EXCEPTION execution.
    
    It can be used many times given the input parameter
    "UnAcceptedValueError.data" in the EXCEPTION execution statement.
    
    The content should be defined previously in the TRY statement
    over the RAISE sentence.
    """
    def __init__(self,data):
        self.data = data
    def __str__(self):
        return repr(self.data)
    
# time bar benchmark
def compute_vwap(ds):
    """
    The following code compute the VWAP
    (Volume Weighted Average Price)
    
    The main formula is:
    
    VWAP = ∑(Price * Volume)/∑(Volume)
    
    It takes a pd.DataFrame of ticks 
    as a parameter with some specific
    value types.
    
    Example 'df_ticks':
        
     	timestamp 	price 	volume
    0 	1586937600079 	283.82 	1000
    1 	1586937609055 	284.41 	216
    
    Format 'df_ticks':
    
    timestamp      int64
    price        float64
    volume         int64
    dtype: object
    
    IMPORTANT
    ---------
    This functions was created to use
    the output dataframe from the 
    "get_tick_pricing()" function.
    
    Another dataframe you'd like to use 
    have to fit the conditions described above.
    """
    
    return np.sum(ds.value * ds.vol) / np.sum(ds.vol)

2.2. Data Time, Null Values and DataPreprocessing Management 

In [13]:
def zarr_format_to_netcdf_format(file,
                                 start_date,
                                 end_date):  ##### ELIMINABLE--------------------------------------------------
    """
    Transform the 'zarr' original xarray.Dataset format
    into the 'netcdf' useful xarray.dataset structure.
    
    Parameters:
    -----------
        - file: directory/path of zarr files + 'stock.zarr' name.
                Example: "D:\\data_zarr\\AAPL.zarr"
        - date: useful date for extracting data
        
    Output:
    -------
    
    xarray.Dataset structure similar to original netcdf.
    Dimensions:  (date: #, n: ###, ts: ######)
    """
    
    #extracting data
    data = xr.open_zarr(file)
    
    ######AGREGAR INITIALIZATION + STEP ###################### 
    
    range_dates = [
        str(np_date).partition('T')[0] for np_date 
        in data.date.sel(
            date = slice(start_date,end_date)
        ).values
    ]
        
    list_df_=[]
    for date in range_dates:
        
        #flattening prices | 'lev_cost'
        
        precios_flatten = data.lev_cost.sel(date=date).to_pandas(
        ).transpose().rename_axis(None)
        
        precios_flatten = precios_flatten[precios_flatten!=0][:-1]

        #flattening volume | 'lev_vol'
        volumen_flatten = data.lev_vol.sel(date=date).to_pandas(
        ).transpose().rename_axis(None)

        volumen_flatten = volumen_flatten[volumen_flatten!=0][:-1] 

        #general dataframe: price + volume                              || HERE 1 
        price_volume = pd.concat([volumen_flatten.to_frame("vol"),
                                  precios_flatten.to_frame("cost")],
                                 axis=1)
        
        #computing density levels for price-volume 
        density_levels = list(
            filter(lambda num: num!=0,
                   np.array(data.size.sel(date=date).values[:-1], 
                            dtype='int')
                  )
        )
        
        #computing timestamp ocurrency for level tick instance
        timestamp_occurrency = list(map(int, 
                 filter(lambda num: num!=0, 
                        np.array(data.ubi.sel(date=date).values[:-1]))))
        
        reference = 0
        
        #iteration to define new format based on density levels
        #                                                        || HERE 2
        for idx, grouped_length in enumerate(density_levels):
            df_ = price_volume[                      
                reference:reference+grouped_length
            ].assign( #asigna neva columna 'ts'
                ts=timestamp_occurrency[idx]
            ).assign( #asigna neva columna 'date'
                date=date
            )
            #assigning a 'n' category of density level group
            df_['n'] = np.arange(df_.shape[0]) + 1
            
            list_df_.append(df_)
            #adding  the 'n' category value as iteration goes by
            reference+=grouped_length
            
    #creates a new dataframe from each small 'n' groups df || #HERE 3 
    temp_df_to_check = pd.concat(list_df_)
    
    #sys.exit()
    
    #                                                      || #HERE 4 
    data_array_structure = temp_df_to_check.groupby(       
        ['date', 'n', 'ts']
    )[['vol','cost']].first()
    
    data_array_structure=data_array_structure.to_xarray().fillna(0)
    #print ("finished")
    # return final netcdf xarray.DataSet structure
    
    return data_array_structure

In [14]:
def data_preprocessing_bt(file,
                          initialization_step,
                          step,
                          start_date, 
                          end_date):
    """
    Change timestamp "ts" netcdf attribute 
    to "datetime" object.
    """

    # 1. openning cdf using xarray 
    netcdf_file = zarr_format_to_netcdf_format(file,
                                               start_date,
                                               end_date)

    # 2. Redifine Milisecond Timestamp to second scale
    netcdf_file['ts'] = netcdf_file.ts.values/(10**3)
    
    lst_ds_dates = []

    # 3. iteration for sub-datasets per day
    for date in netcdf_file.date.values:
        
        #4. temp dataset per day  
        temp_data = netcdf_file.sel(date=date)

        temp_data = temp_data.sel(
            ts=slice(initialization_step,
                     initialization_step+step)
        )

        
        temp_data['ts'] = list(map(datetime.fromtimestamp,
                                   temp_data.ts.values))
        
        
        # 7. adding to a general datasets list
        lst_ds_dates.append(temp_data)
    
    # 8. concat the subset of xarray datasets
    new_general_ds = xr.concat(lst_ds_dates, 
                               dim='date')
    
    return new_general_ds

In [15]:
def _ewma(arr_in,window):
    """
    The following code compute the EWMA
    (Exponential Weighted Moving Average)
    
    Specifically, it computes an EWMA 
    for price variation -this means, volatility.
    
    The main formula is:

    EWMA(t) = |Y1 if t=1
              |alpha*Yt(1-alpha)· EWMA(t-1),t>1  
    
    Where:

    * 'alpha' is the degree of weighting decrease, 
              a constant smoothing factor <0;1>. 
              Higher alpha discounts older observations faster.
    
    * 'Yt' is the value of data at time period 't'
    * 'St' is the value of the EWMA at any time period 't'.
    
    Remember that: 
    
    alpha =2/(window+1)
    
    * 'window' as a window_length for the EWMA 
    
    Parameters
    ----------
    arr_in: array 1D of values (shape>1)
    
    window: int (>=1) 
    
    Returns
    ------
    ewma: array 1D (shape==arr_in) 
    """
    
    #number of data points
    n = arr_in.shape[0]
    #an empty numpy to fill 'ewma' results
    ewma = np.empty(n, dtype=np.float64)
    #computation of "alpha" degree
    alpha = 2 / float(window + 1)
    #weight value | could be a float
    w = 1
    #first ewma: the first data point
    #since it doesn't have previous values
    ewma_old = arr_in[0]
    #deine it in the first index
    #in the ewma variable for returns
    ewma[0] = ewma_old
    #iterate over each data point
    for i in range(1, n):
        #increase weight value
        #as long as the iterator increse
        #the 'i' values
        #based on the alpha degree
        w += (1-alpha)**i
        #compute a new "ewma_old"
        ewma_old = ewma_old*(1-alpha) + arr_in[i]
        #calculate the new ewma data point
        ewma[i] = ewma_old / w
    return ewma

In [16]:
def compute_Ts(bvs,
               E_T_init,
               abs_Ebv_init,
               num_bars = 3):
    
    """
    Compute Imbalance-bar main "ingridients" 
    
    Parameters:
    -----------
    bvs: series-data signed flow | tick direction * volume
    E_T_init: int | ticks to warm up
    abs_Ebv_init: float| absolute mean value of E_T_init  
    num_bars = int | desired number of bars you'll try to keep
    
    Returns:
    --------
    - Ts: bar division index + bar direction
    - abs_thetas: absolute theta values
    - thresholds: threshold bounds
    - i_s: division points 
    
    ** abs_thetas: are the sum of the product
                   between sign of the imbalance <1,-1>
                   and the dollar volume of tick.
                   Remember that sign is computed
                   from the comparisson betwen 
                   tick in time 't' vs tick in 't-1'.
                   
                   In a brief:
                   b: sign
                   p: prices
                   v: dollar volume
                   t: time
                   
                   Δp[t] = p[t] - p[t-1]
                   b[t] = |b[t-1] --> Δp[t]=0
                          |sing(Δp[t]) otherwise
                   v[t] = volume[t] * p[t]
                   |theta[t]| = |∑(b[i]*v[i])|
                   
    ** thresholds: are the values for each bar.
                   It's the same for all the values
                   that belongs to the same bar.
                   Example:
                   Suppose 3 bars. A dataset of 1000 rows.
                   
                   Bar 1 goes from [0,449].
                       All the values inside will have 
                       a value of "1234"
                   So, inside 'Bar 1' you might see:
                       [1234, 1234, 1234,...,1234]
                       where shape of it is 450.
                   The same for Bar 2, but with dif. values.
        
    ** i_s: are the points that divide the bars.
            Suppose a dataet of 1000 row-prices.
            'i_s' could be [560,720], meaning that
            are three bars (if num_bars = 3): 
             a) one from idx 0 to idx 560,
             b) another from idx 561 to 719
             c) and the last one from 720 to 1000
             
        Information between this rows was stored.
        It's why we call this "information-driven" bars.
        
        Important: 
        Not always 'numb_bars = 3' means output 3 bars.
        It'll depend on the legnth of data,
        and the info-data given in the df.
        
    ** Ts: computes a first bar divisor value
           and the value inside that divisor
           It returns the bars with positive value
           
           If you have a 'i_s' like:
               [100,256]
           Your 'Ts' might be: 
               [100,1.0]
           Where all the values belong to
           the bar that starts at idx 100
           have 1.0 values.
    """
    #performing Ts and i_s empty lists
    Ts, i_s = [], []
    #defining first general values
    i_prev, E_T, abs_Ebv=0,E_T_init,abs_Ebv_init
    
    #keeping time series shapes
    #remember: bvs = tick_dicrection * volume 
    n = bvs.shape[0]
    #get values of bvs | bvs = tick_dicrection * volume
    bvs_val = bvs.values.astype(np.float64)
    #predefined abs_thetas and threshold values 
    #empty numpies
    abs_thetas, thresholds = np.zeros(n),np.zeros(n)
    #predefined first abs_thetas and cur_theta value
    #cur_theta: current theta value
    #remember: cur_theta = first value of 'bvs' values
    #remember also: bvs = tick_dicrection * volume
    abs_thetas[0],cur_theta = np.abs(bvs_val[0]),bvs_val[0]
    
    #iteration over total 'n' data points
    for i in range(1, n):
        #updating cur_theta
        #adding the new bvs_val
        cur_theta += bvs_val[i]
        #redefining abs_theta for this new cur_theta
        abs_theta = np.abs(cur_theta)
        #updating new abs_thethas 
        #check that it respects the index 'i'
        abs_thetas[i] = abs_theta
        #updating a new threshold value
        #compute the threshold as:
        #E_T val (from E_T_init) x abs_Ebv val (from abs_Ebv_init) 
        threshold = E_T * abs_Ebv
        #updating threshold information
        #check that it reprects the index 'i'
        thresholds[i] = threshold
    
        #computation of imbalance below!!!
        
        #if abs_theta(sing of price change * dollar_vol of tick)
        #is gerater or equal to the resulted threshold
        #define the cur_theta as '0'
        if abs_theta >= threshold:
            cur_theta=0
            #updating the Ts. value
            Ts.append(np.float64(i - i_prev))
            #append it to the "i_s" variable
            #using only the index
            #we are instered in divide the datapoints
            #into bars so that's why we use the index
            i_s.append(i)
            #redefine a i_prev value as the 'i' index
            #to take it back into the next iteration
            i_prev = i
            #redefine E_T value as EWMA
                #Inputs:
                #array-timeseries: recent computed TS
                #int-window: length of Ts 
            E_T = _ewma(
                np.array(Ts),
                window=np.int64(len(Ts))
            )[-1] #return only the last values
                  #as long as we are interested only 
                  #to assign the same EWMA value
                  #if it corresponds to the same bar 
                
            #redefine abs_Ebv value as EWMA
                #Inputs:
                #Array-bvs_val until the 'i' index point
                #This group of data until that index
                #corresponds to the same bar
                #Window=E_T_init array * num desired bars 
            abs_Ebv = np.abs(
                _ewma(
                    bvs_val[:i],
                    window=np.int64(E_T_init * num_bars)
                )[-1]) #return only the last value 
                       #as long as we are interested only 
                       #to assign the same EWMA value
                       #if it corresponds to the same bar             
    return Ts, abs_thetas, thresholds, i_s

2.3. Basic Bar Generation by Type

In [17]:
def bar_imbalance_tick_construction(df_ticks,
                                    type_structure="vwap"):
    if type_structure == 'vwap':
        df_ticks = df_ticks.apply(
            lambda x: x.assign(
                vwap=np.sum(x.price*x.volume)/np.sum(x.volume))
            ).reset_index(['grpId'], drop=True)
        
    elif type_structure == 'mean':
        df_ticks['mean'] = df_ticks['price'].mean()
        
    else:
        print("Not cognized 'type_structure' parameter")
        
    return df_ticks

In [18]:
def get_bars_cdf_bt(file,
             bartype,
             initialization_step,
             step,
             start_date, 
             end_date,
             freq=15,
             ticks_warm_up='auto',
             window_fracdiff=1,
             actv_dfwap=True):
    """
    Information Source: https://bit.ly/3fqFpAY
    Cap. 2 - Financial Data Structures (Bars)
    
    Compute the selected data-bar type.
        
    Parameters:
    -----------
    - file: string with the direction location of NetCDFs:
            
    - bartype: str that could de:
             - 'tick': for tick bars
             - 'volume': for volume bars
             - 'dollar': for dollar volume bars
             - 'imbalance': for imbalance bars
             - 'fracdiff': for fractional derivative
             
    - initialization_step: lenght of timestamp you'll take as input
                           in order to group and take your data.
                           Example: 600000 = 10 first market minutes.
                           This value will be updated iteritivally
                           based of the 'step' or heart-beat' value.
                           
    - step: lenght of timestamp you'll consider as 'heart-beats'
             This value represents how often you will update
             the original 'initialization_step'.
             Example: if step ('heart-beat') = 300000, this means
             you will update the information each 5 minutes. 
             Thus, your first "initialization_step" will be 'X',
             then after the first iteration, this will be X + 300000.
             
    - freq (predefined): int that reflects the time-series grouping
            for the bars. It'll affect the number of bars,
            as long as the VWAP computed.
            
            Remember that this is necessary because,
            as humans, we need a time series representation
            of the resulted formed bars.
            
    - ticks_warm_up (auto/ predefined): int>1000 or 'str' that reflects
                                        max numbers of ticks
                                        that we will considerate 
                                        to compute during
                                        the tick bar separation
                                        check more in "compute_Ts()".
                                        Util for imbalance bars.
                                        'Auto' (recommended).
                                  
    - window_fracdiff (predefined): 0< int <1 that reflects
                                    the order of derivation
                                    to calculate the fractional diff
                                    of a set of numbers; in this case,
                                    of a numpy.ndarray of prices.
                                    Util for fracdiff bars.
                                  
    - actv_dfwap (predefined): bool that if it's true, it returns
                               some other outputs that differs
                               depends on the selected bar type.
                               
    Returns:
    --------
    Selected new computed bars based on its type.
    
    IMPORTANT
    ---------
    i) The reason why we use a Volume Weighted Average Price (vwap)
        insted of the original tick time series as data-input
        is due to the factor that the original series is highly variable
        and extremely microscope (based on the miliseconds scale).
        Thus it does not allow to appreciate the prices series correctly.
        Try "data.prices.plot()" in your original dataset to see why.
    
    ii) In 'result_dataframe' construction, we use a 'split' process
        over 'time_prices' information from a xarray.DataArray
        because a string 'T' separates the DATE(T)HOUR-MINUTE-SECOND 
        in a np.array of time. So, as long as it belongs to this numpy,
        we have to split this two parts and set DATE.datetime() format
        as index and HOUR-MINUTE-SECOND like our variable in a column.
        
    iii)The conditional 'actv_dfwap' works differently depends
        on the selected bar. Here is a list of hwo it works based on this:
        
        - 'time' & 'fracdiff': if actv_dfwap=True, 
                               returns VWAP based on typebar.
                               Otherwise, returns mean.price. 
                               based on typebar.
                               It can be stored in both sizes.
                      
        - 'tick', 'volume'
           & 'dollar':   if actv_dfwap=True, 
                         it returns VWAP based on type bar.
                         Otherwise, it returns original xarray.Dataset. 
                         Thus, it can be stored only if actv_dfwap=True. 
    """

    _bartypes = ["time","tick","volume","dollar","imbalance",'fracdiff']
    bartype=bartype.lower()
    
    # checking general inputs
    try:
        if bartype not in _bartypes:
            raise UnAcceptedValueError("\
            Only 'tick','volume','dollar','imbalance' as 'bartype'")
        elif type(file) != str:
            raise UnAcceptedValueError("\
            Only 'str' route direction as 'file' input")
        elif type(freq)!=int or freq<=0:
            raise UnAcceptedValueError("Wrong 'freq' input.\
            Only 'freq'>0 and 'int' type")
        elif type(actv_dfwap)!=bool:
            raise UnAcceptedValueError("Only 'bool' type for 'df_vwap'")
        elif (type(window_fracdiff)!=int or 
              window_fracdiff<0 or 
              window_fracdiff>1):
            raise UnAcceptedValueError("\
            Only positive 'int' type for 'window_fracdiff' arg.")
        else:
            pass
    except UnAcceptedValueError as mistake:
        return "Input Error Format: {}".format(mistake.data)
    
    # STEP O: Extracting data
    data=data_preprocessing_bt(file,
                               initialization_step,
                               step,
                               start_date,
                               end_date)
     
    ##################################################################
    # STEP 1: General Data XArray Management#
    
    # 1.1. NaN cleaning
    new_data=data.where(data!=0.).mean(
        dim="date",
        skipna=True
    )
    
    # 1.2. Flatting cleanned 
    new_data_value_flatt  = new_data.cost.stack(
        ts_new = ("ts","n")
    )     
    new_data_vol_flatt  = new_data.vol.stack(
        ts_new = ("ts","n")
    )
    
    new_data_value_drop = new_data_value_flatt.dropna(
        dim = "ts_new"
    )
    new_data_vol_drop = new_data_vol_flatt.dropna(
        dim = "ts_new"
    )
    
    # 1.3. grouping ticks by some 'freq'
    resampling='{}Min'.format(freq)
    netcdf_mean = data.where(data!=0.).mean(
        dim="n",
        skipna=True).mean(dim = "date")
    
    # 1.4. renaming the variable 'cost' by 'value'(price)
    netcdf_mean=netcdf_mean.rename({'cost':'value'})
    
    # 1.5. resample data by some 'freq'
    group_data = netcdf_mean.resample(ts=resampling)
    
    # 1.6. get the resample dots
    num_time_bars = len(group_data) 
    
    ###############################################################
    # STEP 2: General Bars Computation
    
    # Computation 2.1. - Time Bars | VWAP or Mean bar
    if bartype=='time':
        if actv_dfwap==False:
            # 1.1. create mean price for each sample of some 'freq'
            data_pricetime=group_data.mean()
        else:
            # 1.2. create vwap value for each sample of some 'freq'
            data_pricetime=group_data.apply(compute_vwap)
        
        # 2. construction of final DataFrame
        result_dataframe=pd.DataFrame(
            {'time':[str(np_date).partition('T')[2] for np_date 
                     in data_pricetime.ts.values],
             'price_fd':data_pricetime.values},
            index=[str(np_date).partition('T')[0] for np_date 
                   in data_pricetime.ts.values]
        ).iloc[::,::-1]
        result_dataframe.index.name = "date"
        return result_dataframe        
    
    # Computation 2.2. - Tick Bars
    elif bartype=='tick':
        
            # 1. & 2. Modifying the original time series data
            #   and get total of ticks and define num ticks per bar
        total_ticks = new_data_value_drop.shape[-1]
        num_ticks_per_bar = total_ticks/num_time_bars 
        num_ticks_per_bar = round(num_ticks_per_bar, -2)
        
            # 3. Setting a new axis given by the num ticks per bar
        n_coo_ = []
        for i in range(int(new_data_value_drop.shape[0]/
                           num_ticks_per_bar)):
            n_coo_ += [i]*int(num_ticks_per_bar) 
            
            # 4. Setting the new coordinates in order as a list
        n_coo_=n_coo_+[i+1]*(new_data_value_drop.shape[0]-len(n_coo_))  
        
            # 5. Align previous original information 
            #    using new grouped index 'n_coo_'
            #    structure: xarray.DataArray
        
            # 5.1. data price value using 'n_coo_' index
        new_data_value_util = new_data_value_drop.assign_coords(
            ts_new = np.array(n_coo_)
        )
            # 5.2. data volume value using 'n_coo_' index
        new_data_vol_util = new_data_vol_drop.assign_coords(
            ts_new = np.array(n_coo_)
        )
            # 5.3. data time value using 'n_coo_' index
        new_data_time_util = new_data_vol_drop.ts.assign_coords(
            ts_new=np.array(n_coo_)
        )
            # 6. MAIN xarray.Dataset - Putting all together: 
            #    general base XarrayDataset tick/values
            #    It includes previous information (5.1 - 5.3)
        new_data_util = xr.Dataset(
            {"value": new_data_value_util,
             "vol": new_data_vol_util,
             "time": new_data_time_util}
        )
        
            # Conditional OUTPUT
        if actv_dfwap==False:
            # returns the original xarray.Dataset 
            # not for storage; just for print
            # output format: xarray.Dataset
            return new_data_util
        else:
            # returns the vwap computed from new/vol/bar
            # output format: pd.DataFrame
            
            # 'Groupdata' using new/vol/bar coord type 'ts_new=n_coo_' 
            group_data=new_data_util.groupby("ts_new")

            # Results
            #---------------------------------------------------------#
            # RESULT 7.1: 'price' VWAP from new/dol/bar(dataArray)
            price_vwap_tick_bar = group_data.apply(compute_vwap)

            # RESULT 7.2: 'time' price VWAP from new/dol/bar(dataArray)
            dtime_vwap_tick_bar = group_data.first("time").time
            
            result_dataframe=pd.DataFrame(
                {'time':[str(np_date).partition('T')[2] for np_date 
                          in dtime_vwap_tick_bar.values],
                 'price':price_vwap_tick_bar.values},
                index=[str(np_date).partition('T')[0] for np_date 
                          in dtime_vwap_tick_bar.values]
            ).iloc[::,::-1]
            
            result_dataframe.index.name = "date"
            
            return result_dataframe
    
    # Computation 2.3. - Volume Bars
    elif bartype=='volume':
        
            # 1. Compute the volume cumsum among ticks
            #    in order to find the threshold limit of vol/bar        
        data_cm_vol = new_data_vol_drop.values.cumsum()
    
            # 2. Get result of volume cumsum value among ticks
            #    and define the threshold of vol/bar
        total_vol = data_cm_vol[-1] 
        vol_per_bar = total_vol / num_time_bars
        
            # 3. round to the nearest hundred the volume per bar
            #    in order to get a uniform series among different assets        
        vol_per_bar = round(vol_per_bar, -2)

            # 4. calculating new index using volume/bar threshold
        n_coo_ = data_cm_vol//vol_per_bar
        
            # 5. Align previous original information 
            #    using new grouped index 'n_coo_'
            #    structure: xarray.DataArray
        
            # 5.1. data volume cumulative sum value using 'n_coo_' index
        new_data_cmvol_util=new_data_vol_drop.cumsum().assign_coords(
            ts_new=n_coo_
        )       
            # 5.2. data price value using 'n_coo_' index
        new_data_value_util=new_data_value_drop.assign_coords(
            ts_new= n_coo_
        )
            # 5.3. data volume using 'n_coo_' index
        new_data_vol_util=new_data_vol_drop.assign_coords(
            ts_new=n_coo_
        )
            # 5.4. data time value using 'n_coo_' index
        new_data_time_util=new_data_vol_drop.ts.assign_coords(
            ts_new=n_coo_
        )
            # 6. MAIN xarray.Dataset - Putting all together: 
            #    general base XarrayDataset cum/vol/values
            #    It includes previous information (5.1 - 5.4)
        new_data_util = xr.Dataset(
            {"cmvol":new_data_cmvol_util,
             "value":new_data_value_util,
             "vol":new_data_vol_util,
             "time":new_data_time_util}
        )
        
            # 7. Conditional OUTPUT
        if actv_dfwap==False:
            # returns the original xarray.Dataset 
            # not for storage; just for print
            # output format: xarray.Dataset
            return new_data_util
        else:
            # returns the vwap computed from new/vol/bar
            # output format: pd.DataFrame
            
            # COMP1:groupdata using new/vol/bar coord type 'ts_new=n_coo_' 
            group_data=new_data_util.groupby("ts_new")

            # Results
            #---------------------------------------------------------#
            # RESULT 7.1: 'price' VWAP from new/vol/bar(dataArray)
            price_vwap_vol_bar = group_data.apply(compute_vwap)

            # RESULT 7.2: 'time' price VWAP from new/vol/bar(dataArray)
            dtime_vwap_vol_bar = group_data.first("time").time
            
            result_dataframe=pd.DataFrame(
                {'time':[str(np_date).partition('T')[2] for np_date 
                          in dtime_vwap_vol_bar.values],
                 'price':price_vwap_vol_bar.values},
                index=[str(np_date).partition('T')[0] for np_date 
                          in dtime_vwap_vol_bar.values]
            ).iloc[::,::-1]
            result_dataframe.index.name = "date"
            return result_dataframe
        
    #Computation 2.4. - Dollar Bars
    elif bartype=='dollar':
            # Price volatility is the rationale behind dollar bars
            # we do the same process as the "Volume Bars"
            # however, we will use "dollar volume" values
            
            # 1. Computing dollar value using directly xarray.DataArray
        new_data_dollar_value =(new_data_value_drop*
                                new_data_vol_drop)
            
            # 2. Compute the dollar volume value cumsum among ticks
            #    in order to find the threshold limit of dollar per bar
        data_cm_dol = new_data_dollar_value.values.cumsum()   
        total_dol = data_cm_dol[-1] #final cumsum dollar data  
        dol_per_bar = total_dol / num_time_bars

            # 3. round to the nearest hundred the num_dollar_volume per bar
            #    in order to get a uniform series among different assets
        dol_per_bar = round(dol_per_bar, -2)
        
            # 4. calculating new index using dollar amount per bar
        n_coo_ = data_cm_dol//dol_per_bar

            # 5. Align previous original information 
            #    using new grouped index 'n_coo_'
            #    structure: xarray.DataArray
        
            # 5.1. data dollar cumulative sum value using 'n_coo_' index
        new_data_dol_util = new_data_dollar_value.cumsum().assign_coords(
            ts_new= n_coo_
        )
            # 5.2. data price value using 'n_coo_' index
        new_data_value_util = new_data_value_drop.assign_coords(
            ts_new= n_coo_
        )
            # 5.3. data vol value using 'n_coo_' index
        new_data_vol_util = new_data_vol_drop.assign_coords(
            ts_new= n_coo_
        )
            # 5.4. data time value using 'n_coo_' index
        new_data_time_util = new_data_dollar_value.ts.assign_coords(
            ts_new=n_coo_
        )
        
            # 6. MAIN xarray.Dataset - Putting all together: 
            #    general base XarrayDataset cum/dol/values
            #    It includes previous information (5.1 - 5.4)
        new_data_util = xr.Dataset(
            {"cmdol":new_data_dol_util,
             "value":new_data_value_util,
             "vol":new_data_vol_util,
             "time":new_data_time_util}
        )
        
            # 7. Conditional OUTPUT:
        if actv_dfwap==False:
            # returns the original xarray.Dataset 
            # not for storage; just for print
            # output format: xarray.Dataset
            return new_data_util
        else:
            # returns the vwap computed from new/dol/bar
            # output format: pd.DataFrame
            
            # COMP1:groupdata using new/dol/bar coord type 'ts_new=n_coo_' 
            group_data=new_data_util.groupby("ts_new")

            # Results
            #---------------------------------------------------------#
            # RESULT 7.1: 'price' VWAP from new/dol/bar(dataArray)
            price_vwap_dol_bar = group_data.apply(compute_vwap)
            
            # RESULT 7.2: 'time' price VWAP from new/dol/bar(dataArray)
            dtime_vwap_dol_bar = group_data.first("time").time
            
            # Construct the final dataframe
            result_dataframe=pd.DataFrame(
                {'time':[str(np_date).partition('T')[2] for np_date 
                          in dtime_vwap_dol_bar.values],
                 'price': price_vwap_dol_bar.values},
                index= [str(np_date).partition('T')[0] for np_date 
                          in dtime_vwap_dol_bar.values]
            ).iloc[::,::-1]
            result_dataframe.index.name = "date"
            return result_dataframe
    
    # Computation 2.5. - Fractional Derivative Bars    
    elif bartype=='fracdiff':
        # 1. construct the VWAP or series.mean
        if actv_dfwap == False:
            data_pricetime=group_data.mean()
        else:
            data_pricetime=group_data.apply(compute_vwap)
        # 2. get numpy price values from 'VWAP' xarray.DataArray
        data = data_pricetime.values

        # 3. calculating the stationary version of price series
        #    use a windows given as a parameter or default=1
        fracdiff_ = StationaryFracDiff(window=window_fracdiff)
        
        try:
            fracdiff_results = fracdiff_.fit_transform(data)
        except ValueError:
            print("Lack of samples to estimate the fracdiff for {}".format(
            (file.split("//",1)[1])
            )
                 )
            raise
        
        # 4. Drop the 'nan' values. Number of nan's=length of window
        #    First 'n' datapoints should be 'nan' if window = n 
        fracdiff_results=fracdiff_results[~np.isnan(fracdiff_results)]
        
        # 5. Setting the time numpy values series on a variable 
        time_info = data_pricetime.ts.values[1:]
        
        # 6. constructing the result dataframe with time & price-values
        result_dataframe=pd.DataFrame(
            {'time':[str(np_date).partition('T')[2] for np_date 
                     in time_info],
             'price_fd':fracdiff_results},
            index=[str(np_date).partition('T')[0] for np_date 
                   in time_info]
        ).iloc[::,::-1]
        result_dataframe.index.name = "date"
        return result_dataframe
    
    elif bartype=='imbalance':
        # 1. construct dataframe of general information
        data_timeidx = pd.DataFrame(
            {'price':new_data_value_drop.values,
             'volume':new_data_vol_drop.values},
            index=new_data_value_drop.ts)
        
        # 2. estimation of tick direction
        data_timeidx['tickDirection']=data_timeidx.price.diff(
        ).fillna(
            1.0
        ).apply(
            lambda x: 
            1 if x>0 else(
                0 if x==0.0 else -1
            )
        ).replace(
            to_replace=0,
            method='ffill'
        )
        
        # 3. compute the signs into the main dataset
        data_signed_flow = data_timeidx.assign(
            bv = data_timeidx.tickDirection * data_timeidx.volume
        )
        
        # 4. get the mean absolute value of this datasigns
        abs_Ebv_init = np.abs(data_signed_flow['bv'].mean())
        
        # 5. assign predefine max. number ticks to warm up
        if type(ticks_warm_up)==str:
            if ticks_warm_up.lower() == 'auto':
                E_T_init = int(round(data_timeidx.shape[0]*.25,-1))
            else:
                print("Not recognized 'ticks_warm_up' arg. specification")
        elif type(ticks_warm_up)==int or type(ticks_warm_up) ==float:
            if ticks_warm_up > 0:
                E_T_init = int(ticks_warm_up)
            else:
                print("Invalid 'ticks_warm_up' value: only greater than 0")
        else:
            print("Wrong 'ticks_warm_up' argument value. Redefine.")
        
        # 6. getting the ingredients of imbalance bars
        #    see "compute_TS?" function to know more
        Ts, abs_thetas, thresholds, i_s = compute_Ts(
            data_signed_flow.bv,
            E_T_init,
            abs_Ebv_init
        )
        
        # 7. redefine the number of datapoints and some data
        n = data_signed_flow.shape[0]
            #redefine the index iterators
        i_iter = iter(i_s + [n])
            #redefine the current index 
            #this will be useful to iterate over it
        i_cur = i_iter.__next__()
            #get the imbalance var values 
            #empty numpy with zeros
            #we will fill this during the loop
        grpId = np.zeros(n)
        
        # 8. Loop iteration for each datapoint among all the datapoints
        for i in range(1, n):
            #if index <= than current index
            #update the current imbalance bar variable-values
            #with it's the previous value, mostly zero (the same)
            if i <= i_cur:
                grpId[i] = grpId[i-1]
            #otherwise,    
            #update the current imbalance bar variable-values
            #with it's current value, mostly one (a new one)
            else:
                grpId[i] = grpId[i-1] + 1
                #regenerate the iteration
                #in the next bar
                i_cur = i_iter.__next__()
            #this distinction between:
            #'grpId[i-1]' and 'grpId[i-1]+1'
            #it's useful to differentiate values
            #from one bar to another bar
            #thus, values in first bar got zero, 
            #in the second bar one something like 1, 
            #and in the third one get, say, 2, 
            #and so on until the number of bars end.
        
        # 9. define the final data dollarVol imbalance var 
        #    based on the previous described distinction
        #    among the values for different bars
        data_dollar_imb_grp = data_signed_flow.assign(
            grpId = grpId
        )
        
        # 10. fit the final dollarVol imbalance var
        #     to a time-series representation that able to percieve
        #     to humans the time series; this mean, a VWAP - Price
        
        if actv_dfwap == True:
            general_dataframe = bar_imbalance_tick_construction(
                data_dollar_imb_grp.groupby('grpId'),
                type_structure="vwap").vwap
        else:
            general_dataframe = bar_imbalance_tick_construction(
                data_dollar_imb_grp,
                type_structure="mean").mean
            
        # 11. create the last dataframe
        groups = general_dataframe.groupby(general_dataframe)        
        result_dataframe=pd.DataFrame(
                {'time':[str(groups.groups[key][-1]).partition('T')[2] 
                         for key in groups.groups.keys()],
                 'price':[key for key in groups.groups.keys()]},
                index = [str(groups.groups[key][-1]).partition('T')[0]
                        for key in groups.groups.keys()]
            ).iloc[::,::-1]
            
        result_dataframe.index.name = "date"
        return result_dataframe

## Part 3 - Event Classes 

- Class Event
- Class MarketEvent
- Class SignalEvent
- Class OrderEvent
- Class FillEvent

The core idea behind a backtester is to work during an iterative/loop process. This should goes from the first empty Event (the first consecuence from the initial "heart-beat") until the FillEvent. Later, the process should be restarted again.

In [19]:
class Event(object):
    """
    Event is base class providing an interface for all subsequent 
    (inherited) events, that will trigger further events in the 
    trading infrastructure.   
    """
    pass

class MarketEvent(Event):
    """
    Handles the event of receiving a new market update with 
    corresponding bars.
    """

    def __init__(self):
        """
        Initialises the MarketEvent.
        """
        self.type = 'MARKET'
        
class SignalEvent(Event):
    """
    Handles the event of sending a Signal from a Strategy object.
    This is received by a Portfolio object and acted upon.
    """
    
    def __init__(self, symbol, datetime, signal_type,strength):
        """
        Initialises the SignalEvent.

        Parameters:
        -----------
        - symbol: the ticker symbol, e.g. 'GOOG'.
        - datetime: the timestamp at which the signal was generated.
        - signal_type: 'LONG' or 'SHORT'.
        - strength: adjustment factor "suggestion" used to scale values.
                    Useful for pairs strategies and portfolio construction.
        """
        
        self.type = 'SIGNAL'
        self.symbol = symbol
        self.datetime = datetime
        self.signal_type = signal_type
        self.strength = strength
        
class OrderEvent(Event):
    """
    Handles the event of sending an Order to an execution system.
    The order contains a symbol (e.g. GOOG), a type (market or limit),
    quantity and a direction.
    """

    def __init__(self, symbol, order_type, quantity, direction):
        """
        Initialises the order type, setting whether it is
        a Market order ('MKT') or Limit order ('LMT'), has
        a quantity (integral) and its direction ('BUY' or
        'SELL').

        Parameters:
        ------------
        - symbol: The instrument to trade.
        - order_type: 'MKT' or 'LMT' for Market or Limit.
        - quantity: Non-negative integer for quantity.
        - direction: 'BUY' or 'SELL' for long or short.
        """
        
        self.type = 'ORDER'
        self.symbol = symbol
        self.order_type = order_type
        self.quantity = quantity
        self.direction = direction

    def print_order(self):
        """
        Outputs the values within the Order.
        """
        print("Order: Symbol={0},Type={1},Quantity={2},Direction={3}".format(
            self.symbol, self.order_type, self.quantity, self.direction
        )
             )
            
class FillEvent(Event):
    """
    Encapsulates the notion of a Filled Order, as returned
    from a brokerage. Stores the quantity of an instrument
    actually filled and at what price. In addition, stores
    the commission of the trade from the brokerage.
    """

    def __init__(self, timeindex, symbol, exchange, quantity, 
                 direction, fill_cost, commission=None):
        """
        Initialises the FillEvent object. Sets the symbol, exchange,
        quantity, direction, cost of fill and an optional 
        commission.

        If commission is not provided, the Fill object will
        calculate it based on the trade size and Interactive
        Brokers fees.

        Parameters:
        -----------
        - timeindex: The bar-resolution when the order was filled.
        - symbol: The instrument which was filled.
        - exchange: The exchange where the order was filled.
        - quantity: The filled quantity.
        - direction: The direction of fill ('BUY' or 'SELL')
        - fill_cost: The holdings value in dollars.
        - commission: An optional commission sent from IB.
        """
        
        self.type = 'FILL'
        self.timeindex = timeindex
        self.symbol = symbol
        self.exchange = exchange
        self.quantity = quantity
        self.direction = direction
        self.fill_cost = fill_cost

        # Calculate commission
        if commission is None:
            self.commission = self.calculate_ib_commission()
        else:
            self.commission = commission

    def calculate_ib_commission(self):
        """
        Calculates the fees of trading based on an Interactive
        Brokers fee structure for API, in USD.

        This does not include exchange or ECN fees for data.

        Based on "US API Directed Orders":
        www.interactivebrokers.com/en/index.php?f=commission&p=stocks2
        """
        commission_fees = 1.3
        if self.quantity <= 500:
            commission_fees = max(1.3, 0.013 * self.quantity)
        else: 
            commission_fees = max(1.3, 0.008 * self.quantity)
        return commission_fees

## Part 4 - Data Handler 

In [20]:
def Trading_Calendar():
    """
    Funcion que llama los dias calendario de operacion del Nyse
        
    Input: empty
    Output: Lista de datetimes de los dias calendario. 
    EL rango de la ultima acutalizacion es desde 02-01-1990
    hasta 28-04-2021
        
    """
    datesDataset = pd.read_csv("trading_calendar_NYSE.csv",
                               delimiter=";", 
                               header=None)
        
    return datesDataset.apply(lambda x: datetime.strptime(
        x.values[0].split("T")[0],
        '%Y-%m-%d').date()).tolist()


def extract_date_avaible_market(start_, #solo se borro el 'self' | añadir para el cálculo de range dates
                                end_,
                                trd_cal_= Trading_Calendar()):
    """
    Funcion necesaria para ejecutar match dias descarga con NYSE calendar.
        
    Parameters:
    -----------
        - start_: start_date string("Y-M-D")
        - end_: end_date string("Y-M-D")
        - trd_cal_: trading_calendar() function
    Outputs:
    -------
        - Lista de dates válidos para descarga. 
            
    * Nota: si en caso no existe días válidos para descarga, arroja un msj.
    """
    
    startDate=datetime.strptime(start_,'%Y-%m-%d')
    endDate=datetime.strptime(end_,'%Y-%m-%d')
        
    if startDate == endDate:
        list_pre = [startDate.date()]
        idx = []
        date = min(trd_cal_, key=lambda x: abs(x - list_pre[0]))
        if date == list_pre[0]:
                idx= [trd_cal_.index(date)]
                resulted_dates_range = trd_cal_[idx[0]]
                return [result_date_.strftime('%Y-%m-%d') 
                        for result_date_ in [resulted_dates_range]]
        else:
                return ["No Markets Days"]
    else:
        list_pre = [startDate.date(),endDate.date()]
        idx = []
        for date_ in list_pre:
            date = min(trd_cal_, key=lambda x: abs(x - date_))
            idx.append(trd_cal_.index(date))
        resulted_dates_range = trd_cal_[idx[0]:idx[1]+1] 
        return [result_date_.strftime('%Y-%m-%d') 
                for result_date_ in resulted_dates_range]   

In [21]:
class DataHandler(object): 
    """
    DataHandler is an abstract base class providing an interface for
    all subsequent (inherited) data handlers (both live and historic).

    The goal of a (derived) DataHandler object is to output a generated
    set of bars for each symbol requested. 

    This will replicate how a live strategy would function as current
    market data would be sent "down the pipe". Thus a historic and live
    system will be treated equally by the rest of the backtesting suite.
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def get_latest_bars(self, symbol, N=1):
        """
        Returns the last N bars from the latest_symbol list,
        or fewer if less bars are available.
        """
        raise NotImplementedError("Should implement get_latest_bars()")

    @abstractmethod
    def update_bars(self):
        """
        Pushes the latest bar to the latest symbol structure
        for all symbols in the symbol list.
        """
        raise NotImplementedError("Should implement update_bars()")
        

class HistoricNCDataHandler(DataHandler):           
    """
    HistoricNCDataHandler is designed to read NetCDF files for
    each requested symbol from disk and provide an interface
    to obtain the "latest" bar in a manner identical to a live
    trading interface. 
    """

    def __init__(self, events, 
                 nc_dir, symbol_list,
                 bartype,step,
                 init,last,
                 start_date,end_date,
                 need="backtest"):
        """
        Initialises the historic data handler by requesting
        the location of the NC files and a list of symbols.
        
        IMPORTANT:
        It will be assumed that all files are of the form
        'symbol.nc', where 'symbol' is a string in the list.

        Parameters:
        -----------
        -  events : The Event Queue.
        -  nc_dir : Absolute directory path where NC files are stored.
        -  symbol_list: A list of symbol strings
        
        ################ from our version ############################
        
        -  need: requirement for the code-structure. 
                 Could be 'backtest' for backtesting.
                 Could be 'live' for live-trading.
        - bartype: type of bar in which the backtesting will work.
        - step: heart-beat timestamp.
        - init: first timestamp period for bar (computed in milisecond)
        - last: last timestamp period for bar (computed in milisecond)
        
        Output:
        --------
        Updated Bar Dataset (pd.DataFrame)
        
        """
        self.events = events
        self.nc_dir = nc_dir
        self.symbol_list = symbol_list
    
        self.symbol_data = {}
        self.latest_symbol_data = {}
        
        self.start_date = start_date
        self.end_date = end_date
        
        
        for symbol in self.symbol_list:
            # IMPORTANT: This is a refresh process. 
            # This process erase the last bar information
            # in order to update accordingly with new information
            # based on each 'step' or  'heart-beat'.
            self.latest_symbol_data[symbol] = [] 

        self.continue_backtest = True
        
        self.need=need
        self.step=step
        self.bartype=bartype
        #self.heartbeats=range(init,last,step)                               ###HERE | ITERATION RANGE
        
        iterationListTpls = list(itertools.product(
            extract_date_avaible_market(self.start_date,
                                        self.end_date),
            [init,last])
                                )
        
        iterableCoordInfo = [
            [dt_coord[0] + " " + dt_coord[1]
             for dt_coord in [iterationListTpls[limit:limit+2]][0]] 
            for limit in range(0, len(iterationListTpls), 2)
        ]
        
        for coordInfo in iterableCoordInfo:
            #coordInfo[0] #date (range: start_date - end_date) + init
            #coordInfo[1] #date (range: start_date - end_date) + last
            
            _initialization = datetime.timestamp(
                parser.parse(coordInfo[0])
            )
            _finalization = datetime.timestamp(
                parser.parse(coordInfo[1])
            )
            
            heartbeats = range(int(_initialization),
                               int(_finalization),step)
            
        
            if self.need == "backtest":
                for heartbeat in heartbeats:
                    self.open_convert_data_files(nc_dir,
                                                 bartype, 
                                                 heartbeat,
                                                 self.step,
                                                 self.start_date,
                                                 self.end_date)
                    self.update_bars()
            
            elif self.need == "live":
                pass
            else:
                print("Something wrong during 'need' argue.")
                sys.exit()
            
        
        #if self.need == "backtest":
        #    for i in self.heartbeats:
                # On each 'heart-beat' occurs both process:
                # open and convert data files to update data constantly
        #        self.open_convert_data_files(nc_dir,
        #                                     bartype, 
        #                                     i,
        #                                     self.step,
        #                                     self.start_date,
        #                                     self.end_date)
        #        self.update_bars()

        #elif self.need == "live":
        #    pass
        # Still in development
            
        
    def open_convert_data_files(self,nc_dir,
                                bartype,i,step,
                                start_date,end_date):
        """
        Opens the NC files from the path directory, converting
        them into pandas DataFrames within a symbol dictionary.
        """
        comb_index = None
        
        for s in self.symbol_list:
            # Define the nc.file name structure
            nc=self.nc_dir+"/"+s+".zarr"                
            # Open the NetCDF file 
            df = get_bars_cdf_bt(nc,bartype,i,step,
                                 start_date,end_date,
                                 freq=5)
            # Create a column with the daily value
            df['date']=pd.to_datetime(df.index).strftime("%m-%d-%Y ")
            # Add the time value (HR.Value)
            df['date']=pd.to_datetime(df.date.add(df.time))
            # Define the 'Date' TimeColumn as Index
            df=df.set_index('date')
            # Generates a dataset using DateTimeIndex and Prices values
            df.drop(labels='time',axis=1,inplace=True)
            # Result - DataFrame
            self.symbol_data[s]=df
            
            
            # Combine Indexes Values
            # This process allows to update the index coordinates
            # that reflects the timestamp base of our series
            # as long as new 'steps' or 'heart-beats' occurs.
            # So, new 'steps' or 'heart-beats' indicates
            # new timeindexes-data to join to the previous sub-dataset
            
            
            # If there is no previous Index Values
            if comb_index is None:
                # Get the First Index Value 
                comb_index = self.symbol_data[s].index
                
            # If there is a previous Index Value
            else:
                # Join the Previous Index Value with the new one
                comb_index.union(self.symbol_data[s].index)
    
            # Set the latest symbol_data to None
            # self.latest_symbol_data[s] = []
    
        # Reindex the dataframes
        for s in self.symbol_list:
            # Reset the original index of our result dataframe
            self.symbol_data[s].reset_index(inplace=True)
            
            # Reassign the index in our empty-index result dataframe
            # Here we use the 'comb_index' variable stored.
            # If 'comb_index'=empty, basically the same index is assigned.
            self.symbol_data[s].reindex(index=comb_index)
            
            # We define an 'iterrorws()' statement 
            # to return each row as in an iterative process
            # to compute any future requirements.
            self.symbol_data[s] = self.symbol_data[s].iterrows()

            
    def _start_web_socket():
        """
        Function useful for live-trading only.
        """
        return
    
    
    def get_latest_bars(self, symbol, N=1):
            """
            Returns the last N bars from the latest_symbol list,
            or N-k if less available.
            """
            try:
                bars_list = self.latest_symbol_data[symbol]
            except KeyError:
                print ("\
                Symbol not available in the historical data set")
                return ("Process Stopped")
            else:
                return bars_list[-N:]    
    
    def get_latest_bar_datetime(self, symbol):
            """
            Returns a Python datetime object for the last bar.
            """
            try:
                bars_list = self.latest_symbol_data[symbol]
            except KeyError:
                print("\
                Symbol not available in the historical data set.")
                raise
            else:
                return bars_list[-1][0]
            
    def get_latest_bar_value(self, symbol):
            """
            Returns the last value from the pandas Bar series object.
            """
            try:
                bars_list = self.latest_symbol_data[symbol]
            except KeyError:
                    print("\
                    Symbol not available in the historical data set.")
                    raise
            else:
                return bars_list[-1][1].values[1]
            
    def get_latest_bars_values(self, symbol, N=1):
            """
            Returns the last N bar values from the
            latest_symbol list, or N-k if less available.
            """
            try:
                bars_list = self.get_latest_bars(symbol, N)
            except KeyError:
                print("\
                Symbol not available in the historical data set.")
                raise
            else:
                return np.array([bar[1] for bar in bars_list])
            
    # The original "update_bars(self)" method was changed. 
    # Here, we use a 'next()' statement to loop over the 'data' iterator.
    # This avoid us to make another for-loop inside the 'try' statement.
    # Using that reference, we'll fill the "bar" variable.
    # If this information is None (which means no more data available),
    # we "StopIteration" reassigning the "continue_backtest" as False.
    # Remember that this "self.continue_backtest" was True originally.
    # Otherwise, if there is some data in our "bar" variable,
    # we will append that information as a "last_symbol_data"
    # Wit this new information added, we set our event as MarketEvent().
    # This ensures the code that we want to start the whole iteration
    # that involves goes from MarketEvetn() data to FillEvent() data. 
    
    def update_bars(self):
        """
        Pushes the latest bar to the latest_symbol_data structure
        for all symbols in the symbol list.
        """
        for s in self.symbol_list:
            try:
                # Assign on each row the values of 'latest_symbol_data'
                # as a constant updating process.
                bar = [s,next(self.symbol_data[s])[1]]
            except StopIteration:
                self.continue_backtest = False
            else:
                if bar is not None:
                    self.latest_symbol_data[s].append(bar)
        #Assign the whole iterative process as MarketEvent()
        self.events.put(MarketEvent())

## Part 5 - Strategy Computation

In [22]:
## General Strategy Interface Class
class Strategy(object):
    """
    Strategy is an abstract base class providing an interface for
    all subsequent (inherited) strategy handling objects.

    The goal of a (derived) Strategy object is to generate Signal
    objects for particular symbols based on the inputs of Bars 
    (OLHCVI) generated by a DataHandler object.

    This is designed to work both with historic and live data as
    the Strategy object is agnostic to the data source,
    since it obtains the bar tuples from a queue object.
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def calculate_signals(self):
        """
        Provides the mechanisms to calculate the list of signals.
        """
        raise NotImplementedError("Should implement calculate_signals()")

In [23]:
# Personalized Strategy Computation
#-----------------------------------------------------#
# Here we should code more options to store a general trading signal. 
# It must work for any or mostly algorithmic computation we would make.

# First, the class "BuyAndHold" should allows to define a global strategy
# for any different investment approaches, no matter time-frameworks.
# It implies not only long-buy-only approach, but long-short|long-only too.
# This, offcourse, implies a "selling" part currently inexistance 
# and many other posibilities as a general interface.

# The "calculate initial bought" class probably will be still the same.
# The unique changes probably are about adding a "sell" statement.
# This should represent the "sell" part of any "buy" transaction.
# As you know, this should be defined as null or False at the beginning.

# The class "Calculate Signal" is the ones will change mostly everytime.
# This is because it contains the "algoritmic" logic of our strategy.
# We have to define this everytime when try to compute an strategy.
# This part will contain the ML or GIM (General Investment Model) to use.
# Moreover, it should keep the "signal" boolean TRUE/FALSE by asset.
# This boolean should be defined during the iteration every single time.

class BuyAndHoldStrategy(Strategy):
    """
    This is an extremely simple strategy that goes LONG all of the 
    symbols as soon as a bar is received. It will never exit a position.

    It is primarily used as a testing mechanism for the Strategy class
    as well as a benchmark upon which to compare other strategies.
    """

    def __init__(self, bars, events):
        """
        Initialises the buy and hold strategy.

        Parameters:
        -----------
        - bars: The DataHandler object that provides bar information
        - events: The Event Queue object.
        """
        self.bars = bars
        self.symbol_list = self.bars.symbol_list
        self.events = events

        # Once buy & hold signal is given, these are set to True
        self.bought = self._calculate_initial_bought()
        
# strategy.py

    def _calculate_initial_bought(self):
        """
        Adds keys to the bought dictionary for all symbols
        and sets them to False.
        """
        bought = {}
        for s in self.symbol_list:
            bought[s] = False
        return bought
    
# strategy.py

    def calculate_signals(self, event):
        """
        For "Buy and Hold" we generate a single signal per symbol
        and then no additional signals. This means we are 
        constantly long the market from the date of strategy
        initialisation.

        Parameters:
        ----------
        - Event: A MarketEvent object. 
        """
        strength = 1.0
        if event.type == 'MARKET':
            for s in self.symbol_list:
                bars = self.bars.get_latest_bars(s, N=1) #definition of N bars
                if bars is not None and bars != []:
                    if self.bought[s] == False:
                        # Inputs (to SignalEvent):
                        # 1. Symbol ("bars[0][1]")
                        # 2. Datetime ("bars[0][1]")
                        # 3. Type (LONG, SHORT or EXIT as 'str')
                        # 4. Strength (scale value predefined)
                        signal = SignalEvent(bars[0][0], 
                                             bars[0][1], 
                                             'LONG', 
                                             strength)
                        self.events.put(signal)
                        self.bought[s] = True

In [24]:
def create_sharpe_ratio(returns, periods=252):
    """
    Create the Sharpe ratio for the strategy, based on a
    benchmark of zero (i.e. no risk-free rate information).
    
    Parameters:
    -----------
    
    - returns: A pandas Series representing period percentage returns.
    - periods: Daily (252), Hourly (252*6.5), Minutely(252*6.5*60) etc.
    
    Output:
    ------
    - Anually Sharpe Ratio in Numpy.
    """

    return np.sqrt(periods) * (np.mean(returns)) / np.std(returns)

def create_drawdowns(equity_curve):
    """
    Calculate the largest peak-to-trough drawdown of the equity curve
    as well as the duration of the drawdown. Requires that the
    equity_returns is a pandas Series.
    
    Parameters:
    
    - equity_curve: pandas series of returns.
    
    Output:
    -------
    - drawdown (pd.Series).
    """
    # 1) filling NaN values from portfolio returns data
    #    reduce to cero all positive values with 'clip'
    data = equity_curve.dropna().clip(upper=0)

    
    # 2) get the max. values accumulated 
    running_max = np.maximum.accumulate(data.values)

    
    running_max[running_max < 1] = 1

    
    # 3) calculate drawdown 
    drawdown = (data.values)/running_max - 1

    return  pd.Series(data,
                      index=data.index).rename("drawdown")

## Part 6 - Portfolio Classes

In [25]:
class Portfolio(object):
    """
    The Portfolio class handles the positions and market
    value of all instruments at a resolution of a "bar",
    i.e. secondly, minutely, 5-min, 30-min, 60 min or EOD.
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def update_signal(self, event):
        """
        Acts on a SignalEvent to generate new orders 
        based on the portfolio logic.
        """
        raise NotImplementedError("Should implement update_signal()")

    @abstractmethod
    def update_fill(self, event):
        """
        Updates the portfolio current positions and holdings 
        from a FillEvent.
        """
        raise NotImplementedError("Should implement update_fill()")


class NaivePortfolio(Portfolio):
    """
    The NaivePortfolio object is designed to send orders to
    a brokerage object with a constant quantity size blindly,
    i.e. without any risk management or position sizing. It is
    used to test simpler strategies such as BuyAndHoldStrategy.
    """
    
    def __init__(self, 
                 bars, 
                 events, 
                 start_date, 
                 initial_capital=100000.0):
        """
        Initialises the portfolio with bars and an event queue. 
        Also includes a starting datetime index and initial capital 
        (USD unless otherwise stated).

        Parameters:
        -----------
        - bars: The DataHandler object with current market data.
        - events: The Event Queue object.
        - start_date: The start date (bar) of the portfolio.
        - initial_capital: The starting capital in USD.
        """
        self.bars = bars
        self.events = events
        self.symbol_list = self.bars.symbol_list
        self.start_date = start_date
        self.initial_capital = initial_capital
        
        self.all_positions = self.construct_all_positions()
        self.current_positions={symbol: 0.0 for symbol in self.symbol_list}

        self.all_holdings = self.construct_all_holdings()
        self.current_holdings = self.construct_current_holdings()

    def construct_all_positions(self):
        """
        Constructs the positions list using the start_date
        to determine when the time index will begin.
        """
        d = {symbol: 0.0 for symbol in self.symbol_list}
        
        d['datetime'] = self.start_date #datetime
        return [d]

    def construct_all_holdings(self):
        """
        Constructs the holdings list using the start_date
        to determine when the time index will begin.
        """
        d = {symbol: 0.0 for symbol in self.symbol_list}
        
        d['datetime'] = self.start_date #datetime
        d['cash'] = self.initial_capital
        d['commission'] = 0.0
        d['total'] = self.initial_capital
        
        return [d]

    def construct_current_holdings(self):
        """
        Constructs the dictionary which will hold the instantaneous
        value of the portfolio across all symbols.
        """
        d = {symbol: 0.0 for symbol in self.symbol_list}
        
        d['cash'] = self.initial_capital
        d['commission'] = 0.0
        d['total'] = self.initial_capital
        return d

    def update_timeindex(self, event):
        """
        Adds a new record to the positions matrix for the current 
        market data bar. This reflects the PREVIOUS bar, i.e. all
        current market data at this stage is known.

        Makes use of a MarketEvent from the events queue.
        """
        bars = {}
        for sym in self.symbol_list:
            bars[sym] = self.bars.get_latest_bars(sym, N=1)

        # Update positions
        dp = {symbol: 0.0 for symbol in self.symbol_list}

        dp['datetime'] = bars[self.symbol_list[0]][0][1]

        for s in self.symbol_list:
            dp[s] = self.current_positions[s]

        # Append the current positions
        self.all_positions.append(dp)

        # Update portfolio holdings
        dh = {symbol: 0.0 for symbol in self.symbol_list}
        
        dh['datetime'] = bars[self.symbol_list[0]][0][1].date
        dh['cash'] = self.current_holdings['cash']
        dh['commission'] = self.current_holdings['commission']
        dh['total'] = self.current_holdings['cash']

        for s in self.symbol_list:
            # Approximation to the real value
            market_value = (
                self.current_positions[s]*bars[s][0][1].values[1]
            )
            dh[s] = market_value
            dh['total'] += market_value

        # Append the current holdings
        self.all_holdings.append(dh)
        

    def update_positions_from_fill(self, fill):
        """
        Takes a FilltEvent object and updates the position matrix
        to reflect the new position.
        
        This represents an updating for some positions.

        Parameters:
        -----------
        - fill: the FillEvent object to update the positions with.
        """
        # Check whether the fill order direction is a buy or sell
        fill_dir = 0
        if fill.direction == 'BUY':
            fill_dir = 1
        if fill.direction == 'SELL':
            fill_dir = -1

        # Update positions list with new quantities
        self.current_positions[fill.symbol] += fill_dir*fill.quantity

    def update_holdings_from_fill(self, fill): 
        """
        Takes a FillEvent object and updates the holdings matrix
        to reflect the holdings value.
        
        This represents an updating of the entire portfolio.

        Parameters:
        -----------
        - fill: the FillEvent object to update the holdings with.
        """
        # Check whether the fill is a buy or sell
        fill_dir = 0
        if fill.direction == 'BUY':
            fill_dir = 1
        if fill.direction == 'SELL':
            fill_dir = -1

        # Update holdings list with new investment quantities
        fill_cost = self.bars.get_latest_bars(fill.symbol)[0][1].values[1] 
        cost = fill_dir * fill_cost * fill.quantity
        self.current_holdings[fill.symbol] += cost
        self.current_holdings['commission'] += fill.commission
        self.current_holdings['cash'] -= (cost + fill.commission)
        self.current_holdings['total'] -= (cost + fill.commission)
    
    #summarize the update positions and holdings
    def update_fill(self, event): 
        """
        Updates the portfolio current positions and holdings 
        from a FillEvent.
        """
        if event.type == 'FILL':
            self.update_positions_from_fill(event)
            self.update_holdings_from_fill(event)

    def generate_naive_order(self, signal):
        """
        Simply transacts an OrderEvent object as a constant quantity
        sizing of the signal object, without risk management or
        position sizing considerations.

        Parameters:
        -----------
        - signal: The SignalEvent signal information.
        """
        order = None
        
        symbol = signal.symbol
        direction = signal.signal_type
        strength = signal.strength
        
        #market quantity  - should be dynamic
        mkt_quantity = floor(100 * strength)
        cur_quantity = self.current_positions[symbol]
        order_type = 'MKT'

        if direction == 'LONG' and cur_quantity == 0:
            order = OrderEvent(
                symbol, order_type, mkt_quantity, 'BUY'
            )
            
        if direction == 'SHORT' and cur_quantity == 0:
            order = OrderEvent(
                symbol, order_type, mkt_quantity, 'SELL'
            )   
    
        if direction == 'EXIT' and cur_quantity > 0:
            order = OrderEvent(
                symbol, order_type, abs(cur_quantity), 'SELL'
            )
            
        if direction == 'EXIT' and cur_quantity < 0:
            order = OrderEvent(
                symbol, order_type, abs(cur_quantity), 'BUY'
            ) 
        return order

    def update_signal(self, event):
        """
        Acts on a SignalEvent to generate new orders 
        based on the portfolio logic.
        """
        if event.type == 'SIGNAL':
            order_event = self.generate_naive_order(event)
            self.events.put(order_event)
            
    def create_equity_curve_dataframe(self):
        """
        Creates a pandas DataFrame from the all_holdings
        list of dictionaries.
        """
        curve = pd.DataFrame(self.all_holdings)

        curve.set_index('datetime', inplace=True)
        curve['returns'] = curve['total'].pct_change()
        curve['equity_curve'] = (1.0+curve['returns']).cumprod()
        self.equity_curve = curve


    def output_summary_stats(self):
        """
        Creates a list of summary statistics for the portfolio.
        
        The main computed variables are:
        
        - 'total_return': represents the total return of the portfolio. 
        - 'returns': represents the returns obtained during the backtest.
        - 'pnl': represents the profit & losses obtained during the backtest.
        
        All of them are pd.DataFrame.
        
        Output:
        
        tuple: (total_return,returns,pnl) 
        """
        total_return = self.equity_curve["equity_curve"]
        returns = self.equity_curve["returns"]
        pnl = self.equity_curve["equity_curve"]
        
        sharpe_ratio = create_sharpe_ratio(returns)
        drawdown = create_drawdowns(returns)
        
        stats = pd.concat([total_return.iloc[1:], 
                           returns.iloc[1:], 
                           drawdown], 
                      axis=1)
        return stats, sharpe_ratio

## Part 7 - Execution Classes

In [26]:
class ExecutionHandler(object):
    """
    The ExecutionHandler abstract class handles the interaction
    between a set of order objects generated by a Portfolio and
    the ultimate set of Fill objects that actually occur in the
    market. 

    The handlers can be used to subclass simulated brokerages
    or live brokerages, with identical interfaces. This allows
    strategies to be backtested in a very similar manner to the
    live trading engine.
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def execute_order(self, event):
        """
        Takes an Order event and executes it, producing
        a Fill event that gets placed onto the Events queue.

        Parameters:
        -----------
        - event: contains an Event object with order information.
        
        Output:
        -------
        - Implement/NotImplement Error Statement
        """
        raise NotImplementedError("Should implement execute_order()")

class SimulatedExecutionHandler(ExecutionHandler):
    """
    The simulated execution handler simply converts all order
    objects into their equivalent fill objects automatically
    without latency, slippage or fill-ratio issues.

    This allows a straightforward "first go" test of any strategy,
    before implementation with a more sophisticated execution
    handler.
    """
    
    def __init__(self, events):
        """
        Initialises the handler, setting the event queues
        up internally.

        Parameters:
        -----------
        - events: The Queue of Event objects.
        """
        self.events = events

    def execute_order(self, event):
        """
        Simply converts Order objects into Fill objects naively,
        i.e. without any latency, slippage or fill ratio problems.

        Parameters:
        -----------
        - event: contains an Event object with order information.
        """
        if event.type == 'ORDER':
            # The "FillEvent" use 'ARCA' as purely name reference.
            fill_event = FillEvent(datetime.utcnow(),
                                   event.symbol,
                                   'ARCA', 
                                   event.quantity, 
                                   event.direction, 
                                   None)
            
            self.events.put(fill_event)

****

***

In [27]:
import bokeh
from bokeh.plotting import figure, output_file, show
from bokeh.models import LinearAxis, Range1d, Band, HoverTool
from bokeh.models import Line, Span
from bokeh.layouts import gridplot

import bokeh.plotting.figure as bk_figure
from bokeh.io import curdoc, show
from bokeh.layouts import row, widgetbox
from bokeh.models import ColumnDataSource
from bokeh.models.widgets import Slider, TextInput
from bokeh.io import output_notebook 
import numpy as np

from bokeh.application import Application
from bokeh.application.handlers import FunctionHandler

In [28]:
def plot_final_diagram(stats_dataframe, initial_capital):
    """
    Function that plots a summary about stats information.
    
    Parameters:
    -----------
    - stats_dataframe: pd.DataFrame that contains performances info.
    - initial_capita: float/int with predefined initial capital value
    
    Output:
    -------
    - Interactive plot of performance.
    
    Note:
    ----
    'Bokeh' visualization package was used. 
    Please, check: https://docs.bokeh.org/en/latest/index.html
    """
    
    #define main variables from stats_dataframe
    x = stats_dataframe.index.values
    
    y1 = stats_dataframe.equity_curve
    y2 = stats_dataframe.returns * 100
    y3 = stats_dataframe.drawdown * 100
    
    portfolio_value = y1 * initial_capital 
    
    #return results in the notebook
    output_notebook()
    
    #general source of information
    source = ColumnDataSource(
        data=dict(
            x=x,
            y1=y1,
            y2=y2,
            y3=y3,
            money=portfolio_value)
    )
    
    #constructing plot 1: portfolio equity curve line plot 
    plot1 = figure(plot_height=300,
                   plot_width=800,
                   y_axis_label = 'Cumulative Returns',
                   title='Portfolio Curve',
                   x_axis_type='datetime',
                   background_fill_color = None) 
    
    plot_ = plot1.line('x', 'y1', 
                       source=source, 
                       legend= 'backtest',
                       line_color= 'green', 
                       line_width=2, 
                       line_alpha=1)
    
    plot1.add_tools(HoverTool(renderers=[plot_], 
                              tooltips=[('Cash','$@money{0.000 a}')],
                              mode='vline'))
    
    plot1.legend.location = "bottom_left"
    
    daylight_savings_start = Span(location=1,
                              dimension='width', line_color='black', 
                              line_dash='dashed', line_width=1)
    
    plot1.add_layout(daylight_savings_start)
    
    plot1.title.text_font = 'arial' 
    plot1.axis.axis_label_text_font = 'arial'
    plot1.axis.axis_label_text_font_style = None
    plot1.axis.major_label_text_font_size = '9.5pt'
    plot1.title.text_font_size = '10pt'
    
    plot1.xgrid.grid_line_color = '#E4E4E4' 
    plot1.xgrid.grid_line_alpha = 0.7
    
    plot1.ygrid.grid_line_color = '#E4E4E4' 
    plot1.ygrid.grid_line_alpha = 0.7

    #constructing plot 2: returns by each heartbeat 
    plot2 = figure(plot_height=300, 
                   plot_width=800, 
                   x_range = plot1.x_range, 
                   y_axis_label = 'Returns (%)',
                   title='Heartbeat Returns (%)',
                   x_axis_type='datetime',
                   background_fill_color = None) 
    
    plot_2 = plot2.line('x','y2',
                        source=source,
                        line_width=0,
                        line_alpha=0)
    plot2.add_tools(HoverTool(renderers=[plot_2], 
                              tooltips=[('Returns','@y2{0.000 a}%')],
                              mode='vline'))

    plot2.vbar(x=stats_dataframe[stats_dataframe.returns>0].index.values,
               top=y2[y2>0], width = 4.25, 
               line_dash = 'solid',line_alpha = 0.9,
               line_width = 4.25, color='#29399F') 
    plot2.vbar(x=stats_dataframe[stats_dataframe.returns<0].index.values, 
               top=y2[y2<0],width = 4.25,
               line_dash = 'solid',line_alpha = 0.9,
               line_width = 4.25, color='#AC2E3B')
    
    plot2.xgrid.grid_line_color = None
    
    plot2.title.text_font = 'arial'
    plot2.axis.axis_label_text_font = 'arial'
    plot2.axis.axis_label_text_font_style = None
    plot2.axis.major_label_text_font_size = '9.5pt'
    plot2.title.text_font_size = '10pt'
    
    plot2.xgrid.grid_line_color = '#E4E4E4' 
    plot2.xgrid.grid_line_alpha = 0.7
    
    plot2.ygrid.grid_line_color = '#E4E4E4'
    plot2.ygrid.grid_line_alpha = 0.7
    
    #constructing plot 3: drawdowns curve by each heartbeat 
    plot3 = figure(plot_height=300, 
                   plot_width=800, 
                   x_range = plot1.x_range, 
                   x_axis_label = 'Datetime',
                   y_axis_label = 'Drawdowns (%)',
                   title='Heartbeat Drawdowns (%)',
                   x_axis_type='datetime', 
                   background_fill_color = None) 
    
    plot3.line('x', 'y3',source=source,
              line_color = '#990000', 
              line_width=3, 
              line_alpha=0.1)
    
    plot3.patch('x', 'y3',source=source, color='#D24457')
    
    plot3.title.text_font = 'arial'
    plot3.axis.axis_label_text_font = 'arial'
    plot3.axis.axis_label_text_font_style = None
    plot3.axis.major_label_text_font_size = '9.5pt'
    plot3.title.text_font_size = '10pt'
    
    plot3.xgrid.grid_line_color = '#E4E4E4' 
    plot3.xgrid.grid_line_alpha = 0.7
    
    plot3.ygrid.grid_line_color = '#E4E4E4' 
    plot3.ygrid.grid_line_alpha = 0.7
    
    plot3.add_tools(HoverTool(tooltips=[('Drawdown','@y3{0.000 a}%')],
                              mode='vline'))
    
    #linked all the plots 
    general_chart = gridplot([[plot1],
                              [plot2],
                              [plot3]], 
                             toolbar_location='right', title="ra")
    
    #show plots as result
    show(general_chart)

In [29]:
class Backtest(object):
    """
    Enscapsulates the settings and components for carrying out
    an event-driven backtest.
    """
    def __init__(self, 
                 data_dir, 
                 symbol_list, 
                 bartype,
                 step_heartbeat,
                 initialization_step,
                 finalization_step,
                 requirement,
                 start_date,
                 end_date,
                 initial_capital,
                 data_handler, #HistoricNCDataHandler
                 execution_handler,#SimulatedExecutionHandler
                 portfolio, #NaivePortfolio
                 strategy #BuyAndHoldStrategy
                ):
        """
        Initialises the backtest iteration loop.
        
        Parameters:
        -----------
        
        - data_dir (str): the hard root to the data directory.
        - symbol_list (list of str): the list of symbol strings.
        - intial_capital (float): the starting capital for the portfolio.
        - heartbeat (int/float): backtest "heartbeat" in seconds.
        - start_date (datetime): the start datetime of the strategy.
        - data_handler (Class): market data feed HandleProces.
        - execution_handler (Class): orders/fills for trades HandlesProces.
        - portfolio (Class): tracker of portfolio current/prior positions.
        - strategy (Class): Generates signals based on market data.
        
        Output:
        --------
        
        Depends on callable method.
        
        Normally:
        
        'smiulate_trading()'.
        
        Includes:
        - run_backtest
        - out_performance
        
        """

        self.events = queue.Queue()
        
        self.data_dir = data_dir
        self.symbol_list = symbol_list
        self.bart_type = bartype
        self.heartbeat = step_heartbeat 
        self.init = initialization_step
        self.last = finalization_step
        self.need = requirement
        
        self.start_date = start_date 
        self.end_date = end_date
        
        self.initial_capital = initial_capital

        self.data_handler_cls = data_handler
        self.execution_handler_cls = execution_handler
        self.portfolio_cls = portfolio
        self.strategy_cls = strategy

        self.signals = 0
        self.orders = 0
        self.fills = 0
        self.num_strats = 1

        self._generate_trading_instances()


    def _generate_trading_instances(self):
        """
        Generates the trading instance objects from
        their class types.
        """
        warnings.filterwarnings("ignore")
        
        print("1. Creating DataHandler")
        
        self.data_handler = self.data_handler_cls(
            self.events,
            self.data_dir,
            self.symbol_list,
            self.bart_type,
            self.heartbeat,
            self.init,
            self.last,
            self.start_date,
            self.end_date,
            self.need
        )
        
        print("2. Computing Strategy")
        
        self.strategy = self.strategy_cls(
            self.data_handler, 
            self.events
        )
        
        print("3. Constructing Portfolio")
        
        self.portfolio = self.portfolio_cls(
            self.data_handler, 
            self.events, 
            self.start_date, 
            self.initial_capital
        )
        
        print("4. Executing Handler Process")
        
        self.execution_handler = self.execution_handler_cls(
            self.events
        )
        
        
    def _run_backtest(self):
        """
        Executes the backtest thru an iteration process.
        """
        i = 0
        while True:
            i += 1
            
            # 1) Updating the market bars
            if self.data_handler.continue_backtest == True:
                self.data_handler.update_bars()
            else:
                break

            # 2) Handle Events Category
            #    This means in which step of iteration we are.
            #    Check "event" possible types.
            #    Types: 
            #          'None': StopIteration
            #          'Market': beginning-part (datahandler part)
            #          'SignalEvent': first-half (strategy part)
            #          'OrderEvent': second-half (portfolio part)
            #          'FillEvent': final-part (execution part)
            
            while True:
                try:
                    event = self.events.get(False)
                except queue.Empty:
                    break
                else:
                    if event is not None:
                        if isinstance(event, MarketEvent):
                            self.strategy.calculate_signals(event)
                            self.portfolio.update_timeindex(event)

                        elif isinstance(event, SignalEvent):
                            self.signals += 1
                            self.portfolio.update_signal(event)

                        elif isinstance(event, OrderEvent):
                            self.orders += 1
                            self.execution_handler.execute_order(event)

                        elif isinstance(event, FillEvent):
                            self.fills += 1
                            self.portfolio.update_fill(event)

            #time.sleep(1) #self.heartbeat

    def _output_performance(self):
        """
        Outputs the strategy performance from the backtest.
        """
        self.portfolio.create_equity_curve_dataframe()
        
        print("\nCreating summary stats...")
        stats = self.portfolio.output_summary_stats()

        print("Creating equity curve...\n")
        #print(self.portfolio.equity_curve.tail(10))
        
        print("FINAL REPORT")
        print("-"*45)
        
        print("Annualized Sharpe Ratio:{}\n".format(round(stats[1],4)))
        print("Signals: %s" % self.signals)
        print("Orders: %s" % self.orders)
        print("Fills: %s" % self.fills)
        
        
        plot_final_diagram(stats[0], self.initial_capital)
        
    def simulate_trading(self):
        """
        Simulates the backtest and outputs portfolio performance.
        """
        self._run_backtest()
        self._output_performance()

## Example

In [30]:
nc_dir='D:\\data_zarr\\'
symbol_list=['A']
need="backtest"
bartype="volume"


init = '09:30:00'
last = '16:00:00'

step = 100000

event=queue.Queue()

start_date = "2019-01-08"
end_date = "2019-01-08"
initial_capital = 10000

In [31]:
backtest_req = Backtest(data_dir=nc_dir, 
         symbol_list=symbol_list, 
         bartype=bartype,
         step_heartbeat=step,
         initialization_step=init,
         finalization_step=last,
         requirement=need,
         start_date=start_date,
         end_date = end_date,
         initial_capital=initial_capital,
         data_handler=HistoricNCDataHandler, #HistoricNCDataHandler
         execution_handler=SimulatedExecutionHandler,#SimulatedExecutionHandler
         portfolio=NaivePortfolio, #NaivePortfolio
         strategy=BuyAndHoldStrategy)

1. Creating DataHandler
2. Computing Strategy
3. Constructing Portfolio
4. Executing Handler Process


CONTINUAR DONDE ESTÁ 

PENDIENTE: 
- Reemplazar zarr files con el data_extract_backup (ts: timestamp y no int restado).
- Cambiar la computación del datetime | Reemplazar la función 'ts_to_datetime' por un lambda datetime.fromtimestamp() para todos los 'ts
- Cambiar el parámetro de entrada de Init y Step por horas string (preguntar a Heli por el 'latido)


In [32]:
backtest_req.simulate_trading()


Creating summary stats...
Creating equity curve...

FINAL REPORT
---------------------------------------------
Annualized Sharpe Ratio:-0.0031

Signals: 1
Orders: 1
Fills: 1
