## ARCOS Data Processing

Process ARCOS (Automation of Reports and Consolidated Orders System) dataset to create county-year aggregated opioid distribution data. This notebook handles large-scale transaction data, calculates morphine milligram equivalents (MME), and implements quality controls including outlier detection and data validation.

### Import Libraries

In [1]:
import pandas as pd
import numpy as np
import os
from pathlib import Path

pd.set_option("mode.copy_on_write", True)

### Define Validation Functions

Helper functions for data quality checks throughout the processing pipeline.

In [2]:
def validate_file_exists(filepath):
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"Required file not found: {filepath}")
    return True


def validate_columns(df, required_cols, dataset_name="dataset"):
    missing = set(required_cols) - set(df.columns)
    if missing:
        raise ValueError(f"{dataset_name} missing required columns: {missing}")
    return True


def validate_year_range(df, year_col, min_year, max_year):
    invalid = df[(df[year_col] < min_year) | (df[year_col] > max_year)]
    if len(invalid) > 0:
        print(
            f"Warning: {len(invalid)} records with years outside {min_year}-{max_year}"
        )
        print(f"  Year range found: {df[year_col].min()} to {df[year_col].max()}")
    return len(invalid)


def validate_positive_values(df, col, allow_zero=False):
    threshold = 0 if allow_zero else 0
    invalid = df[df[col] <= threshold] if not allow_zero else df[df[col] < threshold]
    if len(invalid) > 0:
        print(f"Warning: {len(invalid)} records with non-positive {col}")
    return len(invalid)


def validate_merge_success(df_before, df_after, merge_type="merge"):
    before_count = len(df_before)
    after_count = len(df_after)
    if after_count < before_count:
        loss_pct = 100 * (before_count - after_count) / before_count
        print(
            f"Warning: {merge_type} resulted in {before_count - after_count} lost rows ({loss_pct:.1f}%)"
        )
    return after_count


def check_duplicates(df, subset_cols):
    dupes = df.duplicated(subset=subset_cols)
    dupe_count = dupes.sum()
    if dupe_count > 0:
        print(f"Warning: {dupe_count} duplicate records found on {subset_cols}")
    return dupe_count

### Initial Data Exploration

Load a small sample to verify file structure and identify available columns.

### Input Validation

Verify file exists and contains required columns before full processing.

In [4]:
# Define required columns and expected states
required_cols = [
    "BUYER_STATE",
    "BUYER_COUNTY",
    "TRANSACTION_DATE",
    "MME_Conversion_Factor",
    "CALC_BASE_WT_IN_GM",
]

selected_states = ["FL", "WA", "NC", "GA", "OR", "SC", "ID", "MT", "UT"]

# Validate sample has required columns
validate_columns(sample, required_cols, "ARCOS sample")

print(f"\nValidation passed. Required columns present.")
print(f"Processing {len(selected_states)} states: {', '.join(selected_states)}")


Validation passed. Required columns present.
Processing 9 states: FL, WA, NC, GA, OR, SC, ID, MT, UT


In [3]:
filepath = "../01_data/raw/arcos/arcos_all.zip"

# Verify file exists before attempting to load
validate_file_exists(filepath)

sample = pd.read_csv(filepath, compression="zip", nrows=100, delimiter="\t")

print(f"Sample data shape: {sample.shape}")
print(f"\nColumns: {list(sample.columns)}")
print(f"\nUnique drugs: {sample['DRUG_NAME'].unique()}")

Sample data shape: (100, 33)

