Importing libraries and modules

In [None]:
import numpy as np

import pandas as pd
from pandas_datareader import data as web

import matplotlib.pyplot as plt
import matplotlib.dates as mdates
%matplotlib inline

import plotly.express as px
import plotly.graph_objects as go

from plotly.subplots import make_subplots

from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode()

import warnings
warnings.simplefilter("ignore")

from datetime import date, timedelta
import time

import yfinance as yf

import cufflinks as cf
cf.go_offline()

from scipy.signal import argrelextrema
from scipy.stats import linregress


Creating Opportunity Universe

In [None]:
oppor_univ = pd.read_csv("https://www1.nseindia.com/content/indices/ind_nifty50list.csv")
oppor_univ = oppor_univ[['Company Name','Symbol']]
oppor_univ_symbols = oppor_univ['Symbol'].tolist()
oppor_univ_symbols[::5]

Candle/Chart pattern detection

In [None]:
#Marubuzo

def Marubuzo(stock_data,shadow_length=None,max_candle_length=None,min_candle_length=None):
    shadow_length = (shadow_length if shadow_length is not None else 0.25)/100
    max_candle_length = (max_candle_length if max_candle_length is not None else 10)/100
    min_candle_length = (min_candle_length if min_candle_length is not None else 1)/100

    i = stock_data.shape[0]-1
    o = stock_data.loc[stock_data.index[i],'Open']
    l = stock_data.loc[stock_data.index[i],'Low']
    h = stock_data.loc[stock_data.index[i],'High']
    c = stock_data.loc[stock_data.index[i],'Close']

    if(c>o):
        #Bull Maru
        if(((h-c)/c < shadow_length) & ((o-l)/o < shadow_length) & (min_candle_length < (c-o)/o < max_candle_length)):
            return 1
        else:
            return 0
    elif(o>c):
        #Bear Maru
        if(((c-l)/c < shadow_length) & ((h-o)/o < shadow_length) & (min_candle_length < (o-c)/c < max_candle_length)):
            return -1
        else:
            return 0
    else:
        return 0

#Engulfing

def Engulfing(stock_data):

    i = stock_data.shape[0]-1
    o = stock_data.loc[stock_data.index[i],'Open']
    c = stock_data.loc[stock_data.index[i],'Close']
    o_1 = stock_data.loc[stock_data.index[i-1],'Open']
    c_1 = stock_data.loc[stock_data.index[i-1],'Close']

    if((o>c) & (c_1>o_1)):
        if((o>c_1) & (c<o_1)):
            return -1
        else:
            return 0
    elif((c>o) & (c_1<o_1)):
        if((c>o_1) & (o<c_1)):
            return 1
        else:
            return 0
    else:
        return 0
    
#Harami

def Harami(stock_data,harami_length=None):

    harami_length = (harami_length if harami_length is not None else 30)/100

    i = stock_data.shape[0]-1
    o = stock_data.loc[stock_data.index[i],'Open']
    c = stock_data.loc[stock_data.index[i],'Close']
    o_1 = stock_data.loc[stock_data.index[i-1],'Open']
    c_1 = stock_data.loc[stock_data.index[i-1],'Close']

    if((o>c) & (c_1>o_1)):
        if((o<c_1) & (c>o_1)):
            if((o-c)>(harami_length*(c_1-o_1))):
                return -1
            else:
                return 0
        else:
            return 0
    elif((c>o) & (c_1<o_1)):
        if((o>c_1) & (c<o_1)):
            if((c-o)>(harami_length*(o_1-c_1))):
                return 1
            else:
                return 0
        else:
            return 0
    else:
        return 0
    
#Paper Umbrella

