In [21]:
import os
import datetime as dt
from typing import List, Dict

import numpy as np
import pandas as pd
from tqdm import tqdm

from jugaad_data.nse import stock_df   # NEW
# from nsepy import get_history       # OLD â€“ can be removed

#### =========================
#### 1. DATA DOWNLOAD (NSEPY)
#### =========================

In [22]:
def download_ohlcv_nsepy(
    symbols: List[str],
    start_date: dt.date,
    end_date: dt.date
) -> pd.DataFrame:
    """
    Download daily OHLCV data from NSE using jugaad-data (stock_df)
    and merge into a single DataFrame.

    NOTE:
    - We keep the function name `download_ohlcv_nsepy` so the rest of the
      pipeline (build_ohlcv_ml_ready) does not need to change.
    - Under the hood, this no longer uses nsepy; it uses `stock_df` from
      the `jugaad_data.nse` module, which works with the current NSE site.
    """
    all_data = []

    for sym in tqdm(symbols, desc="Downloading OHLCV (jugaad-data)"):
        try:
            df = stock_df(
                symbol=sym,
                from_date=start_date,
                to_date=end_date,
                series="EQ",
            )
        except Exception as e:
            print(f"[ERROR] Failed to download {sym} {start_date} to {end_date}: {e}")
            continue

        if df is None or df.empty:
            print(f"[WARN] No data for {sym}, skipping.")
            continue

        # jugaad-data returns columns like: DATE, OPEN, HIGH, LOW, CLOSE, VOLUME, SYMBOL
        df = df.rename(
            columns={
                "DATE": "date",
                "OPEN": "open",
                "HIGH": "high",
                "LOW": "low",
                "CLOSE": "close",
                "VOLUME": "volume",
                "SYMBOL": "symbol",
            }
        )

        df = df[["date", "open", "high", "low", "close", "volume", "symbol"]]
        df.dropna(inplace=True)
        all_data.append(df)

    if not all_data:
        raise ValueError("No data downloaded for any symbol (jugaad-data).")

    ohlcv = pd.concat(all_data, ignore_index=True)
    ohlcv.sort_values(["symbol", "date"], inplace=True)
    ohlcv.reset_index(drop=True, inplace=True)

    return ohlcv

#### =========================
#### 2. BASIC CLEANING
#### =========================

In [23]:
def clean_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    """
    Basic cleaning:
      - Ensure correct dtypes
      - Drop duplicates
      - Remove non-sensical rows
    """
    df = df.copy()

    df["date"] = pd.to_datetime(df["date"])
    for col in ["open", "high", "low", "close", "volume"]:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    df.dropna(subset=["open", "high", "low", "close"], inplace=True)
    df.drop_duplicates(subset=["symbol", "date"], inplace=True)

    # filter out obviously bad rows (e.g. zero or negative prices)
    df = df[(df["open"] > 0) & (df["high"] > 0) & (df["low"] > 0) & (df["close"] > 0)]
    df = df[df["high"] >= df["low"]]

    df.sort_values(["symbol", "date"], inplace=True)
    df.reset_index(drop=True, inplace=True)
    return df

#### =========================
#### 3. TECHNICAL INDICATORS
#### =========================

In [24]:
def ema(series: pd.Series, span: int) -> pd.Series:
    return series.ewm(span=span, adjust=False).mean()


def rsi(series: pd.Series, period: int = 14) -> pd.Series:
    delta = series.diff()
    gain = np.where(delta > 0, delta, 0.0)
    loss = np.where(delta < 0, -delta, 0.0)

    gain = pd.Series(gain, index=series.index)
    loss = pd.Series(loss, index=series.index)

    avg_gain = gain.rolling(window=period, min_periods=period).mean()
    avg_loss = loss.rolling(window=period, min_periods=period).mean()

    # Use Wilderâ€™s smoothing after first period
    avg_gain = avg_gain.where(avg_gain.index < series.index[period],
                              (avg_gain.shift(1) * (period - 1) + gain) / period)
    avg_loss = avg_loss.where(avg_loss.index < series.index[period],
                              (avg_loss.shift(1) * (period - 1) + loss) / period)

    rs = avg_gain / (avg_loss + 1e-9)
    rsi = 100 - (100 / (1 + rs))
    return rsi


