In [None]:
#One campaign, one API key, one pre-set table.
#Must download json + upload to a google drive folder to establish the client path
import requests
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd

# Keys and BQ table info
FEC_API_KEY = "DEMO_KEY"  # Using your own key here is highly recommended
COMMITTEE_ID = "CXXXXXXXX"  # As written, code only works with a campaign id specifically for FEC, NOT an individual's name and NOT for any other sites.
BQ_PROJECT = "project-name"
BQ_DATASET = "dataset-name"
BQ_TABLE = "table-name"
PER_PAGE = 100  # API limit
# ___________________________

from google.colab import drive
drive.mount('/content/drive')

# BigQuery client
credentials = service_account.Credentials.from_service_account_file("/content/drive/MyDrive/folder/downloadedfromcreatingserviceacctkey.json")
bq_client = bigquery.Client(credentials=credentials, project=BQ_PROJECT)

def fetch_fec_data(committee_id, api_key, per_page=100, max_pages=None):
    """All records from stated transaction period, including pagination for API call limits"""
    url = "https://api.open.fec.gov/v1/schedules/schedule_a/"
    page = 1
    all_results = []

    while True:
        params = {
            "api_key": api_key,
            "committee_id": committee_id,
            "per_page": per_page,
            "page": page,
            "two_year_transaction_period": 2024,  # one cycle
            "is_individual": True
        }
        resp = requests.get(url, params=params)
        if resp.status_code != 200:
            raise Exception(f"FEC API Error {resp.status_code}: {resp.text}")

        data = resp.json()
        results = data.get("results", [])
        if not results:
            break

        all_results.extend(results)

        print(f"✅ Retrieved page {page} with {len(results)} records")

        # pagination check
        if not data.get("pagination") or data["pagination"]["pages"] <= page:
            break

        page += 1
        if max_pages and page > max_pages:
            break

    return all_results


def normalize_records(records):
    """Map FEC fields into a clean schema for BigQuery"""
    normalized = []
    for r in records:
        normalized.append({
            "contributor_name": r.get("contributor_name"),
            "contributor_state": r.get("contributor_state"),
            "contributor_employer": r.get("contributor_employer"),
            "contributor_occupation": r.get("contributor_occupation"),
            "contribution_receipt_date": r.get("contribution_receipt_date"),
            "contribution_amount": r.get("contribution_receipt_amount"),
            "committee_id": r.get("committee_id"),
            "report_year": r.get("report_year"),
            "transaction_id": r.get("transaction_id")
        })
    return normalized


def load_to_bigquery(records, project, dataset, table):
    """Load JSON records into BigQuery"""
    df = pd.DataFrame(records)

    if df.empty:
        print("DataFrame is empty. No data to load into BigQuery.")
        return

    table_id = f"{project}.{dataset}.{table}"

    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",
        autodetect=True
    )

    job = bq_client.load_table_from_dataframe(
        df,
        table_id,
        job_config=job_config
    )
    job.result()

    print(f"🚀 Loaded {len(df)} rows into {table_id}")


# ===== RUN PIPELINE =====
fec_data = fetch_fec_data(COMMITTEE_ID, FEC_API_KEY, per_page=PER_PAGE)
normalized = normalize_records(fec_data)
load_to_bigquery(normalized, BQ_PROJECT, BQ_DATASET, BQ_TABLE)