Columns: ['REPORTER_DEA_NO', 'REPORTER_BUS_ACT', 'REPORTER_NAME', 'REPORTER_ADDL_CO_INFO', 'REPORTER_ADDRESS1', 'REPORTER_ADDRESS2', 'REPORTER_CITY', 'REPORTER_STATE', 'REPORTER_ZIP', 'REPORTER_COUNTY', 'BUYER_DEA_NO', 'BUYER_BUS_ACT', 'BUYER_NAME', 'BUYER_ADDL_CO_INFO', 'BUYER_ADDRESS1', 'BUYER_ADDRESS2', 'BUYER_CITY', 'BUYER_STATE', 'BUYER_ZIP', 'BUYER_COUNTY', 'TRANSACTION_CODE', 'DRUG_CODE', 'NDC_NO', 'DRUG_NAME', 'Measure', 'MME_Conversion_Factor', 'Dosage_Strength', 'TRANSACTION_DATE', 'Combined_Labeler_Name', 'Reporter_family', 'CALC_BASE_WT_IN_GM', 'DOSAGE_UNIT', 'MME']

Unique drugs: ['CODEINE' 'OXYCODONE' 'HYDROCODONE' 'METHADONE' 'FENTANYL'
 'BUPRENORPHINE' 'MORPHINE' 'OXYMORPHONE']


### Process Full Dataset

Process complete ARCOS data in chunks to manage memory. Filter to target states, calculate MME values, and aggregate to county-year level.

In [20]:
# Keep track of processing stats
chunks = []
total_records_processed = 0
total_records_kept = 0

# Column subset for memory efficiency
selected_cols = [
    "BUYER_STATE",
    "BUYER_COUNTY",
    "BUYER_BUS_ACT",
    "BUYER_NAME",
    "TRANSACTION_DATE",
    "MME_Conversion_Factor",
    "CALC_BASE_WT_IN_GM",
]

In [21]:
for i, chunk in enumerate(
    pd.read_csv(
        filepath,
        chunksize=800_000,
        compression="zip",
        delimiter="\t",
        usecols=selected_cols,
        low_memory=True,
    )
):
    total_records_processed += len(chunk)

    state_selection = chunk[chunk["BUYER_STATE"].isin(selected_states)]
    total_records_kept += len(state_selection)

    state_selection["TOTAL_MME"] = (
        state_selection["MME_Conversion_Factor"]
        * state_selection["CALC_BASE_WT_IN_GM"]
        * 1000
    )

    state_selection["YEAR"] = pd.to_datetime(
        state_selection["TRANSACTION_DATE"]
    ).dt.year

    # Aggregate by county-year, keeping buyer activity info
    grouped = state_selection.groupby(
        ["BUYER_STATE", "BUYER_COUNTY", "YEAR"], as_index=False
    ).agg(
        {
            "TOTAL_MME": "sum",
            "BUYER_BUS_ACT": lambda x: ",".join(sorted(x.unique())),
            "BUYER_NAME": "nunique",
        }
    )
    grouped.rename(columns={"BUYER_NAME": "unique_buyers"}, inplace=True)

    chunks.append(grouped)
    print(
        f"Processed chunk {i+1}: {len(chunk):,} records, kept {len(state_selection):,} from target states"
    )

df = pd.concat(chunks, ignore_index=True)

# Final aggregation across all chunks
df = df.groupby(["BUYER_STATE", "BUYER_COUNTY", "YEAR"], as_index=False).agg(
    {
        "TOTAL_MME": "sum",
        "BUYER_BUS_ACT": lambda x: ",".join(sorted(set(",".join(x).split(",")))),
        "unique_buyers": "sum",
    }
)

print(f"\nProcessing complete:")
print(f"  Total records scanned: {total_records_processed:,}")
print(
    f"  Records from target states: {total_records_kept:,} ({100*total_records_kept/total_records_processed:.1f}%)"
)
print(f"  Final aggregated rows: {len(df):,}")

df.sample(10)

Processed chunk 1: 800,000 records, kept 0 from target states
Processed chunk 2: 800,000 records, kept 0 from target states
Processed chunk 2: 800,000 records, kept 0 from target states
Processed chunk 3: 800,000 records, kept 0 from target states
Processed chunk 3: 800,000 records, kept 0 from target states
Processed chunk 4: 800,000 records, kept 0 from target states
Processed chunk 4: 800,000 records, kept 0 from target states
Processed chunk 5: 800,000 records, kept 0 from target states
Processed chunk 5: 800,000 records, kept 0 from target states
Processed chunk 6: 800,000 records, kept 0 from target states
Processed chunk 6: 800,000 records, kept 0 from target states
Processed chunk 7: 800,000 records, kept 0 from target states
Processed chunk 7: 800,000 records, kept 0 from target states
Processed chunk 8: 800,000 records, kept 0 from target states
Processed chunk 8: 800,000 records, kept 0 from target states
Processed chunk 9: 800,000 records, kept 0 from target states
Processe

