In [1]:
# core_data_pipeline_extended.py
import math
import numpy as np
import pandas as pd
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler

# -----------------------------
# Helpers: EMA and Wilder EMA
# -----------------------------
def ema(series: pd.Series, span: int) -> pd.Series:
    k = 2 / (span + 1)
    return series.ewm(alpha=k, adjust=False).mean()

def wilder_ema(series: pd.Series, period: int) -> pd.Series:
    return series.ewm(alpha=1/period, adjust=False).mean()

def sma(series: pd.Series, period: int) -> pd.Series:
    return series.rolling(period).mean()

# -----------------------------
# RSI (Wilder)
# -----------------------------
def rsi_wilder(close: pd.Series, period: int = 14) -> pd.Series:
    delta = close.diff()
    up = delta.clip(lower=0)
    down = -delta.clip(upper=0)
    avg_up = wilder_ema(up, period)
    avg_down = wilder_ema(down, period)
    rs = avg_up / (avg_down.replace(0, np.nan))
    rsi = 100 - (100 / (1 + rs))
    return rsi.fillna(0)

# -----------------------------
# MACD
# -----------------------------
def macd(close: pd.Series, fast=12, slow=26, signal=9):
    ema_fast = ema(close, fast)
    ema_slow = ema(close, slow)
    macd_line = ema_fast - ema_slow
    signal_line = ema(macd_line, signal)
    hist = macd_line - signal_line
    return macd_line, signal_line, hist

# -----------------------------
# Bollinger Bands
# -----------------------------
def bollinger_bands(close: pd.Series, period=20, mult=2.0):
    sma_ = sma(close, period)
    std = close.rolling(period).std()
    upper = sma_ + mult * std
    lower = sma_ - mult * std
    width = upper - lower
    return sma_, upper, lower, width

# -----------------------------
# ATR
# -----------------------------
def true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series:
    prev_close = close.shift(1)
    tr1 = high - low
    tr2 = (high - prev_close).abs()
    tr3 = (low - prev_close).abs()
    return pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)