def add_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add common technical indicators per symbol:
      - Log returns
      - Rolling volatility (20d)
      - EMA 20 / 50 / 200
      - MACD (12,26,9)
      - Bollinger Bands (20, 2)
      - ATR (14)
    """
    df = df.copy()

    def _per_symbol(g: pd.DataFrame) -> pd.DataFrame:
        g = g.sort_values("date").copy()

        # returns
        g["ret_1d"] = g["close"].pct_change()
        g["log_ret_1d"] = np.log(g["close"] / g["close"].shift(1))

        # vol
        g["vol_20d"] = g["log_ret_1d"].rolling(20, min_periods=10).std()

        # EMA
        g["ema_20"] = ema(g["close"], 20)
        g["ema_50"] = ema(g["close"], 50)
        g["ema_200"] = ema(g["close"], 200)

        # MACD
        ema_fast = ema(g["close"], 12)
        ema_slow = ema(g["close"], 26)
        g["macd"] = ema_fast - ema_slow
        g["macd_signal"] = ema(g["macd"], 9)
        g["macd_hist"] = g["macd"] - g["macd_signal"]

        # Bollinger
        ma20 = g["close"].rolling(20, min_periods=10).mean()
        sd20 = g["close"].rolling(20, min_periods=10).std()
        g["bb_mid_20"] = ma20
        g["bb_up_20"] = ma20 + 2 * sd20
        g["bb_low_20"] = ma20 - 2 * sd20

        # ATR (14)
        high_low = g["high"] - g["low"]
        high_close_prev = (g["high"] - g["close"].shift(1)).abs()
        low_close_prev = (g["low"] - g["close"].shift(1)).abs()
        tr = pd.concat([high_low, high_close_prev, low_close_prev], axis=1).max(axis=1)
        g["atr_14"] = tr.rolling(14, min_periods=5).mean()

        return g

    df = df.groupby("symbol", group_keys=False).apply(_per_symbol)
    return df

#### =========================
#### 4. CANDLESTICK PATTERNS
#### =========================

In [25]:
def add_candlestick_patterns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Adds some basic candlestick pattern flags:
      - bullish_engulfing
      - bearish_engulfing
      - hammer
      - shooting_star
      - doji
    Values are 1 if pattern present, else 0.
    """
    df = df.copy()

    def _per_symbol(g: pd.DataFrame) -> pd.DataFrame:
        g = g.sort_values("date").copy()

        body = g["close"] - g["open"]
        body_abs = body.abs()
        range_ = g["high"] - g["low"]
        upper_shadow = g["high"] - g[["open", "close"]].max(axis=1)
        lower_shadow = g[["open", "close"]].min(axis=1) - g["low"]

        # Engulfing
        prev_body = body.shift(1)
        prev_open = g["open"].shift(1)
        prev_close = g["close"].shift(1)

        bullish_engulf = (
            (prev_body < 0)
            & (body > 0)
            & (g["open"] < prev_close)
            & (g["close"] > prev_open)
        )

        bearish_engulf = (
            (prev_body > 0)
            & (body < 0)
            & (g["open"] > prev_close)
            & (g["close"] < prev_open)
        )

        # Hammer: small body near top, long lower shadow
        hammer = (
            (lower_shadow >= 2 * body_abs)
            & (upper_shadow <= body_abs)
            & (body_abs / range_ < 0.4)
        )

        # Shooting star: small body near bottom, long upper shadow
        shooting_star = (
            (upper_shadow >= 2 * body_abs)
            & (lower_shadow <= body_abs)
            & (body_abs / range_ < 0.4)
        )

        # Doji: very small body
        doji = body_abs <= (0.1 * range_)

        g["pat_bull_engulf"] = bullish_engulf.astype(int)
        g["pat_bear_engulf"] = bearish_engulf.astype(int)
        g["pat_hammer"] = hammer.astype(int)
        g["pat_shooting_star"] = shooting_star.astype(int)
        g["pat_doji"] = doji.astype(int)

        return g

    df = df.groupby("symbol", group_keys=False).apply(_per_symbol)
    return df

#### =========================
#### 5. PRICE STRUCTURE (SWINGS + HH/HL/LH/LL)
#### =========================

In [26]:
def add_price_structure(
    df: pd.DataFrame,
    left: int = 2,
    right: int = 2
) -> pd.DataFrame:
    """
    Identify swing highs/lows and classify swings into HH/HL/LH/LL.
    Simple approach:
      - A swing high is a high that is greater than neighbors in +/- (left/right) days
      - A swing low is a low that is less than neighbors in +/- (left/right) days
    """

    df = df.copy()

    def _find_swings(g: pd.DataFrame) -> pd.DataFrame:
        g = g.sort_values("date").copy()
        n = len(g)
        swing_high = np.zeros(n, dtype=int)
        swing_low = np.zeros(n, dtype=int)

        highs = g["high"].values
        lows = g["low"].values

        for i in range(left, n - right):
            window_high = highs[i - left : i + right + 1]
            window_low = lows[i - left : i + right + 1]

            if highs[i] == window_high.max() and (highs[i] > window_high[[0, -1]]).all():
                swing_high[i] = 1

            if lows[i] == window_low.min() and (lows[i] < window_low[[0, -1]]).all():
                swing_low[i] = 1

        g["swing_high"] = swing_high
        g["swing_low"] = swing_low

        # Now label HH/HL/LH/LL only on swing points
        structure = []
        last_swing_high = None
        last_swing_low = None

        for i, row in g.iterrows():
            label = "NONE"
            if row["swing_high"] == 1:
                if last_swing_high is None:
                    label = "SH"
                else:
                    if row["high"] > last_swing_high:
                        label = "HH"
                    else:
                        label = "LH"
                last_swing_high = row["high"]

            elif row["swing_low"] == 1:
                if last_swing_low is None:
                    label = "SL"
                else:
                    if row["low"] > last_swing_low:
                        label = "HL"
                    else:
                        label = "LL"
                last_swing_low = row["low"]

            structure.append(label)

        g["structure_label"] = structure
        return g

    df = df.groupby("symbol", group_keys=False).apply(_find_swings)
    return df


