# Importing ARCOS Data with Dask

Last week, we used dask to play with a few datasets to get a feel for how dask works. In order to help us develop code that would run quickly, however, we worked with very small, safe datasets. 

Today, we will continue to work with dask, but this time using much larger datasets. This means that (a) doing things incorrectly may lead to your computer crashing (So save all your open files before you start!), and (b) many of the commands you are being asked run will take several minutes each. 

For familiarity, and so you can see what advantages dask can bring to your workflow, today we'll be working with the DEA ARCOS drug shipment database published by the Washington Post! However, to strike a balance between size and speed, we'll be working with a slightly thinned version that has only the last two years of data, instead of all six.

## Exercise 1

Download the thinned ARCOS data [from this link](https://www.dropbox.com/s/o7nc6yvrwog4ozi/arcos_2011_2012.tsv.zip?dl=0). It should be about 2GB zipped, 25 GB unzipped. 

## Exercise 2

Our goal today is going to be to find the pharmaceutical company that has shipped the most opioids (`MME_Conversion_Factor * CALC_BASE_WT_IN_GM`) in the US.

When working with large datasets, it is good practice to begin by prototyping your code with a subset of your data. So begin by using `pandas` to read in the first 100,000 lines of the ARCOS data and write pandas code to compute the shipments from each shipper (the group that reported the shipment). 

In [None]:
# Exercise 2: Calculate total shipments by reporter using pandas
# Read in 100,000 rows

import pandas as pd

pd.set_option("mode.copy_on_write", True)

file = "/Users/far/Downloads/arcos_2011_2012.tsv"

data_sample = pd.read_csv(file, sep="\t", nrows=100000)

# Calculate total shipments (opioid weight in morphine equivalents)
data_sample["total_shipment"] = (
    data_sample["MME_Conversion_Factor"] * data_sample["CALC_BASE_WT_IN_GM"]
)

# Group by reporter and sum shipments
shipments_by_reporter = (
    data_sample.groupby("REPORTER_NAME")["total_shipment"]
    .sum()
    .sort_values(ascending=False)
)

print("Top 10 reporters by total shipments (pandas, first 100k rows):")
print(shipments_by_reporter.head(10))

Top 10 reporters by total shipments (pandas, first 100k rows):
REPORTER_NAME
MCKESSON CORPORATION           299266.331225
CARDINAL HEALTH 110, LLC        54352.323711
AMERISOURCEBERGEN DRUG CORP     34561.394892
KINRAY INC                      28620.315246
LOUISIANA WHOLESALE DRUG CO     14787.765559
FRANK W KERR INC                 8730.016283
H D SMITH WHOLESALE DRUG CO      6399.324050
KAISER FOUNDATION HOSPITALS      3891.329580
BURLINGTON DRUG COMPANY          3889.490325
AMERICAN SALES COMPANY           3432.058005
Name: total_shipment, dtype: float64


  data_sample = pd.read_csv(file, sep="\t", nrows=100000)


## Exercise 3

Now let's turn to dask. Re-write your code for dask, and calculate the total shipments by reporting company. Remember: 

- Activate a conda environment with a clean dask installation.
- Start by spinning up a distributed cluster.
- Dask won't read compressed files, so you have to unzip your ARCOS data. 
- Start your cluster in a cell all by itself since you don't want to keep re-running the "start a cluster" code. 

If you need to review dask basic code, [check here](https://nickeubank.github.io/practicaldatascience_book/notebooks/PDS_not_yet_in_coursera/30_big_data/70_dask.html).

As you run your code, make sure to click on the Dashboard link below where you created your cluster:

![dask_dashboard](images/dask_cluster.png)

Among other things, the bar across the bottom should give you a sense of how long your task will take:

![dask_progress](images/dask_progress.png)

(For context, my computer (which has 10 cores) only took a couple seconds. My computer is fast, but most computers should be done within a couple minutes, tops).


In [5]:
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 10,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:65369,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:65382,Total threads: 2
Dashboard: http://127.0.0.1:65386/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:65372,
Local directory: /var/folders/s5/m8gf98wx5cz9783d91z25bjh0000gn/T/dask-scratch-space/worker-sz3ifow2,Local directory: /var/folders/s5/m8gf98wx5cz9783d91z25bjh0000gn/T/dask-scratch-space/worker-sz3ifow2

0,1
Comm: tcp://127.0.0.1:65384,Total threads: 2
Dashboard: http://127.0.0.1:65392/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:65374,
Local directory: /var/folders/s5/m8gf98wx5cz9783d91z25bjh0000gn/T/dask-scratch-space/worker-mdl635_0,Local directory: /var/folders/s5/m8gf98wx5cz9783d91z25bjh0000gn/T/dask-scratch-space/worker-mdl635_0

0,1
Comm: tcp://127.0.0.1:65385,Total threads: 2
Dashboard: http://127.0.0.1:65391/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:65376,
Local directory: /var/folders/s5/m8gf98wx5cz9783d91z25bjh0000gn/T/dask-scratch-space/worker-qxcjirb3,Local directory: /var/folders/s5/m8gf98wx5cz9783d91z25bjh0000gn/T/dask-scratch-space/worker-qxcjirb3

0,1
Comm: tcp://127.0.0.1:65383,Total threads: 2
Dashboard: http://127.0.0.1:65387/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:65378,
Local directory: /var/folders/s5/m8gf98wx5cz9783d91z25bjh0000gn/T/dask-scratch-space/worker-o5bfz635,Local directory: /var/folders/s5/m8gf98wx5cz9783d91z25bjh0000gn/T/dask-scratch-space/worker-o5bfz635

0,1
Comm: tcp://127.0.0.1:65390,Total threads: 2
Dashboard: http://127.0.0.1:65395/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:65380,
Local directory: /var/folders/s5/m8gf98wx5cz9783d91z25bjh0000gn/T/dask-scratch-space/worker-zdp__9di,Local directory: /var/folders/s5/m8gf98wx5cz9783d91z25bjh0000gn/T/dask-scratch-space/worker-zdp__9di


Task exception was never retrieved
future: <Task finished name='Task-252128' coro=<Client._gather.<locals>.wait() done, defined at /Users/far/miniforge3/lib/python3.12/site-packages/distributed/client.py:2388> exception=AllExit()>
Traceback (most recent call last):
  File "/Users/far/miniforge3/lib/python3.12/site-packages/distributed/client.py", line 2397, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-281940' coro=<Client._gather.<locals>.wait() done, defined at /Users/far/miniforge3/lib/python3.12/site-packages/distributed/client.py:2388> exception=AllExit()>
Traceback (most recent call last):
  File "/Users/far/miniforge3/lib/python3.12/site-packages/distributed/client.py", line 2397, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-286914' coro=<Client._gather.<locals>.wait() done, defined at /Users/far/miniforge3/lib/python3.12/si

In [6]:
# Check the columns and dtypes
sample_df = pd.read_csv(file, sep="\t", nrows=10000)
print("Columns and dtypes:")
print(sample_df.dtypes)

Columns and dtypes:
Unnamed: 0                 int64
REPORTER_DEA_NO           object
REPORTER_BUS_ACT          object
REPORTER_NAME             object
REPORTER_ADDL_CO_INFO     object
REPORTER_ADDRESS1         object
REPORTER_ADDRESS2         object
REPORTER_CITY             object
REPORTER_STATE            object
REPORTER_ZIP               int64
REPORTER_COUNTY           object
BUYER_DEA_NO              object
BUYER_BUS_ACT             object
BUYER_NAME                object
BUYER_ADDL_CO_INFO        object
BUYER_ADDRESS1            object
BUYER_ADDRESS2            object
BUYER_CITY                object
BUYER_STATE               object
BUYER_ZIP                  int64
BUYER_COUNTY              object
TRANSACTION_CODE          object
DRUG_CODE                  int64
NDC_NO                     int64
DRUG_NAME                 object
QUANTITY                 float64
UNIT                     float64
ACTION_INDICATOR         float64
ORDER_FORM_NO             object
CORRECTION_NO          

In [None]:
# Define explicit dtypes for all object columns to avoid type inference issues
dtype_dict = {
    "REPORTER_DEA_NO": "object",
    "REPORTER_BUS_ACT": "object",
    "REPORTER_NAME": "object",
    "REPORTER_ADDL_CO_INFO": "object",
    "REPORTER_ADDRESS1": "object",
    "REPORTER_ADDRESS2": "object",
    "REPORTER_CITY": "object",
    "REPORTER_STATE": "object",
    "REPORTER_COUNTY": "object",
    "BUYER_DEA_NO": "object",
    "BUYER_BUS_ACT": "object",
    "BUYER_NAME": "object",
    "BUYER_ADDL_CO_INFO": "object",
    "BUYER_ADDRESS1": "object",
    "BUYER_ADDRESS2": "object",
    "BUYER_CITY": "object",
    "BUYER_STATE": "object",
    "BUYER_COUNTY": "object",
    "TRANSACTION_CODE": "object",
    "NDC_NO": "object",
    "DRUG_NAME": "object",
    "UNIT": "object",
    "ACTION_INDICATOR": "object",
    "ORDER_FORM_NO": "object",
    "Product_Name": "object",
    "Ingredient_Name": "object",
    "Measure": "object",
    "Combined_Labeler_Name": "object",
    "Revised_Company_Name": "object",
    "Reporter_family": "object",
    "date": "object",
}

# Read the full dataset with dask
dask_df = dd.read_csv(file, sep="\t", dtype=dtype_dict)

# Calculate total shipments
dask_df["total_shipment"] = (
    dask_df["MME_Conversion_Factor"] * dask_df["CALC_BASE_WT_IN_GM"]
)

# Group by reporter and sum shipments
shipments_by_reporter_dask = dask_df.groupby("REPORTER_NAME")["total_shipment"].sum()

# Compute the result
result = shipments_by_reporter_dask.compute()

# Sort and display top 10
result_sorted = result.sort_values(ascending=False)
print("\nTop 10 reporters by total shipments (Dask, full dataset):")
print(result_sorted.head(10))

## Exercise 4

Now let's calculate, *for each state*, what company shipped the most pills?

Note you will quickly find that you can't sort in dask -- sorting in parallel is *really* tricky! So you'll have to work around that. Do what you need to do on the big dataset first, then compute it all so you get it as a regular pandas dataframe, then finish. 

In [None]:
# First, we need to group by state and reporter to get total shipments
# Then find the reporter with max shipments per state

# Calculate total shipments (reusing dask_df with total_shipment column from Exercise 3)
# Group by BUYER_STATE (destination) and REPORTER_NAME
state_reporter_shipments = dask_df.groupby(["BUYER_STATE", "REPORTER_NAME"])[
    "total_shipment"
].sum()

# Compute to get as pandas DataFrame
state_reporter_df = state_reporter_shipments.compute().reset_index()

# For each state, find the reporter with maximum total_shipment
top_shipper_per_state = state_reporter_df.loc[
    state_reporter_df.groupby("BUYER_STATE")["total_shipment"].idxmax()
]

# Sort by state for readability
top_shipper_per_state_sorted = top_shipper_per_state.sort_values("BUYER_STATE")

print("\nTop shipper (by total opioid shipments) for each state:")
print(top_shipper_per_state_sorted[["BUYER_STATE", "REPORTER_NAME", "total_shipment"]])


Top shipper (by total opioid shipments) for each state:
     BUYER_STATE                   REPORTER_NAME  total_shipment
2880          AK                 CARDINAL HEALTH    1.549137e+05
3140          AL            MCKESSON CORPORATION    2.143844e+06
1729          AR            MCKESSON CORPORATION    6.053744e+05
3213          AZ                     WALGREEN CO    2.312494e+06
1398          CA            MCKESSON CORPORATION    5.152963e+06
1003          CO                     WALGREEN CO    1.044110e+06
3398          CT                 CARDINAL HEALTH    1.016512e+06
3141          DC                 CARDINAL HEALTH    1.329334e+05
2734          DE                     WALGREEN CO    6.339375e+05
2075          FL                     WALGREEN CO    6.566340e+06
2471          GA            MCKESSON CORPORATION    1.643461e+06
1512          GU     AMERISOURCEBERGEN DRUG CORP    7.272706e+03
1243          HI            MCKESSON CORPORATION    3.371645e+05
3343          IA          AMERISO

Does this seem like a situation where a single company is responsible for the opioid epidemic?

In [None]:
# Analysis: Is a single company responsible for the opioid epidemic?
print("\nDistribution of top shippers by state:")
top_company_counts = top_shipper_per_state_sorted["REPORTER_NAME"].value_counts()
print(top_company_counts)

print("\n" + "=" * 70)
print("CONCLUSION:")
print("=" * 70)
print(f"Out of {len(top_shipper_per_state_sorted)} states/territories:")
print(
    f"  - McKesson Corporation is the top shipper in {top_company_counts.get('MCKESSON CORPORATION', 0)} states"
)
print(
    f"  - Cardinal Health is the top shipper in {top_company_counts.get('CARDINAL HEALTH', 0)} states"
)
print(
    f"  - Walgreen Co is the top shipper in {top_company_counts.get('WALGREEN CO', 0)} states"
)
print("\nNo, this does NOT seem like a single company is responsible.")
print("While McKesson dominates in many states, the opioid epidemic appears")
print("to be driven by multiple major pharmaceutical distributors (McKesson,")
print("Cardinal Health, Walgreens, and AmerisourceBergen) across different regions.")


Distribution of top shippers by state:
REPORTER_NAME
MCKESSON CORPORATION              23
CARDINAL HEALTH                   13
WALGREEN CO                        8
AMERISOURCEBERGEN DRUG CORP        4
MCKESSON DRUG COMPANY              2
AMERISOURCEBERGEN DRUG             1
MORRIS & DICKSON CO                1
CARDINAL HEALTH 110, LLC           1
DROGUERIA BETANCES, LLC            1
CARDINAL HEALTH P.R. 120, INC.     1
Name: count, dtype: int64[pyarrow]

CONCLUSION:
Out of 55 states/territories:
  - McKesson Corporation is the top shipper in 23 states
  - Cardinal Health is the top shipper in 13 states
  - Walgreen Co is the top shipper in 8 states

No, this does NOT seem like a single company is responsible.
While McKesson dominates in many states, the opioid epidemic appears
to be driven by multiple major pharmaceutical distributors (McKesson,
Cardinal Health, Walgreens, and AmerisourceBergen) across different regions.


## Exercise 5 

Now go ahead and try and re-do the chunking you did by hand for your project (with this 2 years of data) -- calculate, for each year, the total morphine equivalents sent to each county in the US.

In [None]:
import time

start_time = time.time()

# Define columns we need
cols_needed = [
    "year",
    "BUYER_STATE",
    "BUYER_COUNTY",
    "MME_Conversion_Factor",
    "CALC_BASE_WT_IN_GM",
]

# Initialize list to store results from each chunk
chunk_results = []
chunk_size = 1000000

# Read and process data in chunks
for i, chunk in enumerate(
    pd.read_csv(file, sep="\t", usecols=cols_needed, chunksize=chunk_size)
):
    # Calculate morphine equivalents
    chunk["morphine_equivalents"] = (
        chunk["MME_Conversion_Factor"] * chunk["CALC_BASE_WT_IN_GM"]
    )

    # Group by year, state, and county
    grouped = (
        chunk.groupby(["year", "BUYER_STATE", "BUYER_COUNTY"])["morphine_equivalents"]
        .sum()
        .reset_index()
    )

    # Store the result
    chunk_results.append(grouped)

    print(f"Processed chunk {i+1} ({len(chunk)} rows)")

# Combine all chunk results
combined = pd.concat(chunk_results, ignore_index=True)

# Final aggregation (in case same county appears in multiple chunks)
final_result = (
    combined.groupby(["year", "BUYER_STATE", "BUYER_COUNTY"])["morphine_equivalents"]
    .sum()
    .reset_index()
)

end_time = time.time()

print(f"\nManual chunking completed in {end_time - start_time:.2f} seconds")
print(f"Total counties: {len(final_result)}")
print("\nTop 10 counties by morphine equivalents:")
print(final_result.nlargest(10, "morphine_equivalents"))

Processed chunk 1 (1000000 rows)
Processed chunk 2 (1000000 rows)
Processed chunk 2 (1000000 rows)
Processed chunk 3 (1000000 rows)
Processed chunk 3 (1000000 rows)
Processed chunk 4 (1000000 rows)
Processed chunk 4 (1000000 rows)
Processed chunk 5 (1000000 rows)
Processed chunk 5 (1000000 rows)
Processed chunk 6 (1000000 rows)
Processed chunk 6 (1000000 rows)
Processed chunk 7 (1000000 rows)
Processed chunk 7 (1000000 rows)
Processed chunk 8 (1000000 rows)
Processed chunk 8 (1000000 rows)
Processed chunk 9 (1000000 rows)
Processed chunk 9 (1000000 rows)
Processed chunk 10 (1000000 rows)
Processed chunk 10 (1000000 rows)
Processed chunk 11 (1000000 rows)
Processed chunk 11 (1000000 rows)
Processed chunk 12 (1000000 rows)
Processed chunk 12 (1000000 rows)
Processed chunk 13 (1000000 rows)
Processed chunk 13 (1000000 rows)
Processed chunk 14 (1000000 rows)
Processed chunk 14 (1000000 rows)
Processed chunk 15 (1000000 rows)
Processed chunk 15 (1000000 rows)
Processed chunk 16 (1000000 row

In [None]:
# Display the final result
print("Final Result: Total morphine equivalents by year and county")
print(f"\nTotal rows: {len(final_result)}")
print("\nSample data:")
print(final_result.head(15))

## Exercise 6

Now, re-write your opioid project's initial opioid import using dask. Each person on your team should create a NEW branch to try this. The person who wrote the initial chunking code can help everyone else understand what they did originally and the data, but everyone should write their own code. 

**WARNING:** You will probably run into a lot of type errors (depending on how the ARCOS data has changed since last year). With real world messy data one of the biggest problems with dask is that it struggles if halfway through dataset it discovers that the column it *thought* was floats contains text. That's why, in the dask reading, [I specified the column type for so many columns](https://nickeubank.github.io/practicaldatascience_book/notebooks/PDS_not_yet_in_coursera/30_big_data/70_dask.html#what-can-dask-do-for-me) as `objects` explicitly. Then, because occasionally there data cleanliness issues, I had to do some converting data types by hand. 

In [28]:
import time

start_time = time.time()

dask_ex6 = dask_df[
    ["BUYER_STATE", "BUYER_COUNTY", "MME_Conversion_Factor", "CALC_BASE_WT_IN_GM"]
].copy()

dask_ex6["year"] = dd.to_datetime(dask_df["date"]).dt.year

dask_ex6["morphine_equivalents"] = (
    dask_ex6["MME_Conversion_Factor"] * dask_ex6["CALC_BASE_WT_IN_GM"]
)

dask_grouped = dask_ex6.groupby(["year", "BUYER_STATE", "BUYER_COUNTY"])[
    "morphine_equivalents"
].sum()

dask_result = dask_grouped.compute().reset_index()

dask_result_sorted = dask_result.sort_values(
    ["year", "morphine_equivalents"], ascending=[True, False]
)

end_time = time.time()

print(f"\nDask computation completed in {end_time - start_time:.2f} seconds")
print(f"Total counties: {len(dask_result_sorted)}")
print("\nTop 10 counties by morphine equivalents:")
print(dask_result_sorted.nlargest(10, "morphine_equivalents"))

NameError: name 'dask_df' is not defined