# ARCOS Prescription Data - Create County-Year Dataset with FIPS

This notebook processes the filtered ARCOS data to create a county-year aggregated dataset.

## Goals:
1. Load the filtered parquet file (`arcos_filtered_2006_2015.parquet`)
2. Clean county names
3. Aggregate to county-year level
4. Add FIPS codes for merging with population data
5. Save final processed file

## Input:
- `data/processed/arcos_filtered_2006_2015.parquet` (pre-filtered for 2006-2015 and selected states)

## Output:
- `data/processed/arcos_county_year_with_fips.parquet` (county-year aggregated with FIPS codes)

## Step 1: Import Libraries and Configure Settings

In [3]:
import polars as pl
import pandas as pd
import os

# Configure Polars to use fewer threads (27% of 22 available)
os.environ["POLARS_MAX_THREADS"] = "6"

print("Libraries imported successfully!")
print(f"Polars configured to use 6 threads (out of {os.cpu_count()} available)")

Libraries imported successfully!
Polars configured to use 6 threads (out of 22 available)


## Step 2: Load Filtered ARCOS Data

In [4]:
filtered_file = '../data/processed/arcos_filtered_2006_2015.parquet'

print(f"Loading filtered ARCOS data from: {filtered_file}")
print("=" * 60)

df_arcos = pl.read_parquet(filtered_file)

print(f"\n✓ Data loaded successfully!")
print(f"  Rows: {df_arcos.shape[0]:,}")
print(f"  Columns: {df_arcos.shape[1]}")
print(f"  Column names: {df_arcos.columns}")

print("\n" + "=" * 60)
print("First 5 rows:")
print(df_arcos.head(5))

Loading filtered ARCOS data from: ../data/processed/arcos_filtered_2006_2015.parquet

✓ Data loaded successfully!
  Rows: 218,477,461
  Columns: 11
  Column names: ['BUYER_BUS_ACT', 'BUYER_STATE', 'BUYER_COUNTY', 'DRUG_NAME', 'MME_Conversion_Factor', 'TRANSACTION_DATE', 'Reporter_family', 'CALC_BASE_WT_IN_GM', 'DOSAGE_UNIT', 'MME', 'year']

First 5 rows:
shape: (5, 11)
┌────────────┬────────────┬────────────┬────────────┬───┬────────────┬───────────┬──────────┬──────┐
│ BUYER_BUS_ ┆ BUYER_STAT ┆ BUYER_COUN ┆ DRUG_NAME  ┆ … ┆ CALC_BASE_ ┆ DOSAGE_UN ┆ MME      ┆ year │
│ ACT        ┆ E          ┆ TY         ┆ ---        ┆   ┆ WT_IN_GM   ┆ IT        ┆ ---      ┆ ---  │
│ ---        ┆ ---        ┆ ---        ┆ str        ┆   ┆ ---        ┆ ---       ┆ f64      ┆ i32  │
│ str        ┆ str        ┆ str        ┆            ┆   ┆ f64        ┆ f64       ┆          ┆      │
╞════════════╪════════════╪════════════╪════════════╪═══╪════════════╪═══════════╪══════════╪══════╡
│ ANALYTICAL ┆ CA     

## Step 3: Initial Data Quality Check

In [6]:
print("=" * 60)
print("STEP 8a: INITIAL DATA QUALITY CHECK")
print("=" * 60)

# 1. Check data shape
print(f"\n1. Data Shape:")
print(f"   Rows: {df_arcos.shape[0]:,}")
print(f"   Columns: {df_arcos.shape[1]}")

# 2. Check column names and types
print(f"\n2. Column Information:")
print(df_arcos.schema)

# 3. Check for null values
print(f"\n3. Null Values Check:")
null_counts = df_arcos.null_count()
print(null_counts)

# 4. Check year range
print(f"\n4. Year Range:")
year_stats = df_arcos.select([
    pl.col("year").min().alias("min_year"),
    pl.col("year").max().alias("max_year"),
    pl.col("year").n_unique().alias("unique_years")
])
print(year_stats)

# 5. Check states
print(f"\n5. States in Data:")
states = df_arcos.select("BUYER_STATE").unique().sort("BUYER_STATE")
print(f"   Unique states: {states.shape[0]}")
print(f"   States: {states.to_series().to_list()}")

# 6. Check for negative or zero values in key columns
print(f"\n6. Value Ranges Check:")
value_checks = df_arcos.select([
    (pl.col("MME") <= 0).sum().alias("MME_zero_or_neg"),
    (pl.col("DOSAGE_UNIT") <= 0).sum().alias("DOSAGE_UNIT_zero_or_neg"),
    pl.col("MME").min().alias("MME_min"),
    pl.col("MME").max().alias("MME_max"),
    pl.col("DOSAGE_UNIT").min().alias("DOSAGE_min"),
    pl.col("DOSAGE_UNIT").max().alias("DOSAGE_max")
])
print(value_checks)

print(f"\n{'=' * 60}")
print("✓ Initial quality check complete!")

STEP 8a: INITIAL DATA QUALITY CHECK

1. Data Shape:
   Rows: 218,477,461
   Columns: 11

2. Column Information:
Schema({'BUYER_BUS_ACT': String, 'BUYER_STATE': String, 'BUYER_COUNTY': String, 'DRUG_NAME': String, 'MME_Conversion_Factor': Float64, 'TRANSACTION_DATE': String, 'Reporter_family': String, 'CALC_BASE_WT_IN_GM': Float64, 'DOSAGE_UNIT': Float64, 'MME': Float64, 'year': Int32})

