## FILTROS DIGITALES

De momento implemento 5 filtos, me dejo de reserva el chebyshev, el eliptico y el dema

In [2]:
from scipy.signal import butter, ellip, cheby2, freqz
import plotly.express as px 
import pandas as pd 
import numpy as np
import yfinance as yf
import plotly.express as px
import plotly.graph_objects as go
from math import pi, cos, sin, sqrt, acos
from scipy.optimize import newton
import warnings
warnings.filterwarnings("ignore")

In [3]:
# Siguiente potencia de dos 
def nextpow2(N):
    n = 1
    while n < N: n *= 2
    return n

In [4]:
# Señal
x = yf.download('QQQ', start = '2021-01-01', progress = False)['Close']
L = 2*nextpow2(len(x))

### 1. SMA

In [5]:
# Metodo de Newton para obtener el perido de la SMA a partir de la frecuencia de corte
def N_sma(wc):

    wc = wc*pi
    func = lambda N: np.sin(wc*N/2) - (N/np.sqrt(2))*(np.sin(wc/2))
    deriv = lambda N: (wc/2)*np.cos(wc*N/2) - (1/np.sqrt(2))*np.sin(wc/2)
    N_0 = pi/wc  
    return int(np.round(newton(func, N_0, deriv)))

# Metodo de Newton para obtener la frecuencia de corte a partir del periodo
def wc_sma(N, **kwargs):
    func = lambda w: sin(N*w/2) - N/sqrt(2) * sin(w/2)  
    deriv = lambda w: cos(N*w/2) * N/2 - N/sqrt(2) * cos(w/2) / 2  
    omega_0 = pi/N  
    return newton(func, omega_0, deriv, **kwargs)/pi

In [6]:
# Filtro media movil en funcion de la frecuencia de corte 
def sma(x, wc):
    N = N_sma(wc)
    y = pd.Series(x).rolling(N).mean()
    return y 

In [7]:
# Respuesta en frecuencia
N = 20; wc = wc_sma(N)

A = 1; B = (1/N)*np.ones(N,)
w, Hsma = freqz(B,A,L)
Hsma = 20*np.log10(abs(Hsma))

fig = px.line(x = w/pi, y = Hsma, title = 'Respuesta en Frecuencia SMA', range_y = [-40, 0.5])
fig.update_layout(xaxis_title = 'Frecuencia Normalizada', yaxis_title = 'Magnitud (dB)')

In [8]:
# Fitrado en el tiempo
y_sma = sma(x, wc)
df = pd.DataFrame({'Señal':x, 'SMA':y_sma})
print(f'Frecuencia de corte: {round(wc,4)}, Periodo: {round(N_sma(wc),3)}')
fig = px.line(df, title = 'Media Movil Simple')
fig.update_layout(xaxis_title = 'Fecha', yaxis_title = 'Precio')


Frecuencia de corte: 0.0443, Periodo: 20


### 2. EMA

In [9]:
# Metodo de Newton para obtener la frecuencia de corte de la EMA a partir de alpha
def wc_ema(alpha):
    return (acos((alpha**2 + 2*alpha - 2) / (2*alpha - 2)))/pi

# Metodo de Newton para obtener el parametro alpha de la EMA a partir de la frecuencia de corte
def alpha_ema(wc):
    wc = wc*pi
    B = 2*(1-cos(wc)); C = 2*(cos(wc)-1)
    func = lambda alpha: alpha**2 + B*alpha + C 
    deriv = lambda alpha: 2*alpha + B
    alpha_0 = 0.5
    return newton(func, alpha_0, deriv)

In [10]:
# Filtro media movil exponencial en funcion de la frecuencia de corte
def ema(x, wc):
    alpha = alpha_ema(wc)
    y = []

    for n in range(len(x)):
        if n == 0:
            y.append(x[0])
        else:
            y_n = alpha*x[n] + (1-alpha)*y[n-1]
            y.append(y_n)
    return y