def Umbrella(stock_data,max_candle_length=None,
             shooting_star_upper_shadow=None,shooting_star_lower_shadow=None,
             hammer_upper_shadow=None,hammer_lower_shadow=None):
    max_candle_length = (max_candle_length if max_candle_length is not None else 2)/100
    shooting_star_upper_shadow = (shooting_star_upper_shadow if shooting_star_upper_shadow is not None else 2)/100
    shooting_star_lower_shadow = (shooting_star_lower_shadow if shooting_star_lower_shadow is not None else 0.5)/100
    hammer_upper_shadow = (hammer_upper_shadow if hammer_upper_shadow is not None else 0.5)/100
    hammer_lower_shadow = (hammer_lower_shadow if hammer_lower_shadow is not None else 2)/100

    i = stock_data.shape[0]-1
    o = stock_data.loc[stock_data.index[i],'Open']
    l = stock_data.loc[stock_data.index[i],'Low']
    h = stock_data.loc[stock_data.index[i],'High']
    c = stock_data.loc[stock_data.index[i],'Close']

    if((c>o) & (0.004 < (c-o)/o < max_candle_length) & ((h-c)>=(shooting_star_upper_shadow)*(c-o)) & ((o-l)/l < shooting_star_lower_shadow)):
        return -1
    elif((o>c) & (0.004 < (o-c)/o < max_candle_length) & ((c-l)>=(hammer_lower_shadow)*(o-c)) & ((h-o)/o < hammer_upper_shadow)):
        return 1
    else:
        return 0
    
#Star

def Star(stock_data):
    i = stock_data.shape[0]-1
    o = stock_data.loc[stock_data.index[i],'Open']
    c = stock_data.loc[stock_data.index[i],'Close']
    o_1 = stock_data.loc[stock_data.index[i-1],'Open']
    c_1 = stock_data.loc[stock_data.index[i-1],'Close']
    o_2 = stock_data.loc[stock_data.index[i-2],'Open']
    c_2 = stock_data.loc[stock_data.index[i-2],'Close']


    if((c_2>o_2) &
       (o_1>c_2) &
       (o<c_1) &
       (c<o_2)):
        return -1    
    elif((c_2<o_2) &
       (o_1<c_2) &
       (o>c_1) &
       (c>o_2)):
        return 1
    else:
        return 0
    
#Doji

def Doji(stock_data,shadow_length=None,max_candle_length=None):
    shadow_length = (shadow_length if shadow_length is not None else 2)/100
    max_candle_length = (max_candle_length if max_candle_length is not None else 1)/100

    i = stock_data.shape[0]-1
    o = stock_data.loc[stock_data.index[i],'Open']
    l = stock_data.loc[stock_data.index[i],'Low']
    h = stock_data.loc[stock_data.index[i],'High']
    c = stock_data.loc[stock_data.index[i],'Close']

    if(c>o):
        if(((c-o)/c <= max_candle_length) & ((h-c)/c >= shadow_length) & ((o-l)/o >= shadow_length)):
            return 1
        else:
            return 0
    elif(o>c):
        if(((o-c)/o <= max_candle_length) & ((h-o)/o >= shadow_length) & ((c-l)/c >= shadow_length)):
            return 1
        else:
            return 0
    else:
        return 0

#Gap

def Gap(stock_data,gap_range=None):  
    i = stock_data.shape[0]-1
    o = stock_data.loc[stock_data.index[i],'Open']
    c = stock_data.loc[stock_data.index[i],'Close']
    o_1 = stock_data.loc[stock_data.index[i-1],'Open']
    c_1 = stock_data.loc[stock_data.index[i-1],'Close']

    gap_range = (gap_range if gap_range is not None else 0.5)/100

    if((o>(c_1*gap_range+(c_1)))):
        if((c>o)):
            stock_data.at[stock_data.index[i],'Gap_Up'] = 1
        else:
            return 0

    elif((o<(c_1-(c_1*gap_range)))):
        if((o>c)):
            stock_data.at[stock_data.index[i],'Gap_Down'] = 1
        else:
            return 0
    else:
        return 0
                
