In [1]:
import pandas_ta as ta
import requests
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
plt.style.use('seaborn-v0_8-dark')
import warnings
warnings.filterwarnings("ignore")

In [2]:
%run backtest_functions.ipynb

In [3]:
def fetch_bitstamp_data(symbol, start, end, timeframe, limit=1000):
    url = f"https://www.bitstamp.net/api/v2/ohlc/{symbol}/"
    data_frames = []
    
    while start < end:
        # Ajustar end para la solicitud actual para no exceder el límite de 1000 registros
        current_end = start + (timeframe * limit)
        # Debug
        #print(f"{pd.to_datetime(start, unit='s')} - {pd.to_datetime(current_end, unit='s')}")
        params = {
            'start': int(start),
            'end': int(current_end),
            'step': timeframe,
            'limit': limit,
            'exclude_current_candle': False
        }
        try:
            response = requests.get(url, params=params)
            if response.status_code == 200:
                data = response.json()
                df = pd.DataFrame(data['data']['ohlc'])
                if not df.empty:
                    data_frames.append(df)
            else:
                raise Exception(f"Failed to fetch data: {response.status_code}, {response.text}")
        except Exception as e:
            print(e)
            break
        start = current_end
    # Combinar todos los DataFrames
    if data_frames:
        df = pd.concat(data_frames, ignore_index=True)
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
        df = df.set_index('timestamp')
        df = df.sort_index()
        df.index.name = 'date'
        df = df.astype({
            'open': float,
            'high': float,
            'low': float,
            'close': float,
            'volume': float
        })
        return df
    else:
        return pd.DataFrame()

In [4]:
def find_timestamp_extremum(df_highest, df_lowest):
    """
    params:
        df_highest_timeframe(highest timeframe OHLCV dataframe)
        df_lowest_timeframe(lowest timeframe OHLCV dataframe)
    """
    # Añadir nuevas columnas si no existen
    for col in ['Low_time', 'High_time', 'First']:
        if col not in df_highest.columns:
            df_highest[col] = np.nan
    # Bucle para identificar qué sucedió primero, el TP o el SL
    for i in range(len(df_highest)-1):
        # Extraer los valores del timeframe inferior excepto la primera fila de la vela siguiente
        start = df_highest.iloc[i:i+1].index[0]
        end = df_highest.iloc[i+1:i+2].index[0]
        row_lowest_timeframe = df_lowest.loc[start:end].iloc[:-1]
        # Extrae las marcas de tiempo del máximo y mínimo del timeframe inferior
        try:
            high = row_lowest_timeframe['high'].idxmax()
            low = row_lowest_timeframe['low'].idxmin()
            df_highest.loc[start, 'High_time'] = high
            df_highest.loc[start, 'Low_time'] = low
        except Exception as e:
            print(f'Excepción has occurred: {e}')
            df_highest.loc[start, 'High_time'] = start
            df_highest.loc[start, 'Low_time'] = start
    # Asegurar que las columnas son de tipo datetime
    df_highest['High_time'] = pd.to_datetime(df_highest['High_time'])
    df_highest['Low_time'] = pd.to_datetime(df_highest['Low_time'])
    # Find out which appears first
    df_highest.loc[df_highest['High_time'] > df_highest['Low_time'], 'First'] = 1
    df_highest.loc[df_highest['High_time'] < df_highest['Low_time'], 'First'] = 2
    df_highest.loc[df_highest['High_time'] == df_highest['Low_time'], 'First'] = 0
    # Verificar el número de filas sin TP ni SL al mismo tiempo
    percentage_garbage_row = len(df_highest.loc[df_highest['First']==0].dropna()) / len(df_highest) * 100.
    print(f'WARNING: Garbage row: {percentage_garbage_row:.2f} %')
    # Remover la última columna porque no es posible encontrar el extremo
    #df_highest = df_highest.iloc[:-1]
    return df_highest

