# SISTEMA INDICADORES TECNICOS

* Los indicadores tecnicos son la base del trading cuantitativo que no tiene nada que ver con el analisis tecnico
* Todos los indicadores tecnicos se pueden construir
* Son fuente de ideas
* Existen librerias especializadas
  * No es demasiado recompendable utilizar esas librerias
    * Dependencias
    * Cajas negras
  * Librerias:
    * pandas_ta
    * finta
    * talib

## Importaciones

In [None]:
import pandas as pd
import numpy as np

import warnings
warnings.filterwarnings("ignore")

## Carga de datos

In [None]:
data = pd.read_excel('dfAll.xlsx', index_col=0)
data

## Precio

### PctChg

In [None]:
periodo = 20

data[f'pctChg{periodo}'] = data['Close'].pct_change(periodo) * 100

data

In [None]:
data.drop(['pctChg20'], axis = 1, inplace = True)

data

In [None]:
def ocpPctChg(df, periodo = 1, borraNan = False, col = 'Close'):
    
    ''' return data'''
    
    df[f'pctChg{periodo}'] = df[col].pct_change(periodo) * 100
    
    if borraNan: df.dropna(inplace = True)
        
    return df

In [None]:
help(ocpPctChg)

In [None]:
data = ocpPctChg(data, periodo = 5)

data

### Roi

In [None]:
data['roi'] = data.Close.pct_change()
data.dropna(inplace = True)

data['roi'].iloc[0] = 0

data['cvActivo'] = 100 * (1+data.roi).cumprod()

data

In [None]:
roiH = (data.Close[-1] - data.Close[0]) / data.Close[0] * 100
roiH

### CAGR

In [None]:
years = ((data.index[-1] - data.index[0]).days) / 365
cagr = ((data.cvActivo[-1] / data.cvActivo[0]) ** (1 / years)  - 1)*100
cagr

### DrawDown

In [None]:
def ocpDd(df, col):
   
    # DD con el low es mas preciso, puede ser con el close
    
    df['dd'] = ((df[col]/df[col].cummax()-1)*100).round(2) 
    
    ddmax = int(df.dd.min())
    ddmed = int(df.dd.mean())
    
    return df, ddmax, ddmed

In [None]:
data, ddmax, ddmed = ocpDd(data, 'cvActivo')

display(data)

print(ddmax, ddmed)

## Medias

In [None]:
data = pd.read_excel('dfAll.xlsx', index_col=0)
data

### Sma (simple moving average)

In [None]:
data['s20'] = data['Close'].rolling(20).mean()
data

In [None]:
data.drop('s20', axis=1, inplace = True)

data

In [None]:
def ocpSma(df, periodo = 20, borraNan = False, col = 'Close'):
    
    ''' return data'''
    
    df[f's{periodo}'] = df[col].rolling(periodo).mean()
    
    if borraNan: df.dropna(inplace = True)
        
    return df

In [None]:
data = ocpSma(data, 50)

data

### Ewm (exponential moving average)

In [None]:
def ocpExp(df, periodo = 20, borraNan = False, col = 'Close'):
    
    ''' return data'''
    
    df[f'e{periodo}'] = df[col].ewm(span = periodo).mean()
    
    if borraNan: df.dropna(inplace = True)
        
    return df

In [None]:
data = ocpExp(data, 50)

data

### Dema (media movil exponencial doble)

In [None]:
def ocpDema(df, periodo = 200, borraNan = False, col = 'Close'):
    
    df = ocpExp(df, periodo)
    
    df[f'd{periodo}'] =2*df[f'e{periodo}']-df[f'e{periodo}'].ewm(span=periodo, adjust=True).mean()
    
    df.drop(f'e{periodo}', axis = 1, inplace = True)
    
    if borraNan: df.dropna(inplace = True)
        
    return df

In [None]:
data = ocpDema(data, 100)

data

### Wma (weighted moving average)

