In [1]:
import urllib
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text
import numpy as np
import talib
from talib import abstract
from ta.utils import dropna
import numpy as np
import concurrent.futures


HOST = "DESKTOP-JJT739B\SQLEXPRESS"
DB = "bse"
DRIVER = "ODBC+Driver+17+for+SQL+Server"

NO_OF_WORKERS = 2
HISTORY_TABLE = "bse_history"
OUTPUT_TABLENAME = "bse_indicators"

column_mappings = {
    "date": "Date",
    "symbol": "Symbol",
    "open": "Open",
    "low": "Low",
    "high": "High",
    "close": "Close",
    "volume": "Volume",
    "series": "Series"
}

INCREMENTAL_CHECK_QUERY = "SELECT [{DATE_COLUMN_NAME}] FROM {HISTORY_TABLE_NAME} WHERE NOT EXISTS (SELECT 1 FROM {CALC_TABLE_NAME} WHERE\
                        {HISTORY_TABLE_NAME}.{DATE_COLUMN_NAME} = {CALC_TABLE_NAME}.{DATE_COLUMN_NAME} AND {HISTORY_TABLE_NAME}.{SYMBOL_COLUMN_NAME} = \
                        {CALC_TABLE_NAME}.{SYMBOL_COLUMN_NAME}) and {SYMBOL_COLUMN_NAME}='{SYMBOL_NAME}' order by [{DATE_COLUMN_NAME}] DESC"

GET_PREVIOUS_DAYS_QUERY = "select top({NO_OF_ROWS}) * from {HISTORY_TABLE_NAME} where {SYMBOL_COLUMN_NAME}='{SYMBOL_NAME}' order by [{DATE_COLUMN_NAME}] desc"


