Add estimated filing dates to OpenSecrets.org records

In [11]:
import pandas as pd
import os
import glob
import csv
from datetime import timedelta

# ==========================================
# CONFIGURATION
# ==========================================
DATA_ROOT = 'data/raw/campaign_finance_open_secrets'
CHUNK_SIZE = 100000

csv.field_size_limit(2147483647)

ELECTION_DATES = {
    2012: pd.Timestamp('2012-11-06'),
    2014: pd.Timestamp('2014-11-04'),
    2016: pd.Timestamp('2016-11-08'),
    2018: pd.Timestamp('2018-11-06'),
    2020: pd.Timestamp('2020-11-03'),
    2022: pd.Timestamp('2022-11-08')
}

def load_hs_filer_map(root):
    print("Building House/Senate Filer Map...")
    hs_cmte_ids = set()
    hs_cand_ids = set()

    cands_files = glob.glob(os.path.join(root, 'cands*.csv'))
    for f in cands_files:
        try:
            for chunk in pd.read_csv(f, usecols=['FECCandID', 'DistIDRunFor'], 
                                     dtype=str, on_bad_lines='skip', chunksize=50000):
                mask = chunk['DistIDRunFor'].notna() & (chunk['DistIDRunFor'] != 'US')
                hs_cand_ids.update(chunk.loc[mask, 'FECCandID'].unique())
        except Exception as e:
            pass

    cmtes_files = glob.glob(os.path.join(root, 'cmtes*.csv'))
    for f in cmtes_files:
        try:
            for chunk in pd.read_csv(f, usecols=['CmteID', 'FECCandID'], 
                                     dtype=str, on_bad_lines='skip', chunksize=50000):
                mask = chunk['FECCandID'].isin(hs_cand_ids)
                hs_cmte_ids.update(chunk.loc[mask, 'CmteID'].unique())
        except Exception as e:
            pass
            
    print(f"   Identified {len(hs_cand_ids)} H/S Candidates and {len(hs_cmte_ids)} Committees.")
    return hs_cmte_ids, hs_cand_ids

def get_col_name(df, target):
    """Finds a column name case-insensitively to prevent KeyErrors."""
    for col in df.columns:
        if col.lower() == target.lower():
            return col
    return target

def calculate_dates_in_memory(df, cycle, hs_cmte_ids, hs_cand_ids, table_type):
    df['estimated_filing_date'] = pd.NaT
    
    date_col = get_col_name(df, 'Date')
    df['temp_date'] = pd.to_datetime(df[date_col], errors='coerce')

    # 1. Determine Filer Type safely using case-insensitive column lookups
    if table_type == 'indivs' or table_type == 'expenditures':
        cmte_col = get_col_name(df, 'CmteID')
        is_quarterly = df[cmte_col].isin(hs_cmte_ids) if cmte_col in df.columns else pd.Series(False, index=df.index)
    elif table_type == 'pacs':
        cand_col = get_col_name(df, 'FECCandID')
        is_quarterly = df[cand_col].isin(hs_cand_ids) if cand_col in df.columns else pd.Series(False, index=df.index)
    elif table_type == 'pac_other':
        filer_col = get_col_name(df, 'FilerID')
        is_quarterly = df[filer_col].isin(hs_cmte_ids) if filer_col in df.columns else pd.Series(False, index=df.index)
    else:
        is_quarterly = pd.Series(False, index=df.index)

    # 2. Apply 48-Hour Rule
    election_date = ELECTION_DATES.get(int(cycle))
    if election_date:
        start_20_day = election_date - timedelta(days=20)
        mask_48h = (df['temp_date'] >= start_20_day) & (df['temp_date'] < election_date)
        df.loc[mask_48h, 'estimated_filing_date'] = df.loc[mask_48h, 'temp_date'] + timedelta(days=2)
        mask_regular = ~mask_48h
    else:
        mask_regular = pd.Series(True, index=df.index)

    # 3. Apply Quarterly Logic
    mask_q = mask_regular & is_quarterly & df['temp_date'].notna()
    if mask_q.any():
        dates_q = df.loc[mask_q, 'temp_date']
        months = dates_q.dt.month
        years = dates_q.dt.year
        
        target_year = years.copy()
        target_month = pd.Series(0, index=dates_q.index)
        target_day = pd.Series(0, index=dates_q.index)
        
        m1 = (months >= 1) & (months <= 3)
        target_month[m1], target_day[m1] = 4, 15
        
        m2 = (months >= 4) & (months <= 6)
        target_month[m2], target_day[m2] = 7, 15
        
        m3 = (months >= 7) & (months <= 9)
        target_month[m3], target_day[m3] = 10, 15
        
        m4 = (months >= 10) & (months <= 12)
        target_month[m4], target_day[m4] = 1, 31
        target_year[m4] += 1
        
        df.loc[mask_q, 'estimated_filing_date'] = pd.to_datetime({
            'year': target_year, 'month': target_month, 'day': target_day
        })

    # 4. Apply Monthly Logic
    mask_m = mask_regular & (~is_quarterly) & df['temp_date'].notna()
    if mask_m.any():
        dates_m = df.loc[mask_m, 'temp_date']
        months = dates_m.dt.month
        years = dates_m.dt.year
        
        target_year_m = years.copy()
        target_month_m = months + 1
        target_day_m = pd.Series(20, index=dates_m.index)
        
        is_dec = (months == 12)
        target_month_m[is_dec], target_day_m[is_dec] = 1, 31
        target_year_m[is_dec] += 1
        
        df.loc[mask_m, 'estimated_filing_date'] = pd.to_datetime({
            'year': target_year_m, 'month': target_month_m, 'day': target_day_m
        })

    df.drop(columns=['temp_date'], inplace=True)
    return df