def atr_wilder(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series:
    tr = true_range(high, low, close)
    return wilder_ema(tr, period)

# -----------------------------
# Pivot Points
# -----------------------------
def daily_pivot_levels(df: pd.DataFrame) -> pd.DataFrame:
    piv = pd.DataFrame(index=df.index)
    H, L, C = df['High'].shift(1), df['Low'].shift(1), df['Close'].shift(1)
    P = (H + L + C) / 3.0
    R1 = 2*P - L
    S1 = 2*P - H
    R2 = P + (H - L)
    S2 = P - (H - L)
    piv['P'] = P
    piv['R1'] = R1
    piv['S1'] = S1
    piv['R2'] = R2
    piv['S2'] = S2
    return piv

# -----------------------------
# Fractal pivots
# -----------------------------
def fractal_pivots(df: pd.DataFrame, left=2, right=2):
    highs = df['High'].values
    lows = df['Low'].values
    n = len(df)
    pivot_high = np.full(n, False, dtype=bool)
    pivot_low = np.full(n, False, dtype=bool)
    for i in range(left, n-right):
        if highs[i] == max(highs[i-left:i+right+1]):
            pivot_high[i] = True
        if lows[i] == min(lows[i-left:i+right+1]):
            pivot_low[i] = True
    piv = pd.DataFrame({'pivot_high': pivot_high, 'pivot_low': pivot_low}, index=df.index)
    piv['ph_price'] = np.where(piv['pivot_high'], df['High'], np.nan)
    piv['pl_price'] = np.where(piv['pivot_low'], df['Low'], np.nan)
    return piv

# -----------------------------
# Clustered S/R
# -----------------------------
def cluster_levels(prices: pd.Series, tolerance_frac=0.002, min_touches=3):
    pts = prices.dropna().sort_values().values
    if len(pts) == 0: return []
    clusters = [[pts[0]]]
    for p in pts[1:]:
        if abs(p - clusters[-1][-1]) <= tolerance_frac * clusters[-1][-1]:
            clusters[-1].append(p)
        else:
            clusters.append([p])
    levels = []
    for cl in clusters:
        if len(cl) >= min_touches:
            levels.append({'level': float(np.mean(cl)), 'touches': int(len(cl))})
    return levels

def support_resistance_from_fractals(df: pd.DataFrame, piv: pd.DataFrame, atr_col='ATR', atr_mult=0.0, min_touches=3):
    highs = piv['ph_price']
    lows = piv['pl_price']
    res_levels = cluster_levels(highs, tolerance_frac=0.002, min_touches=min_touches)
    sup_levels = cluster_levels(lows, tolerance_frac=0.002, min_touches=min_touches)
    return {'resistance': res_levels, 'support': sup_levels}

# -----------------------------
# Fibonacci
# -----------------------------
def fib_levels_from_swings(swing_high: float, swing_low: float):
    ratios = [0.236, 0.382, 0.5, 0.618, 0.786]
    if swing_high >= swing_low:
        diff = swing_high - swing_low
        levels = {f'{int(r*100)}%': swing_high - r*diff for r in ratios}
        levels['0%'] = swing_high
        levels['100%'] = swing_low
    else:
        diff = swing_low - swing_high
        levels = {f'{int(r*100)}%': swing_low + r*diff for r in ratios}
        levels['0%'] = swing_low
        levels['100%'] = swing_high
    return levels

def last_swing_from_fractals(df: pd.DataFrame, piv: pd.DataFrame, lookback=200):
    sub = df.tail(lookback)
    sub_piv = piv.loc[sub.index]
    last_idx = sub.index[-1]
    recent_ph = sub_piv[sub_piv['pivot_high']].index.max()
    recent_pl = sub_piv[sub_piv['pivot_low']].index.max()
    if pd.isna(recent_ph) and pd.isna(recent_pl): return None, None
    if pd.isna(recent_pl) or (recent_ph and recent_ph > recent_pl):
        prev_pl = sub_piv[sub_piv.index < recent_ph][sub_piv['pivot_low']].index.max()
        if pd.isna(prev_pl): return None, None
        return float(df.loc[recent_ph, 'High']), float(df.loc[prev_pl, 'Low'])
    else:
        prev_ph = sub_piv[sub_piv.index < recent_pl][sub_piv['pivot_high']].index.max()
        if pd.isna(prev_ph): return None, None
        return float(df.loc[prev_ph, 'High']), float(df.loc[recent_pl, 'Low'])

# -----------------------------
# New Indicators
# -----------------------------

# Stochastic Oscillator (%K, %D)
def stochastic_oscillator(df: pd.DataFrame, k_period=14, d_period=3):
    low_min = df['Low'].rolling(k_period).min()
    high_max = df['High'].rolling(k_period).max()
    k = 100 * (df['Close'] - low_min) / (high_max - low_min)
    d = k.rolling(d_period).mean()
    return k, d

# OBV
def obv(df: pd.DataFrame):
    obv_series = np.zeros(len(df))
    for i in range(1, len(df)):
        if df['Close'].iloc[i] > df['Close'].iloc[i-1]:
            obv_series[i] = obv_series[i-1] + df['Volume'].iloc[i]
        elif df['Close'].iloc[i] < df['Close'].iloc[i-1]:
            obv_series[i] = obv_series[i-1] - df['Volume'].iloc[i]
        else:
            obv_series[i] = obv_series[i-1]
    return pd.Series(obv_series, index=df.index)

# Money Flow Index
def mfi(df: pd.DataFrame, period=14):
    typical_price = (df['High'] + df['Low'] + df['Close']) / 3
    money_flow = typical_price * df['Volume']
    pos_flow = []
    neg_flow = []
    for i in range(1, len(df)):
        if typical_price[i] > typical_price[i-1]:
            pos_flow.append(money_flow[i])
            neg_flow.append(0)
        else:
            pos_flow.append(0)
            neg_flow.append(money_flow[i])
    pos_flow = pd.Series([0]+pos_flow, index=df.index)
    neg_flow = pd.Series([0]+neg_flow, index=df.index)
    mf_ratio = pos_flow.rolling(period).sum() / neg_flow.rolling(period).sum().replace(0, np.nan)
    mfi_series = 100 - (100 / (1 + mf_ratio))
    return mfi_series.fillna(50)

# ADX
def adx(df: pd.DataFrame, period=14):
    high, low, close = df['High'], df['Low'], df['Close']
    plus_dm = high.diff()
    minus_dm = low.diff() * -1
    plus_dm = plus_dm.where((plus_dm > 0) & (plus_dm > minus_dm), 0.0)
    minus_dm = minus_dm.where((minus_dm > 0) & (minus_dm > plus_dm), 0.0)
    tr = true_range(high, low, close)
    atr_ = tr.rolling(period).mean()
    plus_di = 100 * (plus_dm.rolling(period).sum() / atr_)
    minus_di = 100 * (minus_dm.rolling(period).sum() / atr_)
    dx = 100 * ((plus_di - minus_di).abs() / (plus_di + minus_di).replace(0, np.nan))
    adx_series = dx.rolling(period).mean()
    return adx_series.fillna(0)

# Commodity Channel Index (CCI)
def cci(df: pd.DataFrame, period=20):
    tp = (df['High'] + df['Low'] + df['Close']) / 3
    cci_series = (tp - tp.rolling(period).mean()) / (0.015 * tp.rolling(period).std())
    return cci_series.fillna(0)

# Chaikin Money Flow (CMF)
def cmf(df: pd.DataFrame, period=20):
    mf_mult = ((df['Close'] - df['Low']) - (df['High'] - df['Close'])) / (df['High'] - df['Low']).replace(0, np.nan)
    mf_volume = mf_mult * df['Volume']
    cmf_series = mf_volume.rolling(period).sum() / df['Volume'].rolling(period).sum().replace(0, np.nan)
    return cmf_series.fillna(0)

# -----------------------------
# Fetch OHLCV
# -----------------------------
def fetch_ohlcv(symbol: str, period="5y", interval="1d") -> pd.DataFrame:
    t = yf.Ticker(symbol)
    df = t.history(period=period, interval=interval, auto_adjust=False)
    df = df.rename(columns=str.title)
    df = df[['Open','High','Low','Close','Volume']].dropna()
    return df

# -----------------------------
# Build Features (all horizons)
# -----------------------------
def build_features(df: pd.DataFrame) -> (pd.DataFrame, dict, dict):
    out = df.copy()
    
    # Core indicators
    out['RSI14'] = rsi_wilder(out['Close'], 14)
    macd_line, signal_line, hist = macd(out['Close'])
    out['MACD'] = macd_line
    out['MACD_SIGNAL'] = signal_line
    out['MACD_HIST'] = hist
    sma20, bb_u20, bb_l20, bb_width20 = bollinger_bands(out['Close'], 20)
    out['BB_MID_20'] = sma20
    out['BB_UPPER_20_2'] = bb_u20
    out['BB_LOWER_20_2'] = bb_l20
    out['BB_WIDTH_20'] = bb_width20
    out['ATR14'] = atr_wilder(out['High'], out['Low'], out['Close'])
    piv = daily_pivot_levels(out)
    out = out.join(piv)
    pivots = fractal_pivots(out)
    out = out.join(pivots)
    sr = support_resistance_from_fractals(out, pivots)
    sh, sl = last_swing_from_fractals(out, pivots)
    fib = fib_levels_from_swings(sh, sl) if (sh is not None and sl is not None) else {}
    
    # ----------------- Short-Term Features -----------------
    out['EMA5'] = ema(out['Close'], 5)
    out['EMA10'] = ema(out['Close'], 10)
    stoch_k, stoch_d = stochastic_oscillator(out)
    out['STOCH_K'] = stoch_k
    out['STOCH_D'] = stoch_d
    out['OBV'] = obv(out)
    
    # ----------------- Medium-Term Features -----------------
    out['SMA50'] = sma(out['Close'], 50)
    out['MFI14'] = mfi(out)
    out['ADX14'] = adx(out)
    out['RSI28'] = rsi_wilder(out['Close'], 28)  # slow RSI
    
    # ----------------- Long-Term Features -----------------
    out['SMA100'] = sma(out['Close'], 100)
    out['SMA200'] = sma(out['Close'], 200)
    out['EMA100'] = ema(out['Close'], 100)
    out['CCI50'] = cci(out, 50)
    out['CMF50'] = cmf(out, 50)
    _, bb_u100, bb_l100, bb_width100 = bollinger_bands(out['Close'], 100)
    out['BB_UPPER_100'] = bb_u100
    out['BB_LOWER_100'] = bb_l100
    out['BB_WIDTH_100'] = bb_width100
    
    return out, sr, fib

# -----------------------------
# Main Execution
# -----------------------------
import os

if __name__ == "__main__":
    symbols = ["HDFCBANK.NS", "TCS.NS", "INFY.NS"]

    data_dir = "../Data"
    os.makedirs(data_dir, exist_ok=True)

    for s in symbols:
        df = fetch_ohlcv(s, period="5y", interval="1d")
        feats, sr, fib = build_features(df)
        feats.to_csv(os.path.join(data_dir, f"{s}_features.csv"), index=True)
        pd.DataFrame(sr.get('support', [])).to_csv(os.path.join(data_dir, f"{s}_support_levels.csv"), index=False)
        pd.DataFrame(sr.get('resistance', [])).to_csv(os.path.join(data_dir, f"{s}_resistance_levels.csv"), index=False)
        pd.Series(fib).to_csv(os.path.join(data_dir, f"{s}_fibonacci_levels.csv"), index=True)
        print(f"Saved data for {s} in {data_dir}/")


  prev_pl = sub_piv[sub_piv.index < recent_ph][sub_piv['pivot_low']].index.max()
  if typical_price[i] > typical_price[i-1]:
  pos_flow.append(money_flow[i])
  neg_flow.append(money_flow[i])


Saved data for HDFCBANK.NS in ../Data/


  prev_ph = sub_piv[sub_piv.index < recent_pl][sub_piv['pivot_high']].index.max()
  if typical_price[i] > typical_price[i-1]:
  pos_flow.append(money_flow[i])
  neg_flow.append(money_flow[i])


Saved data for TCS.NS in ../Data/
Saved data for INFY.NS in ../Data/


  prev_ph = sub_piv[sub_piv.index < recent_pl][sub_piv['pivot_high']].index.max()
  if typical_price[i] > typical_price[i-1]:
  pos_flow.append(money_flow[i])
  neg_flow.append(money_flow[i])
