In [1]:
!pip install ffiec-data-collector


Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
"""
FFIEC Complete Data Downloader (Step 1)
=======================================
Purpose: Download ALL available FFIEC Call Report data for ALL banks
         and save to a single CSV file.

This script downloads CALL_SINGLE for each quarter, extracts ALL banks' data,
and combines everything into a single wide-format CSV.

Output: ffiec_all_banks.csv
        (One row per bank per quarter, thousands of columns from all schedules)

NOTE: This file will be LARGE (~500MB - 1GB) but only needs to be run ONCE.
      Use Step 2 to filter to specific banks for analysis.

Author: Wake Forest MSBA Practicum Team 4
Date: January 2026
"""

import zipfile
import time
import logging
from pathlib import Path
from io import StringIO

import pandas as pd
from tqdm import tqdm

from ffiec_data_collector import (
    FFIECDownloader,
    Product,
    FileFormat,
    WebpageChangeException,
)


# =============================================================================
# CONFIGURATION
# =============================================================================

# Number of quarters to download
N_QUARTERS = 100  # ~25 years

# Output paths
DOWNLOAD_DIR = Path("./ffiec_downloads")
OUTPUT_CSV = Path("./ffiec_all_banks.csv")

# Rate limiting (FFIEC guideline: ~6 requests per hour for large files)
REQUESTS_PER_HOUR = 6
MAX_RETRIES = 3
RETRY_BACKOFF_FACTOR = 2.0

# Logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

# Create directories
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)


# =============================================================================
# RATE-LIMITED DOWNLOADER
# =============================================================================

class RateLimitedDownloader:
    """Wrapper around FFIECDownloader that respects rate limits."""
    
    def __init__(self, requests_per_hour=REQUESTS_PER_HOUR, download_dir=DOWNLOAD_DIR):
        self.downloader = FFIECDownloader(download_dir=download_dir)
        self.last_request_time = None
        self.min_interval = 3600.0 / max(1, requests_per_hour)

    def _sleep_if_needed(self):
        if self.last_request_time is None:
            return
        elapsed = (pd.Timestamp.now() - self.last_request_time).total_seconds()
        if elapsed < self.min_interval:
            sleep_time = self.min_interval - elapsed
            logging.debug(f"Sleeping {sleep_time:.1f}s to respect rate limits")
            time.sleep(sleep_time)

    def download_with_retries(self, product, period, file_format, max_retries=MAX_RETRIES):
        attempt = 0
        while attempt <= max_retries:
            attempt += 1
            try:
                self._sleep_if_needed()
                logging.debug(f"Attempt {attempt} downloading {period}")

                result = self.downloader.download(
                    product=product,
                    period=period,
                    format=file_format,
                )

                self.last_request_time = pd.Timestamp.now()

                if result.success:
                    return result

                if result.error_message and (
                    "404" in result.error_message or 
                    "not found" in result.error_message.lower()
                ):
                    logging.warning(f"{period}: File not available.")
                    return None

                logging.warning(f"{period}: Error: {result.error_message}")

            except WebpageChangeException:
                logging.error("FFIEC website structure changed; aborting.")
                raise
            except Exception as e:
                logging.warning(f"{period}: Exception: {e}")

            if attempt <= max_retries:
                backoff = RETRY_BACKOFF_FACTOR ** (attempt - 1)
                logging.info(f"Retrying in {backoff:.0f}s...")
                time.sleep(backoff)

        logging.error(f"All retries failed for {period}")
        return None


# =============================================================================
# DATA EXTRACTION
# =============================================================================