def process_file_safe(filepath, cycle, hs_cmte_ids, hs_cand_ids, table_type):
    temp_file = filepath + '.tmp'
    chunk_buffer = []
    row_count = 0
    
    try:
        with open(filepath, 'r', encoding='utf-8', errors='replace') as f_in:
            reader = csv.reader(f_in)
            
            try:
                # Strip invisible characters and spaces from headers
                raw_header = next(reader)
                header = [str(h).strip('\ufeff').strip() for h in raw_header]
                expected_cols = len(header)
            except StopIteration:
                print(f"Skipping empty file: {filepath}")
                return

            with open(temp_file, 'w', encoding='utf-8', newline='') as f_out:
                for row in reader:
                    # FIX: Force rows to perfectly match the header length
                    if len(row) > expected_cols:
                        row = row[:expected_cols]
                    elif len(row) < expected_cols:
                        row.extend([''] * (expected_cols - len(row)))
                        
                    chunk_buffer.append(row)
                    
                    if len(chunk_buffer) >= CHUNK_SIZE:
                        df_chunk = pd.DataFrame(chunk_buffer, columns=header)
                        df_chunk = calculate_dates_in_memory(df_chunk, cycle, hs_cmte_ids, hs_cand_ids, table_type)
                        
                        write_header = (row_count == 0)
                        df_chunk.to_csv(f_out, index=False, header=write_header)
                        
                        row_count += len(df_chunk)
                        chunk_buffer = [] 
                        print(f"   Processed {row_count:,} rows...", end='\r')

                if chunk_buffer:
                    df_chunk = pd.DataFrame(chunk_buffer, columns=header)
                    df_chunk = calculate_dates_in_memory(df_chunk, cycle, hs_cmte_ids, hs_cand_ids, table_type)
                    write_header = (row_count == 0)
                    df_chunk.to_csv(f_out, index=False, header=write_header)
                    row_count += len(df_chunk)

        print(f"   Completed {os.path.basename(filepath)}: {row_count:,} rows.                 ")
        os.replace(temp_file, filepath)
        
    except Exception as e:
        print(f"\n❌ FAILED on {filepath}: {e}")
        if os.path.exists(temp_file):
            os.remove(temp_file)

def main():
    hs_cmte_ids, hs_cand_ids = load_hs_filer_map(DATA_ROOT)
    
    target_tables = ['indivs', 'pacs', 'pac_other', 'expenditures']
    
    for table in target_tables:
        pattern = os.path.join(DATA_ROOT, f'{table}*.csv')
        files = glob.glob(pattern)
        
        for f in files:
            print(f"Processing {os.path.basename(f)}...")
            try:
                cycle_short = os.path.basename(f).replace(table, '').replace('.csv', '')
                cycle = int('20' + cycle_short)
            except ValueError:
                continue

            process_file_safe(f, cycle, hs_cmte_ids, hs_cand_ids, table)

