<a href="https://colab.research.google.com/github/Shrey576/Self-Updating-Market-Model-with-Bloomberg-Terminal/blob/main/QuantMarketModel_w__Yahoo_Finance__API.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Project: Institutional-Grade Algorithmic Trading System



In [None]:
# Directory structure
algo-trading-system/
‚îú‚îÄ‚îÄ data/
‚îÇ   ‚îú‚îÄ‚îÄ providers/
‚îÇ   ‚îÇ   ‚îú‚îÄ‚îÄ __init__.py
‚îÇ   ‚îÇ   ‚îú‚îÄ‚îÄ base.py          # Abstract base class
‚îÇ   ‚îÇ   ‚îú‚îÄ‚îÄ yahoo.py         # Yahoo Finance implementation
‚îÇ   ‚îÇ   ‚îî‚îÄ‚îÄ alpha_vantage.py # Alpha Vantage implementation
‚îÇ   ‚îú‚îÄ‚îÄ storage/
‚îÇ   ‚îÇ   ‚îú‚îÄ‚îÄ __init__.py
‚îÇ   ‚îÇ   ‚îî‚îÄ‚îÄ mysql_handler.py
‚îú‚îÄ‚îÄ strategies/
‚îÇ   ‚îú‚îÄ‚îÄ __init__.py
‚îÇ   ‚îú‚îÄ‚îÄ base_strategy.py     # Abstract strategy class
‚îÇ   ‚îú‚îÄ‚îÄ momentum.py
‚îÇ   ‚îú‚îÄ‚îÄ mean_reversion.py
‚îÇ   ‚îî‚îÄ‚îÄ pairs_trading.py
‚îú‚îÄ‚îÄ backtesting/
‚îÇ   ‚îú‚îÄ‚îÄ __init__.py
‚îÇ   ‚îú‚îÄ‚îÄ engine.py            # Core backtesting logic
‚îÇ   ‚îú‚îÄ‚îÄ portfolio.py         # Position management
‚îÇ   ‚îî‚îÄ‚îÄ execution.py         # Order execution simulation
‚îú‚îÄ‚îÄ risk/
‚îÇ   ‚îú‚îÄ‚îÄ __init__.py
‚îÇ   ‚îî‚îÄ‚îÄ risk_manager.py
‚îú‚îÄ‚îÄ analytics/
‚îÇ   ‚îú‚îÄ‚îÄ __init__.py
‚îÇ   ‚îî‚îÄ‚îÄ performance.py       # Metrics calculation
‚îú‚îÄ‚îÄ dashboard/
‚îÇ   ‚îú‚îÄ‚îÄ app.py              # Streamlit/Dash app
‚îÇ   ‚îî‚îÄ‚îÄ components/
‚îú‚îÄ‚îÄ tests/
‚îú‚îÄ‚îÄ config/
‚îÇ   ‚îî‚îÄ‚îÄ config.yaml
‚îú‚îÄ‚îÄ requirements.txt
‚îú‚îÄ‚îÄ README.md
‚îî‚îÄ‚îÄ docker-compose.yml

## 1.2 Core Data Abstraction Layer

In [None]:
# data/providers/base.py
from abc import ABC, abstractmethod
from datetime import datetime
import pandas as pd

class BaseDataProvider(ABC):
    """
    Abstract base class for market data providers.
    Designed to support: Yahoo Finance, Alpha Vantage, Bloomberg API, Refinitiv
    """

    @abstractmethod
    def get_historical_data(self, ticker: str, start: datetime,
                           end: datetime, interval: str = '1d') -> pd.DataFrame:
        """
        Equivalent to Bloomberg BDH() function.
        Returns: DataFrame with [Open, High, Low, Close, Volume, Adj Close]
        """
        pass

    @abstractmethod
    def get_multiple_tickers(self, tickers: list, start: datetime,
                            end: datetime) -> dict:
        """
        Batch download for multiple tickers.
        Returns: dict of {ticker: DataFrame}
        """
        pass

    @abstractmethod
    def get_latest_price(self, ticker: str) -> float:
        """
        Equivalent to Bloomberg BDP() for real-time price.
        """
        pass

