# Step 1: Ingest and Normalize Transcripts

This notebook:
1. Fetches earnings call transcripts from Financial Modeling Prep API
2. Cleans and normalizes the text
3. Separates presentation vs Q&A sections (if available)
4. Outputs standardized parquet file


In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
import json
import re
from urllib.request import urlopen
import certifi
import time
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock, Semaphore
import requests
from collections import deque

# Load config
BASE_DIR = Path('/Users/david/Desktop/MATH-GA 2707/Moving Target')
CONFIG_DIR = BASE_DIR / 'configs'
INTERMEDIATE_DIR = BASE_DIR / 'data' / 'intermediate'

with open(CONFIG_DIR / 'base.json', 'r') as f:
    config = json.load(f)

# Convert string paths back to Path objects
for key in config['data']:
    config['data'][key] = Path(config['data'][key])

API_KEY = config['api']['fmp_api_key']
BASE_URL = config['api']['base_url']
START_YEAR = config['dates']['start_year']
END_YEAR = config['dates']['end_year']

print(f"API Key: {API_KEY[:10]}...")
print(f"Date range: {START_YEAR}-{END_YEAR}")


API Key: wSb1mJ4mrG...
Date range: 2010-2024


In [2]:
# Load Russell 3000 tickers
with open(INTERMEDIATE_DIR / 'russell_3000_tickers.json', 'r') as f:
    tickers = json.load(f)

print(f"Loaded {len(tickers)} tickers from Russell 3000")


Loaded 2611 tickers from Russell 3000


In [4]:
def get_jsonparsed_data(url, max_retries=3):
    """Fetch and parse JSON from URL with retries"""
    for attempt in range(max_retries):
        try:
            # Use requests instead of urlopen for better performance
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(0.5 * (attempt + 1))  # Exponential backoff
                continue
            return None
    return None

def clean_text(text):
    """Clean transcript text"""
    if not text or pd.isna(text):
        return ""
    
    # Remove excessive whitespace
    text = re.sub(r'\s+', ' ', text)
    
    # Remove common boilerplate headers
    boilerplate_patterns = [
        r'Operator:.*?Please go ahead',
        r'This call is being recorded',
        r'Forward-looking statements.*?SEC today',
    ]
    for pattern in boilerplate_patterns:
        text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)
    
    # Normalize whitespace again
    text = text.strip()
    
    return text

def separate_sections(text):
    """Attempt to separate presentation from Q&A"""
    # Common Q&A markers
    qa_markers = [
        r'Question-and-Answer Session',
        r'Q&A Session',
        r'Questions and Answers',
        r'Operator:.*?questions',
    ]
    
    presentation = text
    qa = ""
    
    for marker in qa_markers:
        match = re.search(marker, text, re.IGNORECASE)
        if match:
            split_pos = match.start()
            presentation = text[:split_pos].strip()
            qa = text[split_pos:].strip()
            break
    
    return presentation, qa

print("Helper functions defined")


Helper functions defined


In [4]:
# Fetch transcripts for ALL tickers from start year to end year
# Using parallel processing to speed up (3000 calls/minute limit)

# Rate limiting: 3000 calls/minute = 50 calls/second
# Use 40 concurrent workers to stay safely under limit
MAX_WORKERS = 40
CALLS_PER_MINUTE = 3000
CALLS_PER_SECOND = CALLS_PER_MINUTE / 60  # ~50 calls/second

# Rate limiter: track API call timestamps
call_times = deque()
rate_lock = Lock()

def rate_limited_call():
    """Ensure we don't exceed 3000 calls per minute"""
    with rate_lock:
        now = time.time()
        # Remove calls older than 1 minute
        while call_times and call_times[0] < now - 60:
            call_times.popleft()
        
        # If we're at the limit, wait
        if len(call_times) >= CALLS_PER_MINUTE:
            sleep_time = 60 - (now - call_times[0]) + 0.1
            if sleep_time > 0:
                time.sleep(sleep_time)
                # Clean up again after waiting
                now = time.time()
                while call_times and call_times[0] < now - 60:
                    call_times.popleft()
        
        # Record this call
        call_times.append(time.time())

