<a href="https://colab.research.google.com/github/bhaskar-sinha/Business-Analytics-Datasets/blob/main/val_trading.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import yfinance as yf
import ta
from datetime import datetime, timedelta
import pytz
import logging
import time
from collections import deque, defaultdict
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from statsmodels.tsa.statespace.kalman_filter import KalmanFilter
from keras.models import Sequential
from keras.layers import LSTM, Dense
import tensorflow as tf
import requests
from bs4 import BeautifulSoup
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# =====================
# Core Configuration
# =====================
CONFIG = {
    'SYMBOL': "^NSEI",
    'LIVE_TRADING': False,
    'DATA_FEED': {
        'SOURCE': 'yfinance',
        'RESOLUTION': '1m',
        'HISTORICAL_DAYS': 30
    },
    'RISK_PARAMS': {
        'MAX_POSITION_SIZE': 1000000,  # 10 lakh INR
        'DAILY_LOSS_LIMIT': 0.02,  # 2%
        'POSITION_VOLATILITY_TARGET': 0.15
    },
    'MODELS': {
        'LSTM': {
            'LOOKBACK_WINDOW': 60,
            'EPOCHS': 50,
            'BATCH_SIZE': 32
        },
        'SENTIMENT': {
            'NEWS_SOURCES': ['reuters', 'economic_times'],
            'UPDATE_FREQUENCY': 300  # 5 minutes
        }
    }
}

# =====================
# Market Data Engine
# =====================
class MarketDataEngine:
    def __init__(self):
        self.historical_data = None
        self.live_data = deque(maxlen=1000)
        self.volatility = 0
        self.last_update = None
        self.sentiment_score = 0
        self.ist = pytz.timezone('Asia/Kolkata')
        self._initialize()

    def _initialize(self):
        """Load historical data and initialize models"""
        logger.info("Initializing market data engine")
        self._load_historical_data()
        self._initialize_models()
        self.last_sentiment_update = datetime.now(self.ist)

    def _load_historical_data(self):
        """Load historical price and volume data"""
        try:
            ticker = yf.Ticker(CONFIG['SYMBOL'])
            self.historical_data = ticker.history(
                period=f"{CONFIG['DATA_FEED']['HISTORICAL_DAYS']}d",
                interval=CONFIG['DATA_FEED']['RESOLUTION']
            )
            self._calculate_volatility()
            logger.info(f"Loaded {len(self.historical_data)} data points")
        except Exception as e:
            logger.error(f"Historical data load failed: {str(e)}")
            raise

    def _calculate_volatility(self):
        """Calculate historical volatility"""
        returns = np.log(self.historical_data['Close']).diff().dropna()
        self.volatility = returns.std() * np.sqrt(252)  # Annualized
        logger.info(f"Historical volatility: {self.volatility:.4f}")

    def update_live_data(self, bid, ask, volume):
        """Process new tick data"""
        timestamp = datetime.now(self.ist)
        mid_price = (bid + ask) / 2
        spread = ask - bid

        tick_data = {
            'timestamp': timestamp,
            'bid': bid,
            'ask': ask,
            'mid': mid_price,
            'volume': volume,
            'spread': spread
        }

        self.live_data.append(tick_data)
        self.last_update = timestamp

        # Update volatility estimate
        if len(self.live_data) > 10:
            returns = np.log([x['mid'] for x in self.live_data])
            self.volatility = np.std(returns) * np.sqrt(252)

        # Update sentiment periodically
        if (timestamp - self.last_sentiment_update).total_seconds() > CONFIG['MODELS']['SENTIMENT']['UPDATE_FREQUENCY']:
            self._update_sentiment()
            self.last_sentiment_update = timestamp

    def _update_sentiment(self):
        """Fetch and analyze news sentiment"""
        analyzer = SentimentIntensityAnalyzer()
        scores = []

        for source in CONFIG['MODELS']['SENTIMENT']['NEWS_SOURCES']:
            try:
                news_text = self._fetch_news(source)
                if news_text:
                    vs = analyzer.polarity_scores(news_text)
                    scores.append(vs['compound'])
            except Exception as e:
                logger.warning(f"Sentiment analysis failed for {source}: {str(e)}")

        if scores:
            self.sentiment_score = np.mean(scores)
            logger.info(f"Updated sentiment score: {self.sentiment_score:.2f}")

    def _fetch_news(self, source):
        """Fetch latest news headlines"""
        if source == 'reuters':
            url = "https://www.reuters.com/markets/asia"
        elif source == 'economic_times':
            url = "https://economictimes.indiatimes.com/markets/stocks/news"
        else:
            return ""

        try:
            response = requests.get(url, timeout=5)
            soup = BeautifulSoup(response.content, 'html.parser')
            headlines = [h.text for h in soup.find_all('h3')[:5]]
            return " ".join(headlines)
        except Exception as e:
            logger.warning(f"News fetch failed from {source}: {str(e)}")
            return ""

