<a href="https://colab.research.google.com/github/CooperJB710/DS2002-Data-Project-1/blob/main/Data_Project_ETL_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install mysql-connector-python

In [None]:
import sys
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mysql.connector

In [None]:
url = ""

In [None]:
def extract_cdc_vaccination_data():
    """Pull county-level CDC data with offset-based paging."""
    chunk_size = 50000
    offset = 0
    all_rows = []
    while True:
        params = {
            '$limit': chunk_size,
            '$offset': offset
        }
        try:
            r = requests.get(url, params=params, timeout=30)
            r.raise_for_status()
            data_chunk = r.json()
        except requests.exceptions.RequestException as e:
            print("ERROR: CDC fetch failed.", e)
            break
        if not data_chunk:
            break
        all_rows.extend(data_chunk)
        offset += chunk_size
    return pd.DataFrame(all_rows)

In [None]:
def load_census_population_data(csv_path):
    """Load county-level census data from CSV."""
    try:
        return pd.read_csv(csv_path)
    except FileNotFoundError:
        print(f"ERROR: File not found -> {csv_path}")
        sys.exit(1)
    except pd.errors.ParserError as e:
        print("ERROR: CSV parse error.", e)
        sys.exit(1)
    except Exception as e:
        print("ERROR: Unknown read error.", e)
        sys.exit(1)

In [None]:
def transform_data(df_cdc, df_census):
    """Clean, transform, and merge CDC + Census data on county/state info."""
    for col in ['series_complete_pop_pct','census2019']:
        if col in df_cdc.columns:
            df_cdc[col] = pd.to_numeric(df_cdc[col], errors='coerce')
    df_cdc = df_cdc.rename(columns={
        'recip_county':'County','recip_state':'State',
        'series_complete_pop_pct':'PctFullyVaccinated',
        'census2019':'CDC_Pop_Est_2019'
    })
    df_cdc.dropna(subset=['County','State'],inplace=True)
    df_cdc['County'] = df_cdc['County'].str.strip().str.lower()
    df_cdc['State']  = df_cdc['State'].str.strip().str.upper()

    df_census = df_census.rename(columns={
        'county_col':'County','state_col':'State','pop_col':'LocalPop'
    })
    df_census.dropna(subset=['County','State'],inplace=True)
    df_census['County'] = df_census['County'].str.strip().str.lower()
    df_census['State']  = df_census['State'].str.strip().str.upper()

    df_merged = pd.merge(df_cdc, df_census, on=['County','State'], how='inner')
    df_merged['PopDifference'] = df_merged['CDC_Pop_Est_2019'] - df_merged['LocalPop']
    df_merged['VaccinationRateVsLocalPop'] = df_merged['PctFullyVaccinated'] * df_merged['LocalPop']/100.0
    return df_merged

In [None]:
def analyze_data(df_merged):
    """Stats, correlations, and scatter plot."""
    print("Records:", len(df_merged), "| Columns:", len(df_merged.columns))
    print(df_merged.head(5))
    numeric_cols = ['PctFullyVaccinated','CDC_Pop_Est_2019','LocalPop','VaccinationRateVsLocalPop']
    print("\nSummary:\n", df_merged[numeric_cols].describe())
    print("\nCorrelation:\n", df_merged[numeric_cols].corr())

    plt.figure(figsize=(7,5))
    plt.scatter(df_merged['LocalPop'], df_merged['PctFullyVaccinated'], alpha=0.3)
    plt.title("Vaccination Rate vs Local Population")
    plt.xlabel("Local Population")
    plt.ylabel("Pct Fully Vaccinated")
    plt.tight_layout()
    plt.savefig("vaccination_rate_vs_population.png")
    print("\nPlot saved: vaccination_rate_vs_population.png")

In [None]:
def store_data_in_mysql(final_df):
    """Create table & insert rows in MySQL."""
    try:
        conn = mysql.connector.connect(
            host="host1",
            user="user1",
            password="coolpassword",
            database="ETL_pipeline_database"
        )
        cur = conn.cursor()
    except mysql.connector.Error as err:
        print("ERROR: MySQL connect failed.", err)
        sys.exit(1)

    create_tbl = """
        CREATE TABLE IF NOT EXISTS cdc_vaccination_counties (
            id INT AUTO_INCREMENT PRIMARY KEY,
            county VARCHAR(100),
            state VARCHAR(10),
            pct_fully_vaccinated DOUBLE,
            cdc_pop_est_2019 DOUBLE,
            localpop DOUBLE,
            popdifference DOUBLE,
            vaccinationratevslocalpop DOUBLE
        );
    """
    try:
        cur.execute("DROP TABLE IF EXISTS cdc_vaccination_counties;")
        cur.execute(create_tbl)
    except mysql.connector.Error as err:
        print("ERROR: Table creation failed.", err)
        conn.close()
        sys.exit(1)

    insert_q = """
        INSERT INTO cdc_vaccination_counties
        (county, state, pct_fully_vaccinated, cdc_pop_est_2019, localpop,
         popdifference, vaccinationratevslocalpop)
        VALUES (%s, %s, %s, %s, %s, %s, %s);
    """
    rows = []
    for _, row in final_df.iterrows():
        rows.append((
            row.get("County"), row.get("State"), row.get("PctFullyVaccinated"),
            row.get("CDC_Pop_Est_2019"), row.get("LocalPop"),
            row.get("PopDifference"), row.get("VaccinationRateVsLocalPop")
        ))
    try:
        cur.executemany(insert_q, rows)
        conn.commit()
        print("Inserted:", cur.rowcount, "rows.")
    except mysql.connector.Error as err:
        print("ERROR: Insert failed.", err)
        conn.close()
        sys.exit(1)

    cur.close()
    conn.close()

In [None]:
def main():
    print("Extracting CDC data with offset-based paging...")
    df_cdc = extract_cdc_vaccination_data()
    print("Loading local census CSV...")
    csv_path = "/content/co-est2023-alldata.csv"
    df_census = load_census_population_data(csv_path)
    print("Transform & merge...")
    df_merged = transform_data(df_cdc, df_census)
    print("Analyze data...")
    analyze_data(df_merged)
    print("Store in MySQL...")
    store_data_in_mysql(df_merged)
    print("ETL complete.")

if __name__ == "__main__":
    main()