# Thread-safe lists
transcripts_list = []
list_lock = Lock()
failed_requests = []
failed_lock = Lock()

# Generate all URL requests
def generate_requests():
    """Generate all API request URLs"""
    requests_list = []
    for ticker in tickers:
        for year in range(START_YEAR, END_YEAR + 1):
            for quarter in [1, 2, 3, 4]:
                url = f"{BASE_URL}/earning-call-transcript?symbol={ticker}&year={year}&quarter={quarter}&apikey={API_KEY}"
                requests_list.append({
                    'url': url,
                    'ticker': ticker,
                    'year': year,
                    'quarter': quarter
                })
    return requests_list

def fetch_single_transcript(request_info):
    """Fetch a single transcript and return processed data"""
    # Rate limit before making the call
    rate_limited_call()
    
    url = request_info['url']
    ticker = request_info['ticker']
    year = request_info['year']
    quarter = request_info['quarter']
    
    try:
        data = get_jsonparsed_data(url)
        
        if data and len(data) > 0:
            results = []
            for item in data:
                transcript_raw = item.get('content', '')
                if transcript_raw:
                    transcript_clean = clean_text(transcript_raw)
                    text_presentation, text_qa = separate_sections(transcript_clean)
                    
                    results.append({
                        'ticker': ticker,
                        'symbol': item.get('symbol', ticker),
                        'year': item.get('year', year),
                        'quarter': item.get('period', f'Q{quarter}').replace('Q', ''),
                        'date': item.get('date', ''),
                        'text_raw': transcript_raw,
                        'text_presentation': text_presentation,
                        'text_qa': text_qa,
                        'text_full': transcript_clean,
                        'source_vendor': 'FMP',
                        'call_id': f"{ticker}_{year}_Q{quarter}"
                    })
            return results
        return []
    except Exception as e:
        with failed_lock:
            failed_requests.append({
                'ticker': ticker,
                'year': year,
                'quarter': quarter,
                'error': str(e)
            })
        return []

# Generate all requests
all_requests = generate_requests()
total_requests = len(all_requests)

print(f"Fetching transcripts for ALL {len(tickers)} tickers...")
print(f"Date range: {START_YEAR} to {END_YEAR} ({END_YEAR - START_YEAR + 1} years)")
print(f"Total API requests: {total_requests:,}")
print(f"Using {MAX_WORKERS} parallel workers")
print(f"Estimated time: ~{total_requests / CALLS_PER_MINUTE:.1f} minutes")
print(f"Rate limit: {CALLS_PER_MINUTE} calls/minute\n")

# Process requests in parallel with rate limiting
transcripts_list = []
checkpoint_interval = 5000  # Save checkpoint every 5000 requests

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    # Submit all tasks
    future_to_request = {
        executor.submit(fetch_single_transcript, req): req 
        for req in all_requests
    }
    
    # Process completed tasks with progress bar
    completed = 0
    for future in tqdm(as_completed(future_to_request), total=total_requests, desc="Fetching transcripts"):
        try:
            results = future.result()
            if results:
                with list_lock:
                    transcripts_list.extend(results)
            
            completed += 1
            
            # Save checkpoint periodically
            if completed % checkpoint_interval == 0:
                with list_lock:
                    if transcripts_list:
                        df_temp = pd.DataFrame(transcripts_list)
                        checkpoint_file = INTERMEDIATE_DIR / f'transcripts_checkpoint_{completed}.parquet'
                        df_temp.to_parquet(checkpoint_file, index=False, engine='pyarrow')
                        print(f"\n  Checkpoint: {len(transcripts_list)} transcripts after {completed:,} requests")
        except Exception as e:
            req = future_to_request[future]
            with failed_lock:
                failed_requests.append({
                    'ticker': req['ticker'],
                    'year': req['year'],
                    'quarter': req['quarter'],
                    'error': str(e)
                })