3. Null Values Check:
shape: (1, 11)
┌─────────────┬─────────────┬─────────────┬───────────┬───┬─────────────┬─────────────┬─────┬──────┐
│ BUYER_BUS_A ┆ BUYER_STATE ┆ BUYER_COUNT ┆ DRUG_NAME ┆ … ┆ CALC_BASE_W ┆ DOSAGE_UNIT ┆ MME ┆ year │
│ CT          ┆ ---         ┆ Y           ┆ ---       ┆   ┆ T_IN_GM     ┆ ---         ┆ --- ┆ ---  │
│ ---         ┆ u32         ┆ ---         ┆ u32       ┆   ┆ ---         ┆ u32         ┆ u32 ┆ u32  │
│ u32         ┆             ┆ u32         ┆           ┆   ┆ u32         ┆             ┆     ┆      │
╞═════════════╪═════════════╪═════════════╪═══════════╪═══╪═════════

## Step 3b: Detailed MME Distribution Analysis

In [3]:
print("=" * 60)
print("MME VALUE DISTRIBUTION ANALYSIS")
print("=" * 60)

# Check percentiles and extreme values
percentiles = df_arcos.select([
    pl.col("MME").quantile(0.50).alias("p50_median"),
    pl.col("MME").quantile(0.75).alias("p75"),
    pl.col("MME").quantile(0.90).alias("p90"),
    pl.col("MME").quantile(0.95).alias("p95"),
    pl.col("MME").quantile(0.99).alias("p99"),
    pl.col("MME").quantile(0.999).alias("p99.9"),
    pl.col("MME").quantile(0.9999).alias("p99.99"),
    pl.col("MME").max().alias("maximum")
])

print("\n1. MME Percentile Distribution:")
print(percentiles)

# Count records in different MME ranges
print("\n2. Count of records in different MME ranges:")
mme_ranges = df_arcos.select([
    (pl.col("MME") < 10000).sum().alias("< 10K (typical individual Rx)"),
    ((pl.col("MME") >= 10000) & (pl.col("MME") < 100000)).sum().alias("10K-100K (small bulk)"),
    ((pl.col("MME") >= 100000) & (pl.col("MME") < 1000000)).sum().alias("100K-1M (medium bulk)"),
    ((pl.col("MME") >= 1000000) & (pl.col("MME") < 10000000)).sum().alias("1M-10M (large bulk)"),
    ((pl.col("MME") >= 10000000) & (pl.col("MME") < 100000000)).sum().alias("10M-100M (very large)"),
    (pl.col("MME") >= 100000000).sum().alias(">= 100M (extreme)"),
    pl.len().alias("total")
])
print(mme_ranges)

# Look at the extreme values
print("\n3. Top 20 MME transactions:")
top_mme = df_arcos.select([
    "BUYER_STATE", 
    "BUYER_COUNTY", 
    "year",
    "DRUG_NAME",
    "CALC_BASE_WT_IN_GM",
    "MME_Conversion_Factor",
    "MME"
]).sort("MME", descending=True).head(20)
print(top_mme)

# Check if extreme values follow the formula
print("\n4. Verifying top 10 extreme values:")
verification = df_arcos.sort("MME", descending=True).head(10).select([
    "MME",
    "CALC_BASE_WT_IN_GM",
    "MME_Conversion_Factor",
    (pl.col("CALC_BASE_WT_IN_GM") * 1000 * pl.col("MME_Conversion_Factor")).alias("MME_expected"),
    ((pl.col("MME") - (pl.col("CALC_BASE_WT_IN_GM") * 1000 * pl.col("MME_Conversion_Factor"))).abs() < 0.01).alias("Formula_Match")
])
print(verification)

print("\n" + "=" * 60)
print("INTERPRETATION:")
print("=" * 60)
print("Typical prescription: 100-10,000 MME (e.g., 30 pills × 5mg × 1.5 factor = 225 MME)")
print("Bulk pharmacy order: 10K-1M MME (supplying pharmacy for weeks/months)")
print("Wholesale distribution: 1M-100M MME (regional distributor)")
print("Extreme values: >100M MME (may be legitimate mega-bulk or data errors)")
print("\nIf CALC_BASE_WT_IN_GM shows values > 100,000 grams (100kg), those are likely data corruption.")

MME VALUE DISTRIBUTION ANALYSIS

1. MME Percentile Distribution:
shape: (1, 8)
┌────────────┬─────────┬─────────┬─────────┬──────────┬──────────┬──────────┬───────────┐
│ p50_median ┆ p75     ┆ p90     ┆ p95     ┆ p99      ┆ p99.9    ┆ p99.99   ┆ maximum   │
│ ---        ┆ ---     ┆ ---     ┆ ---     ┆ ---      ┆ ---      ┆ ---      ┆ ---       │
│ f64        ┆ f64     ┆ f64     ┆ f64     ┆ f64      ┆ f64      ┆ f64      ┆ f64       │
╞════════════╪═════════╪═════════╪═════════╪══════════╪══════════╪══════════╪═══════════╡
│ 2421.6     ┆ 6723.75 ┆ 16137.0 ┆ 32274.0 ┆ 270720.0 ┆ 4.4588e6 ┆ 5.1638e7 ┆ 1.7018e14 │
└────────────┴─────────┴─────────┴─────────┴──────────┴──────────┴──────────┴───────────┘

