# Scrape Congress trade data

In [1]:
import re
import time
import logging
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
from dataclasses import dataclass
from collections import defaultdict

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class Trade:
    """Represents a single trade with parsed and normalized data"""
    politician: str
    party: str
    chamber: str
    state: str
    asset: str
    ticker: str
    is_public: bool
    file_date: str
    trade_date: str
    days_delayed: int
    ownership: str
    trade_type: str
    volume_range: str
    volume_min: float
    volume_max: float
    volume_midpoint: float
    price: float

class CapitolTradesScraper:
    """
    A comprehensive scraper for CapitolTrades focusing on buy trades for the Congress Buys Strategy.
    """

    def __init__(self, delay_between_requests: float = 1.0):
        """
        Initialize the scraper.

        Args:
            delay_between_requests: Delay in seconds between requests to be respectful to the server
        """
        self.delay_between_requests = delay_between_requests
        self.session = requests.Session()
        self.session.headers.update({'User-Agent': 'Mozilla/5.0'})

        self.volume_ranges = {
            '< 1K': (0, 1000),
            '1K-15K': (1000, 15000),
            '15K-50K': (15000, 50000),
            '50K-100K': (50000, 100000),
            '100K-250K': (100000, 250000),
            '250K-500K': (250000, 500000),
            '500K-1M': (500000, 1000000),
            '1M-5M': (1000000, 5000000),
            '5M-25M': (5000000, 25000000),
            '25M-50M': (25000000, 50000000)
        }

    def scrape_trade(self, page_number: int, buy_only: bool = True) -> List[List]:
        """
        Scrapes a single page of trade data from Capitol Trades.

        Args:
            page_number (int): The page number to scrape.
            buy_only (bool): If True, only scrape buy trades

        Returns:
            List[List]: A list of raw trade rows, where each row is a list of cell values.
        """
        # Construct URL with buy filter if requested
        if buy_only:
            base_url = "https://www.capitoltrades.com/trades?txType=buy&pageSize=96&page={}"
        else:
            base_url = "https://www.capitoltrades.com/trades?pageSize=96&page={}"
        url = base_url.format(page_number)
        response = self.session.get(url)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, "html.parser")
        trade_rows = soup.select("tbody > tr")
        raw_trades = []
        for row in trade_rows:
            # cells = [cell.text.strip() for cell in row.find_all("td")]
            cells = []
            for cell in row.find_all("td"):
                if cell.select_one(".issuer-name") and cell.select_one(".issuer-ticker"):
                    asset_name = cell.select_one(".issuer-name").text.strip()
                    ticker = cell.select_one(".issuer-ticker").text.strip()
                    cells.append(f"{asset_name}|{ticker}")
                else:
                    cells.append(cell.text.strip())
            if cells:
                raw_trades.append(cells)
            else:
                logger.warning(f"Empty row found on page {page_number}, skipping.")
        logger.info(f"Scraped page {page_number}: {len(raw_trades)} trades found")
        return raw_trades

    def parse_politician_info(self, politician_str: str) -> Tuple[str, str, str, str]:
        """
        Parse politician information from the first column.

        Args:
            politician_str: String like 'Suzan DelBeneDemocratHouseWA'

        Returns:
            Tuple of (name, party, chamber, state)
        """
        pattern = r'([A-Za-z\s\.]+?)(Republican|Democrat|Independent|Other)(House|Senate)([A-Z]{2})'
        match = re.search(pattern, politician_str)
        if match:
            name = match.group(1).strip()
            party = match.group(2)
            chamber = match.group(3)
            state = match.group(4)
            return name, party, chamber, state
        else:
            logger.warning(f"Could not parse politician info: {politician_str}")
            return politician_str, "Unknown", "Unknown", "Unknown"

    def _parse_asset_info(self, asset_str: str) -> Tuple[str, str]:
        """
        Parse asset name and ticker from the asset column.
        Does have more trouble with separating asset names from tickers because of the lack of clear delimiters.
        E.g. 'JPMORGAN US GOVERNMENT MONEY MARKET FUNDOGVXX:US' would become ('JPMORGAN US GOVERNMENT MONEY MARKET', 'FUNDOGVXX:US').

        Args:
            asset_str: String like 'General Motors Financial Co IncACF:US'

        Returns:
            Tuple of (asset_name, ticker)
        """
        ticker_patterns = [
            r'([A-Z0-9./]{1,10}:[A-Z]{2})$',
            r'N/A$'
        ]
        ticker = None
        asset_name = asset_str
        for pattern in ticker_patterns:
            match = re.search(pattern, asset_str)
            if match:
                ticker = match.group(0)
                asset_name = asset_str[:match.start()].strip()
        if "$" in asset_str:  # Crypto
            logger.warning(f"Skipping crypto asset: {asset_str}")
            ticker = 'N/A'
            return asset_name, ticker
        if not ticker:
            logger.warning(f"Could not parse ticker from asset: {asset_str}")
            ticker = 'N/A'
        return asset_name, ticker

    def parse_asset_info(self, asset_str: str) -> Tuple[str, str]:
        """
        Parse asset name and ticker from the asset column.

        Args:
            asset_str: String like 'General Motors Financial Co Inc|ACF:US'

        Returns:
            Tuple of (asset_name, ticker)
        """
        if '|' in asset_str:
            asset_name, ticker = asset_str.split('|', 1)
            if "$" in ticker:  # Crypto
                logger.warning(f"Skipping crypto asset: {asset_str}")
                ticker = 'N/A'
            return asset_name.strip(), ticker.strip()
        else:
            return self._parse_asset_info(asset_str)

    def is_public_stock(self, ticker: str) -> bool:
        """
        Determine if an asset is a public stock based on name and ticker.

        Args:
            ticker: Ticker symbol (can be None)

        Returns:
            Boolean indicating if it's likely a public stock
        """
        if not ticker or ticker == 'N/A':
            return False
        if ticker and ':' in ticker and len(ticker) >= 4:
            return True
        return False

    def parse_date_string(self, date_str: str) -> str:
        """
        Parse date string and return it in 'YYYY-MM-DD' format.

        Args:
            date_str: String like '13:05Yesterday' or '16 Jun2025'
        """
        if "Today" in date_str:
            return datetime.now().strftime('%Y-%m-%d')
        elif "Yesterday" in date_str:
            yesterday = datetime.now() - timedelta(days=1)
            return yesterday.strftime('%Y-%m-%d')
        else:
            month_fix = {'Sept': 'Sep'}
            for wrong, correct in month_fix.items():
                date_str = date_str.replace(wrong, correct)
            date = datetime.strptime(date_str, '%d %b%Y')
            return date.strftime('%Y-%m-%d')

    def parse_filing_delay(self, delay_str: str) -> int:
        """
        Parse filing delay string and return the number of days delayed.

        Args:
            delay_str: String like 'days35'
        """
        match = re.search(r'\d+', delay_str)
        if match:
            return int(match.group(0))
        logger.warning(f"Could not parse filing delay: {delay_str}")
        return None

    def parse_volume_range(self, volume_str: str) -> Tuple[float, float, float]:
        """
        Parse volume range string and return min, max, and midpoint values.

        Args:
            volume_str: String like '250K-500K' or '$1M-5M'

        Returns:
            Tuple of (min_value, max_value, midpoint_value)
        """
        volume_str = volume_str.replace('–', '-').strip()
        for range_key, (min_val, max_val) in self.volume_ranges.items():
            if range_key == volume_str:
                midpoint = (min_val + max_val) / 2
                return min_val, max_val, midpoint
        logger.warning(f"Could not parse volume range: {volume_str}")
        return None, None, None

    def parse_price(self, price_str: str) -> float:
        """
        Parse price string to float.

        Args:
            price_str: String like '$214.82' or 'N/A'

        Returns:
            Float price or None if not available
        """
        if not price_str or price_str.strip().upper() == 'N/A':
            return None
        try:
            return float(price_str.replace('$', '').replace(',', '').strip())
        except ValueError:
            logger.warning(f"Could not parse price: {price_str}")
            return None

    def parse_trade_row(self, row: List[str]) -> Trade:
        """
        Parse a raw trade row into a Trade object.

        Args:
            row: List of cell values from a table row

        Returns:
            Trade object or None if parsing fails
        """
        if len(row) < 10:
            logger.warning(f"Row has insufficient columns: {row}")
            raise ValueError("Row does not have enough columns to parse")
        # Parse politician info
        politician, party, chamber, state = self.parse_politician_info(row[0])
        # Parse asset info
        asset_name, ticker = self.parse_asset_info(row[1])
        is_public = self.is_public_stock(ticker)
        # Parse file date
        file_date = self.parse_date_string(row[2])
        # Parse trade date
        trade_date = self.parse_date_string(row[3])
        # Parse filing delay
        days_delayed = self.parse_filing_delay(row[4])
        # Parse volume
        volume_min, volume_max, volume_midpoint = self.parse_volume_range(row[7])
        # Parse price
        price = self.parse_price(row[8])
        # Extract other fields
        ownership = row[5]
        trade_type = row[6]
        return Trade(
            politician=politician,
            party=party,
            chamber=chamber,
            state=state,
            asset=asset_name,
            ticker=ticker,
            is_public=is_public,
            file_date=file_date,
            trade_date=trade_date,
            days_delayed=days_delayed,
            ownership=ownership,
            trade_type=trade_type,
            volume_range=row[7],
            volume_min=volume_min,
            volume_max=volume_max,
            volume_midpoint=volume_midpoint,
            price=price,
        )

    def scrape_multiple_pages(
        self,
        num_pages: int = 10,
        buy_only: bool = True,
        public_stocks_only: bool = True
    ) -> List[Trade]:
        """
        Scrape multiple pages of trade data.

        Args:
            num_pages: Number of pages to scrape
            buy_only: If True, only scrape buy trades
            public_stocks_only: If True, only include public stocks

        Returns:
            List of Trade objects
        """
        all_trades = []
        for page in range(1, num_pages + 1):
            raw_trades = self.scrape_trade(page, buy_only=buy_only)
            if not raw_trades or len(raw_trades) == 1:
                logger.info(f"No trades found anymore on page {page}, stopping early.")
                break
            for raw_trade in raw_trades:
                trade = self.parse_trade_row(raw_trade)
                if trade:
                    if public_stocks_only and not trade.is_public:
                        continue
                    if buy_only and trade.trade_type.lower() != 'buy':
                        continue
                    all_trades.append(trade)
            if page < num_pages:
                time.sleep(self.delay_between_requests)
        # Sort by volume (largest first)
        all_trades.sort(key=lambda x: x.volume_midpoint, reverse=True)
        logger.info(f"Scraped {len(all_trades)} qualifying trades from {num_pages} pages")
        return all_trades

    def calculate_politician_stats(self, trades: List[Trade]) -> Dict[str, Dict]:
        """
        Calculate aggregated statistics for each politician for normalization purposes.

        Args:
            trades: List of Trade objects

        Returns:
            Dictionary with politician stats
        """
        pass

    def get_congress_buys_data(
        self, 
        num_pages: int = 10,
        min_volume: float = 1000
    ) -> Dict:
        """
        Get processed data specifically for the Congress Buys Strategy.

        Args:
            num_pages: Number of pages to scrape
            min_volume: Minimum trade volume to include (midpoint)

        Returns:
            Dictionary containing trades and politician stats
        """
        logger.info(f"Starting Congress Buys Strategy data collection for {num_pages} pages...")
        # Scrape trades
        trades = self.scrape_multiple_pages(
            num_pages=num_pages,
            buy_only=True,
            public_stocks_only=True
        )
        # Filter by minimum volume
        filtered_trades = [t for t in trades if t.volume_min >= min_volume]
        # Group trades by ticker for strategy implementation
        trades_by_ticker = defaultdict(list)
        for trade in filtered_trades:
            if trade.ticker:
                trades_by_ticker[trade.ticker].append(trade)
        logger.info(f"Collected {len(filtered_trades)} qualifying trades across {len(trades_by_ticker)} tickers")
        return {
            'trades': filtered_trades,
            'trades_by_ticker': dict(trades_by_ticker),
            'summary': {
                'total_trades': len(filtered_trades),
                'unique_tickers': len(trades_by_ticker),
                'total_volume': sum(t.volume_midpoint for t in filtered_trades)
            }
        }


