In [None]:
"""
Historical Price Data with ADV Filter using LSEG Data Packages

This script fetches historical price and volume data, then filters stocks
based on 3-month Average Daily Volume (ADV) >= 5M USD.

ADV formula: ADV_t = (1/60) * Σ(USD_Volume_{t-i}) for i=1 to 60
"""

import lseg.data as ld
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
from pandas.tseries.holiday import USFederalHolidayCalendar
from pandas.tseries.offsets import CustomBusinessDay

warnings.filterwarnings('ignore')

Not done yet - filtering, currency/ calender conversion. This can help implement time series signals/ trading limits for single stocks as well.

In [None]:
ld.open_session()

In [None]:
# Configuration
ADV_THRESHOLD_USD = 5_000_000  # 5M USD
LOOKBACK_DAYS = 60  # 3 months (trading days)

In [None]:
russel = ld.get_data("0#.RUA",fields = 'TR.CommonName')
russel.head()

In [None]:
rics = russel["Instrument"].astype(str).tolist()

In [None]:
end_date = datetime.now().strftime('%Y-%m-%d')

us_bd = CustomBusinessDay(calendar=USFederalHolidayCalendar())

end = pd.to_datetime(end_date)
start = end - 60 * us_bd

start_date = start.strftime('%Y-%m-%d')

In [None]:
def get_historical_data(rics, start_date, end_date, fields=None, batch_size=50):
    """
    Fetch historical price and volume data in batches (single-core)
    
    Parameters:
    -----------
    rics : list
        List of Reuters Instrument Codes (RICs)
    start_date : str
        Start date in 'YYYY-MM-DD' format
    end_date : str
        End date in 'YYYY-MM-DD' format
    fields : list, optional
        Fields to retrieve. Default includes price and volume fields.
    batch_size : int, optional
        Number of RICs to process per batch (default: 50)
    
    Returns:
    --------
    pd.DataFrame
        Historical data with multi-index (Date, RIC)
    """
    if fields is None:
        fields = [
            'TR.PriceClose',           # Closing price
            'TR.PriceOpen',            # Opening price
            'TR.PriceHigh',            # High price
            'TR.PriceLow',             # Low price
            'TR.Volume',               # Trading volume (shares)
            'TR.PriceClose.currency'   # Currency for price
        ]
    
    total_rics = len(rics)
    num_batches = (total_rics + batch_size - 1) // batch_size
    
    print(f"\nFetching data for {total_rics} instruments...")
    print(f"Date range: {start_date} to {end_date}")
    print(f"Processing in {num_batches} batches of ~{batch_size} RICs each (single-core)\n")
    
    all_data = []
    failed_batches = []
    
    try:
        
        for batch_num in range(num_batches):
            # Get batch of RICs
            batch_start = batch_num * batch_size
            batch_end = min((batch_num + 1) * batch_size, total_rics)
            batch_rics = rics[batch_start:batch_end]

            
            try:
                # Fetch batch data
                df_batch = ld.get_history(
                    universe=batch_rics,
                    fields=fields,
                    start=start_date,
                    end=end_date,
                    interval='daily'
                )
                
                if not df_batch.empty:
                    df_batch = df_batch.stack(level=0)   # stack RICs
                    all_data.append(df_batch)
                    print("✓")
                else:
                    print("✗ (no data)")
                


                progress = (batch_end / total_rics) * 100
                print(f"Progress: {progress:5.1f}% | Batch {batch_num+1}/{num_batches}s")
                        
            except Exception as batch_error:
                failed_batches.append(batch_num + 1)
                print(f"\n⚠ Warning: Batch {batch_num+1} failed: {batch_error}")

        
        # Combine all batches
        if all_data:
            df_combined = pd.concat(all_data)
            df_combined.index.names = ["Date", "RIC"]
            
            #print(f"\n✓ Retrieved {len(df_combined):,} data points in {elapsed:.1f}s")
            #print(f"  Unique instruments: {df_combined['Instrument'].nunique()}")
            #print(f"  Date range: {df_combined['Date'].min()} to {df_combined['Date'].max()}")
            
            if failed_batches:
                print(f"  ⚠ Failed batches: {failed_batches}")
            
            return df_combined
        else:
            print(f"\n✗ No data retrieved")
            return None
            
    except Exception as e:
        print(f"\n✗ Error fetching historical data: {e}")
        return None

In [None]:
df = get_historical_data(rics, start_date, end_date, fields = None)

In [None]:
df.columns

In [None]:
def calculate_usd_volume(df):
    """
    Calculate USD trading volume for each (Date, RIC)

    USD_Volume = Volume * Price Close * FX_Rate (if not USD)

    Parameters:
    -----------
    df : pd.DataFrame
        Must contain:
        ['Volume', 'Price Close', 'Currency']

    Returns:
    --------
    pd.DataFrame
        With added 'USD_Volume' column
    """
    print("\nCalculating USD volume...")

    df = df.copy()

    # Sanity check
    required_cols = {"Volume", "Price Close", "Currency"}
    missing = required_cols - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    # Base calculation (assume USD)
    df["USD_Volume"] = df["Volume"] * df["Price Close"]

    # Identify non-USD rows
    non_usd = df["Currency"] != "USD"

    if non_usd.any():
        print(f"⚠ {non_usd.sum()} rows are non-USD (FX rate = 1.0 placeholder)")
        # Placeholder FX rate
        df.loc[non_usd, "USD_Volume"] *= 1.0

    return df


In [None]:
hist_data = calculate_usd_volume(df)

In [None]:
hist_data.head()

In [None]:
def calculate_adv(df, lookback_days=60, min_coverage=0.8):
    print(f"\nCalculating {lookback_days}-day ADV for each stock...")

    adv_records = []

    for ric, g in df.groupby(level="RIC"):
        g = g.sort_index(level="Date")

        recent = g.tail(lookback_days)
        n_obs = recent["USD_Volume"].notna().sum()

        if n_obs >= lookback_days * min_coverage:
            adv_records.append({
                "RIC": ric,
                "ADV_USD": recent["USD_Volume"].mean(),
                "Days_Used": n_obs,
                "Latest_Date": recent.index.get_level_values("Date")[-1]
            })
        else:
            print(f"  Warning: Insufficient data for {ric} ({n_obs} days)")

    return pd.DataFrame(adv_records)


In [None]:
adv_summary = calculate_adv(hist_data, lookback_days=60)

In [None]:
adv_summary.head()

In [None]:
def filter_by_adv(adv_df, threshold_usd=5_000_000):
    """
    Filter stocks based on ADV threshold
    
    Parameters:
    -----------
    adv_df : pd.DataFrame
        DataFrame with ADV calculations
    threshold_usd : float
        Minimum ADV in USD (default 5M)
    
    Returns:
    --------
    tuple
        (filtered_df, excluded_df)
    """
    print(f"\nFiltering stocks with ADV >= ${threshold_usd:,.0f}...")
    
    filtered = adv_df[adv_df['ADV_USD'] >= threshold_usd].copy()
    excluded = adv_df[adv_df['ADV_USD'] < threshold_usd].copy()
    
    print(f"✓ Passed filter: {len(filtered)} stocks")
    print(f"✗ Excluded: {len(excluded)} stocks")
    
    return filtered, excluded

In [None]:
adv_threshold = 5000000
filtered_adv, excluded_adv = filter_by_adv(adv_summary, threshold_usd=adv_threshold)