2. Count of records in different MME ranges:

1. MME Percentile Distribution:
shape: (1, 8)
┌────────────┬─────────┬─────────┬─────────┬──────────┬──────────┬──────────┬───────────┐
│ p50_median ┆ p75     ┆ p90     ┆ p95     ┆ p99      ┆ p99.9    ┆ p99.99   ┆ maximum   │
│ ---        ┆ ---

: 

## Step 4: Clean County Names

In [4]:
print("=" * 60)
print("STEP 8b: CLEAN COUNTY NAMES")
print("=" * 60)

# Show sample of original county names
print("\n1. Sample of ORIGINAL county names (before cleaning):")
sample_counties_before = df_arcos.select("BUYER_COUNTY").unique().sort("BUYER_COUNTY").head(20)
print(sample_counties_before)

# Check for various issues
print("\n2. County Name Issues:")
county_checks = df_arcos.select([
    (pl.col("BUYER_COUNTY").str.contains("(?i)county")).sum().alias("has_county_suffix"),
    (pl.col("BUYER_COUNTY").str.strip_chars() != pl.col("BUYER_COUNTY")).sum().alias("has_whitespace"),
    pl.col("BUYER_COUNTY").n_unique().alias("unique_counties_before")
])
print(county_checks)

# Clean county names
print("\n3. Cleaning county names...")
df_arcos = df_arcos.with_columns([
    pl.col("BUYER_COUNTY")
    .str.strip_chars()  # Remove leading/trailing whitespace
    .str.to_uppercase()  # Convert to uppercase for consistency
    .str.replace(r"(?i)\s+COUNTY\s*$", "")  # Remove "COUNTY" suffix (case-insensitive)
    .str.strip_chars()  # Remove any trailing whitespace after removal
    .alias("BUYER_COUNTY")
])

print("✓ Cleaning applied!")

# Show sample of cleaned county names
print("\n4. Sample of CLEANED county names (after cleaning):")
sample_counties_after = df_arcos.select("BUYER_COUNTY").unique().sort("BUYER_COUNTY").head(20)
print(sample_counties_after)

# Verify cleaning results
print("\n5. Verification:")
county_checks_after = df_arcos.select([
    (pl.col("BUYER_COUNTY").str.contains("(?i)county")).sum().alias("still_has_county_suffix"),
    pl.col("BUYER_COUNTY").n_unique().alias("unique_counties_after")
])
print(county_checks_after)

print(f"\n{'=' * 60}")
print("✓ County name cleaning complete!")

STEP 8b: CLEAN COUNTY NAMES

1. Sample of ORIGINAL county names (before cleaning):
shape: (20, 1)
┌──────────────┐
│ BUYER_COUNTY │
│ ---          │
│ str          │
╞══════════════╡
│ null         │
│ ABBEVILLE    │
│ ACCOMACK     │
│ ADAMS        │
│ AIKEN        │
│ …            │
│ ALLENDALE    │
│ ALPINE       │
│ AMADOR       │
│ AMELIA       │
│ AMHERST      │
└──────────────┘

2. County Name Issues:
shape: (1, 3)
┌───────────────────┬────────────────┬────────────────────────┐
│ has_county_suffix ┆ has_whitespace ┆ unique_counties_before │
│ ---               ┆ ---            ┆ ---                    │
│ u32               ┆ u32            ┆ u32                    │
╞═══════════════════╪════════════════╪════════════════════════╡
│ 0                 ┆ 0              ┆ 778                    │
└───────────────────┴────────────────┴────────────────────────┘

3. Cleaning county names...
✓ Cleaning applied!

4. Sample of CLEANED county names (after cleaning):
shape: (20, 1)
┌─────────

## Step 5: Handle Invalid/Missing Values

In [5]:
print("=" * 60)
print("STEP 8c: HANDLE INVALID/MISSING VALUES")
print("=" * 60)

print(f"\n1. Initial row count: {df_arcos.shape[0]:,}")

# Check for null/missing values in critical columns
print("\n2. Checking for null values in critical columns:")
null_check = df_arcos.select([
    pl.col("BUYER_STATE").is_null().sum().alias("state_nulls"),
    pl.col("BUYER_COUNTY").is_null().sum().alias("county_nulls"),
    pl.col("year").is_null().sum().alias("year_nulls"),
    pl.col("MME").is_null().sum().alias("MME_nulls"),
    pl.col("DOSAGE_UNIT").is_null().sum().alias("DOSAGE_nulls")
])
print(null_check)

# Check for zero or negative values
print("\n3. Checking for invalid values (zero/negative):")
invalid_check = df_arcos.select([
    (pl.col("MME") <= 0).sum().alias("MME_invalid"),
    (pl.col("DOSAGE_UNIT") <= 0).sum().alias("DOSAGE_invalid"),
    ((pl.col("MME") <= 0) | (pl.col("DOSAGE_UNIT") <= 0)).sum().alias("either_invalid")
])
print(invalid_check)

# Filter out invalid records
print("\n4. Removing rows with invalid values...")
rows_before = df_arcos.shape[0]

