In [None]:
#Libraries
import talib as tb
import yfinance as yf
import pandas as pd
from dateutil.relativedelta import relativedelta
import numpy as np
from sklearn.linear_model import LinearRegression
from scipy.stats import t
from statistics import mean,variance
from tqdm.notebook import tqdm as tq

In [None]:
class Events:
    def __init__(self,df,l1,l2,model,patterns):
        """
        df: DataFrame containing price data: Open, Close, High & Low. 
        l1: lenght of estimation window. 
        l2: l2*2 + event data = L2
        model: market or mean return model. 
        patterns: candlestick patterns to conduct the event study. 
        """
        self.df=df
        self.l1=l1
        self.l2=l2
        self.model=model
        self.patterns=patterns
    
    def candle_events(self,stock,pattern):   
        """
        Function to define candlesticks events and determine windows for a given stock and candlestick pattern. 
        Output: estimation_window, event_window, event_day & bullish_bearish (related to the candle expected outcome).
        """
        #Define the events
        ind=self.df['Adj Close'][stock].dropna().index.tolist()
        df=pd.DataFrame(index=ind)
        hi=self.df['High'][stock].dropna().values
        lo=self.df['Low'][stock].dropna().values
        op=self.df['Open'][stock].dropna().values
        cl=self.df['Close'][stock].dropna().values
        df[pattern] = getattr(tb, pattern)(op, hi, lo, cl)
        events=df[df[pattern]!=0][pattern]

        #Dates list
        dates=events.index.tolist()
        dates.insert(0,df.index[1])
        dates.append(df.index[-1])

        #Compute Windows
        estimation_window=[]
        event_window=[]
        event_day=[]
        bullish_bearish=[]
        for i in range(1,len(dates)-1):
            if ind.index(dates[i])-ind.index(dates[i-1])>=self.l1+self.l2 and ind.index(dates[i+1])-ind.index(dates[i])>=self.l2:
                estimation_window.append((ind[ind.index(dates[i])-(self.l1+self.l2)],ind[ind.index(dates[i])-self.l2]))
                event_window.append((ind[ind.index(dates[i])-self.l2],ind[ind.index(dates[i])+self.l2]))
                event_day.append(str(dates[i])[:-9])
                bullish_bearish.append(events.loc[dates[i]])

        return estimation_window,event_window,event_day,bullish_bearish
    
    def get_CAR(self,stock,pattern):
        """
        Function to calculate abnormal returns. 
        Output: Cumulative Abnormal Returns (CAR), CAR variance, CAR t-stat (SCAR), bullish_bearish (pattern trend). 
        """
        reference_index=self.df['Adj Close'].columns[-1]
        estimation_window, event_window, event_day, bullish_bearish=self.candle_events(stock,pattern)
        log_returns=np.log(abs(self.df['Adj Close'][[stock,reference_index]])/abs(self.df['Adj Close'][[stock,reference_index]].shift(1))).dropna()

        CAR, V_CAR, SCAR={},{},{}
        
        if event_day==[]:
            return {}, {}, {}, {}
        
        else:
            for t in range(len(estimation_window)):
                #Linear Regression
                Rm_estimation=log_returns[reference_index].loc[estimation_window[t][0]:estimation_window[t][1]].values.reshape(-1,1)
                Ri_estimation=log_returns[stock].loc[estimation_window[t][0]:estimation_window[t][1]].values.reshape(-1,1)
                Rm_event=log_returns[reference_index].loc[event_window[t][0]:event_window[t][1]].values.reshape(-1,1)
                Ri_event=log_returns[stock].loc[event_window[t][0]:event_window[t][1]].values
                
                #Calculate expected returns (Market or Mean Return model)
                if self.model=='market':
                    lm=LinearRegression()
                    lm.fit(Rm_estimation,Ri_estimation)
                    event_preds=lm.predict(Rm_event).reshape(Rm_event.shape[0])
                    est_fitted=lm.predict(Rm_estimation).reshape(Rm_estimation.shape[0])

                elif self.model=='mean return':
                    event_preds=est_fitted=Ri_estimation.reshape(Ri_estimation.shape[0]).mean()

                #AR estimation window
                AR_est=Ri_estimation.reshape(Ri_estimation.shape[0])-est_fitted

                #AR for the event window
                AR_event=Ri_event.reshape(Ri_event.shape[0])-event_preds

                #Variance of AR
                Vi=sum(AR_est**2)/(self.l1-2)

                #CAR & CAR variance
                if Vi!=0:
                    CAR[str(event_day[t])]=sum(AR_event)
                    V_CAR[str(event_day[t])]=self.l2*Vi
                    SCAR[str(event_day[t])]=sum(AR_event)/(self.l2*Vi)
                else:
                    pass
            
            return CAR, V_CAR, SCAR, bullish_bearish
    
    def pattern_significance(self,pattern):
        """
        Function that evaluates statistical significance of all stocks in df for a given pattern. 
        Output: two tail t-test, one tail t-test (depending on the pattern's trend) & N (samples evaluated). 
        """
        
        #Number of stocks in the sample
        stocks=self.df['Adj Close'].columns[:-1]
        
        one_tail_sign,N={},{}
        
        for s in tq(stocks):
            SCAR,bullish_bearish=self.get_CAR(s,pattern)[2:4]
            
            if SCAR!={}:
                #Two tail t-test
                #tt_pvalue=[2*(1-t.cdf(abs(x),self.l1-2)) for x in SCAR.values()]
                #two_tail_sign[s]=sum([1 for x in tt_pvalue if x<0.01])/len(tt_pvalue)
                
                #One tail t-test depending on the pattern trend (Bullish or Bearish)
                for i in bullish_bearish:
                    if i==100:
                        ot_pvalue=[t.pdf(x,self.l1-2) for x in SCAR.values()]
                        one_tail_sign[s]=sum([1 for x in ot_pvalue if x<0.01])
                    elif i==-100:
                        ot_pvalue=[t.cdf(x,self.l1-2) for x in SCAR.values()]
                        one_tail_sign[s]=sum([1 for x in ot_pvalue if x<0.01])
                    else:
                        one_tail_sign[s]=np.nan
           
            elif SCAR=={}:
                #two_tail_sign[s]=np.nan
                one_tail_sign[s]=np.nan
            
            #Number of observations for each stock
            N[s]=len(SCAR)
            
        return one_tail_sign,sum(N.values())
    
    def S_overall(self):
        """
        Function that runs pattern_significance() for all the patterns available in self.patterns. 
        Outputs: dataframe of two tail t-test, dataframe of one tail t-test and total number of samples evaluated.
        """
        one_tail_soverall={}
        N_events={}
        for p in tq(self.patterns):
            ot_sign, N = self.pattern_significance(p)
            N_events[p]=N
            one_tail_soverall[p]=ot_sign

        return pd.DataFrame(one_tail_soverall,index=self.df['Adj Close'].columns[:-1]),N_events

In [None]:
stocks = pd.read_excel('S&P500 constituents.xlsx',index_col=[0]).index.tolist()
stocks_df=yf.download(stocks+['^GSPC'])

In [None]:
#Run 
candle_patterns = tb.get_function_groups()['Pattern Recognition']
test_events=Events(df=stocks_df,l1=200,l2=3,model='market',patterns=candle_patterns)
one_tail_sign, n_obs=test_events.S_overall()