def triangle_RSI(stock_name,period=None,a=None,b=None):
    stock_data = yf.download(tickers = stock_name + '.NS' ,period = '10y', interval = '1d')
    period = period if period is not None else 14
    def RSI_df(df,period):
        temp_df = pd.DataFrame()
        temp_df = df
        temp_df['Price_Change'] = temp_df['Close'] - temp_df['Close'].shift(1)
        temp_df['Daily_gain'] = np.where(temp_df['Price_Change']>=0,temp_df['Price_Change'],0)
        temp_df['Daily_loss'] = np.where(temp_df['Price_Change']<0,abs(temp_df['Price_Change']),0)
        temp_df['Avg_gain'] = temp_df['Daily_gain'].rolling(period).sum()
        temp_df['Avg_loss'] = temp_df['Daily_loss'].rolling(period).sum()
        temp_df['RS'] = temp_df['Avg_gain']/temp_df['Avg_loss']
        temp_df['RSI'] = (temp_df['RS']/(1+temp_df['RS']))*100
        df['RSI_'+str(period)] = temp_df['RSI']
    RSI_df(stock_data,period)
    return stock_data

def triangle(temp_RSI_data, RSI_period=None, RSI_fluc=None, up_RSI=None, down_RSI=None):
    date = []
    period_RSI = []
    x = RSI_period if RSI_period is not None else 14
    y = RSI_fluc if RSI_fluc is not None else 2
    a = up_RSI if up_RSI is not None else 57
    b = down_RSI if down_RSI is not None else 43

    for i in range(0,x-1):
        for j in range(i,temp_RSI_data.shape[0],x):
            date.append(temp_RSI_data.index[j])
            period_RSI.append(temp_RSI_data.loc[temp_RSI_data.index[j],'RSI_'+str(x)])

    RSI_df = pd.DataFrame(period_RSI,index=date,columns=['RSI_'+str(x)])

    temp_RSI_data['Double_Top/Double_Bottom'] = 0
    temp_RSI_data['Triple_Top/Triple_Bottom'] = 0
    temp_RSI_data['Flag_Up/Flag_Down'] = 0

    for i in range(RSI_df.shape[0]):
        curr_val = RSI_df.loc[RSI_df.index[i],'RSI_'+str(x)]
        prev_val = RSI_df.loc[RSI_df.index[i-1],'RSI_'+str(x)]
        max_val = prev_val + y
        min_val = prev_val - y
        if((curr_val <= max_val) & (curr_val >= min_val)):
            if((curr_val>=prev_val) &
               (prev_val -y <= RSI_df.loc[RSI_df.index[i-2],'RSI_'+str(x)]) &
               (RSI_df.loc[RSI_df.index[i-2],'RSI_'+str(x)] <= prev_val+(2*y))):
                temp_RSI_data.at[RSI_df.index[i],'Triple_Top/Triple_Bottom'] = 1
            elif((curr_val<prev_val) &
                 (prev_val-(2*y) <= RSI_df.loc[RSI_df.index[i+2],'RSI_'+str(x)]) &
                 (RSI_df.loc[RSI_df.index[i-2],'RSI_'+str(x)]<= prev_val+(y))):
                temp_RSI_data.at[RSI_df.index[i],'Triple_Top/Triple_Bottom'] = 1
            elif( (b <= prev_val) &
                 (prev_val <= a)):
                temp_RSI_data.at[RSI_df.index[i],'Double_Top/Double_Bottom'] = 1
            elif(prev_val > a):
                temp_RSI_data.at[RSI_df.index[i],'Flag_Up/Flag_Down'] = 1
            elif(prev_val < b):
                temp_RSI_data.at[RSI_df.index[i],'Flag_Up/Flag_Down'] = -1
                
    return temp_RSI_data.tail(1)

def Double_Triangle(data):
    return data.loc[data.index[0],'Double_Top/Double_Bottom']
def Triple_Triangle(data):
    return data.loc[data.index[0],'Triple_Top/Triple_Bottom']
def Flag(data):
    return data.loc[data.index[0],'Flag_Up/Flag_Down']