df_arcos = df_arcos.filter(
    (pl.col("BUYER_STATE").is_not_null()) &
    (pl.col("BUYER_COUNTY").is_not_null()) &
    (pl.col("year").is_not_null()) &
    (pl.col("MME").is_not_null()) &
    (pl.col("DOSAGE_UNIT").is_not_null()) &
    (pl.col("MME") > 0) &
    (pl.col("DOSAGE_UNIT") > 0)
)

rows_after = df_arcos.shape[0]
rows_removed = rows_before - rows_after

print(f"   Rows before: {rows_before:,}")
print(f"   Rows after: {rows_after:,}")
print(f"   Rows removed: {rows_removed:,} ({(rows_removed/rows_before)*100:.2f}%)")

# Verify no invalid values remain
print("\n5. Verification - checking for remaining issues:")
verification = df_arcos.select([
    pl.col("BUYER_STATE").is_null().sum().alias("state_nulls"),
    pl.col("BUYER_COUNTY").is_null().sum().alias("county_nulls"),
    pl.col("year").is_null().sum().alias("year_nulls"),
    pl.col("MME").is_null().sum().alias("MME_nulls"),
    pl.col("DOSAGE_UNIT").is_null().sum().alias("DOSAGE_nulls"),
    (pl.col("MME") <= 0).sum().alias("MME_invalid"),
    (pl.col("DOSAGE_UNIT") <= 0).sum().alias("DOSAGE_invalid")
])
print(verification)

print(f"\n{'=' * 60}")
print("✓ Invalid values handled!")

STEP 8c: HANDLE INVALID/MISSING VALUES

1. Initial row count: 218,477,461

2. Checking for null values in critical columns:
shape: (1, 5)
┌─────────────┬──────────────┬────────────┬───────────┬──────────────┐
│ state_nulls ┆ county_nulls ┆ year_nulls ┆ MME_nulls ┆ DOSAGE_nulls │
│ ---         ┆ ---          ┆ ---        ┆ ---       ┆ ---          │
│ u32         ┆ u32          ┆ u32        ┆ u32       ┆ u32          │
╞═════════════╪══════════════╪════════════╪═══════════╪══════════════╡
│ 0           ┆ 6914         ┆ 0          ┆ 45        ┆ 27015222     │
└─────────────┴──────────────┴────────────┴───────────┴──────────────┘

3. Checking for invalid values (zero/negative):
shape: (1, 3)
┌─────────────┬────────────────┬────────────────┐
│ MME_invalid ┆ DOSAGE_invalid ┆ either_invalid │
│ ---         ┆ ---            ┆ ---            │
│ u32         ┆ u32            ┆ u32            │
╞═════════════╪════════════════╪════════════════╡
│ 50          ┆ 2484495        ┆ 2484542        │
└─

## Step 6: Verify MME Calculation Methodology

In [6]:
print("=" * 60)
print("MME CALCULATION VERIFICATION")
print("=" * 60)

# Sample 10 random records to verify MME calculation
sample = df_arcos.sample(n=10, seed=42)

print("\nVerifying MME calculation formula:")
print("Expected: MME = CALC_BASE_WT_IN_GM × 1000 × MME_Conversion_Factor")
print()

# Calculate expected MME
verification = sample.select([
    pl.col("DRUG_NAME"),
    pl.col("CALC_BASE_WT_IN_GM"),
    pl.col("MME_Conversion_Factor"),
    pl.col("DOSAGE_UNIT"),
    pl.col("MME").alias("MME_actual"),
    (pl.col("CALC_BASE_WT_IN_GM") * 1000 * pl.col("MME_Conversion_Factor")).alias("MME_calculated"),
    ((pl.col("MME") - (pl.col("CALC_BASE_WT_IN_GM") * 1000 * pl.col("MME_Conversion_Factor"))).abs() < 0.01).alias("Match")
])

print("Sample verification (10 random records):")
print(verification)

# Check if all match
all_match = verification.filter(pl.col("Match") == False).shape[0] == 0

if all_match:
    print("\n✓ All sample records match the formula!")
    print("  Formula confirmed: MME = CALC_BASE_WT_IN_GM (g) × 1000 × MME_Conversion_Factor")
else:
    print("\n⚠ WARNING: Some records don't match the expected formula!")
    print("  This may indicate a different calculation method or data issues.")

print("\n" + "=" * 60)
print("Key Points:")
print("  1. MME is pre-calculated in the ARCOS data")
print("  2. Formula: Total drug weight (g) × 1000 × conversion factor")
print("  3. This is DIFFERENT from: drug_strength × dosage_units × conversion_factor")
print("  4. CALC_BASE_WT_IN_GM represents the total active ingredient weight")
print("=" * 60)

MME CALCULATION VERIFICATION

Verifying MME calculation formula:
Expected: MME = CALC_BASE_WT_IN_GM × 1000 × MME_Conversion_Factor

Sample verification (10 random records):
shape: (10, 7)
┌───────────────┬───────────────┬───────────────┬─────────────┬────────────┬───────────────┬───────┐
│ DRUG_NAME     ┆ CALC_BASE_WT_ ┆ MME_Conversio ┆ DOSAGE_UNIT ┆ MME_actual ┆ MME_calculate ┆ Match │
│ ---           ┆ IN_GM         ┆ n_Factor      ┆ ---         ┆ ---        ┆ d             ┆ ---   │
│ str           ┆ ---           ┆ ---           ┆ f64         ┆ f64        ┆ ---           ┆ bool  │
│               ┆ f64           ┆ f64           ┆             ┆            ┆ f64           ┆       │
╞═══════════════╪═══════════════╪═══════════════╪═════════════╪════════════╪═══════════════╪═══════╡
│ OXYCODONE     ┆ 7.172         ┆ 1.5           ┆ 100.0       ┆ 10758.0    ┆ 10758.0       ┆ true  │
│ BUPRENORPHINE ┆ 0.24          ┆ 30.0          ┆ 30.0        ┆ 7200.0     ┆ 7200.0        ┆ true  │
│ OX