In [None]:
# data/providers/yahoo.py
import yfinance as yf
from .base import BaseDataProvider
import pandas as pd
from datetime import datetime

class YahooFinanceProvider(BaseDataProvider):
    """
    Yahoo Finance implementation of data provider.
    Free, reliable, good data quality for US equities.
    """

    def __init__(self):
        self.name = "Yahoo Finance"

    def get_historical_data(self, ticker: str, start: datetime,
                           end: datetime, interval: str = '1d') -> pd.DataFrame:
        """
        Download historical OHLCV data.
        Intervals: 1d, 1h, 5m, etc.
        """
        try:
            data = yf.download(ticker, start=start, end=end,
                             interval=interval, progress=False)
            if data.empty:
                raise ValueError(f"No data returned for {ticker}")
            return data
        except Exception as e:
            print(f"Error fetching {ticker}: {e}")
            return pd.DataFrame()

    def get_multiple_tickers(self, tickers: list, start: datetime,
                            end: datetime) -> dict:
        """
        Efficient batch download.
        """
        data = yf.download(tickers, start=start, end=end,
                          group_by='ticker', progress=False)
        return {ticker: data[ticker] for ticker in tickers}

    def get_latest_price(self, ticker: str) -> float:
        """
        Get most recent close price.
        """
        stock = yf.Ticker(ticker)
        return stock.info.get('currentPrice', stock.info.get('regularMarketPrice'))

## 1.3 MySQL Storage Layer



In [None]:
# data/storage/mysql_handler.py
import mysql.connector
import pandas as pd
from datetime import datetime

