In [None]:
import boto3
import pandas as pd
import numpy as np
import os
import datetime
from collections import deque
from io import BytesIO

# ==========================================
# CONFIGURATION
# ==========================================
BUCKET = "live-market-data"
SYMBOL = "NIFTY"

START_DATE = datetime.date(2025, 9, 1)
END_DATE = datetime.date(2025, 11, 26)

EXPIRY_SEP = datetime.date(2025, 9, 25)
EXPIRY_OCT = datetime.date(2025, 10, 30)
EXPIRY_NOV = datetime.date(2025, 11, 27)

# Strategy Params
KINETIC_THRESHOLD = 37500
MEASURE_DURATION = 900   # 15 Minutes to measure the "Bang"

# ==========================================
# 1. S3 & DATA UTILITIES
# ==========================================
def get_trading_symbol(current_date):
    if current_date <= EXPIRY_SEP: return "NIFTY25SEPFUT"
    elif current_date <= EXPIRY_OCT: return "NIFTY25OCTFUT"
    else: return "NIFTY25NOVFUT"

def get_data_for_date(date_obj):
    s3 = boto3.client("s3")
    year = date_obj.year
    month = date_obj.month
    day = date_obj.day
    ts = get_trading_symbol(date_obj)
    
    key = f"year={year}/month={month:02d}/day={day:02d}/Futures/{SYMBOL}/{ts}.parquet"
    
    try:
        obj = s3.get_object(Bucket=BUCKET, Key=key)
        df = pd.read_parquet(BytesIO(obj["Body"].read()))
        
        if 'DateTime' not in df.columns:
            df['DateTime'] = pd.to_datetime(
                df['Date'].astype(str) + " " + df['Time'].astype(str), 
                dayfirst=True, errors='coerce'
            )
        
        col_map = {'LastTradedPrice': 'LTP', 'Close': 'LTP'}
        df.rename(columns=col_map, inplace=True)
        
        if 'Volume' not in df.columns:
            if 'OpenInterest' in df.columns: df['Volume'] = df['OpenInterest']
            elif 'LTQ' in df.columns: df['Volume'] = df['LTQ'] 
            else: df['Volume'] = 0
            
        df = df.dropna(subset=['DateTime', 'LTP']).sort_values('DateTime').reset_index(drop=True)
        return df
    except Exception as e:
        return None

# ==========================================
# 2. KINETIC BRAIN (MAGNITUDE ONLY)
# ==========================================
class KineticBrain:
    def __init__(self, threshold=37500):
        self.threshold = threshold
        self.tick_buffer = deque(maxlen=50) 
        self.last_score = 0

    def process_tick(self, ltp, cumulative_volume):
        self.tick_buffer.append([ltp, cumulative_volume])
        
        if len(self.tick_buffer) < 50:
            return False # Not ready
            
        score = self._calculate_score()
        self.last_score = score
        
        if score > self.threshold:
            return True # SIGNAL FIRED (High Energy)
            
        return False

    def _calculate_score(self):
        data = np.array(self.tick_buffer)
        prices = data[:, 0]
        vols = data[:, 1]
        
        vol_diff = np.diff(vols)
        trade_vol = np.where(vol_diff > 0, vol_diff, 0)
        
        displacement = abs(prices[-1] - prices[0])
        total_vol = np.sum(trade_vol)
        
        score = total_vol / (displacement + 0.05)
        return score

# ==========================================
# 3. MAGNITUDE TEST ENGINE
# ==========================================
def run_day(df):
    brain = KineticBrain(threshold=KINETIC_THRESHOLD)
    
    events = []
    
    # State
    monitoring = False
    entry_time = None
    entry_price = 0.0
    max_dev = 0.0 # Tracks max absolute move
    
    for row in df.itertuples():
        ts = row.DateTime
        ltp = row.LTP
        vol = row.Volume
        
        # 1. Update Monitor if active
        if monitoring:
            elapsed = (ts - entry_time).total_seconds()
            current_dev = abs(ltp - entry_price)
            
            if current_dev > max_dev:
                max_dev = current_dev
                
            if elapsed >= MEASURE_DURATION:
                # Event Finished
                events.append(max_dev)
                monitoring = False
                brain.tick_buffer.clear() # Reset brain to find fresh energy
            continue # Skip detection while monitoring an event
            
        # 2. Check Signal
        fired = brain.process_tick(ltp, vol)
        
        if fired:
            monitoring = True
            entry_time = ts
            entry_price = ltp
            max_dev = 0.0 # Reset tracker
            
    return events

# ==========================================
# 4. MAIN LOOP
# ==========================================
def main():
    print(f"Running KINETIC MAGNITUDE TEST from {START_DATE} to {END_DATE}")
    print(f"Threshold: {KINETIC_THRESHOLD} | Measure Window: {MEASURE_DURATION}s")
    print("-" * 65)
    
    all_moves = []
    
    current_date = START_DATE
    while current_date <= END_DATE:
        if current_date.weekday() >= 5:
            current_date += datetime.timedelta(days=1)
            continue
            
        df = get_data_for_date(current_date)
        
        if df is not None and not df.empty:
            daily_moves = run_day(df)
            all_moves.extend(daily_moves)
            
            # Optional: Print daily avg just to show progress
            # avg = sum(daily_moves)/len(daily_moves) if daily_moves else 0
            # print(f"{current_date} | {len(daily_moves)} Signals | Avg Move: {avg:.2f} pts")
        
        current_date += datetime.timedelta(days=1)
        
    # --- FINAL STATS ---
    total = len(all_moves)
    if total == 0:
        print("No signals found.")
        return

    moves = np.array(all_moves)
    
    print("\n" + "="*40)
    print(f"MAGNITUDE DISTRIBUTION (N={total})")
    print("="*40)
    
    thresholds = [10, 20, 30, 40, 50, 75, 100]
    
    for t in thresholds:
        count = np.sum(moves >= t)
        pct = (count / total) * 100
        print(f"Moves >= {t:<3} pts: {count:<5} ({pct:.1f}%)")
        
    print("-" * 40)
    print(f"Average Move: {np.mean(moves):.2f} pts")
    print(f"Median Move:  {np.median(moves):.2f} pts")
    print(f"Max Move:     {np.max(moves):.2f} pts")

if __name__ == "__main__":
    main()