In [7]:
print("=" * 60)
print("STEP 8d: CLEANED DATA SUMMARY")
print("=" * 60)

# Overall statistics
print("\n1. Data Dimensions:")
print(f"   Total rows: {df_arcos.shape[0]:,}")
print(f"   Total columns: {df_arcos.shape[1]}")

# Year distribution
print("\n2. Year Distribution:")
year_dist = df_arcos.group_by("year").agg(
    pl.count().alias("record_count")
).sort("year")
print(year_dist)

# State distribution
print("\n3. State Distribution:")
state_dist = df_arcos.group_by("BUYER_STATE").agg(
    pl.count().alias("record_count")
).sort("BUYER_STATE")
print(state_dist)

# County counts by state
print("\n4. County Counts by State:")
county_by_state = df_arcos.group_by("BUYER_STATE").agg(
    pl.col("BUYER_COUNTY").n_unique().alias("unique_counties")
).sort("BUYER_STATE")
print(county_by_state)

# Summary statistics for key variables
print("\n5. MME and Dosage Summary Statistics:")
summary_stats = df_arcos.select([
    pl.col("MME").sum().alias("total_MME"),
    pl.col("MME").mean().alias("avg_MME"),
    pl.col("MME").median().alias("median_MME"),
    pl.col("DOSAGE_UNIT").sum().alias("total_dosage"),
    pl.col("DOSAGE_UNIT").mean().alias("avg_dosage"),
    pl.col("DOSAGE_UNIT").median().alias("median_dosage")
])
print(summary_stats)

# Sample of clean data
print("\n6. Sample of Cleaned Data (first 10 rows):")
print(df_arcos.select(["BUYER_STATE", "BUYER_COUNTY", "year", "MME", "DOSAGE_UNIT"]).head(10))

print(f"\n{'=' * 60}")
print("✓ Data is clean and ready for aggregation!")

STEP 8d: CLEANED DATA SUMMARY

1. Data Dimensions:
   Total rows: 188,971,428
   Total columns: 11

2. Year Distribution:


(Deprecated in version 0.20.5)
  pl.count().alias("record_count")


shape: (10, 2)
┌──────┬──────────────┐
│ year ┆ record_count │
│ ---  ┆ ---          │
│ i32  ┆ u32          │
╞══════╪══════════════╡
│ 2006 ┆ 15310663     │
│ 2007 ┆ 16500532     │
│ 2008 ┆ 17560995     │
│ 2009 ┆ 18390776     │
│ 2010 ┆ 19240405     │
│ 2011 ┆ 20536778     │
│ 2012 ┆ 20853779     │
│ 2013 ┆ 21365622     │
│ 2014 ┆ 20163325     │
│ 2015 ┆ 19048553     │
└──────┴──────────────┘

3. State Distribution:


(Deprecated in version 0.20.5)
  pl.count().alias("record_count")


shape: (14, 2)
┌─────────────┬──────────────┐
│ BUYER_STATE ┆ record_count │
│ ---         ┆ ---          │
│ str         ┆ u32          │
╞═════════════╪══════════════╡
│ AL          ┆ 10603329     │
│ CA          ┆ 38640861     │
│ CO          ┆ 7812733      │
│ FL          ┆ 30004939     │
│ GA          ┆ 16320861     │
│ …           ┆ …            │
│ OR          ┆ 6803563      │
│ SC          ┆ 7526482      │
│ TN          ┆ 16179050     │
│ VA          ┆ 10603489     │
│ WA          ┆ 10917356     │
└─────────────┴──────────────┘

4. County Counts by State:
shape: (14, 2)
┌─────────────┬─────────────────┐
│ BUYER_STATE ┆ unique_counties │
│ ---         ┆ ---             │
│ str         ┆ u32             │
╞═════════════╪═════════════════╡
│ AL          ┆ 67              │
│ CA          ┆ 58              │
│ CO          ┆ 64              │
│ FL          ┆ 67              │
│ GA          ┆ 155             │
│ …           ┆ …               │
│ OR          ┆ 36              │
│ SC   

## Step 7: Aggregate to County-Year Level

In [8]:
# Aggregate to county-year level
print("Aggregating to county-year level...")
print("=" * 60)

# Group by state, county, and year; sum MME and DOSAGE_UNIT using Polars
df_county_year = (
    df_arcos
    .group_by(["BUYER_STATE", "BUYER_COUNTY", "year"])
    .agg([
        pl.col("MME").sum().alias("opioid_shipments_mme"),
        pl.col("DOSAGE_UNIT").sum().alias("total_pills")
    ])
)

# Rename columns for clarity
df_county_year = df_county_year.rename({
    "BUYER_STATE": "state",
    "BUYER_COUNTY": "county_name"
})

print(f"✓ Aggregation complete!")
print(f"  Aggregated rows: {df_county_year.shape[0]:,}")
print(f"  Columns: {df_county_year.shape[1]}")

