# filters

> This module implements various filters to check for specific occurrences in the data

In [None]:
# | default_exp filters

In [None]:
# | hide
from nbdev.showdoc import *

In [None]:
# | export
import pandas as pd
from datetime import datetime
import nbdev

In [None]:
# | export
base_path = nbdev.config.get_config().lib_path

In [None]:
# | export
processed_data_dir = base_path / "../Data/Bhavcopy/Processed/"
nifty500_csv = base_path / "../Data/Misc/ind_nifty500list.csv"

# Helper functions for filtering stocks with a given strategy.

In [None]:
# | export
def get_nifty500():
    # Get Nifty500 list
    return pd.read_csv(nifty500_csv).Symbol.to_list()

In [None]:
# | export
def get_symbol_data(symbol):
    file_path = base_path / processed_data_dir / f"{symbol}.parquet"

    if not file_path.exists():
        return None
    df = pd.read_parquet(file_path)
    df['DATE'].apply(lambda x: x.strftime('%Y-%d-%m'))
    return df

In [None]:
# | export

# Convert daily data to monthly data
def get_monthly_data(df):
    return df.resample('M', on='DATE').agg({'OPEN': 'first', 'HIGH': 'max', 'LOW': 'min', 'CLOSE': 'last','SYMBOL': 'first'})

# Convert daily data to weekly data
def get_weekly_data(df):
    return df.resample('W', on='DATE').agg({'OPEN': 'first', 'HIGH': 'max', 'LOW': 'min', 'CLOSE': 'last','SYMBOL': 'first'})

In [None]:
get_symbol_data("3MINDIA")

Unnamed: 0,SYMBOL,SERIES,OPEN,HIGH,LOW,CLOSE,TOTTRDQTY,TOTTRDVAL,TOTALTRADES,DATE,...,BBP_20_2,DCL_66_22,DCU_66_22,ST_12_3,ST_11_2,ST_10_1,CDL_COLOR,CDL_SIZE,TOPWICK_SIZE,BOTWICK_SIZE
0,3MINDIA,EQ,11396.95,11455.00,11200.00,11321.95,400,4.546008e+06,159,2016-01-01,...,,,,1,1,1,red,75.00,58.05,121.95
1,3MINDIA,EQ,11330.00,11330.00,10578.50,10935.80,1002,1.094160e+07,495,2016-01-04,...,,,,1,1,1,red,394.20,0.00,357.30
2,3MINDIA,EQ,11053.40,11055.00,10725.00,10784.35,541,5.855578e+06,277,2016-01-05,...,,,,1,1,1,red,269.05,1.60,59.35
3,3MINDIA,EQ,10751.00,10947.00,10520.20,10644.60,872,9.333687e+06,288,2016-01-06,...,,,,1,1,1,red,106.40,196.00,124.40
4,3MINDIA,EQ,10650.95,10810.00,10505.00,10673.75,381,4.080118e+06,193,2016-01-07,...,,,,1,1,1,green,22.80,136.25,145.95
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1804,3MINDIA,EQ,22400.00,22765.50,22125.25,22412.95,6232,1.400317e+08,2125,2023-04-24,...,0.274932,21500.0,23409.95,-1,-1,-1,green,12.95,352.55,274.75
1805,3MINDIA,EQ,22382.95,22800.00,22160.75,22296.20,23477,5.260838e+08,2208,2023-04-25,...,0.197034,21500.0,23409.95,-1,-1,-1,red,86.75,417.05,135.45
1806,3MINDIA,EQ,22296.20,22550.00,22259.35,22425.70,3083,6.909513e+07,1674,2023-04-26,...,0.214720,21500.0,23409.95,-1,-1,-1,green,129.50,124.30,36.85
1807,3MINDIA,EQ,22449.95,22650.00,22327.10,22611.75,2202,4.960846e+07,901,2023-04-27,...,0.353215,21500.0,23409.95,-1,-1,-1,green,161.80,38.25,122.85


In [None]:
# | export
def filter_stocks(symbols=None, strategy=None, days_lookback=2):
    if not symbols:
        symbols = get_nifty500()

# Draft

In [None]:

def check_single_candle_span(df, col_list=["SMA_20_C", "SMA_200_C"], days_lookback=2):
    days_lookback = min(days_lookback + 1, df.shape[0])
    for d in range(1, days_lookback):
        conditions = [
            df.LOW.iloc[-d] <= df[col].iloc[-d] <= df.HIGH.iloc[-d]
            for col in col_list
        ]
        if all(conditions):
            print(
                f"{df.SYMBOL.iloc[0]} -> Single candle span on {df.DATE.iloc[-d]} : {d} days ago with {df.iloc[-d].CDL_COLOR} candle"
            )
            return d
    return 0

20 SMA HALT STRATEGY

In [None]:

def check_directional_crossover(df, col1, col2, days_lookback=7):
    days_lookback = min(days_lookback + 1, df.shape[0])
    for d in range(1, days_lookback):
        # print(f"Checking for {df.DATE.iloc[-d]}: {df[col1].iloc[-d]}, {df[col2].iloc[-d]}")
        if (
            df[col1].iloc[-d] > df[col2].iloc[-d]
            and df[col1].iloc[-d - 1] < df[col2].iloc[-d - 1]
        ):
            # print(f"{col1} crossed over {col2} on {df.DATE.iloc[-d]} : {d} days ago")
            return d
    return 0

In [None]:

def check_close_under_falling_20(df, d, days_lookback=10):
    df = df.iloc[-d - days_lookback : -d]
    # print(df.DATE.iloc[0], df.DATE.iloc[-1])
    # df.plot(x="DATE", y=["CLOSE", "SMA_20_C"], figsize=(15, 5))
    # df.plot(x="DATE", y=["CLOSE", "SMA_20_C", "SMA_20_H"], figsize=(15, 5))
    # print(df.SMA_20_C.is_monotonic_decreasing, df.SMA_20_H.is_monotonic_decreasing)
    return all(
        [
            any(
                [
                    df.SMA_20_C.is_monotonic_decreasing,
                    df.SMA_20_H.is_monotonic_decreasing,
                    df.SMA_44_C.is_monotonic_decreasing,
                ]
            ),
            any(
                [
                    all(df.CLOSE < df.SMA_20_H),
                    # all(df.CLOSE < df.SMA_20_C)
                ]
            ),
            # all(df.SMA_200_C < df.SMA_20_C)
        ]
    )


# check_close_under_20(df, 15, 18)

In [None]:

def check_close_above_20(df, d, days_lookfwd=3):
    df = df.iloc[-d + 1 :].head(days_lookfwd)
    # print(df)
    # print(df.DATE.iloc[0], df.DATE.iloc[-1])
    # df.plot(x="DATE", y=["CLOSE", "SMA_20_C"], figsize=(15, 5))
    return all(df.CLOSE > df.SMA_20_C)  # df.SMA_20_C.is_monotonic_increasing and

In [None]:

def check_20_200_breakout(df, days_lookback=5):
    found = False
    col1 = "CLOSE"
    col2 = "SMA_20_C"
    col3 = "SMA_200_C"
    col4 = "SMA_20_H"
    # df = df[:-45]

    # Check for double crossover on SMA20 and SMA200 within a 3 day period
    dc20 = check_directional_crossover(df, col1, col2, days_lookback=days_lookback)
    if all(
        [
            dc20 > 0,
            check_close_under_falling_20(df, dc20, days_lookback=30),
            check_close_above_20(df, dc20),
        ]
    ):
        found = True
        dc200 = check_directional_crossover(df, col1, col3, days_lookback=days_lookback)
        # print(dc20, dc200)

        if all([dc200 > 0, abs(dc20 - dc200) < 3]):
            print(
                f"{df.SYMBOL.iloc[0]} -> Double crossover on {df.DATE.iloc[-dc20]} : {dc20} days ago, with {df.iloc[-dc20].CDL_COLOR} candle"
            )
        else:
            print(
                f"{df.SYMBOL.iloc[0]} -> SMA20 crossover on {df.DATE.iloc[-dc20]} : {dc20} days ago, with {df.iloc[-dc20].CDL_COLOR} candle"
            )
            if dc200:
                print(
                    f"{df.SYMBOL.iloc[0]} -> SMA 200 crossover: {df.DATE.iloc[-dc200]} : {dc200} days ago, with {df.iloc[-dc200].CDL_COLOR} candle"
                )
    return found