if __name__ == "__main__":
    main()

   Completed expenditures20.csv: 131,457,097 rows.                 
Processing expenditures22.csv...
   Completed expenditures22.csv: 55,463,229 rows.                 


Add estimated filing dates to lobbying records

In [14]:
import pandas as pd
import os

def process_lobbying_reports():
  filepath = os.path.join('data', 'raw', 'lobbying_data_lobbyview', 'reports.csv')
  temp_file = filepath + '.tmp'
  chunk_size = 100000

  if not os.path.exists(filepath):
      print("File not found.")
      return

  # Map each filing period code to the mm-dd string of its final day
  period_end_dates = {
      'Q1': '03-31',
      'Q2': '06-30',
      'Q3': '09-30',
      'Q4': '12-31',
      'H1': '06-30',  # Older pre-2008 semi-annual filings
      'H2': '12-31'
  }

  first_chunk = True
  for chunk in pd.read_csv(filepath, dtype=str, chunksize=chunk_size):
      chunk['estimated_filing_date'] = pd.NaT

      if 'filing_year' in chunk.columns and 'filing_period_code' in chunk.columns:
          valid_rows = chunk['filing_year'].notna() & chunk['filing_period_code'].notna()
          
          period_codes = chunk.loc[valid_rows, 'filing_period_code'].str.strip().str.upper()
          mapped_dates = period_codes.map(period_end_dates)
          
          valid_mapped = valid_rows & mapped_dates.notna()
          
          # Combine year and mm-dd to build a parseable date string
          date_strings = chunk.loc[valid_mapped, 'filing_year'].astype(str).str.replace('.0', '', regex=False) + '-' + mapped_dates[valid_mapped]
          
          # Convert to datetime and add the 20 day filing window
          chunk.loc[valid_mapped, 'estimated_filing_date'] = pd.to_datetime(date_strings, errors='coerce') + pd.Timedelta(days=20)

      chunk.to_csv(temp_file, mode='w' if first_chunk else 'a', index=False, header=first_chunk)
      first_chunk = False

  os.replace(temp_file, filepath)
  print("reports.csv updated successfully.")
  
process_lobbying_reports()

reports.csv updated successfully.


Add estimated filing dates to 527 transactions

In [16]:
import pandas as pd
import os
import csv
import warnings
from tqdm.auto import tqdm

# Mute Pandas warnings
warnings.filterwarnings('ignore', category=UserWarning, module='pandas')

DATA_ROOT = os.path.join('data', 'raw', '527_data_open_secrets')
CHUNK_SIZE = 100000
csv.field_size_limit(2147483647)

def get_latest_filing_date(dates):
    """
    Vectorized calculation of the latest possible IRS Form 8872 filing date.
    """
    latest_dates = pd.Series(pd.NaT, index=dates.index)
    valid = dates.notna()
    
    if not valid.any():
        return latest_dates
        
    y = dates.dt.year
    m = dates.dt.month
    
    # 1. Monthly Schedule Deadlines
    monthly_y = y.copy()
    monthly_m = m + 1
    monthly_d = pd.Series(20, index=dates.index)
    
    is_dec = (m == 12)
    monthly_m[is_dec] = 1
    monthly_d[is_dec] = 31
    monthly_y[is_dec] += 1
    
    monthly_deadline = pd.to_datetime({
        'year': monthly_y[valid], 
        'month': monthly_m[valid], 
        'day': monthly_d[valid]
    })
    
    # 2. Non-Monthly Schedule Deadlines
    period_y = y.copy()
    period_m = pd.Series(0, index=dates.index)
    period_d = pd.Series(0, index=dates.index)
    
    is_even_year = (y % 2 == 0)
    is_odd_year = ~is_even_year
    
    # Odd Years: Semi-Annual
    odd_h1 = is_odd_year & (m <= 6)
    period_m[odd_h1], period_d[odd_h1] = 7, 31
    
    odd_h2 = is_odd_year & (m > 6)
    period_m[odd_h2], period_d[odd_h2] = 1, 31
    period_y[odd_h2] += 1
    
    # Even Years: Quarterly
    even_q1 = is_even_year & (m <= 3)
    period_m[even_q1], period_d[even_q1] = 4, 15
    
    even_q2 = is_even_year & (m >= 4) & (m <= 6)
    period_m[even_q2], period_d[even_q2] = 7, 15
    
    even_q3 = is_even_year & (m >= 7) & (m <= 9)
    period_m[even_q3], period_d[even_q3] = 10, 15
    
    even_q4 = is_even_year & (m >= 10)
    period_m[even_q4], period_d[even_q4] = 1, 31
    period_y[even_q4] += 1
    
    period_deadline = pd.to_datetime({
        'year': period_y[valid], 
        'month': period_m[valid], 
        'day': period_d[valid]
    })
    
    # 3. Take the maximum (latest) of the two legal deadlines
    latest_dates[valid] = pd.DataFrame({
        'monthly': monthly_deadline, 
        'period': period_deadline
    }).max(axis=1)
    
    return latest_dates