scraper = CapitolTradesScraper()
strategy_data = scraper.get_congress_buys_data(200)
print("Summary:")
print(f"- Total trades: {strategy_data['summary']['total_trades']}")
print(f"- Unique tickers: {strategy_data['summary']['unique_tickers']}")
print(f"- Total volume: ${strategy_data['summary']['total_volume']:,.0f}")


2025-07-23 11:51:21,254 - INFO - Starting Congress Buys Strategy data collection for 200 pages...
2025-07-23 11:51:22,848 - INFO - Scraped page 1: 96 trades found
2025-07-23 11:51:25,204 - INFO - Scraped page 2: 96 trades found
2025-07-23 11:51:27,674 - INFO - Scraped page 3: 96 trades found
2025-07-23 11:51:29,987 - INFO - Scraped page 4: 96 trades found
2025-07-23 11:51:32,456 - INFO - Scraped page 5: 96 trades found
2025-07-23 11:51:34,929 - INFO - Scraped page 6: 96 trades found
2025-07-23 11:51:37,217 - INFO - Scraped page 7: 96 trades found
2025-07-23 11:51:39,508 - INFO - Scraped page 8: 96 trades found
2025-07-23 11:51:41,932 - INFO - Scraped page 9: 96 trades found
2025-07-23 11:51:44,483 - INFO - Scraped page 10: 96 trades found
2025-07-23 11:51:46,819 - INFO - Scraped page 11: 96 trades found
2025-07-23 11:51:49,264 - INFO - Scraped page 12: 96 trades found
2025-07-23 11:51:51,826 - INFO - Scraped page 13: 96 trades found
2025-07-23 11:51:54,440 - INFO - Scraped page 14: 96 