In [None]:
def check_3_green_on_sma20(df):
    crossover = check_directional_crossover(df, "CLOSE", "SMA_20_C", days_lookback=15)
    if all([
        crossover > 2,
        df.iloc[-crossover].CDL_COLOR == "green",
        df.iloc[-crossover+1].CDL_COLOR == "green",
        df.iloc[-crossover+2].CDL_COLOR == "green",
        df.iloc[-crossover+1].CLOSE > df.iloc[-crossover].CLOSE,
        df.iloc[-crossover+2].CLOSE > df.iloc[-crossover+1].CLOSE,
        df.iloc[-crossover+2].CLOSE < df.iloc[-crossover+2].BBU_20_2,
        # df.iloc[-crossover].BBB_20_2 > 8,
        df.iloc[-crossover+1].SMA_20_C > df.iloc[-crossover-1].SMA_20_C
        # crossover > 10,
    ]):
        max_close = max(df[-crossover+1:].CLOSE)
        perc_movement = 100* (max_close - df.iloc[-crossover+2].CLOSE) / df.iloc[-crossover+2].CLOSE
        print(f"{df.SYMBOL.iloc[0]} -> SMA20 crossover on {df.DATE.iloc[-crossover]} : BBB = {df.BBB_20_2.iloc[-crossover]:0.2f} : Moved {perc_movement:.2f}%, upto {max_close}")
    # else:
    #     print(crossover)

# df[-crossover-5:-crossover + 5][["DATE", "CLOSE", "SMA_20_C"]]
    

In [None]:
# for symbol in df500:
#     df = get_symbol_data(symbol)
#     if df is not None:
#         check_3_green_on_sma20(df)

In [None]:
# df = get_symbol_data("PNB")
# window = 500
# df.tail(window).plot(
#     x="DATE",
#     y=[
#         # "BBU_20_2",
#         # "BBM_20_2",
#         # "BBL_20_2",
#         "BBB_20_2",
#         # "BBP_20_2",
#         "CLOSE"
#     ],
#     figsize=(18, 15),
# )
# df.tail(window).plot(
#     x="DATE",
#     y=[
#         "CLOSE"
#     ],
#     figsize=(18, 5),
# )

SMA 20 & SMA 200 Strategies

In [None]:
# Find stocks with SMA20 and SMA44 single candle span
# for symbol in df500:
#     df = get_symbol_data(symbol)
#     if df is not None:
#         check_single_candle_span(
#             df, col_list=["SMA_20_C", "SMA_44_C", "SMA_200_C"], days_lookback=2
#         )

In [None]:
# Find stocks with 20ma crossover preceded by time under a falling SMA20
# for symbol in df500:
#     df = get_symbol_data(symbol)
#     if df is not None:
#         a = check_20_200_breakout(df)
#         if a:
#             b = check_single_candle_span(
#                 df, col_list=["SMA_20_C", "SMA_44_C", "SMA_200_C"], days_lookback=5
#             )
#             print("-" * 100)

ALLTIME HIGH STRATEGY

In [None]:

def check_historical_minmax(df, min_steps_back=30, mode=None):
    if mode is None or mode == "all":
        mode = ["min", "max"]

    if "DATE" not in df.columns:
        df["DATE"] = df.index

    max_steps = len(df)
    if max_steps > min_steps_back:
        if "max" in mode or mode == "max":
            # Search for historical high
            last_is_max = True
            steps_back = 1
            while last_is_max:
                steps_back += 1
                if steps_back > max_steps:
                    print(
                        f"{df.SYMBOL.iloc[-1]}: ALL TIME HIGH on {df.DATE.iloc[-1].date()} closed at {df.CLOSE.iloc[-1]}"
                    )
                    return
                last_is_max = df.CLOSE.iloc[-1] >= df.HIGH.iloc[-steps_back]

            if steps_back >= min_steps_back:
                print(
                    f"{df.SYMBOL.iloc[-1]} CLOSE -> {df.CLOSE.iloc[-1]} : New high of {steps_back} sessions, since HIGH of {df.HIGH.iloc[-steps_back]} on {df.DATE.iloc[-steps_back].date()}"
                )

        if "min" in mode or mode == "min":
            # Search for historical low
            last_is_min = True
            steps_back = 1
            while last_is_min:
                steps_back += 1
                if steps_back > max_steps:
                    print(
                        f"{df.SYMBOL.iloc[-1]}: ALL TIME LOW on {df.DATE.iloc[-1].date()} closed at {df.CLOSE.iloc[-1]}"
                    )
                    return
                last_is_min = df.CLOSE.iloc[-1] <= df.LOW.iloc[-steps_back]

            if steps_back >= min_steps_back:
                print(
                    f"{df.SYMBOL.iloc[-1]} CLOSE -> {df.CLOSE.iloc[-1]} : New low of {steps_back} sessions, since LOW of {df.LOW.iloc[-steps_back]} on {df.DATE.iloc[-steps_back].date()}"
                )