# =====================
# Advanced Alpha Models
# =====================
class AlphaModels:
    def __init__(self, data_engine):
        self.data = data_engine
        self.models = {}
        self.scalers = {}
        self._initialize()

    def _initialize(self):
        """Initialize all alpha models"""
        logger.info("Initializing alpha models")

        # Technical model
        self.models['technical'] = TechnicalModel()

        # Machine learning models
        self.models['random_forest'] = RandomForestClassifier(
            n_estimators=100,
            max_depth=5,
            random_state=42
        )
        self.models['gradient_boosting'] = GradientBoostingClassifier(
            n_estimators=50,
            learning_rate=0.1,
            max_depth=3,
            random_state=42
        )
        self.models['svm'] = SVC(
            kernel='rbf',
            probability=True,
            random_state=42
        )

        # LSTM Model
        self.models['lstm'] = self._build_lstm_model()

        # Statistical models
        self.models['kalman'] = KalmanFilter(
            dim_z=1,
            dim_x=2,
            dim_u=0
        )

        # Sentiment model
        self.models['sentiment'] = SentimentModel()

        # Initialize scalers
        self.scalers['price'] = StandardScaler()
        self.scalers['volume'] = StandardScaler()

    def _build_lstm_model(self):
        """Build and compile LSTM model"""
        model = Sequential([
            LSTM(50, return_sequences=True,
                 input_shape=(CONFIG['MODELS']['LSTM']['LOOKBACK_WINDOW'], 5)),
            LSTM(50),
            Dense(1, activation='sigmoid')
        ])

        model.compile(
            optimizer='adam',
            loss='binary_crossentropy',
            metrics=['accuracy']
        )

        return model

    def generate_signals(self):
        """Generate trading signals from all models"""
        signals = {}

        # Technical signals
        signals['technical'] = self.models['technical'].generate(
            list(self.data.live_data)
        )

        # Prepare ML features
        X = self._prepare_features()

        if len(X) >= CONFIG['MODELS']['LSTM']['LOOKBACK_WINDOW']:
            # ML model predictions
            for name in ['random_forest', 'gradient_boosting', 'svm']:
                signals[name] = self.models[name].predict_proba(X)[:, 1]

            # LSTM prediction
            lstm_input = self._prepare_lstm_input(X)
            signals['lstm'] = self.models['lstm'].predict(lstm_input)[0][0]

        # Statistical signals
        signals['kalman'] = self.models['kalman'].filter()[0]

        # Sentiment signals
        signals['sentiment'] = self.models['sentiment'].generate(
            self.data.sentiment_score
        )

        return signals

    def _prepare_features(self):
        """Prepare features for ML models"""
        prices = np.array([x['mid'] for x in self.data.live_data])
        volumes = np.array([x['volume'] for x in self.data.live_data])

        # Create technical indicators
        df = pd.DataFrame({
            'price': prices,
            'volume': volumes
        })

        # Add technical indicators
        df['sma_10'] = ta.trend.sma_indicator(df['price'], window=10)
        df['ema_10'] = ta.trend.ema_indicator(df['price'], window=10)
        df['rsi'] = ta.momentum.rsi(df['price'], window=14)
        df['macd'] = ta.trend.macd(df['price'])
        df['bb_upper'] = ta.volatility.bollinger_hband(df['price'])
        df.dropna(inplace=True)

        # Scale features
        X_price = self.scalers['price'].fit_transform(df[['price', 'sma_10', 'ema_10']])
        X_vol = self.scalers['volume'].fit_transform(df[['volume']].values.reshape(-1, 1))

        return np.concatenate([X_price, X_vol, df[['rsi', 'macd', 'bb_upper']].values], axis=1)

    def _prepare_lstm_input(self, X):
        """Prepare input data for LSTM model"""
        lookback = CONFIG['MODELS']['LSTM']['LOOKBACK_WINDOW']

        if len(X) < lookback:
            return None

        X_lstm = []
        for i in range(len(X) - lookback):
            X_lstm.append(X[i:i + lookback])

        return np.array(X_lstm)

