In [None]:
!pip uninstall numpy scipy -y

In [None]:
!pip install numpy==1.26.4 scipy==1.13.1

In [None]:
!pip install logzero
!pip install pyotp

In [None]:
!pip install smartapi-python --upgrade

In [None]:
!pip install pandas-ta

In [None]:
!pip install optuna

In [None]:
!pip install smartapi-python

In [None]:
import numpy as np
import pandas as pd
import lightgbm as lgb
import optuna
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error, f1_score, accuracy_score
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import logging
from SmartApi import SmartConnect
from pyotp import TOTP
import urllib.request
import json
import time

In [None]:
optuna.logging.set_verbosity(optuna.logging.WARNING)
logging.basicConfig(
    level=logging.WARNING,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

DEFAULT_EXCHANGE = "NSE"

In [None]:
class ImprovedExitDataset:
    def __init__(self, df, verbose: bool = False):
        self.df = df.copy()
        self.verbose = verbose

    def process_data(self):
        self.add_technical_indicators()
        self.add_volatility_measures()
        self.add_volume_analysis()
        self.add_price_patterns()
        self.add_support_resistance()
        self.add_market_regime()
        self.add_momentum_indicators()

        critical_features = [
            'rsi', 'macd', 'macd_hist', 'bb_width', 'atr_pct',
            'volatility_10d', 'vol_ratio', 'volume_ratio',
            'price_vs_ma20', 'price_vs_ma50',
            'dist_to_resistance', 'dist_to_support',
            'trend_strength', 'market_condition', 'momentum_score'
        ]
        self.df.dropna(subset=critical_features, inplace=True)
        if self.verbose:
            print(f"processed rows: {len(self.df)}")

    def add_technical_indicators(self):
        # RSI
        delta = self.df['close'].diff()
        gain = delta.clip(lower=0).rolling(14).mean()
        loss = -delta.clip(upper=0).rolling(14).mean()
        rs = gain / loss.replace(0, np.nan)
        self.df['rsi'] = 100 - (100 / (1 + rs))
        self.df['rsi_overbought'] = (self.df['rsi'] > 70).astype(int)
        self.df['rsi_oversold'] = (self.df['rsi'] < 30).astype(int)

        # MACD
        ema_12 = self.df['close'].ewm(span=12, adjust=False).mean()
        ema_26 = self.df['close'].ewm(span=26, adjust=False).mean()
        self.df['macd'] = ema_12 - ema_26
        self.df['macd_signal'] = self.df['macd'].ewm(span=9, adjust=False).mean()
        self.df['macd_hist'] = self.df['macd'] - self.df['macd_signal']
        self.df['macd_bullish_cross'] = ((self.df['macd'] > self.df['macd_signal']) &
                                         (self.df['macd'].shift(1) <= self.df['macd_signal'].shift(1))).astype(int)
        self.df['macd_bearish_cross'] = ((self.df['macd'] < self.df['macd_signal']) &
                                         (self.df['macd'].shift(1) >= self.df['macd_signal'].shift(1))).astype(int)

        # Bollinger Bands
        bb_length, bb_std = 20, 2
        basis = self.df['close'].rolling(bb_length).mean()
        dev = bb_std * self.df['close'].rolling(bb_length).std()
        self.df['bb_upper'] = basis + dev
        self.df['bb_lower'] = basis - dev
        self.df['bb_width'] = (self.df['bb_upper'] - self.df['bb_lower']) / basis
        self.df['bb_position'] = (self.df['close'] - self.df['bb_lower']) / (self.df['bb_upper'] - self.df['bb_lower'])

        # ATR
        tr1 = self.df['high'] - self.df['low']
        tr2 = (self.df['high'] - self.df['close'].shift()).abs()
        tr3 = (self.df['low'] - self.df['close'].shift()).abs()
        tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
        self.df['atr'] = tr.rolling(14).mean()
        self.df['atr_pct'] = self.df['atr'] / self.df['close']

    def add_volatility_measures(self):
        self.df['returns'] = self.df['close'].pct_change()
        self.df['volatility_5d'] = self.df['returns'].rolling(5).std()
        self.df['volatility_10d'] = self.df['returns'].rolling(10).std()
        self.df['volatility_21d'] = self.df['returns'].rolling(21).std()
        self.df['vol_ratio'] = self.df['volatility_10d'] / self.df['volatility_21d'].replace(0, np.nan)
        self.df['vol_expanding'] = (self.df['vol_ratio'] > 1.2).astype(int)
        self.df['parkinson_vol'] = np.sqrt((1 / (4 * np.log(2))) *
                                           ((np.log(self.df['high'] / self.df['low'])) ** 2).rolling(20).mean())

    def add_volume_analysis(self):
        self.df['volume_ma20'] = self.df['volume'].rolling(20).mean()
        self.df['volume_ratio'] = self.df['volume'] / self.df['volume_ma20'].replace(0, np.nan)
        self.df['volume_spike'] = (self.df['volume_ratio'] > 2).astype(int)
        self.df['obv'] = (np.sign(self.df['returns']) * self.df['volume']).cumsum()
        self.df['obv_ma20'] = self.df['obv'].rolling(20).mean()
        self.df['obv_trend'] = np.where(self.df['obv'] > self.df['obv_ma20'], 1, -1)
        self.df['vpt'] = (self.df['returns'] * self.df['volume']).cumsum()
        self.df['vpt_ma20'] = self.df['vpt'].rolling(20).mean()

    def add_price_patterns(self):
        self.df['ma_20'] = self.df['close'].rolling(20).mean()
        self.df['ma_50'] = self.df['close'].rolling(50).mean()
        self.df['ma_200'] = self.df['close'].rolling(200).mean()
        self.df['price_vs_ma20'] = self.df['close'] / self.df['ma_20'] - 1
        self.df['price_vs_ma50'] = self.df['close'] / self.df['ma_50'] - 1
        self.df['price_vs_ma200'] = (self.df['close'] / self.df['ma_200'] - 1)
        self.df['price_vs_ma200'] = self.df['price_vs_ma200'].fillna(0)
        self.df['ma_golden_cross'] = ((self.df['ma_50'] > self.df['ma_200']) &
                                      (self.df['ma_50'].shift(1) <= self.df['ma_200'].shift(1))).astype(int)
        self.df['ma_death_cross'] = ((self.df['ma_50'] < self.df['ma_200']) &
                                     (self.df['ma_50'].shift(1) >= self.df['ma_200'].shift(1))).astype(int)
        self.df['daily_range'] = (self.df['high'] - self.df['low']) / self.df['close']
        self.df['range_vs_atr'] = self.df['daily_range'] / self.df['atr_pct'].replace(0, np.nan)

    def add_support_resistance(self):
        self.df['recent_high_10'] = self.df['high'].rolling(10).max()
        self.df['recent_low_10'] = self.df['low'].rolling(10).min()
        self.df['recent_high_20'] = self.df['high'].rolling(20).max()
        self.df['recent_low_20'] = self.df['low'].rolling(20).min()
        self.df['dist_to_resistance'] = (self.df['recent_high_20'] - self.df['close']) / self.df['close']
        self.df['dist_to_support'] = (self.df['close'] - self.df['recent_low_20']) / self.df['close']
        self.df['near_resistance'] = (self.df['dist_to_resistance'] < 0.02).astype(int)
        self.df['near_support'] = (self.df['dist_to_support'] < 0.02).astype(int)

    def add_market_regime(self):
        self.df['trend_strength'] = abs(self.df['close'].pct_change(20).rolling(5).mean())
        self.df['market_condition'] = 0
        self.df.loc[self.df['close'] > self.df['ma_200'] * 1.05, 'market_condition'] = 1
        self.df.loc[self.df['close'] < self.df['ma_200'] * 0.95, 'market_condition'] = -1
        try:
            high_low = self.df['high'] - self.df['low']
            high_close = (self.df['high'] - self.df['close'].shift()).abs()
            low_close = (self.df['low'] - self.df['close'].shift()).abs()
            tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
            atr_14 = tr.rolling(14).mean()
            up_move = self.df['high'] - self.df['high'].shift()
            down_move = self.df['low'].shift() - self.df['low']
            pos_dm = pd.Series(np.where((up_move > down_move) & (up_move > 0), up_move, 0), index=self.df.index)
            neg_dm = pd.Series(np.where((down_move > up_move) & (down_move > 0), down_move, 0), index=self.df.index)
            pos_dm_14 = pos_dm.rolling(14).mean()
            neg_dm_14 = neg_dm.rolling(14).mean()
            pos_di = 100 * pos_dm_14 / atr_14
            neg_di = 100 * neg_dm_14 / atr_14
            dx = 100 * (pos_di - neg_di).abs() / (pos_di + neg_di)
            self.df['adx'] = dx.rolling(14).mean()
            self.df['adx'] = self.df['adx'].fillna(0)
        except Exception:
            self.df['adx'] = 25.0

    def add_momentum_indicators(self):
        self.df['roc_10'] = self.df['close'].pct_change(10)
        self.df['roc_20'] = self.df['close'].pct_change(20)
        self.df['momentum_score'] = (
            (self.df['rsi'] - 50) / 50 * 0.3 +
            np.sign(self.df['macd_hist']) * 0.2 +
            (self.df['price_vs_ma20']) * 0.3 +
            (self.df['obv_trend']) * 0.2
        )
        low_14 = self.df['low'].rolling(14).min()
        high_14 = self.df['high'].rolling(14).max()
        self.df['stoch_k'] = 100 * (self.df['close'] - low_14) / (high_14 - low_14)
        self.df['stoch_d'] = self.df['stoch_k'].rolling(3).mean()

    def create_realistic_exit_dataset(self, position_type='long', max_holding_days=20):
        df = self.df.copy()
        if not isinstance(df.index, pd.DatetimeIndex):
            df.index = pd.to_datetime(df.index)

        exit_points = []
        if position_type == 'long':
            profit_target, stop_loss, trailing_stop_pct = 0.08, 0.04, 0.03
        else:
            profit_target, stop_loss, trailing_stop_pct = 0.08, 0.04, 0.03

        for i in range(len(df) - max_holding_days - 1):
            entry_idx = i
            entry_data = df.iloc[entry_idx].copy()
            entry_price = entry_data['close']

            future_window = df.iloc[i + 1:i + max_holding_days + 1]
            if future_window.empty:
                continue

            exit_idx = None
            exit_reason = None
            peak_price, trough_price = entry_price, entry_price

            for j, (date, row) in enumerate(future_window.iterrows()):
                current_price = row['close']
                holding_days = j + 1

                if position_type == 'long':
                    peak_price = max(peak_price, current_price)
                    returns = (current_price - entry_price) / entry_price
                    drawdown_from_peak = (peak_price - current_price) / peak_price if peak_price > current_price else 0
                    if returns >= profit_target:
                        exit_idx, exit_reason = date, 'profit_target'; break
                    elif returns <= -stop_loss:
                        exit_idx, exit_reason = date, 'stop_loss'; break
                    elif drawdown_from_peak >= trailing_stop_pct and returns > 0.01:
                        exit_idx, exit_reason = date, 'trailing_stop'; break
                    elif holding_days >= 5 and entry_data['momentum_score'] > 0.2 and row['momentum_score'] < -0.2:
                        exit_idx, exit_reason = date, 'momentum_reversal'; break
                    elif holding_days >= 3 and row['rsi'] > 75 and j > 0 and future_window.iloc[j-1]['rsi'] > row['rsi']:
                        exit_idx, exit_reason = date, 'rsi_reversal'; break
                else:
                    trough_price = min(trough_price, current_price)
                    returns = (entry_price - current_price) / entry_price
                    drawup_from_trough = (current_price - trough_price) / trough_price if current_price > trough_price else 0
                    if returns >= profit_target:
                        exit_idx, exit_reason = date, 'profit_target'; break
                    elif returns <= -stop_loss:
                        exit_idx, exit_reason = date, 'stop_loss'; break
                    elif drawup_from_trough >= trailing_stop_pct and returns > 0.01:
                        exit_idx, exit_reason = date, 'trailing_stop'; break
                    elif holding_days >= 5 and entry_data['momentum_score'] < -0.2 and row['momentum_score'] > 0.2:
                        exit_idx, exit_reason = date, 'momentum_reversal'; break
                    elif holding_days >= 3 and row['rsi'] < 25 and j > 0 and future_window.iloc[j-1]['rsi'] < row['rsi']:
                        exit_idx, exit_reason = date, 'rsi_reversal'; break

            if exit_idx is None:
                exit_idx, exit_reason, holding_days = future_window.index[-1], 'time_exit', max_holding_days

            exit_data = df.loc[exit_idx].copy()
            entry_date = df.index[entry_idx]

            if position_type == 'long':
                returns = (exit_data['close'] - entry_price) / entry_price
                max_gain = (max(peak_price, exit_data['close']) - entry_price) / entry_price
                try:
                    min_price_during_hold = df.iloc[i + 1:df.index.get_loc(exit_idx) + 1]['close'].min()
                    max_drawdown = (peak_price - min_price_during_hold) / peak_price if peak_price > 0 else 0
                except Exception:
                    max_drawdown = 0
            else:
                returns = (entry_price - exit_data['close']) / entry_price
                max_gain = (entry_price - min(trough_price, exit_data['close'])) / entry_price
                try:
                    max_price_during_hold = df.iloc[i + 1:df.index.get_loc(exit_idx) + 1]['close'].max()
                    max_drawdown = (max_price_during_hold - trough_price) / trough_price if trough_price > 0 else 0
                except Exception:
                    max_drawdown = 0

            feature_columns = [
                'rsi', 'rsi_overbought', 'rsi_oversold', 'macd', 'macd_hist', 'macd_bullish_cross', 'macd_bearish_cross',
                'bb_width', 'bb_position', 'atr_pct', 'volatility_10d', 'volatility_21d', 'vol_ratio', 'vol_expanding',
                'volume_ratio', 'volume_spike', 'obv_trend', 'price_vs_ma20', 'price_vs_ma50', 'price_vs_ma200',
                'ma_golden_cross', 'ma_death_cross', 'daily_range', 'range_vs_atr', 'dist_to_resistance', 'dist_to_support',
                'near_resistance', 'near_support', 'trend_strength', 'market_condition', 'adx', 'momentum_score', 'roc_10',
                'roc_20', 'stoch_k', 'stoch_d'
            ]

            feature_row = {col: entry_data[col] for col in feature_columns if col in entry_data.index}
            feature_row.update({
                'holding_days': int(holding_days),
                'returns': returns,
                'max_gain': max_gain,
                'max_drawdown': max_drawdown,
                'exit_reason': exit_reason,
                'position_type': position_type,
                'entry_date': entry_date,
                'exit_date': pd.to_datetime(exit_idx),
                'success': 1 if returns > 0 else 0
            })
            exit_points.append(feature_row)

        exit_df = pd.DataFrame(exit_points)
        if exit_df.empty and self.verbose:
            print("no exit points created")
        exit_df = exit_df.dropna()
        if self.verbose:
            print(f"final dataset samples: {len(exit_df)}")
        return exit_df

    def get_position_features(self, position_type):
        features = [
            'rsi', 'rsi_overbought', 'rsi_oversold', 'macd', 'macd_hist', 'macd_bullish_cross', 'macd_bearish_cross',
            'bb_width', 'bb_position', 'atr_pct', 'volatility_10d', 'volatility_21d', 'vol_ratio', 'vol_expanding',
            'volume_ratio', 'volume_spike', 'obv_trend', 'price_vs_ma20', 'price_vs_ma50', 'price_vs_ma200',
            'ma_golden_cross', 'ma_death_cross', 'daily_range', 'range_vs_atr', 'dist_to_resistance', 'dist_to_support',
            'near_resistance', 'near_support', 'trend_strength', 'market_condition', 'adx', 'momentum_score', 'roc_10',
            'roc_20', 'stoch_k', 'stoch_d'
        ]
        missing = [c for c in features if c not in self.df.columns]
        if missing:
            raise KeyError(f"Missing features: {missing}")
        return self.df[features]

In [None]:
class ImprovedExitPredictor:
    def __init__(self, exit_df, position_type, verbose: bool = False):
        self.exit_df = exit_df.copy()
        self.position_type = position_type
        self.model = None
        self.features = None
        self.verbose = verbose

    def prepare_features(self):
        feature_columns = [
            'rsi', 'rsi_overbought', 'rsi_oversold', 'macd', 'macd_hist', 'macd_bullish_cross', 'macd_bearish_cross',
            'bb_width', 'bb_position', 'atr_pct', 'volatility_10d', 'volatility_21d', 'vol_ratio', 'vol_expanding',
            'volume_ratio', 'volume_spike', 'obv_trend', 'price_vs_ma20', 'price_vs_ma50', 'price_vs_ma200',
            'ma_golden_cross', 'ma_death_cross', 'daily_range', 'range_vs_atr', 'dist_to_resistance', 'dist_to_support',
            'near_resistance', 'near_support', 'trend_strength', 'market_condition', 'adx', 'momentum_score', 'roc_10',
            'roc_20', 'stoch_k', 'stoch_d'
        ]
        self.features = self.exit_df[feature_columns]
        self.holding_days_target = self.exit_df['holding_days']
        self.success_target = self.exit_df['success']
        self.returns_target = self.exit_df['returns']

    def train(self, n_trials=30):
        self.prepare_features()
        train_size = int(len(self.features) * 0.8)
        X_train = self.features.iloc[:train_size]
        X_test = self.features.iloc[train_size:]
        y_hold_train = self.holding_days_target.iloc[:train_size]
        y_hold_test = self.holding_days_target.iloc[train_size:]
        y_success_train = self.success_target.iloc[:train_size]
        y_success_test = self.success_target.iloc[train_size:]
        y_returns_train = self.returns_target.iloc[:train_size]
        y_returns_test = self.returns_target.iloc[train_size:]
        if self.verbose:
            print(f"Training {self.position_type} models... n={len(X_train)} success={y_success_train.mean():.1%}")
        hold_model = self.train_hold_model(X_train, y_hold_train, n_trials)
        success_model = self.train_success_model(X_train, y_success_train, n_trials)
        returns_model = self.train_returns_model(X_train, y_returns_train, n_trials)
        hold_rmse = np.sqrt(mean_squared_error(y_hold_test, hold_model.predict(X_test)))
        success_pred = (success_model.predict(X_test) > 0.5).astype(int)
        success_acc = accuracy_score(y_success_test, success_pred)
        success_f1 = f1_score(y_success_test, success_pred)
        returns_rmse = np.sqrt(mean_squared_error(y_returns_test, returns_model.predict(X_test)))
        self.model = {
            'hold_model': hold_model,
            'success_model': success_model,
            'returns_model': returns_model,
            'metrics': {
                'hold_rmse': hold_rmse,
                'success_accuracy': success_acc,
                'success_f1': success_f1,
                'returns_rmse': returns_rmse
            }
        }
        return self.model

    def train_hold_model(self, X, y, n_trials):
        def objective(trial):
            params = {
                'objective': 'regression', 'metric': 'rmse', 'verbosity': -1,
                'num_leaves': trial.suggest_int('num_leaves', 20, 100),
                'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2),
                'feature_fraction': trial.suggest_float('feature_fraction', 0.6, 1.0),
                'bagging_fraction': trial.suggest_float('bagging_fraction', 0.6, 1.0),
                'bagging_freq': trial.suggest_int('bagging_freq', 1, 7),
                'min_child_samples': trial.suggest_int('min_child_samples', 10, 100),
            }
            tscv = TimeSeriesSplit(n_splits=5)
            scores = []
            for tr, va in tscv.split(X):
                dtrain = lgb.Dataset(X.iloc[tr], label=y.iloc[tr])
                dval = lgb.Dataset(X.iloc[va], label=y.iloc[va], reference=dtrain)
                model = lgb.train(params, dtrain, valid_sets=[dval], num_boost_round=200,
                                  callbacks=[lgb.early_stopping(30, verbose=False)])
                scores.append(np.sqrt(mean_squared_error(y.iloc[va], model.predict(X.iloc[va]))))
            return np.mean(scores)
        study = optuna.create_study(direction='minimize'); study.optimize(objective, n_trials=n_trials)
        best_params = study.best_params; best_params.update({'objective': 'regression', 'metric': 'rmse', 'verbosity': -1})
        return lgb.train(best_params, lgb.Dataset(X, label=y), num_boost_round=200)

    def train_success_model(self, X, y, n_trials):
        def objective(trial):
            params = {
                'objective': 'binary', 'metric': 'binary_logloss', 'verbosity': -1,
                'num_leaves': trial.suggest_int('num_leaves', 20, 100),
                'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2),
                'feature_fraction': trial.suggest_float('feature_fraction', 0.6, 1.0),
                'bagging_fraction': trial.suggest_float('bagging_fraction', 0.6, 1.0),
                'bagging_freq': trial.suggest_int('bagging_freq', 1, 7),
                'min_child_samples': trial.suggest_int('min_child_samples', 10, 100),
            }
            tscv = TimeSeriesSplit(n_splits=5)
            scores = []
            for tr, va in tscv.split(X):
                dtrain = lgb.Dataset(X.iloc[tr], label=y.iloc[tr])
                dval = lgb.Dataset(X.iloc[va], label=y.iloc[va], reference=dtrain)
                model = lgb.train(params, dtrain, valid_sets=[dval], num_boost_round=200,
                                  callbacks=[lgb.early_stopping(30, verbose=False)])
                scores.append(f1_score(y.iloc[va], (model.predict(X.iloc[va]) > 0.5).astype(int)))
            return np.mean(scores)
        study = optuna.create_study(direction='maximize'); study.optimize(objective, n_trials=n_trials)
        best_params = study.best_params; best_params.update({'objective': 'binary', 'metric': 'binary_logloss', 'verbosity': -1})
        return lgb.train(best_params, lgb.Dataset(X, label=y), num_boost_round=200)

    def train_returns_model(self, X, y, n_trials):
        return self.train_hold_model(X, y, n_trials)

    def predict(self, features, confidence_threshold=0.6):
        if self.model is None:
            raise ValueError("Model not trained")
        hold_days = max(1, round(self.model['hold_model'].predict(features)[0]))
        success_prob = self.model['success_model'].predict(features)[0]
        predicted_returns = self.model['returns_model'].predict(features)[0]
        risk_score = self._calculate_risk_score(features.iloc[0], success_prob, predicted_returns)
        confidence = self._calculate_confidence(success_prob, predicted_returns, risk_score)
        if success_prob < 0.4:
            recommendation = "AVOID"
        elif success_prob < confidence_threshold:
            recommendation = "RISKY"
        elif confidence > 0.7:
            recommendation = "STRONG"
        else:
            recommendation = "MODERATE"
        return {
            'holding_days': hold_days,
            'success_probability': success_prob,
            'predicted_returns': predicted_returns,
            'risk_score': risk_score,
            'confidence': confidence,
            'recommendation': recommendation,
            'position_type': self.position_type,
            'ticker': None
        }

    def _calculate_risk_score(self, features, success_prob, predicted_returns):
        risk_factors = []
        risk_factors.append(min(features['volatility_10d'] * 10, 1.0) * 0.25)
        risk_factors.append((0.5 if (features['rsi'] > 75 or features['rsi'] < 25) else 0) * 0.15)
        risk_factors.append((1 - success_prob) * 0.30)
        risk_factors.append((0.7 if abs(predicted_returns) > 0.10 else 0) * 0.15)
        risk_factors.append((0.6 if features['market_condition'] == 0 else 0.3) * 0.15)
        return float(np.clip(sum(risk_factors), 0, 1))

    def _calculate_confidence(self, success_prob, predicted_returns, risk_score):
        base_confidence = success_prob
        risk_adjusted = base_confidence * (1 - risk_score * 0.5)
        if abs(predicted_returns) > 0.15:
            magnitude_penalty = 0.3
        elif abs(predicted_returns) > 0.08:
            magnitude_penalty = 0.15
        else:
            magnitude_penalty = 0
        final_confidence = risk_adjusted * (1 - magnitude_penalty)
        return float(np.clip(final_confidence, 0, 1))



In [None]:
class ExitStrategySystem:
    def __init__(self, api_key, user_id, password, totp_key, verbose: bool = False):
        self.api_key = api_key
        self.user_id = user_id
        self.password = password
        self.totp_key = totp_key
        self.client = SmartConnect(api_key=self.api_key)
        self.session_data = None
        self.instrument_list = None
        self.trained_models = {}
        self.verbose = verbose

    def login(self):
        totp = TOTP(self.totp_key).now()
        self.session_data = self.client.generateSession(self.user_id, self.password, totp)
        if self.verbose:
            print("login ok")

    def fetch_instruments(self, url="https://margincalculator.angelbroking.com/OpenAPI_File/files/OpenAPIScripMaster.json"):
        response = urllib.request.urlopen(url)
        self.instrument_list = json.loads(response.read())
        if self.verbose:
            print(f"instruments: {len(self.instrument_list)}")

    def token_lookup(self, ticker, exchange=DEFAULT_EXCHANGE):
        for instrument in self.instrument_list:
            if instrument["name"] == ticker and instrument["exch_seg"] == exchange and instrument["symbol"].split('-')[-1] == "EQ":
                return instrument["token"]
        return None

    def symbol_lookup(self, token, exchange=DEFAULT_EXCHANGE):
        for instrument in self.instrument_list:
            if instrument["token"] == token and instrument["exch_seg"] == exchange and instrument["symbol"].split('-')[-1] == "EQ":
                return instrument["name"]
        return None

    def get_candle_data(self, symbol, from_date, to_date, interval="ONE_DAY", exchange=DEFAULT_EXCHANGE):
        token = self.token_lookup(symbol, exchange)
        if not token:
            if self.verbose:
                print(f"token not found for {symbol}")
            return None
        params = {
            "exchange": exchange,
            "symboltoken": str(token),
            "interval": interval,
            "fromdate": from_date,
            "todate": to_date
        }
        data = self.client.getCandleData(params)
        if "data" in data and data["data"]:
            df = pd.DataFrame(data["data"], columns=["datetime", "open", "high", "low", "close", "volume"])
            df["datetime"] = pd.to_datetime(df["datetime"])
            df['date'] = df['datetime'].dt.date
            df.drop(columns='datetime', inplace=True)
            df.set_index('date', inplace=True)
            return df
        else:
            if self.verbose:
                print(f"no data for {symbol}")
            return None

    def predict_exit_signals(self, ticker, current_date, horizon_days=5, n_trials=20, lookback_years=5):
        try:
            if ticker not in self.trained_models:
                long_model, short_model = self.train_exit_models(
                    ticker, current_date - timedelta(days=1), lookback_years=lookback_years, n_trials=n_trials
                )
                if long_model is None or short_model is None:
                    return None, None
                self.trained_models[ticker] = {'long': long_model, 'short': short_model}
            long_model = self.trained_models[ticker]['long']
            short_model = self.trained_models[ticker]['short']
            long_features, short_features = self.get_prediction_features(ticker, current_date)
            if long_features is None or short_features is None:
                return None, None
            long_exit_signal = long_model.predict(long_features)
            short_exit_signal = short_model.predict(short_features)
            long_exit_signal['ticker'] = ticker
            short_exit_signal['ticker'] = ticker
            return long_exit_signal, short_exit_signal
        except Exception as e:
            logging.error(f"prediction error {ticker}: {e}")
            return None, None

    def train_exit_models(self, ticker, prediction_date, lookback_years=5, n_trials=20):
        long_exit_df, short_exit_df = self.prepare_exit_datasets(ticker, prediction_date, lookback_years)
        if long_exit_df is None or short_exit_df is None or long_exit_df.empty or short_exit_df.empty:
            return None, None
        long_exit_model = ImprovedExitPredictor(long_exit_df, position_type='long', verbose=False)
        long_exit_model.train(n_trials=n_trials)
        short_exit_model = ImprovedExitPredictor(short_exit_df, position_type='short', verbose=False)
        short_exit_model.train(n_trials=n_trials)
        return long_exit_model, short_exit_model

    def prepare_exit_datasets(self, ticker, end_date, lookback_years=5):
        start_date = (end_date - relativedelta(years=lookback_years)).strftime("%Y-%m-%d %H:%M")
        end_date_str = end_date.strftime("%Y-%m-%d %H:%M")
        df = self.get_candle_data(ticker, start_date, end_date_str)
        if df is None or df.empty:
            return None, None
        exit_analyzer = ImprovedExitDataset(df, verbose=False)
        exit_analyzer.process_data()
        if exit_analyzer.df.empty:
            return None, None
        long_exit_df = exit_analyzer.create_realistic_exit_dataset(position_type='long')
        short_exit_df = exit_analyzer.create_realistic_exit_dataset(position_type='short')
        if long_exit_df.empty or short_exit_df.empty:
            return None, None
        return long_exit_df, short_exit_df

    def get_prediction_features(self, ticker, current_date):
        start_date = (current_date - timedelta(days=400)).strftime("%Y-%m-%d %H:%M")
        end_date = current_date.strftime("%Y-%m-%d %H:%M")
        df = self.get_candle_data(ticker, start_date, end_date)
        if df is None or df.empty:
            return None, None
        exit_analyzer = ImprovedExitDataset(df, verbose=False)
        exit_analyzer.process_data()
        if exit_analyzer.df.empty:
            return None, None
        long_features = exit_analyzer.get_position_features('long')
        short_features = exit_analyzer.get_position_features('short')
        if long_features.empty or short_features.empty:
            return None, None
        return long_features.iloc[[-1]], short_features.iloc[[-1]]

In [None]:
class StopLossCalculator:
    def __init__(
        self,
        exit_system,
        atr_period: int = 14,
        lookback_days: int = 60,
        atr_mult_strong: float = 1.8,
        atr_mult_moderate: float = 1.2,
        min_sl_pct: float = 0.01,
        max_sl_pct: float = 0.06,
        tick_size: float = 0.05
    ):
        self.exit_system = exit_system
        self.atr_period = atr_period
        self.lookback_days = lookback_days
        self.atr_mult_strong = atr_mult_strong
        self.atr_mult_moderate = atr_mult_moderate
        self.min_sl_pct = min_sl_pct
        self.max_sl_pct = max_sl_pct
        self.tick_size = tick_size

    def _compute_atr(self, df: pd.DataFrame) -> float:
        high_low = df['high'] - df['low']
        high_close = (df['high'] - df['close'].shift()).abs()
        low_close = (df['low'] - df['close'].shift()).abs()
        tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
        atr = tr.rolling(self.atr_period).mean()
        return float(atr.iloc[-1])

    def _round_price(self, price: float) -> float:
        if self.tick_size <= 0:
            return float(price)
        return float(round(price / self.tick_size) * self.tick_size)

    def _get_price_and_atr(self, ticker: str, prediction_date: datetime):
        from_date = (prediction_date - timedelta(days=self.lookback_days)).strftime("%Y-%m-%d %H:%M")
        to_date = prediction_date.strftime("%Y-%m-%d %H:%M")

        df = self.exit_system.get_candle_data(
            symbol=ticker,
            from_date=from_date,
            to_date=to_date,
            interval="ONE_DAY"
        )
        if df is None or df.empty:
            return None, None

        df = df.sort_index()
        if len(df) < self.atr_period + 2:
            return None, None

        last_close = float(df['close'].iloc[-1])
        atr_value = self._compute_atr(df)
        if not np.isfinite(atr_value) or atr_value <= 0:
            return None, None

        return last_close, atr_value

    def _compute_sl_for_side(self, side: str, rec_label: str, price: float, atr_value: float):
        if rec_label == 'STRONG':
            atr_mult = self.atr_mult_strong
        else:
            atr_mult = self.atr_mult_moderate

        sl_distance = atr_mult * atr_value

        if side == 'long':
            sl_price = price - sl_distance
            sl_pct = sl_distance / price
        else:
            sl_price = price + sl_distance
            sl_pct = sl_distance / price

        sl_pct = float(np.clip(sl_pct, self.min_sl_pct, self.max_sl_pct))

        if side == 'long':
            sl_price = price * (1 - sl_pct)
        else:
            sl_price = price * (1 + sl_pct)

        sl_price = self._round_price(sl_price)
        return sl_pct, sl_price

    def add_stop_losses(self, predictions_df: pd.DataFrame, prediction_date: datetime) -> pd.DataFrame:
        df = predictions_df.copy()
        df['long_sl_pct'] = np.nan
        df['long_sl_price'] = np.nan
        df['short_sl_pct'] = np.nan
        df['short_sl_price'] = np.nan

        atr_cache = {}

        for idx, row in df.iterrows():
            ticker = row['ticker']

            if ticker not in atr_cache:
                try:
                    last_close, atr_value = self._get_price_and_atr(ticker, prediction_date)
                except Exception as e:
                    logging.error(f"SL calc: failed to get ATR for {ticker}: {e}")
                    atr_cache[ticker] = (None, None)
                    continue
                atr_cache[ticker] = (last_close, atr_value)
            else:
                last_close, atr_value = atr_cache[ticker]

            if last_close is None or atr_value is None:
                continue

            long_rec = row['long_recommendation']
            if long_rec in ('STRONG', 'MODERATE'):
                sl_pct, sl_price = self._compute_sl_for_side(
                    side='long',
                    rec_label=long_rec,
                    price=last_close,
                    atr_value=atr_value
                )
                df.at[idx, 'long_sl_pct'] = sl_pct
                df.at[idx, 'long_sl_price'] = sl_price

            short_rec = row['short_recommendation']
            if short_rec in ('STRONG', 'MODERATE'):
                sl_pct, sl_price = self._compute_sl_for_side(
                    side='short',
                    rec_label=short_rec,
                    price=last_close,
                    atr_value=atr_value
                )
                df.at[idx, 'short_sl_pct'] = sl_pct
                df.at[idx, 'short_sl_price'] = sl_price

        return df

In [None]:
class SimpleSLBacktester:
    def __init__(
        self,
        exit_system,
        max_holding_days=20,
        atr_period=14,
        atr_mult=1.5
    ):
        self.exit_system = exit_system
        self.max_holding_days = max_holding_days
        self.atr_period = atr_period
        self.atr_mult = atr_mult

    def _compute_atr_series(self, df: pd.DataFrame) -> pd.Series:
        high_low = df['high'] - df['low']
        high_close = (df['high'] - df['close'].shift()).abs()
        low_close = (df['low'] - df['close'].shift()).abs()
        tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
        atr = tr.rolling(self.atr_period).mean()
        return atr

    def backtest_single_ticker(self, ticker: str, start_date: datetime, end_date: datetime, position_type: str = 'long'):
        from_str = start_date.strftime("%Y-%m-%d %H:%M")
        to_str = end_date.strftime("%Y-%m-%d %H:%M")

        df = self.exit_system.get_candle_data(
            symbol=ticker,
            from_date=from_str,
            to_date=to_str,
            interval="ONE_DAY"
        )

        if df is None or df.empty:
            print(f"No data for backtest: {ticker}")
            return None

        df = df.copy()
        df.index = pd.to_datetime(df.index)
        df = df.sort_index()

        atr_series = self._compute_atr_series(df)

        dataset_builder = ImprovedExitDataset(df, verbose=False)
        dataset_builder.process_data()
        exit_df = dataset_builder.create_realistic_exit_dataset(
            position_type=position_type,
            max_holding_days=self.max_holding_days
        )

        if exit_df.empty:
            print(f"No trades generated for backtest: {ticker}")
            return None

        atr_at_entry = []
        entry_prices = []
        for _, trade in exit_df.iterrows():
            entry_date = trade['entry_date']
            entry_date = pd.to_datetime(entry_date)
            if entry_date in df.index:
                atr_at_entry.append(float(atr_series.loc[entry_date]))
                entry_prices.append(float(df.loc[entry_date, 'close']))
            else:
                atr_at_entry.append(np.nan)
                entry_prices.append(np.nan)

        exit_df['atr_entry'] = atr_at_entry
        exit_df['entry_price'] = entry_prices

        exit_df = exit_df.dropna(subset=['atr_entry', 'entry_price'])
        if exit_df.empty:
            print(f"No trades with valid ATR/entry price for backtest: {ticker}")
            return None

        baseline_returns = []
        sl_returns = []

        for _, trade in exit_df.iterrows():
            entry_date = pd.to_datetime(trade['entry_date'])
            exit_date = pd.to_datetime(trade['exit_date'])
            entry_price = trade['entry_price']

            if exit_date not in df.index or entry_date not in df.index:
                continue

            natural_exit_price = float(df.loc[exit_date, 'close'])

            if position_type == 'long':
                r_base = (natural_exit_price - entry_price) / entry_price
            else:
                r_base = (entry_price - natural_exit_price) / entry_price

            baseline_returns.append(r_base)

            sl_distance = self.atr_mult * float(trade['atr_entry'])

            if position_type == 'long':
                sl_price = entry_price - sl_distance
            else:
                sl_price = entry_price + sl_distance

            trade_window = df.loc[entry_date:exit_date].iloc[1:]
            hit_sl = False
            sl_exit_price = None

            for date_i, row_i in trade_window.iterrows():
                low_i = row_i['low']
                high_i = row_i['high']

                if position_type == 'long':
                    if low_i <= sl_price:
                        hit_sl = True
                        sl_exit_price = sl_price
                        break
                else:
                    if high_i >= sl_price:
                        hit_sl = True
                        sl_exit_price = sl_price
                        break

            if hit_sl:
                if position_type == 'long':
                    r_sl = (sl_exit_price - entry_price) / entry_price
                else:
                    r_sl = (entry_price - sl_exit_price) / entry_price
            else:
                r_sl = r_base

            sl_returns.append(r_sl)

        if not baseline_returns:
            print(f"No testable trades for {ticker}")
            return None

        baseline_returns = np.array(baseline_returns)
        sl_returns = np.array(sl_returns)

        def compute_stats(returns: np.ndarray):
            equity = (1 + returns).cumprod()
            peak = np.maximum.accumulate(equity)
            dd = (equity - peak) / peak
            max_dd = dd.min() if len(dd) > 0 else 0.0

            return {
                'num_trades': int(len(returns)),
                'avg_return': float(returns.mean()),
                'win_rate': float((returns > 0).mean()),
                'max_drawdown': float(max_dd)
            }

        baseline_stats = compute_stats(baseline_returns)
        sl_stats = compute_stats(sl_returns)

        results = {
            'ticker': ticker,
            'position_type': position_type,
            'baseline': baseline_stats,
            'with_sl': sl_stats
        }

        return results


In [None]:
def display_compact(ticker, long_exit, short_exit, prediction_date):
    print(
        f"{ticker:11s} | {prediction_date:%Y-%m-%d} | "
        f"L:{long_exit['recommendation']:<8s} "
        f"(H{long_exit['holding_days']:>2}, R{long_exit['predicted_returns']*100:>6.2f}%, "
        f"P{long_exit['success_probability']*100:>5.1f}%) | "
        f"S:{short_exit['recommendation']:<8s} "
        f"(H{short_exit['holding_days']:>2}, R{short_exit['predicted_returns']*100:>6.2f}%, "
        f"P{short_exit['success_probability']*100:>5.1f}%)"
    )

if __name__ == "__main__":
    ticker_list = [
        'ABB', 'ADANIENT', 'ADANIPORTS', 'ADANIPOWER', 'AMBUJACEM', 'APOLLOHOSP',
        'ASIANPAINT', 'AXISBANK', 'BAJAJ-AUTO', 'BAJFINANCE', 'BAJAJFINSV',
        'BAJAJHLDNG', 'BAJAJHFL', 'BANKBARODA', 'BEL', 'BPCL', 'BHARTIARTL',
        'BOSCHLTD', 'BRITANNIA', 'CGPOWER', 'CANBK', 'CHOLAFIN', 'CIPLA',
        'COALINDIA', 'DLF', 'DABUR', 'DIVISLAB', 'DRREDDY', 'EICHERMOT',
        'ETERNAL', 'GAIL', 'GODREJCP', 'GRASIM', 'HCLTECH', 'HDFCBANK',
        'HDFCLIFE', 'HAVELLS', 'HEROMOTOCO', 'HINDALCO', 'HAL', 'HINDUNILVR',
        'HYUNDAI', 'ICICIBANK', 'ICICIGI', 'ICICIPRULI', 'ITC', 'INDHOTEL',
        'IOC', 'IRFC', 'INDUSINDBK', 'NAUKRI', 'INFY', 'INDIGO', 'JSWENERGY',
        'JSWSTEEL', 'JINDALSTEL', 'KOTAKBANK', 'LTIM', 'LT', 'LICI', 'M&M',
        'MARUTI', 'NTPC', 'NESTLEIND', 'ONGC', 'PIDILITIND', 'PFC', 'POWERGRID',
        'PNB', 'RECLTD', 'RELIANCE', 'SBILIFE', 'MOTHERSON', 'SHREECEM',
        'SHRIRAMFIN', 'SIEMENS', 'SBIN', 'SUNPHARMA', 'TVSMOTOR', 'TCS',
        'TATACONSUM', 'TATAMOTORS', 'TATAPOWER', 'TATASTEEL', 'TECHM', 'TITAN',
        'TORNTPHARM', 'TRENT', 'ULTRACEMCO', 'UNITDSPR', 'VBL', 'VEDL', 'WIPRO',
        'ZYDUSLIFE'
    ]

    api_key = ""
    user_id = ""
    password = ""
    totp_secret = ""

    prediction_date = datetime(2025, 7, 15)
    n_trials = 20
    lookback_years = 5
    horizon_days = 5

    Exit = None
    max_retries = 3
    backoff_seconds = 5

    for attempt in range(max_retries):
        try:
            Exit = ExitStrategySystem(api_key, user_id, password, totp_secret, verbose=False)
            Exit.login()
            Exit.fetch_instruments()
            break
        except Exception as e:
            logging.error(f"login failed attempt {attempt+1}: {e}")
            if attempt < max_retries - 1:
                time.sleep(backoff_seconds)
            else:
                raise SystemExit(1)

    print(f"STARTING BATCH PREDICTION FOR {len(ticker_list)} STOCKS | Prediction Date: {prediction_date:%Y-%m-%d}\n")

    all_predictions = []
    start_time = time.time()

    for idx, ticker in enumerate(ticker_list, 1):
        try:
            long_exit, short_exit = Exit.predict_exit_signals(
                ticker=ticker,
                current_date=prediction_date,
                horizon_days=horizon_days,
                n_trials=n_trials,
                lookback_years=lookback_years
            )
            if long_exit is None or short_exit is None:
                continue
            display_compact(ticker, long_exit, short_exit, prediction_date)
            all_predictions.append({
                'ticker': ticker,
                'prediction_date': prediction_date.strftime('%Y-%m-%d'),
                'long_holding_days': long_exit['holding_days'],
                'long_returns': long_exit['predicted_returns'],
                'long_success_prob': long_exit['success_probability'],
                'long_risk_score': long_exit['risk_score'],
                'long_confidence': long_exit['confidence'],
                'long_recommendation': long_exit['recommendation'],
                'short_holding_days': short_exit['holding_days'],
                'short_returns': short_exit['predicted_returns'],
                'short_success_prob': short_exit['success_probability'],
                'short_risk_score': short_exit['risk_score'],
                'short_confidence': short_exit['confidence'],
                'short_recommendation': short_exit['recommendation'],
            })
        except Exception as e:
            logging.error(f"prediction failed for {ticker}: {e}")
            continue

    elapsed_time = time.time() - start_time

    if not all_predictions:
        print("\nNo predictions were successful. Check logs.")
        raise SystemExit(1)

    predictions_df = pd.DataFrame(all_predictions)

    print("\n=== SUMMARY ===")
    cols = ['ticker','long_recommendation','short_recommendation',
            'long_holding_days','long_returns','long_success_prob',
            'short_holding_days','short_returns','short_success_prob']
    print(predictions_df[cols].to_string(index=False))

    output_file = f'exit_predictions_{prediction_date:%Y%m%d}.csv'
    predictions_df.to_csv(output_file, index=False)
    print(f"\nSaved: {output_file}")

    # STOP LOSS CALCULATION FOR STRONG / MODERATE SIGNALS
    sl_calc = StopLossCalculator(
        exit_system=Exit,
        atr_period=14,
        lookback_days=60,
        atr_mult_strong=1.8,
        atr_mult_moderate=1.2,
        min_sl_pct=0.01,
        max_sl_pct=0.06,
        tick_size=0.05
    )

    predictions_with_sl = sl_calc.add_stop_losses(predictions_df, prediction_date)

    mask = (~predictions_with_sl['long_sl_price'].isna()) | (~predictions_with_sl['short_sl_price'].isna())
    sl_view = predictions_with_sl[mask][[
        'ticker',
        'long_recommendation', 'long_holding_days', 'long_returns', 'long_success_prob',
        'long_sl_pct', 'long_sl_price',
        'short_recommendation', 'short_holding_days', 'short_returns', 'short_success_prob',
        'short_sl_pct', 'short_sl_price'
    ]]

    print("\n=== STOP LOSS RECOMMENDATIONS (ONLY STRONG / MODERATE) ===")
    if not sl_view.empty:
        print(sl_view.to_string(index=False))
    else:
        print("No STRONG/MODERATE signals with valid SL computed.")

    output_file_sl = f'exit_predictions_with_sl_{prediction_date:%Y%m%d}.csv'
    predictions_with_sl.to_csv(output_file_sl, index=False)
    print(f"\nSaved with stop-loss levels: {output_file_sl}")


    # SIMPLE BACKTEST
    RUN_BACKTEST = True

    if RUN_BACKTEST:
        backtester = SimpleSLBacktester(
            exit_system=Exit,
            max_holding_days=20,
            atr_period=14,
            atr_mult=1.5
        )

        bt_result = backtester.backtest_single_ticker(
            ticker='ABB',
            start_date=datetime(2020, 7, 15),
            end_date=prediction_date,
            position_type='long'
        )

        if bt_result is not None:
            print("\n=== SIMPLE SL BACKTEST RESULT (ABB, LONG) ===")
            print("Baseline:", bt_result['baseline'])
            print("With SL :", bt_result['with_sl'])
        else:
            print("\nBacktest did not produce results for ABB.")