In [None]:
def ocpWma(df, periodo = 20, borraNan = False, col = 'Close'):
    
    ''' return data'''
    
    pesos = np.arange(1, periodo+1)/np.arange(1, periodo+1).sum()
    # si la wma es de 3, pesos = [1,2,3]/6
    
    df[f'w{periodo}'] = df[col].rolling(periodo).apply(lambda x: np.sum(pesos*x))
    
    if borraNan: df.dropna(inplace = True)
        
    return df

In [None]:
data = ocpWma(data, 20)

data

### Hull

In [None]:
def ocpHull(df, periodo = 20, borraNan = False, col = 'Close'):
    
    wma01 = df['Close'].rolling(periodo//2).apply(lambda x: \
                 np.sum(x * np.arange(1, periodo//2+1)) / np.sum(np.arange(1, periodo//2+1)), raw=True)

    wma02 = df['Close'].rolling(periodo).apply(lambda x: \
                 np.sum(x * np.arange(1, periodo+1)) / np.sum(np.arange(1, periodo+1)), raw=True)
    
    diff = 2 * wma01 - wma02

    df[f'h{periodo}'] = diff.rolling(int(np.sqrt(periodo))).mean()

    if borraNan: df.dropna(inplace = True)
    
    return df

In [None]:
data = ocpHull(data, 5)

data

## Volatilidad

In [None]:
data = pd.read_excel('dfAll.xlsx', index_col=0)
data

### Std (standard deviation = volatilidad)

In [None]:
def ocpStd(df, periodo = 20, borraNan = False, col = 'Close'):
    
    ''' return data'''
    
    df['pctChg'] = df[col].pct_change(periodo) * 100
    df[f'std{periodo}'] = df['pctChg'].rolling(periodo).std()
    
    df.drop(['pctChg'], axis = 1, inplace= True)
    
    if borraNan: df.dropna(inplace = True)
        
    return df

In [None]:
data = ocpStd(data)
data

### Bb (bollinger bands)

In [None]:
def ocpBb(df, periodo = 20, desvios = 2,  borraNan = False, col = 'Close'):
    
    df[f'bs{periodo}'] = df[col].rolling(periodo).mean()
    vola = df[col].rolling(periodo).std()
    
    df['bInf'] = df[f'bs{periodo}'] - desvios * vola
    df['bSup'] = df[f'bs{periodo}'] + desvios * vola
    
    if borraNan: df.dropna(inplace = True)
   
    return df

In [None]:
data = ocpBb(data)
data

### Atr (average true range)

In [None]:
def ocpAtr(df, periodo = 14, borraNan = False):
    
    ''' return data'''
    
    df['atrHL']= abs(df.High-df.Low) # no es necesario abs, siempre positivo
    df['atrHC']= abs(df.High-df.Close.shift())
    df['atrLC']= abs(df.Low-df.Close.shift())
    
    df['atrMax']= df[['atrHL', 'atrHC', 'atrLC']].max(axis=1)
    df[f'atr{periodo}']= df.atrMax.ewm(span = periodo, adjust = False).mean()
    # data[f'atr{periodo}'] = data.atrMax.rolling(periodo).mean()
        
    df['atrMaxP']= df[['atrHL', 'atrHC', 'atrLC']].max(axis=1)/df.Close
    df[f'atr%{periodo}']= df.atrMaxP.ewm(span = periodo, adjust = False).mean()
    # data[f'atr%{periodo}']= data.atrMaxP.rolling(periodo).mean()
    
    df.drop(['atrHL', 'atrHC', 'atrLC', 'atrMax', 'atrMaxP'], axis = 1, inplace= True)
    
    if borraNan: df.dropna(inplace = True)
        
    return df

In [None]:
data = ocpAtr(data)
data

## Rangueando precio

In [None]:
data = pd.read_excel('dfAll.xlsx', index_col=0)
data

### Max y Min

In [None]:
def ocpMaxMin(df, periodo, colMax = 'Close', colMin = 'Close', borraNan = False):

    ''' return data'''
    
    # OJO no llamar max y min a las columnas porque son metodos de panda y data.min data.max no hara lo esperado
    df['rgMax'] = df[colMax].rolling(periodo).max()
    df['rgMin'] = df[colMin].rolling(periodo).min()

    if borraNan: df.dropna(inplace = True)

    return df

In [None]:
data = ocpMaxMin(data, 20, 'High', 'Low')

data

### Normalizacion

In [None]:
data['rgNormal'] = (data.Close - data.rgMin) / (data.rgMax - data.rgMin)

data

### Estandarizacion (Z-Score)

In [None]:
def ocpZscore(df, periodo = 20, col = 'Close', borraNan = False):
    
    mean = df[col].rolling(periodo).mean()
    std = df[col].rolling(periodo).std()
    
    df['zscore'] = (df[col] - mean) / std

    if borraNan: df.dropna(inplace = True)
        
    return df

In [None]:
data = ocpZscore(data)

data

# Clasicos

In [None]:
data = pd.read_excel('dfAll.xlsx', index_col=0)
data

## Roc (rate of change)

In [None]:
def ocpRoc(df, periodo = 5, borraNan = False, col = 'Close'):
    
    ''' return data'''
    
    df[f'roc{periodo}'] = df[col].pct_change(periodo) * 100
    
    if borraNan: df.dropna(inplace = True)
        
    return df

In [None]:
data = ocpRoc(data)
data

## Rsi (relative strength index)

In [None]:
def ocpRsi(df, periodo = 14, borraNan = False, col = 'Close'):
    
    ''' return data'''
    
    df['dif'] = df[col].diff()

    df['win'] = np.where(df['dif'] > 0, df['dif'], 0)
    df['loss'] = np.where(df['dif'] < 0, abs(df['dif']), 0)
    df['emaWin'] = df.win.ewm(span = periodo).mean()
    df['emaLoss'] = df.loss.ewm(span = periodo).mean()
    df['rs'] = df.emaWin / df.emaLoss

    df[f'rsi{periodo}'] = 100 - (100 / (1+df.rs))

    df.drop(['dif', 'win', 'loss', 'emaWin', 'emaLoss', 'rs'], axis = 1, inplace = True)

    if borraNan: df.dropna(inplace = True)

    return df

In [None]:
data = ocpRsi(data, 5)
data

## Macd (moving  convergence / divergence)

In [None]:
def ocpMacd(df, fast = 12, slow = 26, suavizado = 9, borraNan = False, col = 'Close'):
    
    ''' return data'''

    df['emaFast'] = df[col].ewm(span = fast).mean()
    df['emaSlow'] = df[col].ewm(span = slow).mean()
    
    df[f'macd{fast}{slow}{suavizado}'] = df.emaFast - df.emaSlow
    df['signal'] = df[f'macd{fast}{slow}{suavizado}'].ewm(span = suavizado).mean()
    
    df['mTrend'] = np.where(df[f'macd{fast}{slow}{suavizado}']>df['signal'], 'A', 'B')
    
    df['histoM'] = df[f'macd{fast}{slow}{suavizado}'] - df.signal
    
    df['mHisTrend']=np.where((df.histoM > 0) & (df.histoM > df.histoM.shift()), 'A',
                            np.where((df.histoM > 0) & (df.histoM <= df.histoM.shift()),'L',
                            np.where((df.histoM < 0) & (df.histoM <= df.histoM.shift()),'B','I')))
    
    df.drop(['emaFast', 'emaSlow', 'histoM'], axis = 1, inplace = True)
    
    if borraNan: df.dropna(inplace = True)
    
    return df

In [None]:
data = ocpMacd(data)
data

## Coppock curve

In [None]:
def ocpCop(df, periodo = 10, tipo = 'ponderada',  borraNan = False, col = 'Close'):

    
    '''
    El tradicional es 14 11 10 en timeframe mensual
    
    Cop Diario  -> 12 6 10
    cop Semanal en vista diaria -> 60 30 50  
    cop Mensual en vista diaria -> 252 126 210

    '''
    cop  = int(periodo)
    slow = int(periodo/10*12)
    fast = int(periodo/10*6)

    df['ROC1'] = (df[col]/df[col].shift(fast) -1)*100
    df['ROC2'] = (df[col]/df[col].shift(slow) -1)*100

    data['suma']= df.ROC1 + df.ROC2

    if tipo == 'ponderada':
        
        # media pondedarada de suma
        pesos = np.arange(1, cop + 1)/np.arange(1, cop +1).sum()
        df['cop'] = df.suma.rolling(cop).apply(lambda x: np.sum(pesos*x))
        
    if tipo == 'exponencial':

        df['cop'] = (df.suma).ewm(span=cop, min_periods=cop).mean()
        
    
    df['cTrend'] = np.where(df.cop > df.cop.shift(), 'A', 'B')
    
    df['cHisTrend']= np.where((df.cop>=0) & (df.cop.diff()>=0), 'A',np.where((df.cop>0) & (df.cop.diff()<0),'L',
                          np.where((df.cop<=0) & (df.cop.diff()<=0), 'B','I'))) 
    
    df.drop(['ROC1', 'ROC2', 'suma'],axis=1, inplace= True)
    
    if borraNan: df.dropna(inplace = True)
    
    return df


In [None]:
data = ocpCop(data)
data

## Stochastic

In [None]:
def ocpStochastic(df, N=14, M=3, borraNan = False):
    
    df['lowN']  = df['Low'].rolling(N).min()
    df['highN'] = df['High'].rolling(N).max()
    
    df['K'] = 100 * (df['Close'] - df['lowN']) / (df['highN'] - df['lowN'])
    df['D'] = df['K'].rolling(M).mean()
    
    df.drop(['lowN', 'highN'], axis = 1, inplace = True)

    if borraNan: df.dropna(inplace = True)
    
    return df

In [None]:
data = ocpStochastic(data)
data

## Adx

In [None]:
def ocpAdx(df, periodo = 14, borraNan = False):

    # calculo del atr
    df['atrHL'] = abs(df.High-df.Low) # no es necesario abs, siempre positivo
    df['atrHC'] = abs(df.High-df.Close.shift())
    df['atrLC'] = abs(df.Low-df.Close.shift())
    
    df['atrMax']= df[['atrHL', 'atrHC', 'atrLC']].max(axis=1)
    #data['atr']= data.atrMax.ewm(span = periodo, adjust = False).mean()       
    df['atr'] = df.atrMax.rolling(periodo).mean()
    
    # calculo de acx
    df['plusDm'] = df.High.diff()
    df['minusDm'] = df.Low.diff()
    df.plusDm[df.plusDm < 0] = 0
    df.minusDm[df.minusDm > 0] = 0
    
    df['plusDi'] = 100 * (df.plusDm.ewm(alpha = 1/periodo).mean() / df.atr)
    df['minusDi'] = abs(100 * (df.minusDm.ewm(alpha = 1/periodo).mean() / df.atr))
    
    df['dx'] = (abs(data.plusDi - data.minusDi) / abs(data.plusDi + data.minusDi)) * 100
    
    df['adxIni'] = ((df.dx.shift(1) * (periodo - 1)) + df.dx) / periodo
    
    df['adx'] = df.adxIni.ewm(alpha = 1/periodo).mean()
    
    # las columnas auxiliares del atr
    df.drop(['atrHL', 'atrHC', 'atrLC', 'atrMax', 'atr'], axis = 1, inplace= True)
    
    # las columnas auxiliares del adx
    df.drop(['plusDm', 'minusDm', 'dx', 'adxIni'], axis = 1, inplace= True)
    
    if borraNan: df.dropna(inplace = True)
    
    return df

In [None]:
data = ocpAdx(data, 14)
data

## Parabolic Sar

In [None]:
def ocpParabolicSar(df, psarCol='psar01', afStart=0.02, afIncrement=0.02, afMax=0.2, highCol='High', lowCol='Low' ):
    """
    Añade la columna Parabolic SAR a un DataFrame existente

    Parámetros:
    - df: DataFrame con datos OHLC
    - highCol: Nombre de la columna de precios máximos (default: 'High')
    - lowCol: Nombre de la columna de precios mínimos (default: 'Low')
    - psarCol: Nombre de la columna PSAR a crear (default: 'psar01')
    - afStart: Factor de aceleración inicial (default: 0.02)
    - afIncrement: Incremento del factor de aceleración (default: 0.02)
    - afMax: Factor de aceleración máximo (default: 0.2)

    Retorna:
    - DataFrame original con columna PSAR añadida
    """

    # Inicializar columnas temporales
    df['_trendTemp'] = 0
    df['_afTemp'] = 0.0
    df['_epTemp'] = 0.0
    df[psarCol] = 0.0

    # Obtener primer índice
    firstIdx = df.index[0]

    # Valores iniciales
    df.loc[firstIdx, psarCol] = df.loc[firstIdx, lowCol]
    df.loc[firstIdx, '_trendTemp'] = 1
    df.loc[firstIdx, '_afTemp'] = afStart
    df.loc[firstIdx, '_epTemp'] = df.loc[firstIdx, highCol]

    # Iterar por todos los índices excepto el primero
    for i in df.index:
        if i == firstIdx:
            continue

        # Obtener índices anteriores
        prevIdx = df.index[df.index.get_loc(i) - 1]
        prev2Idx = df.index[df.index.get_loc(i) - 2] if df.index.get_loc(i) > 1 else prevIdx

        # Calcular PSAR
        prevPsar = df.loc[prevIdx, psarCol]
        prevAf = df.loc[prevIdx, '_afTemp']
        prevEp = df.loc[prevIdx, '_epTemp']

        currentPsar = prevPsar + prevAf * (prevEp - prevPsar)

        currentHigh = df.loc[i, highCol]
        currentLow = df.loc[i, lowCol]
        prevTrend = df.loc[prevIdx, '_trendTemp']

        if prevTrend == 1:  # Uptrend
            if currentLow <= currentPsar:
                # Cambio a downtrend
                df.loc[i, '_trendTemp'] = -1
                df.loc[i, psarCol] = prevEp
                df.loc[i, '_afTemp'] = afStart
                df.loc[i, '_epTemp'] = currentLow
            else:
                # Continúa uptrend
                df.loc[i, '_trendTemp'] = 1

                if currentHigh > prevEp:
                    df.loc[i, '_epTemp'] = currentHigh
                    df.loc[i, '_afTemp'] = min(prevAf + afIncrement, afMax)
                else:
                    df.loc[i, '_epTemp'] = prevEp
                    df.loc[i, '_afTemp'] = prevAf

                # Ajustar PSAR
                prevLow = df.loc[prevIdx, lowCol]
                prev2Low = df.loc[prev2Idx, lowCol]
                adjustedPsar = min(currentPsar, prevLow, prev2Low)
                df.loc[i, psarCol] = adjustedPsar

        else:  # Downtrend
            if currentHigh >= currentPsar:
                # Cambio a uptrend
                df.loc[i, '_trendTemp'] = 1
                df.loc[i, psarCol] = prevEp
                df.loc[i, '_afTemp'] = afStart
                df.loc[i, '_epTemp'] = currentHigh
            else:
                # Continúa downtrend
                df.loc[i, '_trendTemp'] = -1

                if currentLow < prevEp:
                    df.loc[i, '_epTemp'] = currentLow
                    df.loc[i, '_afTemp'] = min(prevAf + afIncrement, afMax)
                else:
                    df.loc[i, '_epTemp'] = prevEp
                    df.loc[i, '_afTemp'] = prevAf

                # Ajustar PSAR
                prevHigh = df.loc[prevIdx, highCol]
                prev2High = df.loc[prev2Idx, highCol]
                adjustedPsar = max(currentPsar, prevHigh, prev2High)
                df.loc[i, psarCol] = adjustedPsar

    # Eliminar columnas temporales
    df.drop(['_trendTemp', '_afTemp', '_epTemp'], axis=1, inplace=True)

    return df



import pandas_ta as ta

def ocpParabolicSarTa(df, psarCol='psar05', afStart=0.02, afIncrement=0.02, afMax=0.2, highCol='High', lowCol='Low', ):
    """
    Añade la columna Parabolic SAR usando pandas_ta

    Parámetros:
    - df: DataFrame con datos OHLC
    - highCol: Nombre de la columna de precios máximos (default: 'High')
    - lowCol: Nombre de la columna de precios mínimos (default: 'Low')
    - psarCol: Nombre de la columna PSAR a crear (default: 'psar01')
    - afStart: Factor de aceleración inicial (default: 0.02)
    - afIncrement: Incremento del factor de aceleración (default: 0.02)
    - afMax: Factor de aceleración máximo (default: 0.2)

    Retorna:
    - DataFrame original con columna PSAR añadida
    """

    # Calcular PSAR usando pandas_ta
    psarResult = ta.psar(
        high=df[highCol],
        low=df[lowCol],
        af0=afStart,
        af=afIncrement,
        max_af=afMax
    )

    # Obtener columnas Long y Short
    psarLong = psarResult[f'PSARl_{afStart}_{afMax}']
    psarShort = psarResult[f'PSARs_{afStart}_{afMax}']

    # Combinar usando np.where: si PSARl no es NaN, usar PSARl, sino usar PSARs
    df[psarCol] = np.where(psarLong.notna(), psarLong, psarShort)

    df['psTrend'] = np.where(psarLong.notna(), 'bull', 'bear')

    return df


In [None]:
data = ocpParabolicSar(data)
data = ocpParabolicSarTa(data, 'psar02')
data

In [None]:
dg = data.tail(100)

dg.Close.plot()
dg.psar01.plot()

# Funcion indicadores

In [None]:
data = pd.read_excel('dfAll.xlsx', index_col=0)
data

In [None]:
def ocpIndicadoresTecnicos(df):
    
    df = ocpSma(df, 20)
    df = ocpExp(df, 50)
    df = ocpWma(df, 150)
    
    df = ocpPctChg(df)
    
    df = ocpStd(df)
    df = ocpBb(df)
    df = ocpAtr(df)
    
    df = ocpRoc(df)
    df = ocpRsi(df)
    
    # aquí si borro nan
    df = ocpMacd(df, borraNan= True) 
    
    return df

In [None]:
data = ocpIndicadoresTecnicos(data)
data.round(2)

# Librerias especializadas

## pandas_ta

In [None]:
#!pip install pandas_ta

In [None]:
import pandas_ta as ta

In [None]:
help(ta)

In [None]:
data.ta.indicators()

In [None]:
listadoIndicadores = data.ta.indicators(as_list=True)
listadoIndicadores

In [None]:
help(ta.bbands)

In [None]:
data.ta.bbands().tail()

In [None]:
data.ta.sma(length = 20).tail()

In [None]:
data.ta.ema(length = 20).tail()

In [None]:
data.ta.macd().tail()

In [None]:
sma10 = ta.sma(data['Close'], length=10)
sma10

In [None]:
data = data.ta.cdl_pattern(name='all')
data

## finta

In [None]:
#!pip install finta

In [None]:
from finta import TA

In [None]:
help(TA)

In [None]:
data = pd.read_excel('dfAll.xlsx', index_col=0)
data

In [None]:
# Error por que busca open y es Open

bbands = TA.BBANDS(data, 30)
bbands

In [None]:
sma = TA.SMA(data, 30)
sma

In [None]:
help(TA.SMA)

## talib

In [None]:
#!pip install ta-Lib