In [1]:
import yfinance as yf
import pandas as pd
import pandas_ta as ta
import talib as ta2
import numpy as np
from datetime import datetime as dt
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from datetime import timedelta as delta
import numpy as np
import os

%matplotlib inline

In [2]:
def get_stock_info(stock, save_to_disk=False):
    start_date = '2000-01-01'
    end_date = (dt.now() + delta(1)).strftime('%Y-%m-%d')
    df = yf.download(f"{stock}.NS", period='1d', start=start_date, end=end_date, progress=False)
    if(save_to_disk == True):
        path = './csv'
        try: os.mkdir(path)
        except OSError as error: pass
        df.to_csv(f'{path}/{stock}.csv')

    return df

In [None]:
read_stock_info_from_disk = lambda stock: pd.read_csv(f'./csv/{stock}.csv').set_index('Date')

In [3]:
def add_signal_indicators(df):
    df['SMA_10'] = ta.sma(df['Adj Close'],length=10)
    df['SMA_30'] = ta.sma(df['Adj Close'],length=30)
    df['SMA_50'] = ta.sma(df['Adj Close'],length=50)
    df['SMA_200'] = ta.sma(df['Adj Close'],length=200)
    
    macd = ta.macd(df['Adj Close'], fast=12, slow=26, signal=9)
    df['MACD'] = macd['MACD_12_26_9']
    df['MACD_signal'] = macd['MACDs_12_26_9']
    df['MACD_hist'] = macd['MACDh_12_26_9']

    df['10_cross_30'] = np.where(df['SMA_10'] > df['SMA_30'], 1, 0)
    df['MACD_Signal_MACD'] = np.where(df['MACD_signal'] < df['MACD'], 1, 0)
    df['MACD_lim'] = np.where(df['MACD']>0, 1, 0)
    df['abv_50'] = np.where((df['SMA_30']>df['SMA_50'])&(df['SMA_10']>df['SMA_50']), 1, 0)
    df['abv_200'] = np.where((df['SMA_30']>df['SMA_200'])&(df['SMA_10']>df['SMA_200'])&(df['SMA_50']>df['SMA_200']), 1, 0)
    
    return df

In [4]:
def calculate_returns(df):
    df['5D_returns'] = (df['Adj Close'].shift(-5)-df['Adj Close'])/df['Close']*100
    df['10D_returns'] = (df['Adj Close'].shift(-10)-df['Adj Close'])/df['Close']*100

    df['5D_positive'] = np.where(df['5D_returns']>0, 1, 0)
    df['10D_positive'] = np.where(df['10D_returns']>0, 1, 0)
    
    return df.dropna()

In [5]:
def get_eda_and_deepdive(df):
    eda = df.dropna().groupby(['10_cross_30', 'MACD_Signal_MACD', 'MACD_lim', 'abv_50', 'abv_200'])[['5D_returns', '10D_returns']]\
    .agg(['count', 'mean','median', 'min', 'max'])
    
    deepdive = df.dropna().groupby(['10_cross_30', 'MACD_Signal_MACD', 'MACD_lim', 'abv_50', 'abv_200',
                     '5D_positive', '10D_positive'])[['5D_returns', '10D_returns']]\
    .agg(['count', 'mean','median', 'min', 'max'])

    return eda, deepdive

In [6]:
def calculate_returns_for_signals(df):
    ref_5d = df.dropna()\
        .groupby(['10_cross_30', 'MACD_Signal_MACD', 'MACD_lim', 'abv_50', 'abv_200', '5D_positive'])[['5D_returns']]\
        .agg(['mean','median']).rename(columns={'count':'num','mean':'5d_mean', 'median': '5d_median'})
    
    ref_10d = df.dropna()\
        .groupby(['10_cross_30', 'MACD_Signal_MACD', 'MACD_lim', 'abv_50', 'abv_200', '5D_positive'])[['10D_returns']]\
        .agg(['mean','median']).rename(columns={'mean':'10d_mean', 'median': '10d_median'})
    
    ref_count = df.dropna()\
        .groupby(['10_cross_30', 'MACD_Signal_MACD', 'MACD_lim', 'abv_50', 'abv_200'])[['10_cross_30']]\
        .agg(['count']).rename(columns={'count':'total_count'})
    
    ref_num = df.dropna()\
        .groupby(['10_cross_30', 'MACD_Signal_MACD', 'MACD_lim', 'abv_50', 'abv_200', '5D_positive'])[['10_cross_30']]\
        .agg(['count']).rename(columns={'count':'num'})
    
    ref = ref_5d.merge(ref_10d, how='left', left_index=True, right_index=True)\
        .merge(ref_count, how='left', left_index=True, right_index=True)
    
    ref['5d_returns'] = (ref['5D_returns', '5d_mean']+ref['5D_returns', '5d_median'])/2
    ref['10d_returns'] = (ref['10D_returns', '10d_mean']+ref['10D_returns', '10d_median'])/2
    ref['returns'] = (ref['5d_returns']+ref['10d_returns'])/2
    
    ref = ref.merge(ref_num, how='left', left_index=True, right_index=True)
    
    ref['proba'] = round(ref['10_cross_30', 'num']/ref['10_cross_30', 'total_count'], 2)
    
    ref = ref.reset_index()
    
    ref.columns = ref.columns.droplevel(1)
    
    ref = ref[['10_cross_30', 'MACD_Signal_MACD', 'MACD_lim', 'abv_50', 'abv_200',
           '5D_positive', 'returns', 'proba']]
    
    ref['actual_returns'] = round(ref['returns']*ref['proba'], 2)
    
    ref.columns = ['10_cross_30', '_', '_', 'MACD_Signal_MACD',
           'MACD_lim', 'abv_50', 'abv_200', '5D_positive', 'returns', 'proba',
           'actual_returns']
    
    ref = ref.drop('_', axis=1)
    
    return ref