print(f"\n{'='*60}")
print(f"Fetching complete!")
print(f"  Total transcripts fetched: {len(transcripts_list)}")
print(f"  Total requests processed: {total_requests:,}")
if failed_requests:
    failed_tickers = set([r['ticker'] for r in failed_requests])
    print(f"  Failed requests: {len(failed_requests)}")
    print(f"  Tickers with errors: {len(failed_tickers)}")
    if len(failed_tickers) <= 20:
        print(f"  Failed tickers: {sorted(failed_tickers)}")
    else:
        print(f"  Failed tickers (first 20): {sorted(list(failed_tickers))[:20]}")
print(f"{'='*60}")


Fetching transcripts for ALL 2611 tickers...
Date range: 2010 to 2024 (15 years)
Total API requests: 156,660
Using 40 parallel workers
Estimated time: ~52.2 minutes
Rate limit: 3000 calls/minute



Fetching transcripts:   3%|▎         | 5009/156660 [01:10<2:16:43, 18.49it/s] 


  Checkpoint: 3391 transcripts after 5,000 requests


Fetching transcripts:   7%|▋         | 10354/156660 [03:03<26:28, 92.10it/s]  


  Checkpoint: 6712 transcripts after 10,000 requests


Fetching transcripts:  10%|▉         | 14996/156660 [04:17<15:56, 148.09it/s]  


  Checkpoint: 9879 transcripts after 15,000 requests


Fetching transcripts:  13%|█▎        | 19985/156660 [06:20<20:36, 110.57it/s]  


  Checkpoint: 13124 transcripts after 20,000 requests


Fetching transcripts:  17%|█▋        | 25916/156660 [08:10<19:12, 113.45it/s]  


  Checkpoint: 16322 transcripts after 25,000 requests


Fetching transcripts:  19%|█▉        | 29994/156660 [09:25<17:20, 121.71it/s]  


  Checkpoint: 19438 transcripts after 30,000 requests


Fetching transcripts:  22%|██▏       | 35000/156660 [11:39<15:06:47,  2.24it/s]


  Checkpoint: 22963 transcripts after 35,000 requests


Fetching transcripts:  26%|██▌       | 40001/156660 [13:35<23:57:37,  1.35it/s]


  Checkpoint: 26403 transcripts after 40,000 requests


Fetching transcripts:  29%|██▊       | 45000/156660 [15:13<37:03:17,  1.19s/it]


  Checkpoint: 29638 transcripts after 45,000 requests


Fetching transcripts:  32%|███▏      | 50000/156660 [17:13<23:50:20,  1.24it/s]


  Checkpoint: 32997 transcripts after 50,000 requests


Fetching transcripts:  36%|███▋      | 56902/156660 [19:01<31:08, 53.38it/s]   


  Checkpoint: 36367 transcripts after 55,000 requests


Fetching transcripts:  38%|███▊      | 60001/156660 [21:00<37:32:12,  1.40s/it]


  Checkpoint: 39515 transcripts after 60,000 requests


Fetching transcripts:  41%|████▏     | 65000/156660 [23:10<23:42:28,  1.07it/s]


  Checkpoint: 43073 transcripts after 65,000 requests


Fetching transcripts:  45%|████▌     | 70602/156660 [25:42<2:00:02, 11.95it/s] 


  Checkpoint: 46569 transcripts after 70,000 requests


Fetching transcripts:  48%|████▊     | 75229/156660 [29:27<11:40:41,  1.94it/s] 


  Checkpoint: 49963 transcripts after 75,000 requests


Fetching transcripts:  52%|█████▏    | 81402/156660 [34:07<1:13:27, 17.07it/s] 


  Checkpoint: 53352 transcripts after 80,000 requests


Fetching transcripts:  54%|█████▍    | 85000/156660 [38:30<1:41:33, 11.76it/s]


  Checkpoint: 56654 transcripts after 85,000 requests


Fetching transcripts:  57%|█████▋    | 90000/156660 [46:24<4:53:29,  3.79it/s]


  Checkpoint: 60098 transcripts after 90,000 requests


Fetching transcripts:  61%|██████    | 95000/156660 [58:31<32:59:02,  1.93s/it]


  Checkpoint: 63509 transcripts after 95,000 requests


Fetching transcripts:  64%|██████▍   | 100000/156660 [1:05:20<7:57:56,  1.98it/s]


  Checkpoint: 66746 transcripts after 100,000 requests