In [None]:
pattern_signal_df = pd.DataFrame(columns=['Stock_Name','Marubuzo','Engulfing','Harami','Umbrella','Star','Doji','Gap','Double_Triangle','Triple_Triangle','Flag'])

import time
start = time.time()

def pattern_signal_test(stock_name):
    stock_data = yf.download(tickers = stock_name + '.NS' ,period = '1mo', interval = '5m')
    stock_data = stock_data[stock_data['Volume'] != 0]
    index = pattern_signal_df.shape[0]
    pattern_signal_df.at[index,'Stock_Name'] = stock_name
    pattern_signal_df.at[index,'Marubuzo'] = Marubuzo(stock_data)
    pattern_signal_df.at[index,'Engulfing'] = Engulfing(stock_data)
    pattern_signal_df.at[index,'Harami'] = Harami(stock_data)
    pattern_signal_df.at[index,'Umbrella'] = Umbrella(stock_data)
    pattern_signal_df.at[index,'Star'] = Star(stock_data)
    pattern_signal_df.at[index,'Doji'] = Doji(stock_data)
    pattern_signal_df.at[index,'Gap'] = Gap(stock_data)
    temp_RSI_data = triangle_RSI(stock_name)
    triangle_data = triangle(temp_RSI_data)
    pattern_signal_df.at[index,'Double_Triangle'] = Double_Triangle(triangle_data)
    pattern_signal_df.at[index,'Triple_Triangle'] = Triple_Triangle(triangle_data)
    pattern_signal_df.at[index,'Flag'] = Flag(triangle_data)

for symbol in oppor_univ_symbols:
    try:
        pattern_signal_test(symbol)
    except:
        continue
        
end = time.time()
print("Time Taken\t"+str(end-start))

In [None]:
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

In [None]:
pattern_signal_df

Chart pattern screener for more signals

In [None]:
def triangle_RSI(stock_name,period=None,a=None,b=None):
    stock_data = yf.download(tickers = stock_name + '.NS' ,period = '1d', interval = '1m')
    stock_data = stock_data[stock_data['Volume'] != 0]
    period = period if period is not None else 14
    def RSI_df(df,period):
        temp_df = pd.DataFrame()
        temp_df = df
        temp_df['Price_Change'] = temp_df['Close'] - temp_df['Close'].shift(1)
        temp_df['Daily_gain'] = np.where(temp_df['Price_Change']>=0,temp_df['Price_Change'],0)
        temp_df['Daily_loss'] = np.where(temp_df['Price_Change']<0,abs(temp_df['Price_Change']),0)
        temp_df['Avg_gain'] = temp_df['Daily_gain'].rolling(period).sum()
        temp_df['Avg_loss'] = temp_df['Daily_loss'].rolling(period).sum()
        temp_df['RS'] = temp_df['Avg_gain']/temp_df['Avg_loss']
        temp_df['RSI'] = (temp_df['RS']/(1+temp_df['RS']))*100
        df['RSI_'+str(period)] = temp_df['RSI']
    RSI_df(stock_data,period)
    return stock_data

