In [61]:
import os
from dotenv import load_dotenv
import sqlalchemy
import pymysql
import ta
import pandas as pd
import numpy as np
import yfinance as yf
pymysql.install_as_MySQLdb()
import smtplib
from pretty_html_table import build_table
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
import datetime as dt

import pandas_market_calendars as mcal
import plotly.express as px
import html


In [62]:
class DbConn:
    def __init__(self,name):
        self.name = name
        load_dotenv()
        self.endpoint = os.getenv("DB_ACCESS_KEY")
        self.username = os.getenv("USERNAME")
        self.password = os.getenv("USERPASS")
       
    def getDbConn(self):
        db_connection_str = "mysql+pymysql://"+self.username+ ":" +self.password +"@"+self.endpoint+"/"+ self.name
        print(db_connection_str)
        return sqlalchemy.create_engine(db_connection_str).connect()

    def getDb(self):
        return pymysql.connect(host=self.endpoint, user=self.username,passwd=self.password, database= self.name)

In [63]:
class GetTaReportParms:
    def __init__(self, name):
        self.name = name
        dbconn = DbConn(name)
        self.conn = dbconn.getDbConn()
        self.stkGrpLst = self.getStkGrpLst()
        self.chgDaylst = self.getChgDayLst()
        
    def getStkGrpLst(self):
        req = self.name+'.'+f'`StockGroup`'
        return pd.read_sql(f"SELECT Name FROM {req}",self.conn).Name.to_list()
    def getChgDayLst(self):
        req = self.name+'.'+f'`Property`'
        return pd.read_sql(f"SELECT CAST(value as SIGNED) value FROM {req} where Type = 2",self.conn).value.to_list()


In [64]:
class UpdStkGrpYfData:
    def __init__(self, name):
        self.name = name
        dbconn = DbConn(name)
        self.conn = dbconn.getDbConn()

    def gettables(self):
        query = f"""SELECT table_name FROM information_schema.tables
        WHERE table_schema = '{self.name}'"""
        df = pd.read_sql(query, self.conn)
        df['Schema'] = self.name
        return df

    def maxdate(self):
        req = self.name+'.'+f'`{self.gettables().TABLE_NAME[0]}`'
        return pd.read_sql(f"SELECT MAX(Date) FROM {req}",self.conn)

    def updateDB(self):
        maxdate=self.maxdate()['MAX(Date)'][0]
        print('DB MaxDate =', maxdate)
        for symbol in self.gettables().TABLE_NAME:
            data = yf.download(symbol, start=maxdate)
            data = data[data.index > maxdate]
            data = data.reset_index()
            data.to_sql(symbol, self.conn, if_exists='append')
        print(f'{self.name} successfully updated')

In [65]:
class StkSignals:
    def __init__(self, name):
        self.name = name
        dbconn = DbConn(name)
        self.conn = dbconn.getDbConn()
    
    def getTaResults(self,stockGrpName):
        # query = f"""SELECT A.* FROM `Result` A 
        # inner join (select max(date) maxDate  FROM `Result`) B 
        # on A.date = B.maxDate where A.StkGrp = '{stockGrpName}'"""

        query = f"""SELECT sg.name StkGrp, r.* FROM `StockGroup` sg
        inner join `GrpStkRel` gsr on sg.id = gsr.Gid
        inner join `Stock` stk on gsr.Sid = stk.id
        inner join `Result` r on stk.Symbol = r.Symbol and r.StkGrp = 'YfData'
        where sg.Name ='{stockGrpName}'"""

        dfTaResults = pd.read_sql(query, self.conn)
        # print(dfResults.StkGrp,dfResults.Symbol)
        # print(dfResults)
        return dfTaResults
    def createSingnals(self,dfTaResults):
        indicators = ['Decision MACD','Decision GC','Decision RSI/SMA']
        sigColumns=['Name','Symbol','Decision MACD','Decision GC','Decision RSI/SMA']
        dfSignals=pd.DataFrame(columns=sigColumns)
        for ind in dfTaResults.index:
            macd, gc, rsi, sig ='', '', '', ''
            if dfTaResults['Decision MACD'][ind] == True: 
                macd, sig = 'X', 'MACD'
            if dfTaResults['Decision GC'][ind] == True:
                gc, sig = 'X', 'GC'
            if dfTaResults['Decision RSI/SMA'][ind] == True:
                rsi, sig = 'X', 'RSI'
 
            dfSignals = dfSignals.append(
                {
                'Name': 'TA',
                'Symbol' : dfTaResults['Symbol'][ind],
                'Decision MACD' : macd,
                'Decision GC' : gc,
                'Decision RSI/SMA' : rsi,
                'Signal' : sig
                },ignore_index=True
            )
        return dfSignals.set_index('Name',drop = True)