In [11]:
# Respuesta en frecuencia
alpha = alpha_ema(wc)
B = np.array(alpha); A = np.array((1, alpha - 1))
w, Hema = freqz(B,A,L)        
Hema = 20*np.log10(abs(Hema))

fig = px.line(x = w/pi, y = Hema, title = 'Respuesta en Frecuencia EMA')
fig.update_layout(xaxis_title = 'Frecuencia Normalizada', yaxis_title = 'Magnitud (dB)')

In [12]:
# Fitrado en el tiempo
y_ema = ema(x, wc)
df = pd.DataFrame({'Señal':x, 'EMA':y_ema})
print(f'Frecuencia de corte: {round(wc,3)}, Alpha: {round(alpha_ema(wc),3)}')

fig = px.line(df, title = 'Media Movil Exponencial')
fig.update_layout(xaxis_title = 'Fecha', yaxis_title = 'Precio')

Frecuencia de corte: 0.044, Alpha: 0.13


### 3. Butterworth

In [13]:
# Filtro butterworth en funcion de la frecuencia de corte
def butterworth(x,wc):
    N = 1
    B,A = butter(N,wc)
    y = []

    for n in range(len(x)):
        if n == 0:
            y.append(x[0])
        else:
            y_n = B[0]*(x[n] + x[n-1]) - A[1]*y[n-1]
            y.append(y_n)
    return y

In [35]:
# Respuesta en frecuencia 
N = 1
B, A = butter(N, wc)    
w, Hbut = freqz(B,A,L)         
Hbut = 20*np.log10(abs(Hbut))  

fig = px.line(x = w/pi, y = Hbut, title = 'Respuesta en Frecuencia Butterworth')
fig.update_layout(xaxis_title = 'Frecuencia Normalizada', yaxis_title = 'Magnitud (dB)')

In [15]:
# Fitrado en el tiempo
y_but = butterworth(x, wc)
# y_but = lfilter(B, A, x)
df = pd.DataFrame({'Señal':x, 'Butterworth':y_but})
print(f'Frecuencia de corte: {round(wc,3)}, Orden: {N}')

fig = px.line(df, title = 'Filtro Butterworth')
fig.update_layout(xaxis_title = 'Fecha', yaxis_title = 'Precio')

Frecuencia de corte: 0.044, Orden: 2


### 4. Super Smoother

In [16]:
# Filtro supersmoother en funcion de la frecuencia de corte
def smooth(x,wc):

    a = np.exp(-np.sqrt(2)*np.pi*wc/2)
    b = 2*a*np.cos(np.sqrt(2)*np.pi*wc/2)

    c2 = b
    c3 = -a*a
    c1 = 1-c2-c3

    y = []

    for n in range(len(x)):
        if n == 0:
            y.append(x[0])
        else:
            y_n = (c1/2)*(x[n] + x[n-1]) + c2*y[n-1] + c3*y[n-2]
            y.append(y_n)
    return y

In [17]:
# Respuesta en frecuencia 
a = np.exp(-np.sqrt(2)*wc*np.pi/2)
b = 2*a*np.cos(np.sqrt(2)*wc*np.pi/2)

c2 = b
c3 = -a*a
c1 = 1-c2-c3

B = np.array([0.5*c1,0.5*c1])
A = np.array([1,-c2,-c3])

w, Hsm = freqz(B,A,L)
Hsm = 20*np.log10(np.abs(Hsm))

fig = px.line(x = w/pi, y = Hsm, title = 'Respuesta en Frecuencia SuperSmoother')
fig.update_layout(xaxis_title = 'Frecuencia Normalizada', yaxis_title = 'Magnitud (dB)')

In [18]:
# Fitrado en el tiempo
y_smooth = smooth(x, wc)
df = pd.DataFrame({'Señal':x, 'Butterworth':y_smooth})
print(f'Frecuencia de corte: {round(wc,3)}')

fig = px.line(df, title = 'Filtro SuperSmoother')
fig.update_layout(xaxis_title = 'Fecha', yaxis_title = 'Precio')

Frecuencia de corte: 0.044


### 5. Doble Media Movil