Fetching transcripts:  67%|██████▋   | 105000/156660 [1:19:30<19:52:06,  1.38s/it]


  Checkpoint: 70029 transcripts after 105,000 requests


Fetching transcripts:  70%|███████   | 110001/156660 [1:30:49<24:07:22,  1.86s/it]


  Checkpoint: 73348 transcripts after 110,000 requests


Fetching transcripts:  77%|███████▋  | 119999/156660 [1:57:14<35:49, 17.06it/s]   


KeyboardInterrupt: 

## Recovery: Load Latest Checkpoint and Resume

If the process was interrupted, run this cell to recover your data and resume from where it left off.


In [6]:
# RECOVERY: Load latest checkpoint and identify what's already been processed
import glob

# Find all checkpoint files
checkpoint_files = sorted(glob.glob(str(INTERMEDIATE_DIR / 'transcripts_checkpoint_*.parquet')))

if checkpoint_files:
    # Load the latest checkpoint
    latest_checkpoint = checkpoint_files[-1]
    print(f"Found latest checkpoint: {latest_checkpoint}")
    
    # Extract checkpoint number
    checkpoint_num = int(latest_checkpoint.split('_')[-1].replace('.parquet', ''))
    print(f"Checkpoint number: {checkpoint_num:,} requests")
    
    # Load the checkpoint data
    df_checkpoint = pd.read_parquet(latest_checkpoint)
    print(f"\nLoaded checkpoint with {len(df_checkpoint):,} transcripts")
    print(f"Columns in checkpoint: {list(df_checkpoint.columns)}")
    print(f"Unique tickers in checkpoint: {df_checkpoint['ticker'].nunique()}")
    
    # Handle call_date column - create it from 'date' if needed
    if 'call_date' not in df_checkpoint.columns:
        if 'date' in df_checkpoint.columns:
            print("  Creating 'call_date' from 'date' column...")
            df_checkpoint['call_date'] = pd.to_datetime(df_checkpoint['date'], errors='coerce')
        else:
            print("  Warning: No 'date' or 'call_date' column found. Creating placeholder dates.")
            df_checkpoint['call_date'] = pd.NaT
    
    # Show date range if available
    if 'call_date' in df_checkpoint.columns and df_checkpoint['call_date'].notna().any():
        print(f"Date range: {df_checkpoint['call_date'].min()} to {df_checkpoint['call_date'].max()}")
    else:
        print("Date range: Not available")
    
    # Create a set of already processed (ticker, year, quarter) combinations
    # Handle year and quarter columns - they might be named differently
    if 'year' not in df_checkpoint.columns:
        if 'fiscal_year' in df_checkpoint.columns:
            df_checkpoint['year'] = df_checkpoint['fiscal_year']
        elif 'fyearq' in df_checkpoint.columns:
            df_checkpoint['year'] = df_checkpoint['fyearq']
        else:
            print("  Warning: No year column found. Cannot track processed combinations.")
            df_checkpoint['year'] = None
    
    if 'quarter' not in df_checkpoint.columns:
        if 'fiscal_quarter' in df_checkpoint.columns:
            df_checkpoint['quarter'] = df_checkpoint['fiscal_quarter']
        elif 'fqtr' in df_checkpoint.columns:
            df_checkpoint['quarter'] = df_checkpoint['fqtr']
        else:
            print("  Warning: No quarter column found. Cannot track processed combinations.")
            df_checkpoint['quarter'] = None
    
    # Create processed_combos if we have year and quarter
    if df_checkpoint['year'].notna().any() and df_checkpoint['quarter'].notna().any():
        df_checkpoint['year'] = df_checkpoint['year'].astype(int)
        df_checkpoint['quarter'] = df_checkpoint['quarter'].astype(int)
        processed_combos = set(
            zip(df_checkpoint['ticker'], df_checkpoint['year'], df_checkpoint['quarter'])
        )
        print(f"\nAlready processed combinations: {len(processed_combos):,}")
    else:
        processed_combos = set()
        print(f"\nWarning: Could not determine processed combinations. Will reprocess all.")
    
    # Store for use in next cell
    transcripts_list = df_checkpoint.to_dict('records')
    print(f"\n✓ Recovery complete! Ready to resume processing.")
    print(f"  Current transcripts: {len(transcripts_list):,}")
    print(f"  Processed combinations: {len(processed_combos):,}")
    