In [66]:
class ProcStock:
    def __init__(self, name):
        self.name = name
        dbconn = DbConn(name)
        self.conn = dbconn.getDbConn()

    def gettables(self):
        query = f"""SELECT table_name FROM information_schema.tables
        WHERE table_schema = '{self.name}'"""
        df = pd.read_sql(query, self.conn)
        df['Schema'] = self.name
        return df

    def getprices(self):
        prices = []
        for table, schema in zip(self.gettables().TABLE_NAME, self.gettables().Schema):
            req = schema+'.'+f'`{table}`'
            prices.append(pd.read_sql(f"SELECT * \
                FROM (SELECT Date, '{self.name}' as StkGrp, '{table}' as Symbol, Close FROM {req} ORDER BY Date desc Limit 201) SUB ORDER BY Date ASC",self.conn))
        return prices


In [67]:
class Recommendor:
    def __init__(self, name, prices):
        self.name = name
        self.prices = prices
        dbconn = DbConn(name)
        self.conn = dbconn.getDbConn()
        self.db = dbconn.getDb()
   
    def MACDdecision(self,df):
        df['MACD_diff'] = ta.trend.macd_diff(df.Close)
        df['Decision MACD'] = np.where((df.MACD_diff > 0) & (df.MACD_diff.shift(1) < 1), True, False)

    def Goldencrossdecision(self,df):
        df['SMA20'] = ta.trend.sma_indicator(df.Close, window=20)
        df['SMA50'] = ta.trend.sma_indicator(df.Close, window=50)
        df['GCSignal'] = np.where(df['SMA20'] > df['SMA50'], True, False)
        df['Decision GC'] = df.GCSignal.diff()

    def RSI_SMAdecision(self,df):
        df['RSI'] = ta.momentum.rsi(df.Close, window=10)
        df['SMA200'] = ta.trend.sma_indicator(df.Close, window=200)
        df['Decision RSI/SMA'] = np.where((df.Close > df.SMA200) & (df.RSI < 30), True, False)

    def applytechnicals(self):
        for frame in self.prices:
            self.MACDdecision(frame)
            self.Goldencrossdecision(frame)
            self.RSI_SMAdecision(frame)
 
    def recommend(self):
        indicators = ['Decision MACD','Decision GC','Decision RSI/SMA']
        sigColumns=['Name','Symbol','Decision MACD','Decision GC','Decision RSI/SMA']
        # dfSignals=pd.DataFrame(columns=sigColumns)
        self.applytechnicals()
        mycursor = self.db.cursor()
        
        for frame in self.prices:
            if frame.empty is False:
                macd, gc, rsi, sig ='', '', '', ''
                for indicator in indicators:
                    if frame[indicator].iloc[-1] == True: # only chk today's result in the last row
                        if 'Decision MACD' == indicator:
                            macd, sig = 'X', 'MACD'
                        if 'Decision GC' == indicator:
                            gc, sig = 'X', 'GC' 
                        if 'Decision RSI/SMA' == indicator:
                            rsi, sig = 'X', 'RSI'
                if sig != '':
                    dfDb = frame.tail(1)
                    dfDb=dfDb.set_index('Date')
                    # print(dfDb.head())
                    # delete the row if already exists
                    # print(type(dfDb),'@@@@@@',dfDb.index.values[0],'+++++++++++',dfDb.iloc[0][0],'-------',dfDb.iloc[0][1])
                    sql = f"DELETE FROM Result WHERE Date = '{dfDb.index.values[0]}' and StkGrp = '{dfDb.iloc[0][0]}' and Symbol = '{dfDb.iloc[0][1]}'"
                    # print('==========',sql)
                    mycursor.execute(sql)
                    self.db.commit()    
                    # add to DB
                    dfDb.to_sql('Result', self.conn, if_exists='append')