In [5]:
def run_tp_sl(df, leverage=1, tp=0.015, sl=-0.015, cost=0.00):
    '''
    params (mandatory): 
        df - DataFrame with High_time and Low_time columns
    params (optional):
        leverage=1, tp_0.015, ls=-0.015, cost=0.00
    return:
        df - Incoming DataFrame with two new columns ['returns', 'duration']
    '''
    # Establecer valores iniciales
    buy = False
    sell = False
    df['duration'] = 0
    # Bucle principal
    for i in range(len(df)):
        # Extraer fila
        row = df.iloc[i]
        # ABRIR COMPRA
        if buy == False and row['Signal'] == 1:
            buy = True
            open_buy_price = row['open']
            open_buy_date = row.name
        if buy:
            # Verificar variación
            var_buy_high = (row['high'] - open_buy_price) / open_buy_price
            var_buy_low = (row['low'] - open_buy_price) / open_buy_price
            # Comprobar localización de TP y SL
            if (var_buy_high > tp) and (var_buy_low < sl):
                # Si ambos tienen el mismo timestamp, no se considera el trade
                if row['First'] == 0:
                    pass
                elif row['First'] == 2:
                    df.loc[row.name, 'returns'] = (tp - cost) * leverage
                    df.loc[row.name, 'duration'] = row['High_time'] - open_buy_date
                elif row['First'] == 1:
                    df.loc[row.name, 'returns'] = (sl - cost) * leverage
                    df.loc[row.name, 'duration'] = row['Low_time'] - open_buy_date
                # Resetear valores
                buy = False
                del(
                    open_buy_price,
                    open_buy_date,
                    var_buy_high,
                    var_buy_low)
            elif var_buy_high > tp:
                df.loc[row.name, 'returns'] = (tp - cost) * leverage
                df.loc[row.name, 'duration'] = row['High_time'] - open_buy_date
                # Resetear valores
                buy = False
                del(
                    open_buy_price,
                    open_buy_date,
                    var_buy_high,
                    var_buy_low)
            elif var_buy_low < sl:
                df.loc[row.name, 'returns'] = (sl - cost) * leverage
                df.loc[row.name, 'duration'] = row['Low_time'] - open_buy_date
                # Resetear valores
                buy = False
                del(
                    open_buy_price,
                    open_buy_date,
                    var_buy_high,
                    var_buy_low)
        # ABRIR VENTA
        if sell == False and row['Signa'] == -1:
            sell = True
            open_sell_price = row['open']
            open_sell_date = row.name
        if sell:
            # Verificar variación
            var_sell_high = -(row['high'] - open_sell_price) / open_sell_price
            var_sell_low = -(row['low'] - open_sell_price) / open_sell_price
            # Comprobar localización de TP y SL
            if (var_sell_low > tp) and (var_buy_high < sl):
                # Si ambos tienen el mismo timestamp, no se considera el trade
                if row['First'] == 0:
                    pass
                elif row['First'] == 1:
                    df.loc[row.name, 'returns'] = (tp - cost) * leverage
                    df.loc[row.name, 'duration'] = row['Low_time'] - open_sell_date
                elif row['First'] == 2:
                    df.loc[row.name, 'returns'] = (sl - cost) * leverage
                    df.loc[row.name, 'duration'] = row['High_time'] - open_sell_date
                # Resetear valores
                sell = False
                del(
                    open_sell_price,
                    open_sell_date,
                    var_sell_high,
                    var_sell_low
                )
            elif var_sell_low > tp:
                df.loc[row.name, 'returns'] = (tp - cost) * leverage
                df.loc[row.name, 'duration'] = row['Low_time'] - open_sell_date
                # Resetear valores
                sell = False
                del(
                    open_sell_price,
                    open_sell_date,
                    var_sell_high,
                    var_sell_low
                )
            elif var_sell_high < sl:
                df.loc[row.name, 'returns'] = (sl - cost) * leverage
                df.loc[row.name, 'duration'] = row['High_time'] - open_sell_date
                # Resetear valores
                sell = False
                del(
                    open_sell_price,
                    open_sell_date,
                    var_sell_high,
                    var_sell_low
                )
    # Rellenar con 0s valores faltantes
    df['returns'] = df['returns'].fillna(value=0)
    # retornar df
    return df

In [6]:
# Descarga de históricos
start_date = pd.Timestamp('2020-01-01').timestamp()
end_date = pd.Timestamp.now(tz='UTC').timestamp()
df = fetch_bitstamp_data('btcusd', start=start_date, end=end_date, timeframe=3600)
df_lowest_timeframe = fetch_bitstamp_data('btcusd', start=start_date, end=end_date, timeframe=60)
# Comprobar filas duplicadas
if df.index.duplicated().any():
    df.groupby(df.index).filter(lambda x: len(x) > 1)

In [75]:
del(df)
df = df_highest_timeframe[['open', 'high', 'low', 'close', 'volume']]

## Sistema Alpha

In [76]:
# Feature enginering, agregando indicadores
ichimoku = df.ta.ichimoku(high='high', low='low', close='close', tenkan=72, kijun=16, senkou=31)
stochastic = df.ta.stoch(high='high', low='low', close='close', k=78, d=78, mamode='ema', offset=1)
vortex = df.ta.vortex(high='high', low='low', close='close', length=24)
df['ichimoku_a'] = ichimoku[0]['ISA_72']
df['ichimoku_b'] = ichimoku[0]['ISB_16']
df['stoch_k'] = stochastic['STOCHk_78_78_3']
df['stoch_d'] = stochastic['STOCHd_78_78_3']
df['vortex_pos'] = vortex['VTXP_24']
df['vortex_neg'] = vortex['VTXM_24']
# Eliminar filas con NaN
df = df.dropna(axis=0)

In [132]:
# Condición de compra
df['vortex_buy_signal'] = (df['vortex_pos'] > df['vortex_neg'])
df['stoch_buy_signal'] = (df['stoch_k'] > df['stoch_d'])
df['ichimoku_buy_signal'] = (df['close'] > df['ichimoku_a']) & (df['close'] > df['ichimoku_b'])
# Condición de venta
df['vortex_sell_signal'] = (df['vortex_pos'] < df['vortex_neg'])
df['stoch_sell_signal'] = (df['stoch_k'] < df['stoch_d'])
df['ichimoku_sell_signal'] = (df['close'] < df['ichimoku_a']) & (df['close'] < df['ichimoku_b'])
# Establecer señal
df['buy_signal'] = df[['vortex_buy_signal', 'stoch_buy_signal', 'ichimoku_buy_signal']].all(axis=1)
df['sell_signal'] = df[['vortex_sell_signal', 'stoch_sell_signal', 'ichimoku_sell_signal']].all(axis=1)
# Eliminar columnas innecesarias
df = df.drop(labels=['vortex_buy_signal', 'stoch_buy_signal', 'ichimoku_buy_signal', 
                'vortex_sell_signal', 'stoch_sell_signal', 'ichimoku_sell_signal'], axis=1)
# Asignar 1 donde las condiciones de compra se cumplen por primera vez
df['signal'] = np.where(df['buy_signal'], 1, 0)
# Asignar -1 donde las condiciones de venta se cumplen por primera vez
df['signal'] = np.where(df['sell_signal'], -1, df['signal'])
# Eliminar señales repetidas manteniendo la primera ocurrencia de cada señal
df['signal'] = df['signal'].replace(0, np.nan)
changes = df['signal'].diff().ne(0)
df['signal'] = df['signal'].where(changes).fillna(0).astype('int64')