print("\n" + "=" * 60)
print("Sample of aggregated data:")
print(df_county_year.head(20))

Aggregating to county-year level...
✓ Aggregation complete!
  Aggregated rows: 10,254
  Columns: 5

Sample of aggregated data:
shape: (20, 5)
┌───────┬─────────────┬──────┬──────────────────────┬─────────────┐
│ state ┆ county_name ┆ year ┆ opioid_shipments_mme ┆ total_pills │
│ ---   ┆ ---         ┆ ---  ┆ ---                  ┆ ---         │
│ str   ┆ str         ┆ i32  ┆ f64                  ┆ f64         │
╞═══════╪═════════════╪══════╪══════════════════════╪═════════════╡
│ GA    ┆ FULTON      ┆ 2007 ┆ 3.4870e9             ┆ 4.8480e8    │
│ AL    ┆ COVINGTON   ┆ 2008 ┆ 3.7174e7             ┆ 3.3649e6    │
│ NC    ┆ NEW HANOVER ┆ 2010 ┆ 2.7637e8             ┆ 1.3574e7    │
│ GA    ┆ HABERSHAM   ┆ 2009 ┆ 4.2864e7             ┆ 2.3811e6    │
│ WA    ┆ JEFFERSON   ┆ 2010 ┆ 2.9136e7             ┆ 1.5222e6    │
│ …     ┆ …           ┆ …    ┆ …                    ┆ …           │
│ GA    ┆ PICKENS     ┆ 2015 ┆ 3.6436e7             ┆ 1.8861e6    │
│ WA    ┆ WHITMAN     ┆ 2015 ┆ 2.3990e7   

## Step 8: Add FIPS Codes for Merging with Population Data

In [16]:
print("Adding FIPS codes to prescription data...")
print("=" * 60)

# Load FIPS reference file
fips_file = '../reference/fips.txt'
print(f"\n1. Loading FIPS reference from: {fips_file}")

# State FIPS code mapping
state_fips_map = {
    '01': 'AL', '02': 'AK', '04': 'AZ', '05': 'AR', '06': 'CA',
    '08': 'CO', '09': 'CT', '10': 'DE', '11': 'DC', '12': 'FL',
    '13': 'GA', '15': 'HI', '16': 'ID', '17': 'IL', '18': 'IN',
    '19': 'IA', '20': 'KS', '21': 'KY', '22': 'LA', '23': 'ME',
    '24': 'MD', '25': 'MA', '26': 'MI', '27': 'MN', '28': 'MS',
    '29': 'MO', '30': 'MT', '31': 'NE', '32': 'NV', '33': 'NH',
    '34': 'NJ', '35': 'NM', '36': 'NY', '37': 'NC', '38': 'ND',
    '39': 'OH', '40': 'OK', '41': 'OR', '42': 'PA', '44': 'RI',
    '45': 'SC', '46': 'SD', '47': 'TN', '48': 'TX', '49': 'UT',
    '50': 'VT', '51': 'VA', '53': 'WA', '54': 'WV', '55': 'WI',
    '56': 'WY'
}

fips_data = []
with open(fips_file, 'r') as f:
    for line in f:
        # Look for lines with 5-digit FIPS codes (format: "    01001        Autauga County")
        stripped = line.strip()
        if len(stripped) >= 6 and stripped[:5].isdigit():
            fips_code = stripped[:5]
            place_name = stripped[5:].strip()
            
            # Skip state-level codes (ending in 000) and header rows
            if fips_code.endswith('000') or not place_name:
                continue
            
            # Extract state abbreviation from FIPS code
            state_code = fips_code[:2]
            state_abbrev = state_fips_map.get(state_code)
            
            if state_abbrev:
                # Clean county name: remove "County", "Parish", "Borough", etc.
                county_name = place_name.upper()
                for suffix in [' COUNTY', ' PARISH', ' BOROUGH', ' CENSUS AREA', 
                              ' CITY AND BOROUGH', ' MUNICIPALITY', ' CITY']:
                    if county_name.endswith(suffix):
                        county_name = county_name[:-len(suffix)].strip()
                        break
                
                fips_data.append({
                    'fips': fips_code,
                    'county_name': county_name,
                    'state': state_abbrev
                })

df_fips = pl.DataFrame(fips_data)