Summary:
- Total trades: 14000
- Unique tickers: 1431
- Total volume: $472,987,500


In [2]:
# Median filing delay across trades
trade_delays = [t.days_delayed for t in strategy_data['trades']]
median_delay = sorted(trade_delays)[len(trade_delays) // 2] if trade_delays else 0
print(f"Median filing delay: {median_delay:.1f} days")


Median filing delay: 26.0 days


In [3]:
# Percentage of trades with a filing delay smaller than 25 days
import numpy as np
delay_threshold = 25
percentage_within_week = sum(1 for t in strategy_data['trades'] if t.days_delayed < delay_threshold) / len(strategy_data['trades']) * 100 if strategy_data['trades'] else 0
print(f"Percentage of trades with filing delay < 25 days: {percentage_within_week:.1f}%")
print(f"Total trades with filing delay < 25 days: {sum(1 for t in strategy_data['trades'] if t.days_delayed < delay_threshold)}")
print(f"Average volume of trades with filing delay < 25 days: ${np.mean([t.volume_midpoint for t in strategy_data['trades'] if t.days_delayed < delay_threshold]):,.0f}")
print(f"Maximum volume of trades with filing delay < 25 days: ${max([t.volume_midpoint for t in strategy_data['trades'] if t.days_delayed < delay_threshold]):,.0f}")


Percentage of trades with filing delay < 25 days: 45.9%
Total trades with filing delay < 25 days: 6430
Average volume of trades with filing delay < 25 days: $33,609
Maximum volume of trades with filing delay < 25 days: $15,000,000


# Match capitoltrader tickers to yfinance tickers

In [4]:
import os
import groq
from groq import Groq
from api_keys import GROQ_API_key

MAX_RETRIES = 5

def llama_3_1_70B(
    prompt: str,
    model: str = "llama-3.3-70b-versatile",
    temperature: float = 0.6,
    top_p: float = 0.9,
    groq_api_key: str = GROQ_API_key,
    retry_count: int = 0,
) -> str:
    """
    Queries the LLAMA 3.1 70B model with the specified prompt.

    IMPORTANT: The GROQ API key allows free and unlimited access to the LLAMA 3.1 70B model!

    Args:
        prompt (str): The prompt to send to the model.
        model (str): The model to use.
        temperature (float): The temperature parameter.
        top_p (float): The top_p parameter.
        groq_api_key (str): The GROQ API key.
        retry_count (int): The number of retries.

    Returns:
        str: The response text.

    """
    os.environ["GROQ_API_KEY"] = groq_api_key
    client = Groq()

    try:
        response = client.chat.completions.create(
            messages=[{"role": "user", "content": prompt}],
            model=model,
            temperature=temperature,
            top_p=top_p,
        )
    except groq.RateLimitError:
        time.sleep(180)
        response = client.chat.completions.create(
            messages=[{"role": "user", "content": prompt}],
            model=model,
            temperature=temperature,
            top_p=top_p,
        )
    except groq.InternalServerError:
        if retry_count >= MAX_RETRIES:
            msg = "Too many retries."
            raise groq.RateLimitError(msg)  # noqa: B904
        retry_count += 1
        llama_3_1_70B(prompt, model, temperature, top_p, groq_api_key, retry_count)
    except Exception as e:
        msg = f"An unknown error occurred: {e!s}"
        raise Exception(msg) from e  # noqa: TRY002

    return response.choices[0].message.content


In [5]:
import io
import pandas as pd
import yfinance as yf
logging.getLogger("yfinance").setLevel(logging.INFO)

log_stream = io.StringIO()
stream_handler = logging.StreamHandler(log_stream)
yf_logger = logging.getLogger("yfinance")
yf_logger.addHandler(stream_handler)

def get_price_data(trade_list: List[Trade], start_date: str, end_date: str) -> Dict[str, pd.DataFrame]:
    """Download price data for given tickers"""
    price_data = {}
    skipped_assets = set()
    for trade in trade_list:
        ticker = trade.ticker
        asset = trade.asset
        if ticker in price_data or asset in skipped_assets:
            logger.info(f"Ticker {ticker} of asset {asset} already processed, skipping.")
            continue
        logger.info(f"Processing ticker and asset: {ticker}, {asset}")
        processed_ticker = ticker.split(":")[0].replace("/","-")
        try:
            stock = yf.Ticker(processed_ticker)
            data = stock.history(start=start_date, end=end_date)
            if not data.empty:
                price_data[ticker] = data
            else:
                logs = log_stream.getvalue()
                log_lines = logs.strip().split('\n')
                last_log = log_lines[-1]
                if "no price data found" in last_log:
                    logger.info(f"No price data found for {processed_ticker} in the given date range, skipping.")
                    continue
                raise ValueError(f"Failed to download data for {processed_ticker}")
        except Exception:
            llama_ticker = llama_3_1_70B(
                f"""
                What is the Yahoo finance ticker of {asset}?
                Answer with only the ticker, no other text.
                """
            )
            stock = yf.Ticker(llama_ticker)
            data = stock.history(start=start_date, end=end_date)
            if not data.empty:
                price_data[ticker] = data
            else:
                logs = log_stream.getvalue()
                log_lines = logs.strip().split('\n')
                last_log = log_lines[-1]
                if "no price data found" in last_log:
                    logger.info(f"No price data found for {processed_ticker} in the given date range, skipping.")
                    continue
                logger.warning(f"Ticker for asset {asset} not found. Skipping.")
                skipped_assets.add(asset)
                continue
    logger.info(f"Number of Skipped assets: {len(skipped_assets)}.")
    logger.info(f"Number of succesfully processed tickers: {len(price_data)}.")
    return price_data



In [6]:
results_dict = get_price_data(strategy_data["trades"], "2023-01-01", "2025-05-01")


2025-07-23 11:58:34,155 - INFO - Processing ticker and asset: MSFT:US, Microsoft Corp
2025-07-23 11:58:34,537 - INFO - Processing ticker and asset: AVGO:US, Broadcom Inc
2025-07-23 11:58:34,659 - INFO - Processing ticker and asset: OGVXX:US, JPMORGAN US GOVERNMENT MONEY MARKET FUND
2025-07-23 11:58:34,733 - ERROR - $OGVXX: possibly delisted; no price data found  (1d 2023-01-01 -> 2025-05-01)
2025-07-23 11:58:34,735 - INFO - No price data found for OGVXX in the given date range, skipping.
2025-07-23 11:58:34,736 - INFO - Processing ticker and asset: NVDA:US, NVIDIA Corporation
2025-07-23 11:58:34,837 - INFO - Processing ticker and asset: BNPQY:US, BNP Paribas
2025-07-23 11:58:34,941 - INFO - Ticker MSFT:US of asset Microsoft Corp already processed, skipping.
2025-07-23 11:58:34,942 - INFO - Ticker MSFT:US of asset Microsoft Corp already processed, skipping.
2025-07-23 11:58:34,942 - INFO - Ticker MSFT:US of asset Microsoft Corp already processed, skipping.
2025-07-23 11:58:34,942 - INFO

In [7]:
results_dict["MSFT:US"]


Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2023-01-03 00:00:00-05:00,238.239260,240.856088,232.672365,234.808960,25740000,0.0,0.0
2023-01-04 00:00:00-05:00,227.654309,228.232556,221.460175,224.537643,50623400,0.0,0.0
2023-01-05 00:00:00-05:00,222.675477,223.018513,217.343808,217.882858,39585600,0.0,0.0
2023-01-06 00:00:00-05:00,218.559125,221.264157,214.981818,220.450684,43613600,0.0,0.0
2023-01-09 00:00:00-05:00,221.940421,226.635040,221.901224,222.597076,27369800,0.0,0.0
...,...,...,...,...,...,...,...
2025-04-24 00:00:00-04:00,375.011552,387.738188,374.502477,386.590271,22232300,0.0,0.0
2025-04-25 00:00:00-04:00,386.290839,391.441388,383.895243,391.131958,18973200,0.0,0.0
2025-04-28 00:00:00-04:00,391.241729,392.020298,385.931501,390.443207,16579400,0.0,0.0
2025-04-29 00:00:00-04:00,390.582932,394.375987,389.664635,393.317932,14974000,0.0,0.0


In [8]:
# Clean up tickers which yfinance does not recognize
updated_trade_list = [trade for trade in strategy_data['trades'] if trade.ticker in results_dict]
updated_trade_dict = {ticker: trade_list for ticker, trade_list in strategy_data['trades_by_ticker'].items() if ticker in results_dict}
total_trades = len(updated_trade_list)
unique_tickers = len(updated_trade_dict)
updated_volume = sum(trade.volume_midpoint for trade in updated_trade_list)

strategy_data = {
    'trades': updated_trade_list,
    'trades_by_ticker': updated_trade_dict,
    'summary': {
        'total_trades': total_trades,
        'unique_tickers': unique_tickers,
        'total_volume': updated_volume
    }
}

print("Summary:")
print(f"- Total trades: {strategy_data['summary']['total_trades']}")
print(f"- Unique tickers: {strategy_data['summary']['unique_tickers']}")
print(f"- Total volume: ${strategy_data['summary']['total_volume']:,.0f}")



Summary:
- Total trades: 13542
- Unique tickers: 1359
- Total volume: $455,949,000


# Simulate trading strategy using backtesting via backtrader

In [63]:
import pandas as pd
import backtrader as bt
import matplotlib.pyplot as plt


@dataclass
class TradingConfig:
    """Configuration for the trading strategy"""
    initial_capital: float = 1000  # Has to be passed to the Backtesting engine
    monthly_contribution: float = 500
    trade_fee: float = 2.5
    annual_holding_fee_pct: float = 0.15
    max_delay_days: int = 25
    allow_fractional_shares: bool = False  # Unused for now (is always False anyway for now)
    max_monthly_positions: int = 10
    buy_margin: float = 10


class CongressionalTradingStrategy(bt.Strategy):
    """
    Implements a buy and hold strategy that copies congressional buy trades with realistic constraints
    """

    params = (
        ('strategy_data', None),
        ('start_date', None),
        ('end_date', None),
        ('verbose', False)
    )

    def __init__(self):
        """Initialize the strategy with trades data"""
        self.congress_trade_list = self.params.strategy_data['trades']
        self.config = TradingConfig()
        # Unpack yfinance data added to the strategy via the `adddata` method into a dictionary
        self.data_feeds = {}
        for data_feed in self.datas:
            ticker = data_feed._name
            self.data_feeds[ticker] = data_feed
        # Construct a DataFrame with all congress trades in scope
        self.congress_trades_df = self._prepare_trades_data()
        # Construct dictionary of monthly buy signals
        start_date = self.params.start_date
        end_date = self.params.end_date
        self.monthly_signals = self._get_monthly_signals(start_date, end_date)
        # Initialize portfolio variables
        self.year_tracker = start_date.year
        self.buy_margin = self.config.buy_margin
        self.last_rebalance_date = None
        self.verbose = self.params.verbose
        self.asset_history = defaultdict(list)  # stores (date, value) tuples for each asset
        self.portfolio_history = []  # stores (date, total_portfolio_value) tuples
        self.holdings = defaultdict(int)  # track number of shares held for each ticker

    def _prepare_trades_data(self) -> pd.DataFrame:
        """Convert list of trades to pandas DataFrame and prepare for analysis"""
        trade_dicts = []
        for trade in self.congress_trade_list:
            if trade.trade_type.lower() == 'buy':
                trade_dict = {
                    'politician': trade.politician,
                    'party': trade.party,
                    'chamber': trade.chamber,
                    'state': trade.state,
                    'asset': trade.asset,
                    'ticker': trade.ticker,
                    'is_public': trade.is_public,
                    'file_date': pd.to_datetime(trade.file_date),
                    'trade_date': pd.to_datetime(trade.trade_date),
                    'days_delayed': trade.days_delayed,
                    'ownership': trade.ownership,
                    'trade_type': trade.trade_type,
                    'volume_range': trade.volume_range,
                    'volume_min': trade.volume_min,
                    'volume_max': trade.volume_max,
                    'volume_midpoint': trade.volume_midpoint,
                    'price': trade.price
                }
                trade_dicts.append(trade_dict)
        df = pd.DataFrame(trade_dicts)
        df = df[df['days_delayed'] <= self.config.max_delay_days]
        df = df.sort_values('trade_date')
        return df

    def _calculate_weights(self, trades_subset: pd.DataFrame) -> dict[str, float]:
        """Calculate position weights based on trade volumes"""
        if trades_subset.empty:
            raise ValueError("No trades available for weight calculation")
        ticker_volumes = trades_subset.groupby('ticker')['volume_midpoint'].sum().nlargest(self.config.max_monthly_positions)
        total_volume = ticker_volumes.sum()
        weights = (ticker_volumes / total_volume).to_dict()
        return weights

    def _get_monthly_signals(self, start_date: datetime, end_date: datetime) -> dict[datetime, dict[str, float]]:
        """Generate monthly rebalancing signals"""
        signals = {}
        current_date = start_date
        while current_date <= end_date:
            lookback_start = current_date - timedelta(days=self.config.max_delay_days)
            mask = (self.congress_trades_df['trade_date'] >= lookback_start) & \
                   (self.congress_trades_df['trade_date'] <= current_date)
            recent_trades = self.congress_trades_df[mask]
            weights = self._calculate_weights(recent_trades)
            if not weights:
                raise ValueError(f"No weights found for rebalancing on {current_date}")
            trade_date = pd.to_datetime(next_weekday(current_date.strftime('%Y-%m-%d'))).date()
            signals[trade_date] = weights
            if current_date.month == 12:
                current_date = current_date.replace(year=current_date.year + 1, month=1)
            else:
                current_date = current_date.replace(month=current_date.month + 1)
        return signals

    def next(self):
        """Called for each bar in the backtest"""
        current_date = self.datas[0].datetime.date(0)
        current_year = current_date.year
        # Handle annual holding fees
        if current_year > self.year_tracker:
            self._subtract_yearly_holding_fees()
            self.year_tracker = current_year
        # Check if we need to rebalance
        if current_date in self.monthly_signals and current_date != self.last_rebalance_date:
            if self.verbose:
                logger.info(f"Rebalancing portfolio on {current_date}...")
                logger.info(f"Current cash: ${self.broker.get_cash():.2f}.")
            target_weights = self.monthly_signals[current_date]
            self._rebalance_portfolio(target_weights)
            self.last_rebalance_date = current_date
            if self.verbose:
                logger.info("Adding monthly cash contribution...")
            self.broker.add_cash(self.config.monthly_contribution)
        # Update portfolio history
        self._track_asset_values(current_date)

    def _track_asset_values(self, current_date):
        """Track the current market value of all holdings"""
        total_portfolio_value = 0
        for ticker in self.data_feeds.keys():
            if ticker in self.holdings and self.holdings[ticker] > 0:
                current_price = self.data_feeds[ticker].close[0]
                current_value = self.holdings[ticker] * current_price
                self.asset_history[ticker].append((current_date, current_price))
                total_portfolio_value += current_value
        self.portfolio_history.append((current_date, total_portfolio_value))

    def _rebalance_portfolio(self, target_weights: dict[str, float]):
        """Rebalance portfolio according to target weights"""
        buy_count = 0
        available_cash = self.broker.get_cash()
        # First pass: try to buy according to target weights
        for ticker, weight in target_weights.items():
            if ticker not in self.data_feeds:
                raise ValueError(f"Ticker {ticker} not found in data feeds")
            data_feed = self.data_feeds[ticker]
            current_price = data_feed.close[0]
            allocatable_cash = available_cash * weight
            if self.verbose:
                logger.info(f"Trying to buy {ticker} with ${allocatable_cash:.2f} (weight: {weight:.3f})...")
            if current_price + self.config.trade_fee > allocatable_cash + self.buy_margin:
                if self.verbose:
                    logger.info(f"Can't buy {ticker} with allocated cash. Checking if we can buy with all available cash...")
                if current_price + self.config.trade_fee > available_cash:
                    if self.verbose:
                        logger.info(f"Can't buy {ticker} with all available cash. Skipping...")
                else:
                    if self.verbose:
                        logger.info(f"Placing a buy order for 1 share of {ticker} at ${current_price:.2f}...")
                    # Create order which will be executed the next day
                    self.buy(data=data_feed, size=1)
                    self.holdings[ticker] += 1
                    buy_count += 1
                    available_cash -= (current_price + self.config.trade_fee)
                    if self.verbose:
                        logger.info(f"Estimated ${available_cash:.2f} cash remaining...")
            else:
                shares_to_buy = int((allocatable_cash - self.config.trade_fee - self.buy_margin) // current_price)
                if self.verbose:
                    logger.info(f"Placing a buy order for {shares_to_buy} shares of {ticker} at ${current_price:.2f}...")
                # Create order which will be executed the next day
                self.buy(data=data_feed, size=shares_to_buy)
                self.holdings[ticker] += shares_to_buy
                buy_count += 1
                available_cash -= (shares_to_buy * current_price + self.config.trade_fee)
                if self.verbose:
                    logger.info(f"Estimated ${available_cash:.2f} cash remaining...")
        # Second pass: use any remaining cash
        if available_cash > min([self.data_feeds[ticker].close[0] + self.config.trade_fee + self.buy_margin for ticker in target_weights.keys()]):
            for ticker in target_weights.keys():
                data_feed = self.data_feeds[ticker]
                current_price = data_feed.close[0]
                shares_to_buy = int((available_cash - self.config.trade_fee - self.buy_margin) // current_price)
                if shares_to_buy > 0:
                    if self.verbose:
                        logger.info(f"Placing a buy order for {shares_to_buy} shares of {ticker} at ${current_price:.2f} with remaining cash...")
                    # Create order which will be executed the next day
                    self.buy(data=data_feed, size=shares_to_buy)
                    self.holdings[ticker] += shares_to_buy
                    buy_count += 1
                    available_cash -= (shares_to_buy * current_price + self.config.trade_fee)
                    if self.verbose:
                        logger.info(f"Estimated ${available_cash:.2f} cash remaining...")
        logger.info(f"Total trades executed: {buy_count}")

    def _subtract_yearly_holding_fees(self):
        """Subtract annual holding fees from cash"""
        total_portfolio_value = self.broker.getvalue() - self.broker.get_cash()
        if total_portfolio_value <= 0:
            raise RuntimeError("Portfolio value is zero or negative, cannot calculate holding fees")
        total_fee = total_portfolio_value * (self.config.annual_holding_fee_pct / 100)
        if self.broker.get_cash() < total_fee:
            logger.warning("Not enough cash to pay full annual holding fees")
            raise RuntimeError("Not enough cash to pay annual holding fees")
        if total_fee > 0:
            logger.info(f"Subtracting annual holding fees: ${total_fee:.2f}")
            self.broker.add_cash(-total_fee)

    def plot_asset_evolution(self, save_path=None, figsize=(15, 10)):
        """
        Plot the evolution of asset values over time
        """
        if not self.asset_history:
            raise ValueError("No asset history to plot. Make sure the backtest has been run.")

        # Create subplots
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=figsize)
        fig.subplots_adjust(hspace=1)

        # Plot 1: Individual asset values
        for ticker, history in self.asset_history.items():
            if history:
                dates, values = zip(*history)
                if max(values) > 0:
                    ax1.plot(dates, values, label=ticker, linewidth=2)

        ax1.set_title('Evolution of Individual Asset Values', fontsize=14, fontweight='bold')
        ax1.set_xlabel('Date')
        ax1.set_ylabel('Asset Value ($)')
        ax1.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        ax1.grid(True, alpha=0.3)
        ax1.tick_params(axis='x', rotation=45)

        # Plot 2: Total portfolio value
        if self.portfolio_history:
            dates, values = zip(*self.portfolio_history)
            ax2.plot(dates, values, color='black', linewidth=3, label='Total Portfolio')
            ax2.fill_between(dates, values, alpha=0.3, color='black')

        ax2.set_title('Total Portfolio Value Evolution', fontsize=14, fontweight='bold')
        ax2.set_xlabel('Date')
        ax2.set_ylabel('Portfolio Value ($)')
        ax2.grid(True, alpha=0.3)
        ax2.tick_params(axis='x', rotation=45)

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"Plot saved to {save_path}")

        plt.show()

    def get_asset_summary(self):
        """
        Return a summary of asset performance
        """
        summary = {}
        for ticker, history in self.asset_history.items():
            if history and max([v for _, v in history]) > 0:
                dates, values = zip(*history)
                initial_value = next((v for v in values if v > 0), 0)
                final_value = values[-1]
                max_value = max(values)
                summary[ticker] = {
                    'initial_value': initial_value,
                    'final_value': final_value,
                    'max_value': max_value,
                    'total_return': final_value - initial_value if initial_value > 0 else 0,
                    'return_pct': ((final_value - initial_value) / initial_value * 100) if initial_value > 0 else 0
                }
        return summary


class CommissionScheme(bt.CommInfoBase):
    """Custom commission scheme with fixed fees per trade"""

    params = (
        ('commission', 0.0),  # No percentage commission
        ('fixed_fee', 2.5),   # Fixed fee per trade
    )

    def _getcommission(self, size, price, pseudoexec):
        return self.p.fixed_fee


def next_weekday(date_str):
    date = datetime.strptime(date_str, "%Y-%m-%d").date()
    while date.weekday() > 4:
        date += timedelta(days=1)
    return date

def previous_weekday(date_str):
    date = datetime.strptime(date_str, "%Y-%m-%d").date()
    while date.weekday() > 4:
        date -= timedelta(days=1)
    return date


def run_backtest(strategy_data: dict = strategy_data, yf_ticker_dict: dict = results_dict, start_date: str = '2023-01-01', end_date: str = '2025-05-01'):
    """
    Run the backtest with multiple tickers

    Args:
        strategy_data: Dictionary containing congressional trading data
        yf_ticker_dict: Dictionary of tickers and yf dataframes to include in the backtest
        start_date: Start date for backtest
        end_date: End date for backtest
    """
    # Define yfinance start and end dates
    start_date = pd.to_datetime(next_weekday(start_date) + timedelta(days=1))
    end_date = pd.to_datetime(previous_weekday(end_date) - timedelta(days=1))

    # Filter out tickers which do not have yfinance data for the full date range
    yf_ticker_dict_copy = yf_ticker_dict.copy()
    for ticker, data_df in yf_ticker_dict.items():
        if data_df.index.min().date() != start_date.date() or data_df.index.max().date() != end_date.date():
            logger.warning(f"Data for {ticker} does not match the specified date range. Skipping this ticker.")
            del yf_ticker_dict_copy[ticker]
            updated_trade_list = [trade for trade in strategy_data['trades'] if trade.ticker in yf_ticker_dict_copy]
            updated_trade_dict = {ticker: trade_list for ticker, trade_list in strategy_data['trades_by_ticker'].items() if ticker in yf_ticker_dict_copy}
            total_trades = len(updated_trade_list)
            unique_tickers = len(updated_trade_dict)
            updated_volume = sum(trade.volume_midpoint for trade in updated_trade_list)
            strategy_data = {
                'trades': updated_trade_list,
                'trades_by_ticker': updated_trade_dict,
                'summary': {
                    'total_trades': total_trades,
                    'unique_tickers': unique_tickers,
                    'total_volume': updated_volume
                }
            }

    # Initialize Cerebro
    cerebro = bt.Cerebro(stdstats=True)
    cerebro.addstrategy(
        CongressionalTradingStrategy,
        strategy_data=strategy_data,
        start_date=start_date,
        end_date=end_date,
        verbose=True
    )

    # Add data feeds for each ticker
    for ticker, data_df in yf_ticker_dict_copy.items():
        data_feed = bt.feeds.PandasData(
            dataname=data_df,
            name=ticker,
            fromdate=pd.to_datetime(start_date.strftime("%Y-%m-%d")),
            todate=pd.to_datetime(end_date.strftime("%Y-%m-%d"))
        )
        cerebro.adddata(data_feed)
        logger.info(f"Added data feed for {ticker}")

    # Set initial capital
    config = TradingConfig()
    cerebro.broker.set_cash(config.initial_capital)

    # Set commission scheme
    cerebro.broker.addcommissioninfo(CommissionScheme())

    # Add analyzers
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')

    logger.info(f"Starting backtest with initial capital: ${config.initial_capital}")

    # Run backtest
    results = cerebro.run()
    strategy_instance = results[0]
    final_value = cerebro.broker.getvalue()

    # Get analyzer results
    returns_analyzer = strategy_instance.analyzers.returns.get_analysis()
    drawdown_analyzer = strategy_instance.analyzers.drawdown.get_analysis()
    sharpe_analyzer = strategy_instance.analyzers.sharpe.get_analysis()

    print("\n=== Backtest Results ===")
    print(f"Initial Capital: ${config.initial_capital:.2f}")
    print(f"Final Portfolio Value: ${final_value:.2f}")
    if 'rtot' in returns_analyzer:
        print(f"Total Return: {returns_analyzer['rtot'] * 100:.2f}%")
    if 'rnorm' in returns_analyzer:
        print(f"Normalized Return: {returns_analyzer['rnorm'] * 100:.2f}%")
    if 'max' in drawdown_analyzer:
        print(f"Maximum Drawdown: {drawdown_analyzer['max']['drawdown']:.2f}%")
    if 'len' in drawdown_analyzer:
        print(f"Longest Drawdown Period: {drawdown_analyzer['len']} days")
    if 'sharperatio' in sharpe_analyzer:
        print(f"Sharpe Ratio: {sharpe_analyzer['sharperatio']}")

    return results, cerebro



In [64]:
results, cerebro = run_backtest()




2025-07-23 15:09:49,266 - INFO - Added data feed for MSFT:US
2025-07-23 15:09:49,267 - INFO - Added data feed for AVGO:US
2025-07-23 15:09:49,267 - INFO - Added data feed for NVDA:US
2025-07-23 15:09:49,268 - INFO - Added data feed for BNPQY:US
2025-07-23 15:09:49,268 - INFO - Added data feed for RFRAX:US
2025-07-23 15:09:49,269 - INFO - Added data feed for PANW:US
2025-07-23 15:09:49,271 - INFO - Added data feed for VWLUX:US
2025-07-23 15:09:49,271 - INFO - Added data feed for GOOGL:US
2025-07-23 15:09:49,272 - INFO - Added data feed for DFCEX:US
2025-07-23 15:09:49,272 - INFO - Added data feed for FPAFY:US
2025-07-23 15:09:49,272 - INFO - Added data feed for VST:US
2025-07-23 15:09:49,273 - INFO - Added data feed for JPM:US
2025-07-23 15:09:49,273 - INFO - Added data feed for DFAS:US
2025-07-23 15:09:49,274 - INFO - Added data feed for DFAI:US
2025-07-23 15:09:49,274 - INFO - Added data feed for AAPL:US
2025-07-23 15:09:49,275 - INFO - Added data feed for ITOT:US
2025-07-23 15:09:49,


=== Backtest Results ===
Initial Capital: $1000.00
Final Portfolio Value: $16432.50
Total Return: 279.93%
Normalized Return: 167.84%
Maximum Drawdown: 13.45%
Longest Drawdown Period: 142 days
Sharpe Ratio: 0.9170452344648837


In [65]:
results[0].plot_asset_evolution(save_path='asset_evolution.png')


Plot saved to asset_evolution.png


<IPython.core.display.Javascript object>

In [66]:
def plot_asset_returns(asset_summary: dict, save_path: str = None):
    """Plot asset return percentages"""
    tickers = list(asset_summary.keys())
    returns = [asset_summary[ticker]['return_pct'] for ticker in tickers]
    colors = ['green' if r >= 0 else 'red' for r in returns]
    plt.figure(figsize=(10, 6))
    plt.bar(tickers, returns, color=colors)
    plt.axhline(0, color='black', linewidth=0.8)
    plt.title('Asset Return Percentages')
    plt.ylabel('Return (%)')
    plt.xticks(rotation=90, fontsize=5)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        print(f"Plot saved to {save_path}")
    plt.show()

plot_asset_returns(results[0].get_asset_summary(), save_path='asset_returns.png')


Plot saved to asset_returns.png


<IPython.core.display.Javascript object>

In [67]:
class SPYTradingStrategy(bt.Strategy):
    """
    Implements a buy and hold strategy that consistently performs SPY buy trades
    """

    params = (
        ('start_date', None),
        ('end_date', None),
        ('verbose', False)
    )

    def __init__(self):
        """Initialize the strategy with trades data"""
        self.config = TradingConfig()
        # Unpack yfinance data added to the strategy via the `adddata` method
        if len(self.datas) != 1:
            raise ValueError("SPYTradingStrategy expects exactly one data feed (SPY)")
        self.spy_data = self.datas[0]
        # Construct monthly buy signals
        start_date = self.params.start_date
        end_date = self.params.end_date
        self.monthly_buy_dates = self._generate_monthly_buy_dates(start_date, end_date)
        # Initialize portfolio variables
        self.year_tracker = start_date.year
        self.buy_margin = self.config.buy_margin
        self.last_rebalance_date = None
        self.verbose = self.params.verbose
        self.asset_history = defaultdict(list)  # stores (date, value) tuples for each asset
        self.portfolio_history = []  # stores (date, total_portfolio_value) tuples
        self.holdings = defaultdict(int)  # track number of shares held for each ticker

    def _generate_monthly_buy_dates(self, start_date: datetime, end_date: datetime) -> list[datetime]:
        """Generate monthly buy dates"""
        signals = []
        current_date = start_date
        while current_date <= end_date:
            trade_date = pd.to_datetime(next_weekday(current_date.strftime('%Y-%m-%d'))).date()
            signals.append(trade_date)
            if current_date.month == 12:
                current_date = current_date.replace(year=current_date.year + 1, month=1)
            else:
                current_date = current_date.replace(month=current_date.month + 1)
        return signals

    def next(self):
        """Called for each bar in the backtest"""
        current_date = self.datas[0].datetime.date(0)
        current_year = current_date.year
        # Handle annual holding fees
        if current_year > self.year_tracker:
            self._subtract_yearly_holding_fees()
            self.year_tracker = current_year
        # Check if we need to rebalance
        if current_date in self.monthly_buy_dates and current_date != self.last_rebalance_date:
            if self.verbose:
                logger.info(f"Monthly SPY purchase on {current_date}...")
                logger.info(f"Current cash: ${self.broker.get_cash():.2f}.")
            self._rebalance_portfolio()
            self.last_rebalance_date = current_date
            if self.verbose:
                logger.info("Adding monthly cash contribution...")
            self.broker.add_cash(self.config.monthly_contribution)
        # Update portfolio history
        self._track_asset_values(current_date)

    def _track_asset_values(self, current_date):
        """Track the current market value of all holdings"""
        total_portfolio_value = 0
        if "SPY" in self.holdings and self.holdings["SPY"] > 0:
            current_price = self.spy_data.close[0]
            current_value = self.holdings["SPY"] * current_price
            self.asset_history["SPY"].append((current_date, current_price))
            total_portfolio_value += current_value
        self.portfolio_history.append((current_date, total_portfolio_value))

    def _rebalance_portfolio(self):
        """Perform monthly SPY purchase"""
        available_cash = self.broker.get_cash()
        current_price = self.spy_data.close[0]
        if self.verbose:
            logger.info(f"Trying to buy SPY at ${current_price:.2f}...")
        max_shares = int((available_cash - self.config.trade_fee - self.buy_margin) // current_price)
        if max_shares > 0:
            if self.verbose:
                logger.info(f"Placing a buy order for {max_shares} shares of SPY at ${current_price:.2f}...")
            # Create order which will be executed the next day
            self.buy(data=self.spy_data, size=max_shares)
            self.holdings["SPY"] += max_shares
            available_cash -= (max_shares * current_price + self.config.trade_fee)
            if self.verbose:
                logger.info(f"Estimated ${available_cash:.2f} cash remaining...")
        else:
            if self.verbose:
                logger.info(f"Not enough cash to buy SPY at ${current_price:.2f}. Skipping this month...")

    def _subtract_yearly_holding_fees(self):
        """Subtract annual holding fees from cash"""
        total_portfolio_value = self.broker.getvalue() - self.broker.get_cash()
        if total_portfolio_value <= 0:
            raise RuntimeError("Portfolio value is zero or negative, cannot calculate holding fees")
        total_fee = total_portfolio_value * (self.config.annual_holding_fee_pct / 100)
        if self.broker.get_cash() < total_fee:
            logger.warning("Not enough cash to pay full annual holding fees")
            raise RuntimeError("Not enough cash to pay annual holding fees")
        if total_fee > 0:
            logger.info(f"Subtracting annual holding fees: ${total_fee:.2f}")
            self.broker.add_cash(-total_fee)

    def plot_asset_evolution(self, save_path=None, figsize=(15, 10)):
        """
        Plot the evolution of asset values over time
        """
        if not self.asset_history:
            raise ValueError("No asset history to plot. Make sure the backtest has been run.")

        # Create subplots
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=figsize)
        fig.subplots_adjust(hspace=1)

        # Plot 1: Individual asset values
        for ticker, history in self.asset_history.items():
            if history:
                dates, values = zip(*history)
                if max(values) > 0:
                    ax1.plot(dates, values, label=ticker, linewidth=2)

        ax1.set_title('Evolution of Individual Asset Values', fontsize=14, fontweight='bold')
        ax1.set_xlabel('Date')
        ax1.set_ylabel('Asset Value ($)')
        ax1.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        ax1.grid(True, alpha=0.3)
        ax1.tick_params(axis='x', rotation=45)

        # Plot 2: Total portfolio value
        if self.portfolio_history:
            dates, values = zip(*self.portfolio_history)
            ax2.plot(dates, values, color='black', linewidth=3, label='Total Portfolio')
            ax2.fill_between(dates, values, alpha=0.3, color='black')

        ax2.set_title('Total Portfolio Value Evolution', fontsize=14, fontweight='bold')
        ax2.set_xlabel('Date')
        ax2.set_ylabel('Portfolio Value ($)')
        ax2.grid(True, alpha=0.3)
        ax2.tick_params(axis='x', rotation=45)

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"Plot saved to {save_path}")

        plt.show()

    def get_asset_summary(self):
        """
        Return a summary of asset performance
        """
        summary = {}
        for ticker, history in self.asset_history.items():
            if history and max([v for _, v in history]) > 0:
                dates, values = zip(*history)
                initial_value = next((v for v in values if v > 0), 0)
                final_value = values[-1]
                max_value = max(values)
                summary[ticker] = {
                    'initial_value': initial_value,
                    'final_value': final_value,
                    'max_value': max_value,
                    'total_return': final_value - initial_value if initial_value > 0 else 0,
                    'return_pct': ((final_value - initial_value) / initial_value * 100) if initial_value > 0 else 0
                }
        return summary


