In [21]:
import pandas as pd
import numpy as np
from nsepython import equity_history
from datetime import datetime, timedelta
import yfinance as yf

In [22]:
class DataLoader:
    req_columns = ['CH_TIMESTAMP', 'CH_SYMBOL', 'CH_TRADE_HIGH_PRICE', 'CH_TRADE_LOW_PRICE', 'CH_OPENING_PRICE', 'CH_CLOSING_PRICE', 'CH_LAST_TRADED_PRICE', 'CH_PREVIOUS_CLS_PRICE', 'CH_TOT_TRADED_QTY', 'CH_52WEEK_HIGH_PRICE', 'CH_52WEEK_LOW_PRICE']
    new_column_names = ['date', 'symbol', 'high', 'low', 'open', 'close', 'ltp', 'prev_close', 'volume', 'high_52w', 'low_52w']

    @staticmethod
    def load_data(script_name, start_date, end_date, series="EQ"):
        df = equity_history(script_name, series, start_date, end_date)[DataLoader.req_columns]
        df.columns = DataLoader.new_column_names
        return df
    
class YFinanceDataLoader:
    req_columns = ['Open', 'High', 'Low', 'Close', 'Volume']
    new_column_names = ['date', 'symbol', 'open', 'high', 'low', 'close', 'prev_close', 'volume']

    @staticmethod
    def load_data(script_name, num_days, exchg="ns"):
        ticker = yf.Ticker(f'{script_name}.{exchg}')
        df = ticker.history(period=f'{num_days}d')[YFinanceDataLoader.req_columns].reset_index()
        df['symbol'] = script_name.upper()
        df['prev_close'] = df['Close'].shift(1)
        df.columns = [i.lower() for i in df.columns]
        return df[YFinanceDataLoader.new_column_names].set_index('date')
    
    @staticmethod
    def load_multi_data(scripts, num_days, exchg="ns"):
        tickers = yf.Tickers

### Studies

In [23]:
def load_volume_sma(df):
    df['vol_sma_10d'] = df.volume.rolling(window=10).mean()
    return df

def load_emas(df):
    ema_spans = [5, 9, 21, 30, 50, 100, 12, 26]
    for i in ema_spans:
        df[f'ema_{i}d'] = df.close.ewm(span=i).mean()
    return df

def load_macd(df):
    df['macd'] = df.ema_12d - df.ema_26d
    df['macd_9d_signal'] = df.macd.ewm(9).mean()
    return df
    
def load_bollinger_bands(df):
    df['sma_20d'] = df.close.rolling(window=20).mean()
    df['bb_upper'] = df.sma_20d + 2 * df.close.rolling(20).std()
    df['bb_lower'] = df.sma_20d - 2 * df.close.rolling(20).std()
    return df

def load_rsi(df):
    df['gain_pts'] = np.where(df["close"].diff() > 0, df["close"].diff(), 0)
    df['loss_pts'] = np.where(df["close"].diff() < 0, df["close"].diff().abs(), 0)
    df['gain_avg'] = df['gain_pts'].ewm(alpha=1/14, adjust=True, min_periods=14).mean()
    df['loss_avg'] = df['loss_pts'].ewm(alpha=1/14, adjust=True, min_periods=14).mean()
    df['rsi'] = 100 - (100 / (1 + df['gain_avg'] / df['loss_avg']))
    return df

def load_ATR(df):
    df['true_range'] = np.maximum.reduce([
        df['high'] - df['low'], 
        (df['high'] - df['prev_close']).abs(), 
        (df['low'] - df['prev_close']).abs()
    ])
    df['ATR'] = df['true_range'].ewm(alpha=1/14).mean()
    return df

def load_ADX(df, period):
    df["plus_DM"] = df['high'].diff(1)
    df["minus_DM"] = -df['low'].diff(1)
    df['plus_DX'] = np.where(
        np.logical_and(
            df['plus_DM'] > df["minus_DM"],
            df['plus_DM'] > 0
        ), df["plus_DM"], 0)
    df['minus_DX'] = np.where(
        np.logical_and(
            df['minus_DM'] > df["plus_DM"],
            df['minus_DM'] > 0
        ), df["minus_DM"], 0)
    
    df['smooth_DXp'] = df["plus_DX"].ewm(alpha=1/period).mean()
    df['smooth_DXm'] = df["minus_DX"].ewm(alpha=1/period).mean()
    
    df['plus_DI'] = df["smooth_DXp"] * 100 / df["ATR"]
    df['minus_DI'] = df["smooth_DXm"] * 100 / df['ATR']
    
    df['DX'] = ((df['plus_DI'] - df['minus_DI']).abs() * 100 / (df['plus_DI'] + df['minus_DI'])).abs()
    df['ADX'] = df['DX'].ewm(alpha=1/period).mean()
    
    return df

def load_aroon(df, period):
    """using argmax"""
    df["period"] = period
    df["days_since_period_high"] = df["high"].rolling(window=period).apply(lambda x: period - np.argmax(x) - 1)
    df["days_since_period_low"] = df["low"].rolling(window=period).apply(lambda x: period - np.argmin(x) - 1)
    
    df["aroon_up"] = (period - df["days_since_period_high"]) * 100 / period
    df["aroon_down"] = (period - df["days_since_period_low"]) * 100 / period
    
    return df

### single candlestick patterns