class MySQLHandler:
    """
    Persistent storage for market data and backtest results.
    """

    def __init__(self, host='localhost', user='root', password='', database='trading_system'):
        self.connection = mysql.connector.connect(
            host=host, user=user, password=password, database=database
        )
        self.cursor = self.connection.cursor()
        self._create_tables()

    def _create_tables(self):
        """
        Create schema for market data and results.
        """
        # Market data table
        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS market_data (
                id INT AUTO_INCREMENT PRIMARY KEY,
                ticker VARCHAR(10),
                date DATE,
                open DECIMAL(10,2),
                high DECIMAL(10,2),
                low DECIMAL(10,2),
                close DECIMAL(10,2),
                volume BIGINT,
                adj_close DECIMAL(10,2),
                UNIQUE KEY unique_ticker_date (ticker, date)
            )
        """)

        # Backtest results table
        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS backtest_results (
                id INT AUTO_INCREMENT PRIMARY KEY,
                strategy_name VARCHAR(50),
                run_date DATETIME,
                start_date DATE,
                end_date DATE,
                total_return DECIMAL(10,4),
                sharpe_ratio DECIMAL(10,4),
                max_drawdown DECIMAL(10,4),
                num_trades INT,
                win_rate DECIMAL(5,4),
                config JSON
            )
        """)

        # Trade log table
        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS trades (
                id INT AUTO_INCREMENT PRIMARY KEY,
                backtest_id INT,
                ticker VARCHAR(10),
                trade_date DATE,
                action VARCHAR(10),
                quantity INT,
                price DECIMAL(10,2),
                pnl DECIMAL(10,2),
                FOREIGN KEY (backtest_id) REFERENCES backtest_results(id)
            )
        """)

        self.connection.commit()

    def save_market_data(self, df: pd.DataFrame, ticker: str):
        """
        Store OHLCV data.
        """
        for date, row in df.iterrows():
            query = """
                INSERT INTO market_data (ticker, date, open, high, low, close, volume, adj_close)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                open=VALUES(open), high=VALUES(high), low=VALUES(low),
                close=VALUES(close), volume=VALUES(volume), adj_close=VALUES(adj_close)
            """
            self.cursor.execute(query, (
                ticker, date.date(), row['Open'], row['High'],
                row['Low'], row['Close'], row['Volume'], row['Adj Close']
            ))
        self.connection.commit()

    def get_market_data(self, ticker: str, start: datetime, end: datetime) -> pd.DataFrame:
        """
        Retrieve stored data (avoids API calls).
        """
        query = """
            SELECT date, open, high, low, close, volume, adj_close
            FROM market_data
            WHERE ticker=%s AND date BETWEEN %s AND %s
            ORDER BY date
        """
        self.cursor.execute(query, (ticker, start.date(), end.date()))
        data = self.cursor.fetchall()

        df = pd.DataFrame(data, columns=['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close'])
        df.set_index('Date', inplace=True)
        return df

## 2.1 Base Strategy Class

In [None]:
# strategies/base_strategy.py
from abc import ABC, abstractmethod
import pandas as pd

class BaseStrategy(ABC):
    """
    Abstract base for all trading strategies.
    """

    def __init__(self, name: str):
        self.name = name
        self.positions = {}  # {ticker: quantity}

    @abstractmethod
    def generate_signals(self, data: pd.DataFrame) -> pd.Series:
        """
        Generate trading signals.
        Returns: Series with values {1: buy, -1: sell, 0: hold}
        """
        pass

    @abstractmethod
    def calculate_position_size(self, ticker: str, price: float,
                               portfolio_value: float) -> int:
        """
        Determine how many shares to trade.
        """
        pass

## 2.2 Momentum Strategy

In [None]:
# strategies/momentum.py
from .base_strategy import BaseStrategy
import pandas as pd
import numpy as np

class MomentumStrategy(BaseStrategy):
    """
    Classic momentum: Buy winners, sell losers.
    """

    def __init__(self, lookback_period=20, top_n=10):
        super().__init__("Momentum")
        self.lookback_period = lookback_period
        self.top_n = top_n

    def generate_signals(self, data: pd.DataFrame) -> pd.Series:
        """
        Calculate momentum and generate signals.
        """
        # Calculate returns over lookback period
        returns = data['Adj Close'].pct_change(self.lookback_period)

        # Generate signals
        signals = pd.Series(0, index=data.index)
        signals[returns > returns.quantile(0.8)] = 1   # Buy top 20%
        signals[returns < returns.quantile(0.2)] = -1  # Sell bottom 20%

        return signals

    def calculate_position_size(self, ticker: str, price: float,
                               portfolio_value: float) -> int:
        """
        Equal weight allocation.
        """
        allocation = portfolio_value * 0.1  # 10% per position
        return int(allocation / price)

## 2.3 Mean Reversion Strategy

In [None]:
# strategies/mean_reversion.py
from .base_strategy import BaseStrategy
import pandas as pd

class MeanReversionStrategy(BaseStrategy):
    """
    Mean reversion using Bollinger Bands.
    """

    def __init__(self, window=20, num_std=2):
        super().__init__("Mean Reversion")
        self.window = window
        self.num_std = num_std

    def generate_signals(self, data: pd.DataFrame) -> pd.Series:
        """
        Buy when price touches lower band, sell at upper band.
        """
        # Calculate Bollinger Bands
        rolling_mean = data['Adj Close'].rolling(window=self.window).mean()
        rolling_std = data['Adj Close'].rolling(window=self.window).std()

        upper_band = rolling_mean + (rolling_std * self.num_std)
        lower_band = rolling_mean - (rolling_std * self.num_std)

        # Generate signals
        signals = pd.Series(0, index=data.index)
        signals[data['Adj Close'] < lower_band] = 1   # Buy oversold
        signals[data['Adj Close'] > upper_band] = -1  # Sell overbought

        return signals

    def calculate_position_size(self, ticker: str, price: float,
                               portfolio_value: float) -> int:
        allocation = portfolio_value * 0.15
        return int(allocation / price)

## 3.1 Core Backtest Engine

In [None]:
# backtesting/engine.py
import pandas as pd
import numpy as np
from datetime import datetime

class BacktestEngine:
    """
    Professional backtesting with realistic execution.
    """

    def __init__(self, initial_capital=100000, commission=0.001, slippage=0.0005):
        self.initial_capital = initial_capital
        self.commission = commission  # 10 bps
        self.slippage = slippage      # 5 bps

        self.portfolio_value = initial_capital
        self.cash = initial_capital
        self.positions = {}
        self.trades = []
        self.equity_curve = []

    def execute_trade(self, ticker: str, signal: int, price: float,
                     quantity: int, date: datetime):
        """
        Execute trade with transaction costs.
        """
        if signal == 0:
            return

        # Calculate costs
        trade_value = quantity * price
        cost = trade_value * (self.commission + self.slippage)

        if signal == 1:  # Buy
            total_cost = trade_value + cost
            if self.cash >= total_cost:
                self.cash -= total_cost
                self.positions[ticker] = self.positions.get(ticker, 0) + quantity
                self.trades.append({
                    'date': date,
                    'ticker': ticker,
                    'action': 'BUY',
                    'quantity': quantity,
                    'price': price,
                    'cost': cost
                })

        elif signal == -1:  # Sell
            if ticker in self.positions and self.positions[ticker] >= quantity:
                proceeds = trade_value - cost
                self.cash += proceeds
                self.positions[ticker] -= quantity
                self.trades.append({
                    'date': date,
                    'ticker': ticker,
                    'action': 'SELL',
                    'quantity': quantity,
                    'price': price,
                    'cost': cost
                })

    def update_portfolio_value(self, current_prices: dict):
        """
        Mark to market.
        """
        holdings_value = sum(
            qty * current_prices.get(ticker, 0)
            for ticker, qty in self.positions.items()
        )
        self.portfolio_value = self.cash + holdings_value
        self.equity_curve.append(self.portfolio_value)

    def run(self, strategy, data_dict: dict, start_date, end_date):
        """
        Main backtest loop.
        data_dict: {ticker: DataFrame}
        """
        # Get common date range
        all_dates = sorted(set.union(*[set(df.index) for df in data_dict.values()]))
        dates_in_range = [d for d in all_dates if start_date <= d <= end_date]

        for date in dates_in_range:
            # Get current prices
            current_prices = {
                ticker: df.loc[date, 'Adj Close']
                for ticker, df in data_dict.items()
                if date in df.index
            }

            # Generate signals for each ticker
            for ticker, df in data_dict.items():
                if date not in df.index:
                    continue

                # Get data up to current date
                historical_data = df.loc[:date]

                # Generate signal
                signals = strategy.generate_signals(historical_data)
                current_signal = signals.iloc[-1]

                # Calculate position size
                quantity = strategy.calculate_position_size(
                    ticker, current_prices[ticker], self.portfolio_value
                )

                # Execute trade
                self.execute_trade(
                    ticker, current_signal, current_prices[ticker], quantity, date
                )

            # Update portfolio value
            self.update_portfolio_value(current_prices)

        return self._generate_results()

    def _generate_results(self):
        """
        Calculate performance metrics.
        """
        equity_curve = pd.Series(self.equity_curve)
        returns = equity_curve.pct_change().dropna()

        total_return = (self.portfolio_value - self.initial_capital) / self.initial_capital

        # Sharpe Ratio (annualized)
        sharpe = np.sqrt(252) * returns.mean() / returns.std() if len(returns) > 0 else 0

        # Max Drawdown
        cumulative = (1 + returns).cumprod()
        running_max = cumulative.cummax()
        drawdown = (cumulative - running_max) / running_max
        max_drawdown = drawdown.min()

        # Trade statistics
        winning_trades = [t for t in self.trades if t['action'] == 'SELL']  # Simplified
        win_rate = 0  # Need to track PnL per trade properly

        return {
            'total_return': total_return,
            'sharpe_ratio': sharpe,
            'max_drawdown': max_drawdown,
            'num_trades': len(self.trades),
            'win_rate': win_rate,
            'final_value': self.portfolio_value,
            'equity_curve': equity_curve,
            'trades': self.trades
        }

In [None]:
# analytics/performance.py
import pandas as pd
import numpy as np

class PerformanceAnalytics:
    """
    Institutional-grade performance metrics.
    """

    @staticmethod
    def calculate_metrics(returns: pd.Series, benchmark_returns: pd.Series = None):
        """
        Comprehensive performance analysis.
        """
        metrics = {}

        # Basic returns
        metrics['total_return'] = (1 + returns).prod() - 1
        metrics['annualized_return'] = (1 + metrics['total_return']) ** (252 / len(returns)) - 1

        # Risk metrics
        metrics['volatility'] = returns.std() * np.sqrt(252)
        metrics['sharpe_ratio'] = metrics['annualized_return'] / metrics['volatility']

        # Downside risk
        downside_returns = returns[returns < 0]
        metrics['sortino_ratio'] = (
            metrics['annualized_return'] / (downside_returns.std() * np.sqrt(252))
        )

        # Drawdown
        cumulative = (1 + returns).cumprod()
        running_max = cumulative.cummax()
        drawdown = (cumulative - running_max) / running_max
        metrics['max_drawdown'] = drawdown.min()
        metrics['avg_drawdown'] = drawdown[drawdown < 0].mean()

        # Calmar Ratio
        metrics['calmar_ratio'] = metrics['annualized_return'] / abs(metrics['max_drawdown'])

        # Benchmark comparison
        if benchmark_returns is not None:
            # Beta
            covariance = returns.cov(benchmark_returns)
            benchmark_variance = benchmark_returns.var()
            metrics['beta'] = covariance / benchmark_variance

            # Alpha
            benchmark_return = (1 + benchmark_returns).prod() - 1
            risk_free_rate = 0.02  # Assume 2%
            expected_return = risk_free_rate + metrics['beta'] * (benchmark_return - risk_free_rate)
            metrics['alpha'] = metrics['total_return'] - expected_return

            # Information Ratio
            active_returns = returns - benchmark_returns
            metrics['information_ratio'] = active_returns.mean() / active_returns.std() * np.sqrt(252)

            # Tracking Error
            metrics['tracking_error'] = active_returns.std() * np.sqrt(252)

        # Win/Loss stats
        winning_days = returns[returns > 0]
        losing_days = returns[returns < 0]
        metrics['win_rate'] = len(winning_days) / len(returns)
        metrics['avg_win'] = winning_days.mean()
        metrics['avg_loss'] = losing_days.mean()
        metrics['profit_factor'] = abs(winning_days.sum() / losing_days.sum()) if len(losing_days) > 0 else np.inf

        return metrics

In [None]:
# dashboard/app.py
import streamlit as st
import plotly.graph_objects as go
import plotly.express as px
from datetime import datetime, timedelta
import sys
sys.path.append('..')

from data.providers.yahoo import YahooFinanceProvider
from strategies.momentum import MomentumStrategy
from strategies.mean_reversion import MeanReversionStrategy
from backtesting.engine import BacktestEngine
from analytics.performance import PerformanceAnalytics

st.set_page_config(page_title="Algorithmic Trading System", layout="wide")

# Sidebar
st.sidebar.title("üéØ Trading System")
st.sidebar.markdown("---")

# Strategy selection
strategy_type = st.sidebar.selectbox(
    "Select Strategy",
    ["Momentum", "Mean Reversion", "Pairs Trading"]
)

# Date range
col1, col2 = st.sidebar.columns(2)
start_date = col1.date_input("Start Date", datetime.now() - timedelta(days=365))
end_date = col2.date_input("End Date", datetime.now())

# Tickers
tickers = st.sidebar.multiselect(
    "Select Tickers",
    ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "SPY"],
    default=["AAPL", "MSFT"]
)

# Run backtest button
run_backtest = st.sidebar.button("üöÄ Run Backtest", type="primary")

# Main content
st.title("üìä Institutional-Grade Algorithmic Trading System")
st.markdown("### Production-ready backtesting with pluggable architecture")

if run_backtest and tickers:
    with st.spinner("Running backtest..."):
        # Initialize
        provider = YahooFinanceProvider()

        if strategy_type == "Momentum":
            strategy = MomentumStrategy()
        elif strategy_type == "Mean Reversion":
            strategy = MeanReversionStrategy()

        engine = BacktestEngine(initial_capital=100000)

        # Download data
        data_dict = provider.get_multiple_tickers(tickers, start_date, end_date)

        # Run backtest
        results = engine.run(strategy, data_dict, start_date, end_date)

        # Display metrics
        col1, col2, col3, col4 = st.columns(4)
        col1.metric("Total Return", f"{results['total_return']:.2%}")
        col2.metric("Sharpe Ratio", f"{results['sharpe_ratio']:.2f}")
        col3.metric("Max Drawdown", f"{results['max_drawdown']:.2%}")
        col4.metric("Num Trades", results['num_trades'])

        # Equity curve
        st.subheader("üìà Equity Curve")
        fig = go.Figure()
        fig.add_trace(go.Scatter(
            y=results['equity_curve'],
            mode='lines',
            name='Portfolio Value',
            line=dict(color='#00D9FF', width=2)
        ))
        fig.update_layout(
            title="Portfolio Performance Over Time",
            xaxis_title="Days",
            yaxis_title="Portfolio Value ($)",
            template="plotly_dark",
            height=400
        )
        st.plotly_chart(fig, use_container_width=True)

        # Trade log
        st.subheader("üìã Trade Log")
        trades_df = pd.DataFrame(results['trades'])
        st.dataframe(trades_df, use_container_width=True)

else:
    st.info("üëà Configure settings and click 'Run Backtest' to start")

    # Show architecture diagram
    st.subheader("üèóÔ∏è System Architecture")
    st.markdown("""