#### =========================
#### 6. TARGETS (FUTURE RETURNS)
#### =========================

In [27]:
def add_targets(
    df: pd.DataFrame,
    horizons: List[int] = [1, 5, 21]
) -> pd.DataFrame:
    """
    Add future returns and classification labels.
    For each horizon h:
      - future_ret_h = close(t+h) / close(t) - 1
      - target_up_h = 1 if future_ret_h > 0 else 0
    """
    df = df.copy()

    def _per_symbol(g: pd.DataFrame) -> pd.DataFrame:
        g = g.sort_values("date").copy()
        for h in horizons:
            fut_close = g["close"].shift(-h)
            fut_ret = fut_close / g["close"] - 1.0
            col_ret = f"future_ret_{h}d"
            col_target = f"target_up_{h}d"

            g[col_ret] = fut_ret
            g[col_target] = (fut_ret > 0).astype(int)
        return g

    df = df.groupby("symbol", group_keys=False).apply(_per_symbol)
    return df


#### =========================
#### 7. MASTER PIPELINE
#### =========================

In [28]:
def build_ohlcv_ml_ready(
    symbols: List[str],
    start_date: dt.date,
    end_date: dt.date,
    output_parquet: str
) -> pd.DataFrame:
    print("Step 1: Downloading data...")
    ohlcv = download_ohlcv_nsepy(symbols, start_date, end_date)

    print("Step 2: Cleaning data...")
    ohlcv = clean_ohlcv(ohlcv)

    print("Step 3: Adding technical indicators...")
    ohlcv = add_technical_indicators(ohlcv)

    print("Step 4: Adding candlestick patterns...")
    ohlcv = add_candlestick_patterns(ohlcv)

    print("Step 5: Adding price structure (swings + HH/HL/LH/LL)...")
    ohlcv = add_price_structure(ohlcv)

    print("Step 6: Adding targets (future returns)...")
    ohlcv = add_targets(ohlcv, horizons=[1, 5, 21])

    print(f"Step 7: Saving to Parquet at: {output_parquet}")
    ohlcv.to_parquet(output_parquet, index=False)

    print("Done.")
    return ohlcv


#### =========================
#### 8. SCRIPT ENTRY POINT
#### =========================

In [None]:
# ðŸ”§ TODO: customise this list for your dissertation universe
SYMBOLS = [
    "RELIANCE",
    # "TCS",
    # "HDFCBANK",
    # "INFY",
    # "ICICIBANK",
    # add more symbols as needed
]

START = dt.date(2024, 12, 30)
END = dt.date(2024, 12, 31)

OUTPUT_PATH = "data/sample_ohlcv_ml_ready.parquet"

# Make sure 'data' folder exists

os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)

build_ohlcv_ml_ready(SYMBOLS, START, END, OUTPUT_PATH)



Step 1: Downloading data...


Downloading OHLCV (jugaad-data): 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1/1 [00:10<00:00, 10.15s/it]
  df = df.groupby("symbol", group_keys=False).apply(_per_symbol)
  df = df.groupby("symbol", group_keys=False).apply(_per_symbol)
  df = df.groupby("symbol", group_keys=False).apply(_find_swings)
  df = df.groupby("symbol", group_keys=False).apply(_per_symbol)


Step 2: Cleaning data...
Step 3: Adding technical indicators...
Step 4: Adding candlestick patterns...
Step 5: Adding price structure (swings + HH/HL/LH/LL)...
Step 6: Adding targets (future returns)...
Step 7: Saving to Parquet at: data/ohlcv_ml_ready.parquet
Done.


Unnamed: 0,date,open,high,low,close,volume,symbol,ret_1d,log_ret_1d,vol_20d,...,pat_doji,swing_high,swing_low,structure_label,future_ret_1d,target_up_1d,future_ret_5d,target_up_5d,future_ret_21d,target_up_21d
0,2024-12-30,1216.4,1223.2,1208.1,1210.7,8818766,RELIANCE,,,,...,0,0,0,NONE,0.003923,1,,0,,0
1,2024-12-31,1208.0,1219.1,1206.15,1215.45,6405475,RELIANCE,0.003923,0.003916,,...,0,0,0,NONE,,0,,0,,0