In [68]:
class CreateEmails:
    smtp_server ='smtp.gmail.com'
    port = 587
    def __init__(self, name, dfSignals, pdf):
        self.name = name
        self.dfSignals =dfSignals
        self.pdf = pdf
        self.sender = os.getenv("SENDER_EMAIL")
        self.receivers = os.getenv("RECEIVER_EMAILS")
        self.password = os.getenv("PASSWORD")
        self.dbconn = DbConn('ta')
        self.conn = self.dbconn.getDbConn()
    
    def getTaEmails(self,name):
        

        query = f"""SELECT name, TaEmails FROM `User` u
        inner join `UserGrpRel` rel on u.Id = rel.Uid
        inner join `StockGroup` grp on rel.Gid = grp.Id
        where inactive = 0 and grp.Name ='{name}'"""
        dfEmails = pd.read_sql(query, self.conn)
        return dfEmails        
        
    def sendEmails(self):    
        taEmails = self.getTaEmails(self.name)
        print('\n',taEmails['TaEmails'][0])
        print(type(taEmails['TaEmails'][0])) 
        if not taEmails.empty:        
            message = MIMEMultipart()
            message['Subject'] = f'{self.name} buying signals report'
            message['From'] = self.sender
 
            now = dt.datetime.now().strftime("%m/%d/%Y %H:%M:%S")
            header = f'<h2>{self.name} buying signals report created at {now} UTC</h2>'
            url = 'https://www.tradingview.com/chart/?symbol=' # link to tradeview web site 
            self.dfSignals['Symbol']= '<a href=\"' + url + self.dfSignals['Symbol']+'\">' + self.dfSignals['Symbol'] + '</a>'
            body = html.unescape(build_table(self.dfSignals, "green_light",text_align ="center"))
            # print(body)
            footer = f'<h2>Good Luck!!!</h2>'
            img = MIMEImage(self.pdf, "pdf" )
            img.add_header('Content-Disposition', 'attachment', filename=self.name+".pdf")
            body_content = header + body + footer
            message.attach(MIMEText(body_content, "html"))
            message.attach(img)
            msg_body = message.as_string()
            server =smtplib.SMTP(self.smtp_server, self.port)
            server.starttls()
            server.login(self.sender,self.password)
            # print('----\n',taEmails)
            for ind in taEmails.index:
                # print('to emails\n',taEmails['TaEmails'][ind])
                message['To'] = taEmails['TaEmails'][ind]
                server.sendmail(self.sender,taEmails['TaEmails'][ind].split(","),msg_body)
            #
        server.quit()

In [69]:
class DbQuoteData:
    def __init__(self, name, tickers_lst):
        self.name = name
        self.tickers_lst = tickers_lst
        self.conn = self.getDbConn(name)

    def getDbConn(self,name):
        dbconn = DbConn(name)
        return dbconn.getDbConn()

    def gettables(self):
        query = f"""SELECT table_name FROM information_schema.tables
        WHERE table_schema = '{self.name}'"""
        df = pd.read_sql(query, self.conn)
        df['Schema'] = self.name
        return df

        

    # Step1: Calcualte the start and end dates based on the input days from the valid NYSC calendar
    def get_start_end_dates(self,day):
    # get the last valid NYSE bus dates for the last 40 calendar dates using market calendar
        nyse = mcal.get_calendar('NYSE')
        schedule_nyse = nyse.schedule(
            (dt.datetime.today()-dt.timedelta(40)).strftime("%Y-%m-%d"),
            dt.datetime.today().strftime("%Y-%m-%d"))
    #check today's market closed or not
        if dt.datetime.now(dt.timezone.utc).hour >= schedule_nyse.market_close[-1].hour:
            market_closed_indicator = 0 # now is after 4PM ET- market closed-- we have today's data
        else:
            market_closed_indicator = 1  # else yesterday's data      
        end = schedule_nyse.market_close[-1-market_closed_indicator].strftime("%Y-%m-%d")
        start = schedule_nyse.market_close[-day - market_closed_indicator].strftime("%Y-%m-%d")
        return start, end
    def getDbCloseQuote(self, symbol, date):
        req = self.name+'.'+f'`{symbol}`'
        sql = f"SELECT `Adj Close` FROM {req} where Date = '{date}'"
        result = self.conn.execute(sql)
        adjCloseQ = 0.0
        for row in result:
            adjCloseQ = row['Adj Close']
        return adjCloseQ
    def calcChgs(self,symbol,start,end):
        startQ= self.getDbCloseQuote(symbol,start)
        endQ= self.getDbCloseQuote(symbol,end)
        if (endQ == 0.0 or startQ == 0.0) :
            return 0.0
        else:
            return (endQ-startQ)/startQ*100
    # Step2: Get the quotes data from Database based on the input parm 'days'
    def calc_percent_chgs(self,day):
        start, end = self.get_start_end_dates(day) #call step1
        # print(f'get quotes start---{dt.datetime.now().strftime("%d/%m/%Y %H:%M:%S")}')
        print(f'Start Date={start}  End Date={end}')
        chgs=[]
        for symbol in self.tickers_lst:
            chg = self.calcChgs(symbol,start, end)
            chgs.append(chg)
        return chgs