Unnamed: 0,BUYER_STATE,BUYER_COUNTY,YEAR,TOTAL_MME,BUYER_BUS_ACT,unique_buyers
1382,GA,COFFEE,2017,91544320.0,"CHAIN PHARMACY,HOSPITAL/CLINIC,PRACTITIONER,RE...",155
5369,NC,PERQUIMANS,2019,7376052.0,"MLP-AMBULANCE SERVICE,RETAIL PHARMACY",22
4225,MT,ROSEBUD,2014,5836508.0,"HOSP/CLINIC FED,HOSPITAL/CLINIC,PRACTITIONER,R...",22
7479,WA,ISLAND,2007,47238210.0,"CHAIN PHARMACY,HOSP/CLINIC- MIL,HOSPITAL/CLINI...",415
6126,OR,POLK,2006,30306390.0,"CHAIN PHARMACY,HOSPITAL/CLINIC,PRACTITIONER,RE...",187
2721,GA,TELFAIR,2019,28565400.0,"CHAIN PHARMACY,PRACTITIONER,RETAIL PHARMACY",32
5449,NC,ROBESON,2015,217894600.0,"CHAIN PHARMACY,HOSPITAL/CLINIC,MAINTENANCE,PRA...",394
4776,NC,DAVIE,2007,35909740.0,"CHAIN PHARMACY,HOSPITAL/CLINIC,MLP-NURSE PRACT...",327
231,FL,FLAGLER,2013,118705300.0,"CHAIN PHARMACY,HOSPITAL/CLINIC,PRACTITIONER,PR...",805
4524,NC,BUNCOMBE,2010,579786500.0,"CHAIN PHARMACY,DISTRIBUTOR,HOSP/CLINIC-VA,HOSP...",2128


In [22]:
print("Running validation checks on aggregated ARCOS data...\n")

# Check for duplicates after aggregation
dupe_count = check_duplicates(df, ["BUYER_STATE", "BUYER_COUNTY", "YEAR"])
if dupe_count > 0:
    print(f"ERROR: Found {dupe_count} duplicates after aggregation")
    print("Removing duplicates...")
    df = df.drop_duplicates(
        subset=["BUYER_STATE", "BUYER_COUNTY", "YEAR"], keep="first"
    )

# Validate year range (ARCOS data is 2006-2015)
invalid_years = validate_year_range(df, "YEAR", 2006, 2015)

# Check for negative or zero MME values
invalid_mme = validate_positive_values(df, "TOTAL_MME", allow_zero=False)
if invalid_mme > 0:
    print(f"Removing {invalid_mme} records with invalid MME values")
    df = df[df["TOTAL_MME"] > 0].copy()

# Check state coverage
states_found = df["BUYER_STATE"].unique()
states_missing = set(selected_states) - set(states_found)
if states_missing:
    print(f"Warning: Missing data for states: {states_missing}")
else:
    print(f"All {len(selected_states)} target states have data")

# Distribution check
print(f"\nData distribution by state:")
print(df.groupby("BUYER_STATE").size().sort_values(ascending=False))

print(f"\nMME statistics:")
print(f"  Total MME: {df['TOTAL_MME'].sum():,.0f} mg")
print(f"  Mean per county-year: {df['TOTAL_MME'].mean():,.0f} mg")
print(f"  Median per county-year: {df['TOTAL_MME'].median():,.0f} mg")

print("\nValidation complete.")

Running validation checks on aggregated ARCOS data...

  Year range found: 2006 to 2019
All 9 target states have data

