In [2]:
# Jupyter Notebook: Load BEA Personal Income to economic_indicators Table

import os
from dotenv import load_dotenv
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL
import requests

# Load environment variables
load_dotenv()

# Database connection
DB_NAME = os.environ["DB_NAME"]
DB_USER = os.environ["DB_USER"]
DB_PASSWORD = os.environ["DB_PASSWORD"]
DB_HOST = os.environ.get("DB_HOST", "localhost")
DB_PORT = os.environ.get("DB_PORT", "5432")

# BEA API Key
BEA_API_KEY = os.environ["BEA_API_KEY"]

# Create engine
connection_url = URL.create(
    "postgresql+psycopg2",
    username=DB_USER,
    password=DB_PASSWORD,
    host=DB_HOST,
    port=DB_PORT,
    database=DB_NAME,
)
engine = create_engine(connection_url, echo=True)

# ----------------------------------------
# Query BEA API for Personal Income
# ----------------------------------------

def fetch_bea_income(year):
    url = "https://apps.bea.gov/api/data"
    params = {
        "UserID": BEA_API_KEY,
        "method": "GetData",
        "datasetname": "Regional",
        "TableName": "CAINC1",
        "LineCode": "3",  # Personal income
        "GeoFIPS": "COUNTY",
        "Year": str(year),
        "ResultFormat": "json"
    }

    resp = requests.get(url, params=params)
    try:
        results = resp.json()["BEAAPI"]["Results"]["Data"]
    except KeyError:
        print("❌ Could not find expected 'Data' field.")
        print("Response:", resp.json())
        return pd.DataFrame()

    records = [r for r in results if r["GeoFips"].isdigit() and len(r["GeoFips"]) == 5]
    df = pd.DataFrame(records)
    df = df[["GeoFips", "TimePeriod", "DataValue"]]
    df.columns = ["county_fips", "year", "personal_income"]
    df["county_fips"] = df["county_fips"].astype(str).str.zfill(5)
    df["year"] = df["year"].astype(int)
    df["personal_income"] = df["personal_income"].str.replace(",", "").astype(float)

    return df

# Fetch data from 2015 to 2020
all_years = [fetch_bea_income(y) for y in range(2015, 2021)]
bea_df = pd.concat(all_years)

# Check for matches before updating
with engine.begin() as conn:
    result = conn.execute(text("SELECT county_fips, year FROM economic_indicators"))
    existing_keys = set((row["county_fips"], row["year"]) for row in result.mappings())

# Filter BEA data to matching keys
bea_df = bea_df[bea_df.apply(lambda row: (row["county_fips"], row["year"]) in existing_keys, axis=1)]

# Update personal_income in economic_indicators
with engine.begin() as conn:
    for _, row in bea_df.iterrows():
        conn.execute(
            text("""
                UPDATE economic_indicators
                SET personal_income = :personal_income
                WHERE county_fips = :county_fips AND year = :year
            """),
            {
                "personal_income": row["personal_income"],
                "county_fips": row["county_fips"],
                "year": row["year"]
            }
        )

print("✅ BEA personal income data successfully loaded into economic_indicators table.")


2025-04-19 11:21:08,203 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-04-19 11:21:08,203 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-04-19 11:21:08,204 INFO sqlalchemy.engine.Engine select current_schema()
2025-04-19 11:21:08,204 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-04-19 11:21:08,205 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-04-19 11:21:08,205 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-04-19 11:21:08,206 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-04-19 11:21:08,206 INFO sqlalchemy.engine.Engine SELECT county_fips, year FROM economic_indicators
2025-04-19 11:21:08,206 INFO sqlalchemy.engine.Engine [generated in 0.00027s] {}
2025-04-19 11:21:08,210 INFO sqlalchemy.engine.Engine COMMIT
2025-04-19 11:21:08,297 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-04-19 11:21:08,298 INFO sqlalchemy.engine.Engine 
                UPDATE economic_indicators
                SET personal_income = %(personal_income)s
        