# Add manual mappings for common naming variations
manual_mappings = [
    # Florida
    {'fips': '12086', 'county_name': 'MIAMI-DADE', 'state': 'FL'},
    {'fips': '12109', 'county_name': 'SAINT JOHNS', 'state': 'FL'},
    {'fips': '12103', 'county_name': 'SAINT LUCIE', 'state': 'FL'},
    # Minnesota
    {'fips': '27137', 'county_name': 'SAINT LOUIS', 'state': 'MN'},
    # Alabama
    {'fips': '01049', 'county_name': 'DE KALB', 'state': 'AL'},
    {'fips': '01115', 'county_name': 'SAINT CLAIR', 'state': 'AL'},
    # Virginia Independent Cities
    {'fips': '51770', 'county_name': 'ROANOKE CITY', 'state': 'VA'},
    {'fips': '51600', 'county_name': 'FAIRFAX CITY', 'state': 'VA'},
    {'fips': '51760', 'county_name': 'RICHMOND CITY', 'state': 'VA'},
    {'fips': '51550', 'county_name': 'CHESAPEAKE CITY', 'state': 'VA'},
    {'fips': '51660', 'county_name': 'HARRISONBURG CITY', 'state': 'VA'},
    {'fips': '51840', 'county_name': 'WINCHESTER CITY', 'state': 'VA'},
    {'fips': '51595', 'county_name': 'EMPORIA CITY', 'state': 'VA'},
    {'fips': '51683', 'county_name': 'MANASSAS CITY', 'state': 'VA'},
    {'fips': '51710', 'county_name': 'NORFOLK CITY', 'state': 'VA'},
    {'fips': '51800', 'county_name': 'SUFFOLK CITY', 'state': 'VA'},
    {'fips': '51540', 'county_name': 'CHARLOTTESVILLE CITY', 'state': 'VA'},
    {'fips': '51640', 'county_name': 'GALAX CITY', 'state': 'VA'},
    {'fips': '51610', 'county_name': 'FALLS CHURCH CITY', 'state': 'VA'},
    {'fips': '51720', 'county_name': 'NORTON CITY', 'state': 'VA'},
    {'fips': '51735', 'county_name': 'POQUOSON CITY', 'state': 'VA'},
    {'fips': '51740', 'county_name': 'PORTSMOUTH CITY', 'state': 'VA'},
    {'fips': '51670', 'county_name': 'HOPEWELL CITY', 'state': 'VA'},
    {'fips': '51520', 'county_name': 'BRISTOL CITY', 'state': 'VA'},
    {'fips': '51570', 'county_name': 'COLONIAL HEIGHTS CITY', 'state': 'VA'},
    {'fips': '51620', 'county_name': 'FRANKLIN CITY', 'state': 'VA'},
    {'fips': '51690', 'county_name': 'MARTINSVILLE CITY', 'state': 'VA'},
    {'fips': '51730', 'county_name': 'PETERSBURG CITY', 'state': 'VA'},
    {'fips': '51775', 'county_name': 'SALEM CITY', 'state': 'VA'},
    {'fips': '51820', 'county_name': 'WAYNESBORO CITY', 'state': 'VA'},
    {'fips': '51830', 'county_name': 'WILLIAMSBURG CITY', 'state': 'VA'},
    {'fips': '51700', 'county_name': 'NEWPORT NEWS CITY', 'state': 'VA'},
    {'fips': '51678', 'county_name': 'LEXINGTON CITY', 'state': 'VA'},
    {'fips': '51580', 'county_name': 'COVINGTON CITY', 'state': 'VA'},
    {'fips': '51630', 'county_name': 'FREDERICKSBURG CITY', 'state': 'VA'},
    {'fips': '51650', 'county_name': 'HAMPTON CITY', 'state': 'VA'},
    {'fips': '51590', 'county_name': 'DANVILLE CITY', 'state': 'VA'},
    {'fips': '51810', 'county_name': 'VIRGINIA BEACH CITY', 'state': 'VA'},
    {'fips': '51790', 'county_name': 'STAUNTON CITY', 'state': 'VA'},
    {'fips': '51685', 'county_name': 'MANASSAS PARK CITY', 'state': 'VA'},
    {'fips': '51680', 'county_name': 'LYNCHBURG CITY', 'state': 'VA'},
    {'fips': '51530', 'county_name': 'BUENA VISTA CITY', 'state': 'VA'},
    {'fips': '51510', 'county_name': 'ALEXANDRIA CITY', 'state': 'VA'},
    # Other states
    {'fips': '32510', 'county_name': 'CARSON CITY', 'state': 'NV'},
    {'fips': '12027', 'county_name': 'DE SOTO', 'state': 'FL'},
    {'fips': '08014', 'county_name': 'BROOMFIELD', 'state': 'CO'},
]

df_manual = pl.DataFrame(manual_mappings)
df_fips = pl.concat([df_fips, df_manual])

print(f"   ✓ Loaded {len(df_fips)} FIPS codes (including manual mappings)")
print(f"   Sample FIPS data:")
print(df_fips.head(10))

# Merge FIPS codes with prescription data
print(f"\n2. Merging FIPS codes with prescription data...")
print(f"   Prescription data: {df_county_year.shape[0]} rows")

df_county_year_with_fips = df_county_year.join(
    df_fips,
    on=['state', 'county_name'],
    how='left'
)

# Check merge results
matched = df_county_year_with_fips.filter(pl.col('fips').is_not_null()).shape[0]
unmatched = df_county_year_with_fips.filter(pl.col('fips').is_null()).shape[0]
match_rate = (matched / df_county_year_with_fips.shape[0]) * 100

print(f"\n3. Merge Results:")
print(f"   Total rows: {df_county_year_with_fips.shape[0]:,}")
print(f"   Matched: {matched:,} ({match_rate:.1f}%)")
print(f"   Unmatched: {unmatched:,}")

if unmatched > 0:
    print(f"\n4. Sample of unmatched counties:")
    unmatched_counties = df_county_year_with_fips.filter(
        pl.col('fips').is_null()
    ).select(['state', 'county_name']).unique()
    print(unmatched_counties.head(20))
    
print("\n" + "=" * 60)
print("✓ FIPS codes added!")

Adding FIPS codes to prescription data...

1. Loading FIPS reference from: ../reference/fips.txt
   ✓ Loaded 3190 FIPS codes (including manual mappings)
   Sample FIPS data:
shape: (10, 3)
┌───────┬─────────────┬───────┐
│ fips  ┆ county_name ┆ state │
│ ---   ┆ ---         ┆ ---   │
│ str   ┆ str         ┆ str   │
╞═══════╪═════════════╪═══════╡
│ 01001 ┆ AUTAUGA     ┆ AL    │
│ 01003 ┆ BALDWIN     ┆ AL    │
│ 01005 ┆ BARBOUR     ┆ AL    │
│ 01007 ┆ BIBB        ┆ AL    │
│ 01009 ┆ BLOUNT      ┆ AL    │
│ 01011 ┆ BULLOCK     ┆ AL    │
│ 01013 ┆ BUTLER      ┆ AL    │
│ 01015 ┆ CALHOUN     ┆ AL    │
│ 01017 ┆ CHAMBERS    ┆ AL    │
│ 01019 ┆ CHEROKEE    ┆ AL    │
└───────┴─────────────┴───────┘

2. Merging FIPS codes with prescription data...
   Prescription data: 10254 rows

3. Merge Results:
   Total rows: 10,304
   Matched: 10,304 (100.0%)
   Unmatched: 0

✓ FIPS codes added!


## Step 9: Validate and Save Final Dataset

In [20]:
print("=" * 60)
print("FINAL VALIDATION AND SAVE")
print("=" * 60)

# Reorder columns to put FIPS first
df_final = df_county_year_with_fips.select([
    'fips', 'state', 'county_name', 'year', 'opioid_shipments_mme', 'total_pills'
])

# Check unique combinations before deduplication
unique_before = df_final.select(['fips', 'year']).unique().shape[0]
print(f"\nBefore deduplication:")
print(f"   Total rows: {df_final.shape[0]:,}")
print(f"   Unique FIPS-Year combinations: {unique_before:,}")

# If there are duplicates, aggregate them
if df_final.shape[0] > unique_before:
    duplicates_count = df_final.shape[0] - unique_before
    print(f"   Found {duplicates_count} duplicate FIPS-Year entries. Aggregating...")
    
    # Aggregate by FIPS and Year, summing the values
    df_final = df_final.group_by(['fips', 'year']).agg([
        pl.col('state').first(),
        pl.col('county_name').first(),
        pl.col('opioid_shipments_mme').sum(),
        pl.col('total_pills').sum()
    ]).sort(['state', 'county_name', 'year'])
    
    print(f"   After aggregation: {df_final.shape[0]:,} rows")

# Reorder columns
df_final = df_final.select([
    'fips', 'state', 'county_name', 'year', 'opioid_shipments_mme', 'total_pills'
])

# Final validation
unique_final = df_final.select(['fips', 'year']).unique().shape[0]
print(f"\n1. Data Quality Checks:")
print(f"   Total rows: {df_final.shape[0]:,}")
print(f"   Unique FIPS-Year: {unique_final:,}")
print(f"   Years: {df_final['year'].min()} - {df_final['year'].max()}")
print(f"   States: {df_final['state'].n_unique()}")
print(f"   Counties: {df_final['county_name'].n_unique()}")
print(f"   Missing FIPS: {df_final.filter(pl.col('fips').is_null()).shape[0]}")
print(f"   Actual Duplicates: {df_final.shape[0] - unique_final}")

# Save to parquet
output_file = '../data/processed/arcos_county_year_with_fips.parquet'
print(f"\n2. Saving to: {output_file}")

df_final.write_parquet(output_file, compression='snappy')

file_size = os.path.getsize(output_file)
print(f"   ✓ File saved successfully!")
print(f"   Size: {file_size:,} bytes ({file_size / 1024:.2f} KB)")

print("\n" + "=" * 60)
print("SUMMARY:")
print("=" * 60)
print(f"  Time period: {df_final['year'].min()} - {df_final['year'].max()}")
print(f"  States: {df_final['state'].n_unique()}")
print(f"  Counties: {df_final['county_name'].n_unique()}")
print(f"  Total observations: {df_final.shape[0]:,}")
print(f"\n  Columns: {df_final.columns}")

print("\n" + "=" * 60)
print("Sample of final data:")
print(df_final.head(10))

print("\n✓ Preprocessing complete! Ready to merge with population data.")

FINAL VALIDATION AND SAVE

Before deduplication:
   Total rows: 10,304
   Unique FIPS-Year combinations: 10,254
   Found 50 duplicate FIPS-Year entries. Aggregating...
   After aggregation: 10,254 rows

1. Data Quality Checks:
   Total rows: 10,254
   Unique FIPS-Year: 10,254
   Years: 2006 - 2015
   States: 14
   Counties: 774
   Missing FIPS: 0
   Actual Duplicates: 0

2. Saving to: ../data/processed/arcos_county_year_with_fips.parquet
   ✓ File saved successfully!
   Size: 174,453 bytes (170.36 KB)

SUMMARY:
  Time period: 2006 - 2015
  States: 14
  Counties: 774
  Total observations: 10,254

  Columns: ['fips', 'state', 'county_name', 'year', 'opioid_shipments_mme', 'total_pills']

Sample of final data:
shape: (10, 6)
┌───────┬───────┬─────────────┬──────┬──────────────────────┬─────────────┐
│ fips  ┆ state ┆ county_name ┆ year ┆ opioid_shipments_mme ┆ total_pills │
│ ---   ┆ ---   ┆ ---         ┆ ---  ┆ ---                  ┆ ---         │
│ str   ┆ str   ┆ str         ┆ i32  ┆ f