def get_line_count(filepath):
    with open(filepath, 'rb') as f:
        return sum(1 for _ in f)

def has_been_processed(filepath):
    try:
        with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
            reader = csv.reader(f)
            header = next(reader)
            return any('estimated_filing_date' in str(h).lower() for h in header)
    except Exception:
        return False

def process_527_file(filename):
    filepath = os.path.join(DATA_ROOT, filename)
    if not os.path.exists(filepath):
        print(f"File not found: {filepath}")
        return

    if has_been_processed(filepath):
        print(f"   ⏩ Skipping {filename} (already processed).")
        return

    temp_file = filepath + '.tmp'
    row_count = 0
    total_rows = max(0, get_line_count(filepath) - 1)
    
    try:
        with open(filepath, 'r', encoding='utf-8', errors='replace') as f_in:
            reader = csv.reader(f_in)
            try:
                raw_header = next(reader)
                header = [str(h).strip('\ufeff').strip() for h in raw_header]
                expected_cols = len(header)
            except StopIteration:
                return

            with open(temp_file, 'w', encoding='utf-8', newline='') as f_out:
                chunk_buffer = []
                with tqdm(total=total_rows, desc=filename, unit="rows") as pbar:
                    for row in reader:
                        if len(row) > expected_cols:
                            row = row[:expected_cols]
                        elif len(row) < expected_cols:
                            row.extend([''] * (expected_cols - len(row)))
                            
                        chunk_buffer.append(row)
                        
                        if len(chunk_buffer) >= CHUNK_SIZE:
                            df_chunk = pd.DataFrame(chunk_buffer, columns=header)
                            
                            date_col = next((c for c in df_chunk.columns if c.lower() == 'date'), None)
                            if date_col:
                                df_chunk['temp_date'] = pd.to_datetime(df_chunk[date_col], errors='coerce')
                                df_chunk['estimated_filing_date'] = get_latest_filing_date(df_chunk['temp_date'])
                                df_chunk.drop(columns=['temp_date'], inplace=True)
                            else:
                                df_chunk['estimated_filing_date'] = pd.NaT
                                
                            write_header = (row_count == 0)
                            df_chunk.to_csv(f_out, index=False, header=write_header)
                            
                            row_count += len(df_chunk)
                            pbar.update(len(df_chunk))
                            chunk_buffer = []

                    if chunk_buffer:
                        df_chunk = pd.DataFrame(chunk_buffer, columns=header)
                        date_col = next((c for c in df_chunk.columns if c.lower() == 'date'), None)
                        if date_col:
                            df_chunk['temp_date'] = pd.to_datetime(df_chunk[date_col], errors='coerce')
                            df_chunk['estimated_filing_date'] = get_latest_filing_date(df_chunk['temp_date'])
                            df_chunk.drop(columns=['temp_date'], inplace=True)
                        else:
                            df_chunk['estimated_filing_date'] = pd.NaT
                            
                        write_header = (row_count == 0)
                        df_chunk.to_csv(f_out, index=False, header=write_header)
                        pbar.update(len(chunk_buffer))

        os.replace(temp_file, filepath)
        print(f"✅ {filename} updated successfully.")
        
    except Exception as e:
        print(f"\n❌ FAILED on {filepath}: {e}")
        if os.path.exists(temp_file):
            os.remove(temp_file)

if __name__ == "__main__":
    for f in ['rcpts527.csv']:
        process_527_file(f)

rcpts527.csv:   0%|          | 0/4715027 [00:00<?, ?rows/s]

✅ rcpts527.csv updated successfully.