In [70]:
class TksChgsByDays:
    #days_lst = (2,5,10) # 1D/1W/2W
    def __init__(self, name, tks, days):
        self.name = name
        self.tks = tks
        self.days =days    
    def get_chgs(self):
        tksChgsByDays = []
        dbQuoteData =DbQuoteData(self.name, self.tks)
        for day in self.days:
            tkerChgs = dbQuoteData.calc_percent_chgs(day)
            tksChgsByDays.append(tkerChgs) 
        return tksChgsByDays

In [71]:
class PlotChgs:
    def __init__(self, name, chgs):
        self.name = name
        self.chgs = chgs
    def doPlot(self):
        fig = px.bar(self.chgs,width=1000, height=400)
        fig.update_layout(barmode = 'group', bargap = 0.2, bargroupgap = 0.0)
        fig.update_layout(
            title=self.name + " Percentage Changes",
            title_x=0.5,
            xaxis_tickangle=-45,
            xaxis_showticklabels= True,
            xaxis_type = 'category',
            xaxis_title="Symbols",
            yaxis_title="Percentage",
            legend_title="Days",
            font=dict(
                family="Courier New, monospace",
                size=18,
                color="RebeccaPurple"
            )
        )
        fig.show()
        pdf = fig.to_image(format="pdf")  
        return pdf

In [72]:
class ApplyTechAnalysis:
    def __init__(self, name):
        self.name = name

    # get stock data
    def getStkprice(self):
        stkData = ProcStock(self.name)
        self.prices = stkData.getprices()
    # create stock recommend results
    def applyTa(self):
        self.getStkprice()
        stkRecommendor = Recommendor('TA',self.prices)
        stkRecommendor.recommend()    

In [73]:
class StockGrpBuyingReport:
    def __init__(self,name, days):
        self.name = name
        self.days = days
    def createBuyingRpt(self):
     # create stock buying signals
        stkSignals = StkSignals('TA')
        dfTaResults = stkSignals.getTaResults(self.name)
        dfStkGrpSignals = stkSignals.createSingnals(dfTaResults)
        print('aaaaaaaaaa\n',dfStkGrpSignals.head(10)) 
    # create changes
        if not dfStkGrpSignals.empty:
            stkGrpSymbolLst =dfStkGrpSignals.Symbol.to_list()
            stkGrpChgsByDays = TksChgsByDays('YfData', stkGrpSymbolLst, self.days)
            stkGrpTksChgsByDays = stkGrpChgsByDays.get_chgs()
            # print(dfStkGrpSignals) 
            df = pd.DataFrame(stkGrpTksChgsByDays).transpose()
            df['Symbol']= ('(' + dfStkGrpSignals.Signal +') ' + dfStkGrpSignals.Symbol).to_list() # remove signal col
            df=df.set_index('Symbol',drop = True)
            daysNames =[]
            for day in self.days:
                daysNames.append(str(day)+'days Chgs')
            df.columns = daysNames
            # print(df)
        
    # create plot
            stkGrpchgsPlot=PlotChgs(self.name, df)
            svg = stkGrpchgsPlot.doPlot()
        
    # create email
            stkGrpEmails = CreateEmails(self.name,dfStkGrpSignals.drop(columns=['Signal']),svg) #remove signal col
            stkGrpEmails.sendEmails()