def triangle(temp_RSI_data, RSI_period=None, RSI_fluc=None, up_RSI=None, down_RSI=None):
    date = []
    period_RSI = []
    x = RSI_period if RSI_period is not None else 14
    y = RSI_fluc if RSI_fluc is not None else 2
    a = up_RSI if up_RSI is not None else 57
    b = down_RSI if down_RSI is not None else 43

    for i in range(0,x-1):
        for j in range(i,temp_RSI_data.shape[0],x):
            date.append(temp_RSI_data.index[j])
            period_RSI.append(temp_RSI_data.loc[temp_RSI_data.index[j],'RSI_'+str(x)])

    RSI_df = pd.DataFrame(period_RSI,index=date,columns=['RSI_'+str(x)])

    temp_RSI_data['Double_Top/Double_Bottom'] = 0
    temp_RSI_data['Triple_Top/Triple_Bottom'] = 0
    temp_RSI_data['Flag_Up/Flag_Down'] = 0

    for i in range(RSI_df.shape[0]-2):
        curr_val = RSI_df.loc[RSI_df.index[i],'RSI_'+str(x)]
        prev_val = RSI_df.loc[RSI_df.index[i-1],'RSI_'+str(x)]
        max_val = prev_val + y
        min_val = prev_val - y
        if((curr_val <= max_val) & (curr_val >= min_val)):
            if((curr_val>=prev_val) &
               (prev_val -y <= RSI_df.loc[RSI_df.index[i-2],'RSI_'+str(x)]) &
               (RSI_df.loc[RSI_df.index[i-2],'RSI_'+str(x)] <= prev_val+(2*y))):
                temp_RSI_data.at[RSI_df.index[i],'Triple_Top/Triple_Bottom'] = 1
            elif((curr_val<prev_val) &
                 (prev_val-(2*y) <= RSI_df.loc[RSI_df.index[i+2],'RSI_'+str(x)]) &
                 (RSI_df.loc[RSI_df.index[i+2],'RSI_'+str(x)]<= prev_val+(y))):
                temp_RSI_data.at[RSI_df.index[i],'Triple_Top/Triple_Bottom'] = 1
            elif( (b <= prev_val) &
                 (prev_val <= a)):
                temp_RSI_data.at[RSI_df.index[i],'Double_Top/Double_Bottom'] = 1
            elif(prev_val > a):
                temp_RSI_data.at[RSI_df.index[i],'Flag_Up/Flag_Down'] = 1
            elif(prev_val < b):
                temp_RSI_data.at[RSI_df.index[i],'Flag_Up/Flag_Down'] = -1
                
    return temp_RSI_data


for symbol in oppor_univ_symbols: 
    for i in range(12,31):
        temp_RSI_data = triangle_RSI(stock_name,period = i)
        triangle_data = triangle(temp_RSI_data,RSI_period = i)
        print(i)
        display(temp_RSI_data[temp_RSI_data['Double_Top/Double_Bottom'] == 1].tail(10))

WUPHF upon signal detection

In [None]:
import os
import smtplib,ssl

In [None]:
#desired signal
filt = ((pattern_signal_df['Double_Triangle'] == 1))

if(pattern_signal_df[filt].shape[0] > 0):    
    # SMTP session
    s = smtplib.SMTP('smtp.gmail.com', 587)

    # security
    s.starttls()

    s.login(os.environ.get('Quants_wuphf_username'),'agpcryxajmdsqgwt') 
            #os.environ.get('Quants_wuphf_password')

    message = "A chart pattern signal(Double_Triangle) encountered."
    sender_email_id = os.environ.get('Quants_wuphf_username')
    receiver_email_id = input("Enter your email address to receive alerts\n")

    s.sendmail(sender_email_id, receiver_email_id, message)
    s.quit()

Indicator confirmation

In [None]:
#volume

def volume(stock_data,mode=None,period=None):
    period = period if period is not None else 10
    mode = mode if mode is not None else 'S'
    
    if(mode == 'S'):
        Volume_MA_data = stock_data['Volume'].rolling(window=period).mean()
    elif(mode == 'E'):
        Volume_MA_data = stock_data['Volume'].ewm(span=period).mean()
    
    if((stock_data.loc[stock_data.tail(1).index,'Volume'].item() >= Volume_MA_data.tail(1).item())):
        return 1
    elif((stock_data.loc[stock_data.tail(1).index,'Volume'].item() < Volume_MA_data.tail(1).item())):
        return 0

#SMA

def SMA(stock_data,period_1=None,period_2=None):
    period_1 = period_1 if period_1 is not None else 50
    period_2 = period_2 if period_2 is not None else 200
    
    small_SMA = stock_data['Close'].rolling(window=period_1).mean()
    long_SMA = stock_data['Close'].rolling(window=period_2).mean()
    
    if((small_SMA.tail(1).item() >= long_SMA.tail(1).item())):
        return 1
    elif((small_SMA.tail(1).item() <= long_SMA.tail(1).item())):
        return 0
    