Data distribution by state:
BUYER_STATE
GA    2127
NC    1382
FL     936
MT     731
SC     644
ID     586
WA     546
OR     496
UT     380
dtype: int64

MME statistics:
  Total MME: 3,641,135,146,453,877 mg
  Mean per county-year: 465,142,456,113 mg
  Median per county-year: 31,445,927 mg

Validation complete.


In [23]:
import os

# Export to clean data folder with proper naming
output_path = "../01_data/clean/arcos_by_county_year.csv"

# Ensure directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)

# Save as CSV for compatibility with downstream notebooks
df.to_csv(output_path, index=False)

print(f"Dataset exported to {output_path}")
print(f"Rows exported: {len(df):,}")
print(f"Columns: {list(df.columns)}")

Dataset exported to ../01_data/clean/arcos_by_county_year.csv
Rows exported: 7,828
Columns: ['BUYER_STATE', 'BUYER_COUNTY', 'YEAR', 'TOTAL_MME', 'BUYER_BUS_ACT', 'unique_buyers']


### Export Cleaned Data

Save the aggregated ARCOS dataset to the clean data folder.

In [24]:
print("Running final validation checks...\n")

# Check for missing values in key columns
print("Missing value check:")
for col in df.columns:
    missing_count = df[col].isna().sum()
    if missing_count > 0:
        print(f"  {col}: {missing_count} missing ({100*missing_count/len(df):.1f}%)")
    else:
        print(f"  {col}: 0 missing")

# Verify no duplicates remain
final_dupes = check_duplicates(df, ["BUYER_STATE", "BUYER_COUNTY", "YEAR"])
assert final_dupes == 0, "Duplicates found in final dataset"

# Check data ranges
print(f"\nValue ranges:")
print(f"  Years: {df['YEAR'].min()} to {df['YEAR'].max()}")
print(
    f"  Total MME range: {df['TOTAL_MME'].min():,.0f} to {df['TOTAL_MME'].max():,.0f}"
)
print(f"  Counties per state: {df.groupby('BUYER_STATE').size().describe()}")

# Temporal completeness check
print(f"\nTemporal coverage by state:")
for state in sorted(df["BUYER_STATE"].unique()):
    state_data = df[df["BUYER_STATE"] == state]
    years = sorted(state_data["YEAR"].unique())
    print(f"  {state}: {len(years)} years ({min(years)}-{max(years)})")

# Summary statistics
print(f"\nFinal dataset summary:")
print(f"  Total rows: {len(df):,}")
print(f"  States: {df['BUYER_STATE'].nunique()}")
print(f"  Counties: {df['BUYER_COUNTY'].nunique()}")
print(f"  Years: {df['YEAR'].nunique()}")
print(f"  Total MME: {df['TOTAL_MME'].sum():,.0f} mg")

print("\nValidation passed. Ready for export.")

Running final validation checks...

Missing value check:
  BUYER_STATE: 0 missing
  BUYER_COUNTY: 0 missing
  YEAR: 0 missing
  TOTAL_MME: 0 missing
  BUYER_BUS_ACT: 0 missing
  unique_buyers: 0 missing

Value ranges:
  Years: 2006 to 2019
  Total MME range: 0 to 740,037,027,208,646
  Counties per state: count       9.000000
mean      869.777778
std       557.063008
min       380.000000
25%       546.000000
50%       644.000000
75%       936.000000
max      2127.000000
dtype: float64

Temporal coverage by state:
  FL: 14 years (2006-2019)
  GA: 14 years (2006-2019)
  ID: 14 years (2006-2019)
  MT: 14 years (2006-2019)
  NC: 14 years (2006-2019)
  OR: 14 years (2006-2019)
  SC: 14 years (2006-2019)
  UT: 14 years (2006-2019)
  WA: 14 years (2006-2019)

Final dataset summary:
  Total rows: 7,828
  States: 9
  Counties: 467
  Years: 14
  Total MME: 3,641,135,146,453,877 mg

Validation passed. Ready for export.