In [24]:
def preprocess(df):
    df['candle_body_length'] = (df['open'] - df['close']).abs()
    df['candle_length'] = df['high'] - df['low']
    df['candle_body_ratio'] = df['candle_body_length'] / df['candle_length']
    df['candle_color'] = np.where(df.close > df.open, 'green', 'red')
    df['pct_change'] = (df['close'] - df['prev_close']) * 100 / df['prev_close']
    
    for i in ['open', 'high', 'close', 'low', 'candle_color', 'candle_body_ratio', 'candle_body_length']:
        df[f'prev_{i}'] = df[i].shift(1)
    return df

In [25]:
def identify_marubozus(df):
    df['marubozu'] = np.where(
        np.logical_and.reduce(
            [df['candle_color'] == 'green', 
             df['candle_body_ratio'] > 0.85, 
             df['pct_change'].abs() > 0.5]
        ),
        "Bullish",
        np.where(
            np.logical_and.reduce(
                [df['candle_color'] == 'red', 
                 df['candle_body_ratio'] > 0.85, 
                 df['pct_change'].abs() > 0.5]
            ),
            "Bearish",
            None
        )
    )
    return df

def identify_dojis(df):
    df['is_doji'] = np.where(
        np.logical_or(
            np.logical_and.reduce(
                [
                    df['candle_body_ratio'] < 0.15,
                    df['candle_color'] == 'red',
                    ((df['high'] - df['open']) / df['candle_length']).between(.33, .67)
                ]
            ),
            np.logical_and.reduce(
                [
                    df['candle_body_ratio'] < 0.15,
                    df['candle_color'] == 'green',
                    ((df['high'] - df['close']) / df['candle_length']).between(.33, .67)
                ]
            ),
        ),
        True,
        False
    )
    return df

def identify_paper_umbrellas(df):
    df['paper_umbrella_type'] = np.where(
        np.logical_or(
            np.logical_and.reduce([
                df['candle_body_ratio'] < 0.38,
                df['candle_color'] == 'green',
                ((df['open'] - df['low']) / df['candle_length']) >= 0.60,
                ((df['high'] - df['close']) / df['candle_length']) <= 0.125,
            ]),
            np.logical_and.reduce([
                df['candle_body_ratio'] < 0.38,
                df['candle_color'] == 'red',
                ((df['close'] - df['low']) / df['candle_length']) >= 0.60,
                ((df['high'] - df['open']) / df['candle_length']) <= 0.125,
            ])
        ),
        'Yes',
        None
    )
    return df

def identify_shooting_stars(df):
    df['is_shooting_star'] = np.where(
        np.logical_or(
            np.logical_and.reduce([
                df['candle_body_ratio'] < 0.38,
                df['candle_color'] == 'green',
                ((df['high'] - df['close']) / df['candle_length']) >= 0.6
            ]),
            np.logical_and.reduce([
                df['candle_body_ratio'] < 0.38,
                df['candle_color'] == 'red',
                ((df['high'] - df['open']) / df['candle_length']) >= 0.6
            ])
        ),
        True,
        False
    )
    return df

### multiple candlestick patterns

In [26]:
def identify_engulfing(df):
    for i in ['open', 'high', 'close', 'low', 'candle_color', 'candle_body_ratio', 'candle_body_length']:
        df[f'prev_{i}'] = df[i].shift(1)

    df['engulfing'] = np.where(
        np.logical_and.reduce([
            df['prev_candle_color'] != df['candle_color'],
            df['prev_candle_body_ratio'] >= 0.15,
            df['prev_open'].between(df[['open', 'close']].min(axis=1), df[['open', 'close']].max(axis=1)),
            df['prev_close'].between(df[['open', 'close']].min(axis=1), df[['open', 'close']].max(axis=1)),
            (df['prev_candle_body_length'] / df['candle_body_length']) < 0.95
        ]),
        np.where(df["candle_color"] == 'green', "Bullish Engulfing", "Bearish Engulfing"),
        None
    )
    
    return df

def identify_haramis(df):
    df['harami'] = np.where(
        np.logical_and.reduce([
            df['prev_candle_color'] != df['candle_color'],
            df['prev_candle_body_ratio'] >= 0.5,
            (df['candle_body_length'] / df['prev_candle_body_length']) < 0.5,
            df['open'].between(df[['prev_open', 'prev_close']].min(axis=1), df[['prev_open', 'prev_close']].max(axis=1)),
            df['close'].between(df[['prev_open', 'prev_close']].min(axis=1), df[['prev_open', 'prev_close']].max(axis=1)),
        ]),
        np.where(df['candle_color'] == 'green', "Bullish Harami", "Bearish Harami"),
        None
    )
    return df

def identify_piercing_or_dark_clouds(df):
    df["partial_engulfing"] = np.where(
        np.logical_and.reduce([
            df["close"].between(df[["prev_open", "prev_close"]].min(axis=1), df[["prev_open", "prev_close"]].max(axis=1)),
            df["prev_candle_body_ratio"] >= 0.5,
            df['candle_body_length'] >= df['prev_candle_body_length'] * 0.5,
            df["candle_color"] != df["prev_candle_color"]
        ]),
        np.where(
            (df["candle_color"] == "green") & (df['close'] >= (df['prev_open'] + df['prev_close']) / 2), 
            "Piercing Pattern",
            np.where(
                (df["candle_color"] == "red") & (df['close'] <= (df['prev_open'] + df['prev_close']) / 2),
                "Dark Cloud Cover",
                None
            )
        ),
        None
    )
    return df