def run_benchmark_backtest(start_date: str = '2023-01-01', end_date: str = '2025-05-01'):
    """
    Run the backtest for the SPY trading strategy

    Args:
        start_date: Start date for backtest
        end_date: End date for backtest
    """
    # Define yfinance start and end dates
    start_date = pd.to_datetime(next_weekday(start_date) + timedelta(days=1))
    end_date = pd.to_datetime(previous_weekday(end_date) - timedelta(days=1))

    # Initialize Cerebro
    cerebro = bt.Cerebro(stdstats=True)
    cerebro.addstrategy(
        SPYTradingStrategy,
        start_date=start_date,
        end_date=end_date,
        verbose=True
    )

    # Add data feed for SPY
    stock = yf.Ticker("SPY")
    data = stock.history(start=start_date.strftime("%Y-%m-%d"), end=end_date.strftime("%Y-%m-%d"))
    data_feed = bt.feeds.PandasData(
        dataname=data,
        name="SPY",
        fromdate=pd.to_datetime(start_date.strftime("%Y-%m-%d")),
        todate=pd.to_datetime(end_date.strftime("%Y-%m-%d"))
    )
    cerebro.adddata(data_feed)
    logger.info("Added data feed for SPY")

    # Set initial capital
    config = TradingConfig()
    cerebro.broker.set_cash(config.initial_capital)

    # Set commission scheme
    cerebro.broker.addcommissioninfo(CommissionScheme())

    # Add analyzers
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')

    logger.info(f"Starting backtest with initial capital: ${config.initial_capital}")

    # Run backtest
    results = cerebro.run()
    strategy_instance = results[0]
    final_value = cerebro.broker.getvalue()

    # Get analyzer results
    returns_analyzer = strategy_instance.analyzers.returns.get_analysis()
    drawdown_analyzer = strategy_instance.analyzers.drawdown.get_analysis()
    sharpe_analyzer = strategy_instance.analyzers.sharpe.get_analysis()

    print("\n=== Backtest Results ===")
    print(f"Initial Capital: ${config.initial_capital:.2f}")
    print(f"Final Portfolio Value: ${final_value:.2f}")
    if 'rtot' in returns_analyzer:
        print(f"Total Return: {returns_analyzer['rtot'] * 100:.2f}%")
    if 'rnorm' in returns_analyzer:
        print(f"Normalized Return: {returns_analyzer['rnorm'] * 100:.2f}%")
    if 'max' in drawdown_analyzer:
        print(f"Maximum Drawdown: {drawdown_analyzer['max']['drawdown']:.2f}%")
    if 'len' in drawdown_analyzer:
        print(f"Longest Drawdown Period: {drawdown_analyzer['len']} days")
    if 'sharperatio' in sharpe_analyzer:
        print(f"Sharpe Ratio: {sharpe_analyzer['sharperatio']}")

    return results, cerebro