In [74]:
class ArkxEtfLstBuyingRpt:
    def __init__(self,etfLst, days):
        self.etfLst = etfLst
        self.days = days
    def createArkxEtfBuyingRpt(self):
        for etf in self.etfLst:
            print('Zzzzzzzzzzzzz  ',etf,'  Zzzzzzzzzzzzz')
            etfStockGrpBuyingReport = StockGrpBuyingReport(etf, self.days)
            etfStockGrpBuyingReport.createBuyingRpt()

In [75]:
# Daily process
# 1. Get process parms from DB
procParms = GetTaReportParms('TA')
# 1. update DB with yfdata
# procParms.stkGrpLst = ['SP100'] #TESTING

# updStkGrpYfData = UpdStkGrpYfData('YfData')
# updStkGrpYfData.updateDB()

# 2. Apply Technical Analysis to all the stocks in the TA DB

# applyTechAnalysis = ApplyTechAnalysis('YfData')
# applyTechAnalysis.applyTa()

# 3. Create TA reports based on the parms in the TA db
arkxEtfLstBuyingRpt = ArkxEtfLstBuyingRpt(procParms.stkGrpLst,procParms.chgDaylst)
# arkxEtfLstBuyingRpt = ArkxEtfLstBuyingRpt(['ARKK','ARKF','ARKW','ARKQ','ARKX','ARKG','CMY1','DJIA','SP100'],[2,5,10])
# arkxEtfLstBuyingRpt = ArkxEtfLstBuyingRpt(['SP100'],[2,5,10])
arkxEtfLstBuyingRpt.createArkxEtfBuyingRpt()

mysql+pymysql://root:12344321@localhost/TA
Zzzzzzzzzzzzz   ARKK   Zzzzzzzzzzzzz
mysql+pymysql://root:12344321@localhost/TA
aaaaaaaaaa
      Symbol Decision MACD Decision GC Decision RSI/SMA Signal
Name                                                         
TA     CRSP             X                                MACD
TA     FATE             X                                MACD
TA     HOOD             X                                MACD
TA     IOVA             X                                MACD
TA    NTDOY             X                                MACD
TA     PLTR             X                                MACD
TA     PRLB             X                                MACD
TA     SKLZ             X                                MACD
TA       SQ             X                                MACD
TA     TRMB                                          X    RSI
mysql+pymysql://root:12344321@localhost/YfData
Start Date=2021-09-28  End Date=2021-09-29
Start Date=2021-09-23  End Date

mysql+pymysql://root:12344321@localhost/ta


OperationalError: (pymysql.err.OperationalError) (1054, "Unknown column 'TaEmails' in 'field list'")
[SQL: SELECT name, TaEmails FROM `User` u
        inner join `UserGrpRel` rel on u.Id = rel.Uid
        inner join `StockGroup` grp on rel.Gid = grp.Id
        where inactive = 0 and grp.Name ='ARKK']
(Background on this error at: http://sqlalche.me/e/14/e3q8)

In [None]:
# etfStockGrpBuyingReport = StockGrpBuyingReport('ARKK',[2,5,10])
# etfStockGrpBuyingReport.createBuyingRpt()

In [None]:
# djia = Recommender('djia')
# djia.updateDB()
# dfdjiaSignals =djia.recommend()
# djiaEmails = CreateEmails('DJIA',dfdjiaSignals)
# djiaEmails.sendEmails()

In [None]:
# arkk = Recommender('arkk')
# # arkk.updateDB()
# dfarkkSignals =arkk.recommend()
# print(dfarkkSignals)
# tkersLst = dfarkkSignals.Symbol.to_list()
# print(tkersLst)
# DbQuoteData =DbQuoteData('arkk', tkersLst,2)
# chgs = DbQuoteData.calc_percent_chgs()
# # arkkEmails = CreateEmails('ARKK',dfarkkSignals)
# # arkkEmails.sendEmails()