In [7]:
def prioritise_signals(ref, min_returns=0.5, min_probability=0.65):
    base = ref[(ref['10_cross_30']==1)|(ref['MACD_Signal_MACD']==1)]\
        .drop_duplicates()\
        .groupby(['10_cross_30', 'MACD_Signal_MACD', 'MACD_lim', 'abv_50', 'abv_200'])\
        .sum()['actual_returns']\
        .reset_index()\
        .sort_values('actual_returns', ascending=False)
    
    probs = ref[ref['5D_positive']==1][['10_cross_30', 'MACD_Signal_MACD', 'MACD_lim', 'abv_50', 'abv_200', 'proba']]
    
    base = base.merge(probs, on=['10_cross_30', 'MACD_Signal_MACD', 'MACD_lim', 'abv_50', 'abv_200'], how='left')
    
    base = base[(base['proba']>min_probability)&(base['actual_returns']>min_returns)]
    
    return base.sort_values(by=['actual_returns'], ascending=False)
    

In [8]:
def analyse(stock, save_to_disk=False):
    df = get_stock_info(stock, save_to_disk)
    df = add_signal_indicators(df)
    df = calculate_returns(df)
    eda, deepdive = get_eda_and_deepdive(df)
    df = calculate_returns_for_signals(df)
    output = prioritise_signals(df)
    output['stock'] = stock

    return output, eda, deepdive

In [None]:
def get_curr_price(stock):
    _d = yf.download(f"{stock}.NS", period='1d', start=(dt.now() - delta(7)).strftime('%Y-%m-%d'),
                     end=(dt.now() + delta(1)).strftime('%Y-%m-%d'), progress=False)
    return round(_d['Adj Close'].values[-1], 2)

In [1]:
def backtest_signals(row):
    global TRADES, trade_in_progress, signals, holding_period, check_trend_change
    
    if(trade_in_progress):
        _data = TRADES[-1]
        # time to sell after n holding days
        if(row['index']==_data['sell_index']):
            _data['sell_price'] = round(row['Adj Close'],2)
            _data['sell_date'] = str(pd.to_datetime(row['Date']).date())
            _data['returns'] = round((_data['sell_price']-_data['buy_price'])/_data['buy_price']*100, 3)
            TRADES[-1] = _data
            trade_in_progress = False

        elif((row['index']<_data['sell_index']) and check_trend_change):
            ## this is to check the trend change by checking if the signals have changed 
            # _r = pd.DataFrame([row])
            # _r = _r.merge(signals, on=list(signals.columns[:-1]))
            # if(_r.shape[0] == 0):
            #     # close trade as the condition has been violated
            #     _data['sell_price'] = row['Adj Close']
            #     _data['sell_date'] = str(pd.to_datetime(row['Date']).date())
            #     _data['returns'] = round((_data['sell_price']-_data['buy_price'])/_data['buy_price']*100, 2)
            #     TRADES[-1] = _data
            #     trade_in_progress = False
            # else: pass

            ## check if the expected returns has been achieved - if yes close the trade
            curr_returns = 100*(row['Adj Close'] - _data['buy_price'])/_data['buy_price']
            if(curr_returns <= -_data['expected_return']):
                _data['sell_price'] = row['Adj Close']
                _data['sell_date'] = str(pd.to_datetime(row['Date']).date())
                _data['returns'] = round((_data['sell_price']-_data['buy_price'])/_data['buy_price']*100, 3)
                TRADES[-1] = _data
                trade_in_progress = False
            
    else:
        _r = pd.DataFrame([row])
        _r = _r.merge(signals, on=list(signals.columns[:-2]))
        strategy = _r.shape[0]
        
        if(strategy>0): 
            trade_in_progress = True
            _data = {
                'grade': _r['signal_grade'].values[0],
                'buy_date': str(pd.to_datetime(row['Date']).date()),
                'buy_index': row['index'],
                'sell_date': '',
                'sell_index': row['index'] + holding_period,
                'buy_price': round(row['Adj Close'], 2),
                'sell_price': '',
                'returns': 0,
                'stock': row['stock'],
                'expected_return': _r['expected_return'].values[0]
            }
            TRADES.append(_data)
            del _data