In [68]:
benchmark_results, benchmark_cerebro = run_benchmark_backtest()


2025-07-23 15:16:15,696 - INFO - Added data feed for SPY
2025-07-23 15:16:15,696 - INFO - Starting backtest with initial capital: $1000
2025-07-23 15:16:15,726 - INFO - Monthly SPY purchase on 2023-01-03...
2025-07-23 15:16:15,727 - INFO - Current cash: $1000.00.
2025-07-23 15:16:15,727 - INFO - Trying to buy SPY at $368.17...
2025-07-23 15:16:15,727 - INFO - Placing a buy order for 2 shares of SPY at $368.17...
2025-07-23 15:16:15,727 - INFO - Estimated $261.16 cash remaining...
2025-07-23 15:16:15,728 - INFO - Adding monthly cash contribution...
2025-07-23 15:16:15,729 - INFO - Monthly SPY purchase on 2023-02-03...
2025-07-23 15:16:15,729 - INFO - Current cash: $756.60.
2025-07-23 15:16:15,729 - INFO - Trying to buy SPY at $398.65...
2025-07-23 15:16:15,729 - INFO - Placing a buy order for 1 shares of SPY at $398.65...
2025-07-23 15:16:15,729 - INFO - Estimated $355.45 cash remaining...
2025-07-23 15:16:15,729 - INFO - Adding monthly cash contribution...
2025-07-23 15:16:15,730 - INF


=== Backtest Results ===
Initial Capital: $1000.00
Final Portfolio Value: $16672.83
Total Return: 281.38%
Normalized Return: 238.16%
Maximum Drawdown: 12.78%
Longest Drawdown Period: 48 days
Sharpe Ratio: 0.9166677978164882


In [69]:
benchmark_results[0].plot_asset_evolution(save_path='benchmark_asset_evolution.png')


Plot saved to benchmark_asset_evolution.png


<IPython.core.display.Javascript object>