In [None]:
symbol = "BAJAJFINSV"
file_path = base_path / processed_data_dir / f"{symbol}.csv"
if not file_path.exists():
    print(f"File does not exist: {file_path}")
else:
    print(f"File exists: {file_path}")
    df = pd.read_csv(file_path, parse_dates=["DATE"])
    # Convert daily data to monthly data
    df_monthly = df.resample("M", on="DATE").agg(
        {
            "OPEN": "first",
            "HIGH": "max",
            "LOW": "min",
            "CLOSE": "last",
            "SYMBOL": "first",
        }
    )

    check_historical_minmax(df, min_steps_back=4, mode=None)
    # check_historical_minmax(df_monthly, min_steps_back=4, mode=None)

File does not exist: c:\MyData\TechWork\stocksurfer\stocksurfer\..\Data\Bhavcopy\Processed\BAJAJFINSV.csv


In [None]:
# for symbol in df500:
#     file_path = base_path / processed_data_dir / f"{symbol}.csv"
#     if file_path.exists():
#         df = pd.read_csv(file_path, parse_dates=["DATE"])
#         # Convert daily data to monthly data
#         df_monthly = df.resample("M", on="DATE").agg(
#             {
#                 "OPEN": "first",
#                 "HIGH": "max",
#                 "LOW": "min",
#                 "CLOSE": "last",
#                 "SYMBOL": "first",
#             }
#         )

#         # check_historical_minmax(df, min_steps_back=30, mode="max")
#         check_historical_minmax(df_monthly, min_steps_back=36, mode="all")

In [None]:
# Get list of all csv files in raw_data_dir
# csv_files = [f for f in raw_data_dir.iterdir() if f.suffix == '.csv']

# for file in csv_files:
#     file_path = base_path / processed_data_dir / f"{symbol}.csv"
#     if os.path.exists(file_path):
#         df = pd.read_csv(file_path, parse_dates=["DATE"])

#         # Convert daily data to monthly data
#         df_monthly = df.resample('M', on='DATE').agg({'OPEN': 'first', 'HIGH': 'max', 'LOW': 'min', 'CLOSE': 'last','SYMBOL': 'first'})

#         # check_historical_minmax(df, min_steps_back=300, mode="max")
#         check_historical_minmax(df_monthly, min_steps_back=60, mode="max")

Movement in a band

In [None]:
def check_movement_within_band(df, band_perc=5, steps_lookback=20):
    steps_lookback = min(steps_lookback + 1, df.shape[0])
    max_steps = len(df)

    band_max = df.CLOSE.iloc[-1] * (1 + band_perc / 100)
    band_min = df.CLOSE.iloc[-1] * (1 - band_perc / 100)

    in_band = True
    steps_back = 1
    while in_band:
        steps_back += 1
        if steps_back > max_steps:
            print(
                f"{df.SYMBOL.iloc[-1]}: Within band of {band_perc}% for {steps_back-1} sessions for band: {band_min:0.2f} to {band_max:0.2f}"
            )
            return
        in_band = (df.CLOSE.iloc[-steps_back] <= band_max) and (
            df.CLOSE.iloc[-steps_back] >= band_min
        )

    if steps_back >= steps_lookback:
        print(
            f"{df.SYMBOL.iloc[-1]} CLOSE -> {df.CLOSE.iloc[-1]} : Within band of {band_perc}% for {steps_back-1} sessions for band: {band_min:0.2f} to {band_max:0.2f}"
        )
        return