CREATE_OUTPUT_TABLE_QUERY = f"""
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='{OUTPUT_TABLENAME}' and xtype='U')
CREATE TABLE {OUTPUT_TABLENAME} (
	[ID] [bigint] PRIMARY KEY IDENTITY(1,1),
	[Date] [date] NOT NULL,
	[Symbol] [varchar](250) NOT NULL,
	[Series] [varchar](20) NULL,
	[Prev Close] [float](2) NULL,
	[open] [float](2) NOT NULL,
	[high] [float](2) NOT NULL,
	[low] [float](2) NOT NULL,
	[Last] [float](2) NULL,
	[close] [float](2) NOT NULL,
	[VWAP] [float](2) NULL,
	[volume] [float](2) NOT NULL,
	[Turnover] [float] NULL,
	[Trades] [float](2) NULL,
	[Deliverable Volume] [float](2) NULL,
	[%Deliverble] [float](2) NULL,
	[Cycle_HT_DCPERIOD] [float](2) NULL,
	[Cycle_HT_DCPHASE] [float](2) NULL,
	[Cycle_HT_PHASOR_inphase] [float](2) NULL,
	[Cycle_HT_PHASOR_quadrature] [float](2) NULL,
	[Cycle_HT_SINE_sine] [float](2) NULL,
	[Cycle_HT_SINE_leadsine] [float](2) NULL,
	[Cycle_HT_TRENDMODE] [float](2) NULL,
	[Math_ADD] [float](2) NULL,
	[Math_DIV] [float](2) NULL,
	[Math_MAX] [float](2) NULL,
	[Math_MAXINDEX] [float](2) NULL,
	[Math_MIN] [float](2) NULL,
	[Math_MININDEX] [float](2) NULL,
	[Math_MINMAX_min] [float](2) NULL,
	[Math_MINMAX_max] [float](2) NULL,
	[Math_MINMAXINDEX_minidx] [float](2) NULL,
	[Math_MINMAXINDEX_maxidx] [float](2) NULL,
	[Math_MULT] [float](2) NULL,
	[Math_SUB] [float](2) NULL,
	[Math_SUM] [float](2) NULL,
	[Math_ATAN] [float](2) NULL,
	[Math_CEIL] [float](2) NULL,
	[Math_COS] [float](2) NULL,
	[Math_COSH] [float] NULL,
	[Math_EXP] [float] NULL,
	[Math_FLOOR] [float](2) NULL,
	[Math_LN] [float](2) NULL,
	[Math_LOG10] [float](2) NULL,
	[Math_SIN] [float](2) NULL,
	[Math_SINH] [float] NULL,
	[Math_SQRT] [float](2) NULL,
	[Math_TAN] [float](2) NULL,
	[Math_TANH] [float](2) NULL,
	[Momentum_ADX] [float](2) NULL,
	[Momentum_ADXR] [float](2) NULL,
	[Momentum_APO] [float](2) NULL,
	[Momentum_AROON_aroondown] [float](2) NULL,
	[Momentum_AROON_aroonup] [float](2) NULL,
	[Momentum_AROONOSC] [float](2) NULL,
	[Momentum_BOP] [float](2) NULL,
	[Momentum_CCI] [float](2) NULL,
	[Momentum_CMO] [float](2) NULL,
	[Momentum_DX] [float](2) NULL,
	[Momentum_MACD_macd] [float](2) NULL,
	[Momentum_MACD_macdsignal] [float](2) NULL,
	[Momentum_MACD_macdhist] [float](2) NULL,
	[Momentum_MACDEXT_macd] [float](2) NULL,
	[Momentum_MACDEXT_macdsignal] [float](2) NULL,
	[Momentum_MACDEXT_macdhist] [float](2) NULL,
	[Momentum_MACDFIX_macd] [float](2) NULL,
	[Momentum_MACDFIX_macdsignal] [float](2) NULL,
	[Momentum_MACDFIX_macdhist] [float](2) NULL,
	[Momentum_MFI] [float](2) NULL,
	[Momentum_MINUS_DI] [float](2) NULL,
	[Momentum_MINUS_DM] [float](2) NULL,
	[Momentum_MOM] [float](2) NULL,
	[Momentum_PLUS_DI] [float](2) NULL,
	[Momentum_PLUS_DM] [float](2) NULL,
	[Momentum_PPO] [float](2) NULL,
	[Momentum_ROC] [float](2) NULL,
	[Momentum_ROCP] [float](2) NULL,
	[Momentum_ROCR] [float](2) NULL,
	[Momentum_ROCR100] [float](2) NULL,
	[Momentum_RSI] [float](2) NULL,
	[Momentum_STOCH_slowk] [float](2) NULL,
	[Momentum_STOCH_slowd] [float](2) NULL,
	[Momentum_STOCHF_fastk] [float](2) NULL,
	[Momentum_STOCHF_fastd] [float](2) NULL,
	[Momentum_STOCHRSI_fastk] [float](2) NULL,
	[Momentum_STOCHRSI_fastd] [float](2) NULL,
	[Momentum_TRIX] [float](2) NULL,
	[Momentum_ULTOSC] [float](2) NULL,
	[Momentum_WILLR] [float](2) NULL,
	[Overlap_BBANDS_upperband] [float](2) NULL,
	[Overlap_BBANDS_middleband] [float](2) NULL,
	[Overlap_BBANDS_lowerband] [float](2) NULL,
	[Overlap_DEMA] [float](2) NULL,
	[Overlap_EMA] [float](2) NULL,
	[Overlap_HT_TRENDLINE] [float](2) NULL,
	[Overlap_KAMA] [float](2) NULL,
	[Overlap_MA] [float](2) NULL,
	[Overlap_MAMA_mama] [float](2) NULL,
	[Overlap_MAMA_fama] [float](2) NULL,
	[Overlap_MIDPOINT] [float](2) NULL,
	[Overlap_MIDPRICE] [float](2) NULL,
	[Overlap_SAR] [float](2) NULL,
	[Overlap_SAREXT] [float](2) NULL,
	[Overlap_SMA] [float](2) NULL,
	[Overlap_T3] [float](2) NULL,
	[Overlap_TEMA] [float](2) NULL,
	[Overlap_TRIMA] [float](2) NULL,
	[Overlap_WMA] [float](2) NULL,
	[Pattern_CDL2CROWS] [float](2) NULL,
	[Pattern_CDL3BLACKCROWS] [float](2) NULL,
	[Pattern_CDL3INSIDE] [float](2) NULL,
	[Pattern_CDL3LINESTRIKE] [float](2) NULL,
	[Pattern_CDL3OUTSIDE] [float](2) NULL,
	[Pattern_CDL3STARSINSOUTH] [float](2) NULL,
	[Pattern_CDL3WHITESOLDIERS] [float](2) NULL,
	[Pattern_CDLABANDONEDBABY] [float](2) NULL,
	[Pattern_CDLADVANCEBLOCK] [float](2) NULL,
	[Pattern_CDLBELTHOLD] [float](2) NULL,
	[Pattern_CDLBREAKAWAY] [float](2) NULL,
	[Pattern_CDLCLOSINGMARUBOZU] [float](2) NULL,
	[Pattern_CDLCONCEALBABYSWALL] [float](2) NULL,
	[Pattern_CDLCOUNTERATTACK] [float](2) NULL,
	[Pattern_CDLDARKCLOUDCOVER] [float](2) NULL,
	[Pattern_CDLDOJI] [float](2) NULL,
	[Pattern_CDLDOJISTAR] [float](2) NULL,
	[Pattern_CDLDRAGONFLYDOJI] [float](2) NULL,
	[Pattern_CDLENGULFING] [float](2) NULL,
	[Pattern_CDLEVENINGDOJISTAR] [float](2) NULL,
	[Pattern_CDLEVENINGSTAR] [float](2) NULL,
	[Pattern_CDLGAPSIDESIDEWHITE] [float](2) NULL,
	[Pattern_CDLGRAVESTONEDOJI] [float](2) NULL,
	[Pattern_CDLHAMMER] [float](2) NULL,
	[Pattern_CDLHANGINGMAN] [float](2) NULL,
	[Pattern_CDLHARAMI] [float](2) NULL,
	[Pattern_CDLHARAMICROSS] [float](2) NULL,
	[Pattern_CDLHIGHWAVE] [float](2) NULL,
	[Pattern_CDLHIKKAKE] [float](2) NULL,
	[Pattern_CDLHIKKAKEMOD] [float](2) NULL,
	[Pattern_CDLHOMINGPIGEON] [float](2) NULL,
	[Pattern_CDLIDENTICAL3CROWS] [float](2) NULL,
	[Pattern_CDLINNECK] [float](2) NULL,
	[Pattern_CDLINVERTEDHAMMER] [float](2) NULL,
	[Pattern_CDLKICKING] [float](2) NULL,
	[Pattern_CDLKICKINGBYLENGTH] [float](2) NULL,
	[Pattern_CDLLADDERBOTTOM] [float](2) NULL,
	[Pattern_CDLLONGLEGGEDDOJI] [float](2) NULL,
	[Pattern_CDLLONGLINE] [float](2) NULL,
	[Pattern_CDLMARUBOZU] [float](2) NULL,
	[Pattern_CDLMATCHINGLOW] [float](2) NULL,
	[Pattern_CDLMATHOLD] [float](2) NULL,
	[Pattern_CDLMORNINGDOJISTAR] [float](2) NULL,
	[Pattern_CDLMORNINGSTAR] [float](2) NULL,
	[Pattern_CDLONNECK] [float](2) NULL,
	[Pattern_CDLPIERCING] [float](2) NULL,
	[Pattern_CDLRICKSHAWMAN] [float](2) NULL,
	[Pattern_CDLRISEFALL3METHODS] [float](2) NULL,
	[Pattern_CDLSEPARATINGLINES] [float](2) NULL,
	[Pattern_CDLSHOOTINGSTAR] [float](2) NULL,
	[Pattern_CDLSHORTLINE] [float](2) NULL,
	[Pattern_CDLSPINNINGTOP] [float](2) NULL,
	[Pattern_CDLSTALLEDPATTERN] [float](2) NULL,
	[Pattern_CDLSTICKSANDWICH] [float](2) NULL,
	[Pattern_CDLTAKURI] [float](2) NULL,
	[Pattern_CDLTASUKIGAP] [float](2) NULL,
	[Pattern_CDLTHRUSTING] [float](2) NULL,
	[Pattern_CDLTRISTAR] [float](2) NULL,
	[Pattern_CDLUNIQUE3RIVER] [float](2) NULL,
	[Pattern_CDLUPSIDEGAP2CROWS] [float](2) NULL,
	[Pattern_CDLXSIDEGAP3METHODS] [float](2) NULL,
	[Price_AVGPRICE] [float](2) NULL,
	[Price_MEDPRICE] [float](2) NULL,
	[Price_TYPPRICE] [float](2) NULL,
	[Price_WCLPRICE] [float](2) NULL,
	[Statistic_BETA] [float](2) NULL,
	[Statistic_CORREL] [float](2) NULL,
	[Statistic_LINEARREG] [float](2) NULL,
	[Statistic_LINEARREG_ANGLE] [float](2) NULL,
	[Statistic_LINEARREG_INTERCEPT] [float](2) NULL,
	[Statistic_LINEARREG_SLOPE] [float](2) NULL,
	[Statistic_STDDEV] [float](2) NULL,
	[Statistic_TSF] [float](2) NULL,
	[Statistic_VAR] [float](2) NULL,
	[Volatility_ATR] [float](2) NULL,
	[Volatility_NATR] [float](2) NULL,
	[Volatility_TRANGE] [float](2) NULL,
	[Volume_AD] [float](2) NULL,
	[Volume_ADOSC] [float](2) NULL,
	[Volume_OBV] [float](2) NULL,
	[calculation_timestamp] [datetime] default current_timestamp
)
"""