#EMA

def EMA(stock_data,period_1=None,period_2=None,call=None):
    period_1 = period_1 if period_1 is not None else 50
    period_2 = period_2 if period_2 is not None else 200
    call = call if call is not None else 'S'
    
    stock_data['small_EMA'] = stock_data['Close'].ewm(span=period_1).mean()
    stock_data['long_EMA'] = stock_data['Close'].ewm(span=period_2).mean()
    
    if(call == 'D'):
        return stock_data
    elif(call == 'P'):
        #plotting data
        pass
    elif(call == 'S'):
        if((stock_data['small_EMA'].tail(1).item() >= stock_data['long_EMA'].tail(1).item())):
            return 1
        elif((stock_data['small_EMA'].tail(1).item() <= stock_data['long_EMA'].tail(1).item())):
            return 0

#RSI

def RSI(stock_data,period=None):
    period = period if period is not None else 14

    stock_data['Price_Change'] = stock_data['Close'] - stock_data['Close'].shift(1)
    stock_data['Daily_gain'] = np.where(stock_data['Price_Change']>=0,stock_data['Price_Change'],0)
    stock_data['Daily_loss'] = np.where(stock_data['Price_Change']<0,abs(stock_data['Price_Change']),0)
    stock_data['Avg_gain'] = stock_data['Daily_gain'].rolling(period).sum()
    stock_data['Avg_loss'] = stock_data['Daily_loss'].rolling(period).sum()
    stock_data['RS'] = stock_data['Avg_gain']/stock_data['Avg_loss']
    stock_data['RSI'] = (stock_data['RS']/(1+stock_data['RS']))*100
    
    return stock_data['RSI'].tail(1).item()
    
#MACD

def MACD(stock_data,period_1=None,period_2=None,signal_period=None):
    period_1 = period_1 if period_1 is not None else 7
    period_2 = period_2 if period_2 is not None else 21
    signal_period = signal_period if signal_period is not None else 9
    
    stock_data = EMA(stock_data,call='D')
    stock_data['MACD_data'] = stock_data['small_EMA'] - stock_data['long_EMA']
    stock_data['MACD_Signal'] = stock_data['MACD_data'].ewm(span=signal_period).mean()
    
    if((stock_data['MACD_data'].tail(1).item() >= stock_data['MACD_Signal'].tail(1).item())):
        return 1
    elif((stock_data['MACD_data'].tail(1).item() < stock_data['MACD_Signal'].tail(1).item())):
        return -1
    else:
        return 0
    
#ADX 

#Trend based

#ML signal

In [None]:
indicator_signal_df = pd.DataFrame(columns=['Stock_Name','Volume','SMA','EMA','RSI','MACD'])

def indicator_signal_test(stock_name):
    stock_data = yf.download(tickers = stock_name + '.NS' ,period = '10y', interval = '1d')
    index = indicator_signal_df.shape[0]
    indicator_signal_df.at[index,'Stock_Name'] = stock_name
    indicator_signal_df.at[index,'Volume'] = volume(stock_data,mode='S')
    indicator_signal_df.at[index,'SMA'] = SMA(stock_data)
    indicator_signal_df.at[index,'EMA'] = EMA(stock_data)
    indicator_signal_df.at[index,'RSI'] = RSI(stock_data)
    #indicator_signal_df.at[index,'CCI'] = CCI(stock_data)
    indicator_signal_df.at[index,'MACD'] = MACD(stock_data)
    #indicator_signal_df.at[index,'ADX'] = ADX(stock_data)
    #indicator_signal_df.at[index,'Main_Trend'] = Trend(stock_data)
    #indicator_signal_df.at[index,'Aroon'] = Aroon(stock_data)
    #ML signal

for symbol in oppor_univ_symbols:
        indicator_signal_test(symbol)

In [None]:
indicator_signal_df