In [19]:
# Metodo de Newton para obtener el perido de la DMA a partir de la frecuencia de corte
def N_dma(wc):
    wc = wc*pi
    func = lambda N: sin(wc*N/2) - (N/(2**(1/4)))*(sin(wc/2))
    deriv = lambda N: (wc/2)*cos(wc*N/2) - (1/(2**(1/4)))*sin(wc/2)
    N_0 = pi/wc  
    return int(np.round(newton(func, N_0, deriv)))

# Metodo de Newton para obtener la frecuencia de corte a partir del periodo
def wc_dma(N):
    func = lambda wc: sin(N*wc/2) - (N/(2**(1/4))) * sin(wc/2)  
    deriv = lambda wc: cos(N*wc/2) * N/2 - (N/(2**(1/4))) * cos(wc/2) / 2  
    omega_0 = pi/N  
    return newton(func, omega_0, deriv)/pi

In [20]:
def dma(x, wc):
    N = N_dma(wc)
    h = (1/N)*np.ones(N,)
    B = np.convolve(h, h, mode='full')

    y = []
    for n in range(len(x)):
        if n < len(B):
            y.append(np.NAN)
        else:
            y_n = np.dot(B,x[n-len(B):n])
            y.append(y_n)

    return y

In [21]:
# Respuesta en frecuencia DMA
N = N_dma(wc)
h = (1/N)*np.ones(N,)
B = np.convolve(h, h, mode='full'); A = 1
w, Hdma = freqz(B,A,L)         
Hdma = 20*np.log10(abs(Hdma))  

fig = px.line(x = w/pi, y = Hdma, title = 'Respuesta en Frecuencia DMA')
fig.update_layout(xaxis_title = 'Frecuencia Normalizada', yaxis_title = 'Magnitud (dB)')

In [22]:
# Fitrado en el tiempo
y_dma = dma(x, wc)
df = pd.DataFrame({'Señal':x, 'DMA':y_dma})
print(f'Frecuencia de corte: {round(wc,3)}, Periodo: {N}')

fig = px.line(df, title = 'Filtro Doble Media Móvil')
fig.update_layout(xaxis_title = 'Fecha', yaxis_title = 'Precio')

Frecuencia de corte: 0.044, Periodo: 14


### 6. Chebyshev



In [23]:
# Filtro Chebyshev Tipo I en función de la frecuencia de corte
def chebyshev(x, wc):
    
    N = 1; rp = 3
    B, A = cheby2(N, rp, wc)
    y = []

    for n in range(len(x)):
        if n == 0:
            y.append(x[0])
        else:
            y_n = B[0]*x[n] + B[1]*x[n-1] - A[1]*y[n-1]
            y.append(y_n)
    return y

In [24]:
# Respuesta en frecuencia
N = 1; rs = 3
B, A = cheby2(N, rs, wc)
w, Hche = freqz(B, A, L)        
Hche = 20*np.log10(abs(Hche))

df = pd.DataFrame({'Butterworth': Hbut, 'Cheby': Hche})
fig = px.line(df, x = w/pi, y = [df['Butterworth'], df['Cheby']], title = 'Respuesta en Frecuencia Chebyshev', )
fig.update_layout(xaxis_title = 'Frecuencia Normalizada', yaxis_title = 'Magnitud (dB)')
fig.add_trace(go.Scatter(x=[wc], y=[-3], mode='markers', name = 'Cutoff Frequency'))

In [25]:
# Fitrado en el tiempo
y_che = chebyshev(x, wc)
df = pd.DataFrame({'Señal':x, 'Chebyshev2':y_che})
print(f'Frecuencia de corte: {round(wc,3)}, Ripple: {rs}, Orden: {N}')

fig = px.line(df, title = 'Filtro Chebyshev')
fig.update_layout(xaxis_title = 'Fecha', yaxis_title = 'Precio')

Frecuencia de corte: 0.044, Ripple: 3, Orden: 1


### 7. Eliptico