else:
    print("No checkpoint files found. Starting fresh.")
    transcripts_list = []
    processed_combos = set()


Found latest checkpoint: /Users/david/Desktop/MATH-GA 2707/Moving Target/data/intermediate/transcripts_checkpoint_95000.parquet
Checkpoint number: 95,000 requests


: 

In [None]:
# Convert to DataFrame
if transcripts_list:
    df_transcripts = pd.DataFrame(transcripts_list)
    
    # Parse dates
    df_transcripts['call_date'] = pd.to_datetime(df_transcripts['date'], errors='coerce')
    df_transcripts['fiscal_year'] = df_transcripts['year']
    df_transcripts['fiscal_quarter'] = df_transcripts['quarter'].astype(int)
    
    # Add text length metrics
    df_transcripts['len_presentation'] = df_transcripts['text_presentation'].str.len()
    df_transcripts['len_qa'] = df_transcripts['text_qa'].str.len()
    df_transcripts['len_full'] = df_transcripts['text_full'].str.len()
    
    # Sort by ticker and date
    df_transcripts = df_transcripts.sort_values(['ticker', 'call_date']).reset_index(drop=True)
    
    print(f"Created DataFrame with {len(df_transcripts)} transcripts")
    print(f"\nColumns: {list(df_transcripts.columns)}")
    print(f"\nDate range: {df_transcripts['call_date'].min()} to {df_transcripts['call_date'].max()}")
    print(f"\nSample:")
    print(df_transcripts[['ticker', 'call_date', 'fiscal_year', 'fiscal_quarter', 'len_full']].head(10))
else:
    print("No transcripts fetched. Check API key and network connection.")
    df_transcripts = pd.DataFrame()


Created DataFrame with 63509 transcripts

Columns: ['ticker', 'symbol', 'year', 'quarter', 'date', 'text_raw', 'text_presentation', 'text_qa', 'text_full', 'source_vendor', 'call_id', 'call_date', 'fiscal_year', 'fiscal_quarter', 'len_presentation', 'len_qa', 'len_full']

Date range: 2009-05-21 00:00:00 to 2025-04-15 00:00:00

Sample:
  ticker  call_date  fiscal_year  fiscal_quarter  len_full
0      A 2010-02-12         2010               1     30481
1      A 2010-05-18         2010               2     44386
2      A 2011-02-14         2011               1     56947
3      A 2011-05-13         2011               2     51672
4      A 2011-08-15         2011               3     55869
5      A 2011-11-15         2011               4     56508
6      A 2012-02-15         2012               1     68830
7      A 2012-05-14         2012               2     44642
8      A 2012-08-15         2012               3     62928
9      A 2012-11-19         2012               4     58603


In [None]:
# Save to parquet
if not df_transcripts.empty:
    output_file = config['data']['transcripts_clean']
    df_transcripts.to_parquet(output_file, index=False, engine='pyarrow')
    print(f"\nSaved {len(df_transcripts)} transcripts to: {output_file}")
    
    # Summary statistics
    print("\nSummary Statistics:")
    print(f"  Unique tickers: {df_transcripts['ticker'].nunique()}")
    print(f"  Average transcript length: {df_transcripts['len_full'].mean():.0f} characters")
    print(f"  Transcripts with Q&A section: {(df_transcripts['len_qa'] > 0).sum()}")
    print(f"  Date range: {df_transcripts['call_date'].min()} to {df_transcripts['call_date'].max()}")
else:
    print("No data to save")



Saved 63509 transcripts to: /Users/david/Desktop/MATH-GA 2707/Moving Target/data/intermediate/transcripts_clean.parquet

Summary Statistics:
  Unique tickers: 1475
  Average transcript length: 41823 characters
  Transcripts with Q&A section: 58057
  Date range: 2009-05-21 00:00:00 to 2025-04-15 00:00:00