```
    Data Layer (Pluggable)
    ‚îú‚îÄ‚îÄ Yahoo Finance (Current)
    ‚îú‚îÄ‚îÄ Alpha Vantage (Supported)
    ‚îî‚îÄ‚îÄ Bloomberg API (Designed For)

    Strategy Framework
    ‚îú‚îÄ‚îÄ Momentum
    ‚îú‚îÄ‚îÄ Mean Reversion
    ‚îî‚îÄ‚îÄ Pairs Trading

    Backtesting Engine
    ‚îú‚îÄ‚îÄ Realistic Execution
    ‚îú‚îÄ‚îÄ Transaction Costs
    ‚îî‚îÄ‚îÄ Risk Management

    Storage & Analytics
    ‚îú‚îÄ‚îÄ MySQL Database
    ‚îú‚îÄ‚îÄ Performance Metrics
    ‚îî‚îÄ‚îÄ Real-time Dashboard
```
    """)

## 6.1 Configuration Management

In [None]:
# config/config.yaml
data:
  provider: "yahoo"  # yahoo, alpha_vantage, bloomberg
  cache_enabled: true
  cache_duration_days: 1

backtesting:
  initial_capital: 100000
  commission: 0.001  # 10 bps
  slippage: 0.0005   # 5 bps

database:
  host: "localhost"
  user: "root"
  password: ""
  database: "trading_system"

strategies:
  momentum:
    lookback_period: 20
    top_n: 10
  mean_reversion:
    window: 20
    num_std: 2

## 6.2 Logging

In [None]:
# utils/logger.py
import logging
from datetime import datetime

def setup_logger(name):
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    handler = logging.FileHandler(f'logs/{name}_{datetime.now().strftime("%Y%m%d")}.log')
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    return logger

## 6.3 Unit Tests

In [None]:
# tests/test_strategies.py
import unittest
import pandas as pd
import numpy as np
from strategies.momentum import MomentumStrategy

class TestMomentumStrategy(unittest.TestCase):

    def setUp(self):
        # Create sample data
        dates = pd.date_range('2020-01-01', periods=100)
        prices = np.random.randn(100).cumsum() + 100
        self.data = pd.DataFrame({
            'Adj Close': prices
        }, index=dates)

        self.strategy = MomentumStrategy()

    def test_generate_signals(self):
        signals = self.strategy.generate_signals(self.data)

        # Check signal values are valid
        self.assertTrue(all(signals.isin([-1, 0, 1])))

        # Check no signals in warmup period
        self.assertEqual(signals.iloc[:20].sum(), 0)

    def test_position_sizing(self):
        size = self.strategy.calculate_position_size('AAPL', 150, 100000)

        # Check reasonable position size
        self.assertGreater(size, 0)
        self.assertLess(size * 150, 15000)  # Max 15% position

if __name__ == '__main__':
    unittest.main()