In [26]:
# Filtro Eliptico en función de la frecuencia de corte
def ellipt(x, wc, rp, rs):
    
    B, A = ellip(N, rp, rs, wc)
    y = []

    for n in range(len(x)):
        if n == 0:
            y.append(x[0])
        else:
            y_n = B[0]*x[n] + B[1]*x[n-1] - A[1]*y[n-1]
            y.append(y_n)
    return y

In [27]:
# Respuesta en frecuencia
rp = 3;  rs = 100; N = 1
B, A = ellip(N, rp, rs, wc)
w, Hel = freqz(B, A, L)        
Hel = 20*np.log10(abs(Hel))

fig = px.line(x = w/pi, y = Hel, title = 'Respuesta en Frecuencia Elíptico')
fig.update_layout(xaxis_title = 'Frecuencia Normalizada', yaxis_title = 'Magnitud (dB)')
fig.add_trace(go.Scatter(x=[wc], y=[-3], mode='markers', name = 'Cutoff Frequency'))

In [28]:
# Fitrado en el tiempo
y_el = ellipt(x, wc, rp, rs)
#y_el = lfilter(B, A, x)
df = pd.DataFrame({'Señal':x, 'Eliptico':y_el})
print(f'Frecuencia de corte: {round(wc,3)}, Rp: {rp}, Rs: {rs}')

fig = px.line(df, title = 'Filtro Eliptico')
fig.update_layout(xaxis_title = 'Fecha', yaxis_title = 'Precio')

Frecuencia de corte: 0.044, Rp: 3, Rs: 100


### 8. Doble Media Exponencial

In [29]:
# Respuesta en frecuencia

alpha = alpha_ema(wc)
B = np.array(alpha); A = np.array((1, alpha - 1))
w, Hdema = freqz(B, A, L)        
Hdema = 40*np.log10(abs(Hdema))

fig = px.line(x = w/pi, y = Hdema, title = 'Respuesta en Frecuencia DEMA')
fig.update_layout(xaxis_title = 'Frecuencia Normalizada', yaxis_title = 'Magnitud (dB)')

In [30]:
# Fitrado en el tiempo
y_dema = ema(ema(x, wc), wc)
df = pd.DataFrame({'Señal':x, 'DEMA':y_dema})
print(f'Frecuencia de corte: {round(wc,3)}, Alpha: {alpha}')

fig = px.line(df, title = 'Filtro Doble Media Móvil Exponencial')
fig.update_layout(xaxis_title = 'Fecha', yaxis_title = 'Precio')

Frecuencia de corte: 0.044, Alpha: 0.12984277746790043


# COMPARACIÓN FILTROS

In [31]:
# Comparación filtrado temporal
df = pd.DataFrame({'Señal':x, 'SMA':y_sma, 'EMA':y_ema, 'Butterworth':y_but, 
                   'SuperSmoother':y_smooth, 'DMA': y_dma})

fig = px.line(df, title = 'Comparación de los Filtros')
fig.update_layout(xaxis_title = 'Fecha', yaxis_title = 'Precio')

In [36]:
# Comparación respuestas en frecuencia 
df = pd.DataFrame({'SMA':Hsma, 'EMA':Hema, 'Butterworth':Hbut, 
                   'SuperSmoother':Hsm, 'DMA': Hdma, 'Frecuencia':w/pi})

fig = px.line(df, x = df['Frecuencia'], y = [df['SMA'], df['EMA'], df['Butterworth'], df['SuperSmoother'], 
                      df['DMA']], title = 'Respuestas en Frecuencia', range_y=[-40,0.5])


fig.update_layout(xaxis_title = 'Frecuencia Normalizada', yaxis_title = 'Magnitud (dB)')
fig.add_trace(go.Scatter(x=[wc], y=[-3], mode='markers', name = 'Cutoff Frequency'))

In [47]:
df_strat = pd.DataFrame({'Señal': x, 'SMA1': sma(x, 0.015), 'SMA2': sma(x, 0.005)}).dropna()
px.line(df_strat, title = 'Estrategia de Cruce de Medias')