def extract_all_schedules_from_zip(zip_path):
    """
    Extract data for ALL banks from a CALL_SINGLE ZIP file.
    
    The ZIP contains multiple schedule .txt files. We read each one
    and merge them all together on IDRSSD.
    
    Returns: DataFrame with one row per bank, all schedule columns merged
    """
    try:
        schedule_dfs = []
        
        with zipfile.ZipFile(zip_path, "r") as zf:
            txt_files = [f for f in zf.namelist() if f.lower().endswith(".txt")]
            
            if not txt_files:
                logging.warning(f"{zip_path.name}: No .txt files found")
                return None
            
            for txt_file in txt_files:
                try:
                    with zf.open(txt_file) as fh:
                        content = fh.read().decode('utf-8', errors='replace')
                    
                    # Read with first row as header, skip second row (descriptions)
                    df = pd.read_csv(
                        StringIO(content),
                        sep='\t',
                        header=0,
                        skiprows=[1],  # Skip the description row
                        dtype=str,
                        low_memory=False
                    )
                    
                    df.columns = df.columns.str.strip()
                    
                    # Find RSSD column
                    rssd_col = next((c for c in df.columns if 'rssd' in c.lower()), None)
                    if rssd_col is None:
                        continue
                    
                    # Standardize to IDRSSD
                    if rssd_col != 'IDRSSD':
                        df = df.rename(columns={rssd_col: 'IDRSSD'})
                    
                    df['IDRSSD'] = df['IDRSSD'].astype(str).str.strip()
                    
                    # NO FILTERING - keep all banks!
                    
                    # Handle duplicate column names within this file
                    cols = df.columns.tolist()
                    seen = {}
                    new_cols = []
                    for col in cols:
                        if col in seen:
                            seen[col] += 1
                            new_cols.append(f"{col}_{seen[col]}")
                        else:
                            seen[col] = 0
                            new_cols.append(col)
                    df.columns = new_cols
                    
                    schedule_dfs.append(df)
                    
                except Exception as e:
                    logging.debug(f"Error reading {txt_file}: {e}")
                    continue
        
        if not schedule_dfs:
            return None
        
        # Merge all schedules on IDRSSD
        merged = schedule_dfs[0]
        for i, df in enumerate(schedule_dfs[1:], 2):
            # Handle overlapping columns
            overlap = set(merged.columns) & set(df.columns) - {'IDRSSD'}
            if overlap:
                df = df.rename(columns={col: f"{col}_dup{i}" for col in overlap})
            merged = merged.merge(df, on='IDRSSD', how='outer')
        
        # Keep only valid RSSD rows (numeric IDs)
        merged = merged[merged['IDRSSD'].str.match(r'^\d+$', na=False)]
        
        return merged
        
    except Exception as e:
        logging.exception(f"Failed processing {zip_path}: {e}")
        return None


# =============================================================================
# MAIN PIPELINE
# =============================================================================

def main():
    print("\n" + "=" * 70)
    print("FFIEC COMPLETE DATA DOWNLOADER (Step 1)")
    print("ALL BANKS - NO FILTERING")
    print("=" * 70 + "\n")
    
    # Display configuration
    print("CONFIGURATION:")
    print("-" * 40)
    print(f"Quarters to download: {N_QUARTERS}")
    print(f"Output: {OUTPUT_CSV}")
    print(f"NOTE: This will download ALL banks (~5,000 per quarter)")
    print()
    
    # Discover available quarters
    logging.info("Discovering available FFIEC quarters...")
    tmp = FFIECDownloader(download_dir=DOWNLOAD_DIR)
    periods = tmp.select_product(Product.CALL_SINGLE)
    
    available_quarters = sorted([p.date_str for p in periods], reverse=True)
    logging.info(f"FFIEC offers {len(available_quarters)} total quarters.")
    
    quarters_to_download = available_quarters[:N_QUARTERS]
    
    if not quarters_to_download:
        logging.error("No quarters available!")
        return None
    
    logging.info(f"Will download {len(quarters_to_download)} quarters")
    logging.info(f"Date range: {quarters_to_download[-1]} to {quarters_to_download[0]}")
    
    # Estimate time
    est_hours = len(quarters_to_download) * (3600 / REQUESTS_PER_HOUR) / 3600
    print(f"\nEstimated time: {est_hours:.1f} hours at {REQUESTS_PER_HOUR} req/hr")
    print("-" * 40 + "\n")
    
    # Initialize downloader
    rl = RateLimitedDownloader(
        requests_per_hour=REQUESTS_PER_HOUR,
        download_dir=DOWNLOAD_DIR
    )
    
    # Download and process each quarter
    all_quarters = []
    
    for quarter in tqdm(quarters_to_download, desc="Downloading", unit="quarter"):
        logging.info(f"Requesting quarter {quarter}...")
        
        result = rl.download_with_retries(
            product=Product.CALL_SINGLE,
            period=quarter,
            file_format=FileFormat.TSV,
        )
        
        if result is None or not getattr(result, "success", False):
            logging.info(f"Skipping {quarter} (download failed)")
            continue
        
        # Extract all schedules for ALL banks
        df_quarter = extract_all_schedules_from_zip(Path(result.file_path))
        
        if df_quarter is None or df_quarter.empty:
            logging.info(f"No data in {quarter}")
            continue
        
        # Add quarter identifier
        df_quarter['quarter'] = quarter
        all_quarters.append(df_quarter)
        
        logging.info(f"  -> {len(df_quarter)} banks, {len(df_quarter.columns)} columns")
    
    # Combine all quarters
    if all_quarters:
        print("\nCombining all quarters...")
        final_df = pd.concat(all_quarters, ignore_index=True)
        
        # Reorder columns: IDRSSD, quarter, then everything else alphabetically
        id_cols = ['IDRSSD', 'quarter']
        other_cols = sorted([c for c in final_df.columns if c not in id_cols])
        final_df = final_df[id_cols + other_cols]
        
        # Save
        print(f"Saving to {OUTPUT_CSV}...")
        final_df.to_csv(OUTPUT_CSV, index=False)
        
        print("\n" + "=" * 70)
        print("COMPLETE")
        print("=" * 70)
        print(f"Output: {OUTPUT_CSV}")
        print(f"Shape: {final_df.shape[0]:,} rows x {final_df.shape[1]:,} columns")
        print(f"Quarters: {final_df['quarter'].nunique()}")
        print(f"Unique banks: {final_df['IDRSSD'].nunique():,}")
        
        # Show quarters covered
        quarters_covered = sorted(final_df['quarter'].unique())
        print(f"\nQuarters: {quarters_covered[0]} to {quarters_covered[-1]}")
        
        # File size
        file_size_mb = OUTPUT_CSV.stat().st_size / (1024**2)
        print(f"File size: {file_size_mb:.1f} MB")
        print("=" * 70 + "\n")
        
        return final_df
    else:
        logging.error("No data collected!")
        return None