engine = create_engine(f'mssql+pyodbc://{HOST}/{DB}?trusted_connection=yes&driver={DRIVER}')
conn = engine.connect()

def execute_sql(query, commit=False):
    result = conn.execute(text(query))
    if commit:
        conn.commit()
    return result

execute_sql(CREATE_OUTPUT_TABLE_QUERY, commit=True)
VALID_COLUMNS = [c.COLUMN_NAME for c in execute_sql(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{OUTPUT_TABLENAME}'").fetchall()]

def convert_fdataype(df):
    for cname in df.columns:
        if cname in [column_mappings["date"], column_mappings["symbol"], column_mappings["series"],
                     "Math_COSH", "Math_EXP", "Math_SINH"]:
            continue
        try:
            df[cname] = round(df[cname].astype(float), 2)
        except (TypeError, ValueError):
            pass
    return df

def calculate_indicators(df):
    df.rename(columns={column_mappings["open"]:"open", column_mappings["high"]:"high", column_mappings["low"]: "low",
                       column_mappings["volume"]: "volume", column_mappings["close"]: "close"}, inplace=True)
    df_1 = df.copy()
    all_functions = talib.get_functions()
    for indicator in all_functions:
        if indicator in ("ACOS", "ASIN", "MAVP"):
            continue
        name = f'{getattr(abstract, indicator).info["group"].split(" ")[0]}_{getattr(abstract, indicator).info["name"].split(" ")[0]}'
        try:
            output = getattr(abstract, indicator)(df)
        except:
            print(indicator)
        if type(output) == type(df):
            for col_name in output.columns:
                output.rename(columns={col_name: f"{name}_{col_name}"}, inplace=True)
        else:
            output = pd.DataFrame(output, columns=[name])
        df_1 = df_1.join(output)
    return df_1


def main(sym):
    INC_CHECK_QUERY = INCREMENTAL_CHECK_QUERY.format(HISTORY_TABLE_NAME=HISTORY_TABLE, CALC_TABLE_NAME=OUTPUT_TABLENAME,
                                                     SYMBOL_NAME=sym, DATE_COLUMN_NAME=column_mappings["date"],
                                                     SYMBOL_COLUMN_NAME=column_mappings["symbol"])
    inc_df = pd.read_sql_query(INC_CHECK_QUERY, engine)
    if inc_df.empty:
        print(f"No new data to calculate for {sym}..")
        return
    TEMP_DF_QUERY = GET_PREVIOUS_DAYS_QUERY.format(NO_OF_ROWS=90+inc_df.shape[0], SYMBOL_NAME=sym, HISTORY_TABLE_NAME=HISTORY_TABLE,
                                                   SYMBOL_COLUMN_NAME=column_mappings["symbol"], DATE_COLUMN_NAME=column_mappings["date"])
    temp_df = pd.read_sql_query(TEMP_DF_QUERY, engine)
    temp_df = temp_df.sort_values(by=column_mappings["date"])
    print(f"Calculating indicators for {sym}. Total rows to calculate {inc_df.shape[0]}")
    temp_df_na = dropna(temp_df)
    if len(temp_df_na) < 1:
        temp_df_na = temp_df
        
    df_ = calculate_indicators(temp_df_na)
    df_ = convert_fdataype(df_)
    df_.replace([np.inf, -np.inf], np.nan, inplace=True)
    try:
        cols = [i for i in df_.columns if i in VALID_COLUMNS]
        df_.tail(inc_df.shape[0])[cols].to_sql(OUTPUT_TABLENAME, engine, if_exists="append", index=False, chunksize=30)
        print(f"Updated calculations for {sym} in database")
    except Exception:
        print("Error on calculation {}".format(sym))


def concurrent_request(function, iterable_list, no_of_workers=10, parser_func= None):
    if not hasattr(iterable_list, "__iter__"):
        raise Exception("Please pass iterable object")
    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=no_of_workers) as executor:
            data = executor.map(function, iterable_list)
    except KeyboardInterrupt as e:
        print("Received KeyboardInterrupt. Stopping processes...")
        executor.shutdown(wait=False)
        
#     _ = [i for i in data]
    return True

## Run below code to calculate indicators

In [None]:
symbol_lst = [i.Symbol for i in execute_sql(f'select {column_mappings["symbol"]} from {HISTORY_TABLE} Group by \
                                                {column_mappings["symbol"]}')]

concurrent_request(main, symbol_lst, NO_OF_WORKERS)

No new data to calculate for MRF LTD.    ..
Calculating indicators for SOLID STONE . Total rows to calculate 604
Calculating indicators for SOTL        . Total rows to calculate 850
Updated calculations for SOLID STONE  in database
Updated calculations for SOTL         in database
Calculating indicators for RODIUM      . Total rows to calculate 808
Calculating indicators for 1025MHFL24  . Total rows to calculate 280
Updated calculations for RODIUM       in database
Updated calculations for 1025MHFL24   in database
Calculating indicators for NUTECH GLOBA. Total rows to calculate 340
Calculating indicators for PANACHE     . Total rows to calculate 497
Updated calculations for NUTECH GLOBA in database
Updated calculations for PANACHE      in database
Calculating indicators for HIKLASS     . Total rows to calculate 13
Calculating indicators for IHF24SEP21A . Total rows to calculate 11
Updated calculations for HIKLASS      in database
Updated calculations for IHF24SEP21A  in database
Calcul

In [14]:
df = main(symbol_lst[0])

Calculating indicators for MRF LTD.    . Total rows to calculate 860
Updated calculations for MRF LTD.     in database