# =====================
# Trading Engine Core
# =====================
class TradingEngine:
    def __init__(self):
        self.data_engine = MarketDataEngine()
        self.alpha_models = AlphaModels(self.data_engine)
        self.position = 0
        self.pnl = 0
        self.daily_pnl = []
        self.order_history = []

    def run(self):
        """Main trading loop"""
        logger.info("Starting trading engine")

        try:
            while True:
                if not self._market_open():
                    self._sleep_until_open()
                    continue

                # Simulate market data update
                if CONFIG['DATA_FEED']['SOURCE'] == 'yfinance':
                    tick_data = self._fetch_yfinance_tick()
                else:
                    tick_data = self._simulate_tick()

                self.data_engine.update_live_data(
                    tick_data['bid'],
                    tick_data['ask'],
                    tick_data['volume']
                )

                # Generate alpha signals
                signals = self.alpha_models.generate_signals()

                # Execute trading logic
                self._execute_strategy(signals)

                # Sleep based on data frequency
                time.sleep(60 if CONFIG['DATA_FEED']['RESOLUTION'] == '1m' else 1)

        except KeyboardInterrupt:
            logger.info("Shutting down trading engine")

    def _execute_strategy(self, signals):
        """Execute trading based on model signals"""
        # Combine signals (example: simple weighted average)
        combined_signal = 0
        weights = {
            'technical': 0.3,
            'random_forest': 0.2,
            'gradient_boosting': 0.2,
            'lstm': 0.15,
            'sentiment': 0.15
        }

        for model, weight in weights.items():
            if model in signals:
                combined_signal += signals[model] * weight

        # Determine position sizing based on volatility
        pos_size = self._calculate_position_size(combined_signal)

        # Generate orders
        if combined_signal > 0.6:  # Strong buy signal
            self._send_order('BUY', pos_size)
        elif combined_signal < 0.4:  # Strong sell signal
            self._send_order('SELL', pos_size)
        elif combined_signal < 0.6 and combined_signal > 0.4:
            logger.info("Neutral signal - holding position")

    def _calculate_position_size(self, signal_strength):
        """Calculate position size based on volatility and risk limits"""
        max_size = CONFIG['RISK_PARAMS']['MAX_POSITION_SIZE']
        vol_target = CONFIG['RISK_PARAMS']['POSITION_VOLATILITY_TARGET']

        # Dynamic sizing based on volatility and signal strength
        size = min(
            max_size * signal_strength,
            max_size * (vol_target / (self.data_engine.volatility + 0.01))
        )

        return max(1, int(size))  # At least 1 unit

    def _send_order(self, side, size):
        """Simulate order execution"""
        last_tick = self.data_engine.live_data[-1]
        price = last_tick['ask'] if side == 'BUY' else last_tick['bid']

        # Record order
        self.order_history.append({
            'timestamp': datetime.now(self.data_engine.ist),
            'side': side,
            'size': size,
            'price': price
        })

        # Update position and PnL
        prev_pos = self.position
        self.position += size if side == 'BUY' else -size

        # Mark-to-market PnL
        current_value = self.position * last_tick['mid']
        self.pnl = current_value - sum(o['price'] * o['size'] for o in self.order_history)
        self.daily_pnl.append(self.pnl)

        logger.info(
            f"Executed {side} order: {size} @ {price:.2f} | "
            f"Position: {self.position} | PnL: {self.pnl:.2f}"
        )

    def _market_open(self):
        """Check if market is currently open"""
        now = datetime.now(self.data_engine.ist)
        weekday = now.weekday()

        if weekday >= 5:  # Weekend
            return False

        market_open = now.replace(hour=9, minute=15, second=0)
        market_close = now.replace(hour=15, minute=30, second=0)

        return market_open <= now <= market_close

    def _sleep_until_open(self):
        """Sleep until next market open"""
        now = datetime.now(self.data_engine.ist)

        if now.weekday() == 4:  # Friday
            next_open = (now + timedelta(days=3)).replace(
                hour=9, minute=10, second=0
            )
        else:
            next_open = (now + timedelta(days=1)).replace(
                hour=9, minute=10, second=0
            )

        sleep_secs = (next_open - now).total_seconds()
        logger.info(f"Market closed. Sleeping for {sleep_secs/3600:.2f} hours")
        time.sleep(sleep_secs)

    def _simulate_tick(self):
        """Generate simulated market data"""
        if not self.data_engine.historical_data.empty:
            last_close = self.data_engine.historical_data['Close'].iloc[-1]
        else:
            last_close = 25000

        # Random walk with drift
        price_change = np.random.normal(0.02, 0.5)
        new_price = last_close + price_change

        return {
            'bid': new_price - 0.5,
            'ask': new_price + 0.5,
            'volume': random.randint(100, 500)
        }

    def _fetch_yfinance_tick(self):
        """Fetch real-time data from yfinance"""
        ticker = yf.Ticker(CONFIG['SYMBOL'])
        hist = ticker.history(period="1d", interval="1m")

        if hist.empty:
            return self._simulate_tick()

        last_row = hist.iloc[-1]
        return {
            'bid': last_row['Close'] - 0.5,
            'ask': last_row['Close'] + 0.5,
            'volume': last_row['Volume']
        }

# =====================
# Support Classes
# =====================
class TechnicalModel:
    """Traditional technical analysis model"""
    def generate(self, market_data):
        if len(market_data) < 20:
            return 0

        closes = np.array([x['mid'] for x in market_data])

        # RSI
        rsi = ta.momentum.rsi(closes, window=14)

        # MACD
        macd = ta.trend.macd(closes)
        macd_signal = ta.trend.macd_signal(closes)

        # Bollinger Bands
        bb = ta.volatility.BollingerBands(closes)

        # Combine signals
        signal = 0
        if macd[-1] > macd_signal[-1]:
            signal += 0.3
        if rsi[-1] < 30:
            signal += 0.2
        elif rsi[-1] > 70:
            signal -= 0.2
        if closes[-1] < bb.bollinger_lband()[-1]:
            signal += 0.5

        return min(max(signal, -1), 1)  # Clip to [-1, 1] range

class SentimentModel:
    """Market sentiment analysis model"""
    def generate(self, score):
        # Map sentiment score (-1 to 1) to trading signal (0 to 1)
        return (score + 1) / 2

# =====================
# Main Execution
# =====================
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    engine = TradingEngine()
    engine.run()