if __name__ == "__main__":
    df = main()

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd
2026-01-20 16:25:47,019 [INFO] Discovering available FFIEC quarters...
2026-01-20 16:25:47,212 [INFO] FFIEC offers 99 total quarters.
2026-01-20 16:25:47,212 [INFO] Will download 99 quarters
2026-01-20 16:25:47,212 [INFO] Date range: 03/31/2001 to 12/31/2024



FFIEC COMPLETE DATA DOWNLOADER (Step 1)
ALL BANKS - NO FILTERING

CONFIGURATION:
----------------------------------------
Quarters to download: 100
Output: ffiec_all_banks.csv
NOTE: This will download ALL banks (~5,000 per quarter)


Estimated time: 16.5 hours at 6 req/hr
----------------------------------------



Downloading:   0%|          | 0/99 [00:00<?, ?quarter/s]2026-01-20 16:25:47,256 [INFO] Requesting quarter 12/31/2024...
2026-01-20 16:26:03,334 [INFO]   -> 4543 banks, 4004 columns
Downloading:   1%|          | 1/99 [00:16<26:15, 16.08s/quarter]2026-01-20 16:26:03,336 [INFO] Requesting quarter 12/31/2023...
2026-01-20 16:36:04,912 [INFO]   -> 4642 banks, 4095 columns
Downloading:   2%|▏         | 2/99 [10:17<9:42:47, 360.49s/quarter]2026-01-20 16:36:04,914 [INFO] Requesting quarter 12/31/2022...
2026-01-20 16:46:06,032 [INFO]   -> 4756 banks, 4165 columns
Downloading:   3%|▎         | 3/99 [20:18<12:32:35, 470.37s/quarter]2026-01-20 16:46:06,034 [INFO] Requesting quarter 12/31/2021...
2026-01-20 16:56:07,218 [INFO]   -> 4887 banks, 4166 columns
Downloading:   4%|▍         | 4/99 [30:19<13:46:31, 522.01s/quarter]2026-01-20 16:56:07,220 [INFO] Requesting quarter 12/31/2020...
2026-01-20 17:06:08,427 [INFO]   -> 5050 banks, 4151 columns
Downloading:   5%|▌         | 5/99 [40:21<14:22:33, 