In [None]:
# symbol = "VAIBHAVGBL"
# file_path = base_path / processed_data_dir / f"{symbol}.csv"
# if not os.path.exists(file_path):
#     print(f"File does not exist: {file_path}")
# else:
#     print(f"File exists: {file_path}")
#     df = pd.read_csv(file_path, parse_dates=["DATE"])
#     df = df.query("DATE < '2022-08-27'")
#     check_movement_within_band(df, band_perc=5, steps_lookback=20)

# Convert daily data to monthly data
# df_monthly = df.resample('M', on='DATE').agg({'OPEN': 'first', 'HIGH': 'max', 'LOW': 'min', 'CLOSE': 'last','SYMBOL': 'first'})

In [None]:
# for symbol in df500:
#     file_path = base_path / processed_data_dir / f"{symbol}.csv"
#     if os.path.exists(file_path):
#         df = pd.read_csv(file_path, parse_dates=["DATE"])

#         check_movement_within_band(df, band_perc=3, steps_lookback=20)

In [None]:
# def check_3_green_on_SMA(df, sma_period=20):

#     if df.shape[0] < sma_period:
#         return

#     if all([
#         df.OPEN.iloc[-3] < df.SMA_20_C.iloc[-3] < df.CLOSE.iloc[-3],
#         df.OPEN.iloc[-2] < df.CLOSE.iloc[-2],
#         df.OPEN.iloc[-1] < df.CLOSE.iloc[-1],
#         df.CLOSE.iloc[-1] > df.CLOSE.iloc[-2] > df.CLOSE.iloc[-3],
#     ]): # 3 green candles on SMA

#         print(f"{df.SYMBOL.iloc[-1]}: 3 green candles on SMA 20 on {df.DATE.iloc[-1]}")

In [None]:
# Get list of all csv files in raw_data_dir
# csv_files = [f for f in os.listdir(processed_data_dir) if f.endswith('.csv')]

# for file in csv_files:
#     file_path = base_path / processed_data_dir / f"{symbol}.csv"
#     if os.path.exists(file_path):
#         df = pd.read_csv(file_path, parse_dates=["DATE"])

#         # Convert daily data to monthly data
#         # df_monthly = df.resample('M', on='DATE').agg({'OPEN': 'first', 'HIGH': 'max', 'LOW': 'min', 'CLOSE': 'last','SYMBOL': 'first'})

#         # Convert daily data to weekly data
#         df_weekly = df.resample('W', on='DATE').agg({'OPEN': 'first', 'HIGH': 'max', 'LOW': 'min', 'CLOSE': 'last','SYMBOL': 'first'})
#         df_weekly['SMA_20_C'] = df_weekly.CLOSE.rolling(20).mean()
#         if "DATE" not in df_weekly.columns:
#             df_weekly["DATE"] = df_weekly.index

#         check_3_green_on_SMA(df_weekly, sma_period=20)

In [None]:
# Download csv from URL
# url = r"https://www.nseindia.com/api/equity-stockIndices?csv=true&index=SECURITIES%20IN%20F%26O"
# df = pd.read_csv(url)
# df


In [None]:
# import requests
# # Download csv from URL using requests
# url = r"https://www.nseindia.com/api/equity-stockIndices?csv=true&index=SECURITIES%20IN%20F%26O"
# requests.get(url).content


In [None]:
# https://www.nseindia.com/market-data/live-equity-market

# https://www.nseindia.com/api/equity-stockIndices?csv=true&index=NIFTY%20SMALLCAP%2050


In [None]:
# init_cap = 200000
# win_rate = 0.6
# one_trade_perc = 0.02

# for i in range(200):
#     if random.random() < win_rate:
#         init_cap = init_cap * (1 + one_trade_perc)
#     else:
#         init_cap = init_cap * (1 - one_trade_perc)

# print(f'{init_cap:,}')

In [None]:
# import random
# random.random()

In [None]:
# | hide
nbdev.nbdev_export()