# Importing ARCOS Data with Dask

><span style="color: #008080">*Bárbara Flores*</span>

In [1]:
import pandas as pd
import warnings
import os
from dask.distributed import Client
import dask.dataframe as dd

warnings.simplefilter(action="ignore", category=FutureWarning)
pd.set_option("mode.copy_on_write", True)

Last week, we used dask to play with a few datasets to get a feel for how dask works. In order to help us develop code that would run quickly, however, we worked with very small, safe datasets. 

Today, we will continue to work with dask, but this time using much larger datasets. This means that (a) doing things incorrectly may lead to your computer crashing (So save all your open files before you start!), and (b) many of the commands you are being asked run will take several minutes each. 

For familiarity, and so you can see what advantages dask can bring to your workflow, today we'll be working with the DEA ARCOS drug shipment database published by the Washington Post! However, to strike a balance between size and speed, we'll be working with a slightly thinned version that has only the last two years of data, instead of all six.

## Exercise 1

Download the thinned ARCOS data [from this link](https://www.dropbox.com/s/o7nc6yvrwog4ozi/arcos_2011_2012.tsv.zip?dl=0). It should be about 2GB zipped, 25 GB unzipped. 

## Exercise 2

Our goal today is going to be to find the pharmaceutical company that has shipped the most opioids (`MME_Conversion_Factor * CALC_BASE_WT_IN_GM`) in the US.

When working with large datasets, it is good practice to begin by prototyping your code with a subset of your data. So begin by using `pandas` to read in the first 100,000 lines of the ARCOS data and write pandas code to compute the shipments from each shipper (the group that reported the shipment). 

In [2]:
path = "../../arcos_2011_2012.tsv"
arcos_2011_2012_subset = pd.read_csv(path, sep="\t", nrows=100000)
arcos_2011_2012_subset.head()

  arcos_2011_2012_subset = pd.read_csv(path, sep="\t", nrows=100000)


Unnamed: 0.1,Unnamed: 0,REPORTER_DEA_NO,REPORTER_BUS_ACT,REPORTER_NAME,REPORTER_ADDL_CO_INFO,REPORTER_ADDRESS1,REPORTER_ADDRESS2,REPORTER_CITY,REPORTER_STATE,REPORTER_ZIP,...,Product_Name,Ingredient_Name,Measure,MME_Conversion_Factor,Combined_Labeler_Name,Revised_Company_Name,Reporter_family,dos_str,date,year
0,0,PA0006836,DISTRIBUTOR,ACE SURGICAL SUPPLY CO INC,,1034 PEARL STREET,,BROCKTON,MA,2301,...,HYDROCODONE BIT/ACETA 10MG/500MG USP,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,SpecGx LLC,Mallinckrodt,ACE Surgical Supply Co Inc,10.0,2012-12-26,2012
1,9,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONE BITARTRATE & ACETA 5MG/,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,5.0,2012-12-05,2012
2,10,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONE BITARTRATE & ACETA 5MG/,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,5.0,2012-07-24,2012
3,16,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONEBITARTRATE & ACETA 7.5MG,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,7.5,2012-02-04,2012
4,17,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONE BITARTRATE & ACETA 5MG/,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,5.0,2011-11-07,2011


><span style="color: #008080">*Then, we group by REPORTER_DEA_NO and REPORTER_NAME. For example, we can see that the reporter 'MCKESSON CORPORATION' has 3 unique IDs of entities reporting shipments to DEA: PM0000771, PF0000012, PM0003094*</span>
>
><span style="color: #008080">*We can observe the number of shipments for our 100,000-row sample for each reporter during the 2011-2012 period from various reporters in the following table.*</span>

In [3]:
arcos_2011_2012_subset["Total_Shipments"] = (
    arcos_2011_2012_subset["MME_Conversion_Factor"]
    * arcos_2011_2012_subset["CALC_BASE_WT_IN_GM"]
)

In [4]:
arcos_2011_2012_subset.groupby(["REPORTER_DEA_NO", "REPORTER_NAME"])[
    "Total_Shipments"
].sum().sort_values(ascending=False).reset_index(name="Total_Shipments")

Unnamed: 0,REPORTER_DEA_NO,REPORTER_NAME,Total_Shipments
0,PM0000771,MCKESSON CORPORATION,91928.451192
1,PF0000012,MCKESSON CORPORATION,64118.325379
2,PC0003044,"CARDINAL HEALTH 110, LLC",54352.323711
3,PD0029567,MCKESSON CORPORATION,47680.418755
4,PL0032627,AMERISOURCEBERGEN DRUG CORP,34561.394892
5,PM0003094,MCKESSON CORPORATION,34332.441204
6,PM0001951,MCKESSON CORPORATION,30383.924484
7,PK0070297,KINRAY INC,28620.315246
8,PM0018425,MCKESSON CORPORATION,23845.207505
9,PL0184933,LOUISIANA WHOLESALE DRUG CO,14787.765559


## Exercise 3

Now let's turn to dask. Re-write your code for dask, and calculate the total shipments by reporting company. Remember: 

- Activate a conda environment with a clean dask installation.
- Start by spinning up a distributed cluster.
- Dask won't read compressed files, so you have to unzip your ARCOS data. 
- Start your cluster in a cell all by itself since you don't want to keep re-running the "start a cluster" code. 

If you need to review dask basic code, [check here](https://nickeubank.github.io/practicaldatascience_book/notebooks/PDS_not_yet_in_coursera/30_big_data/70_dask.html).

As you run your code, make sure to click on the Dashboard link below where you created your cluster:

![dask_dashboard](images/dask_cluster.png)

Among other things, the bar across the bottom should give you a sense of how long your task will take:

![dask_progress](images/dask_progress.png)

(For context, my computer (which has 10 cores) only took a couple seconds. My computer is fast, but most computers should be done within a couple minutes, tops).


In [5]:
# client = Client()
# client

In [6]:
arcos_2011_2012_dask = dd.read_csv(
    path,
    sep="\t",
    dtype={
        "Unnamed: 0": "str",
        "REPORTER_DEA_NO": "str",
        "REPORTER_BUS_ACT": "str",
        "REPORTER_NAME": "str",
        "REPORTER_ADDL_CO_INFO": "str",
        "REPORTER_ADDRESS1": "str",
        "REPORTER_ADDRESS2": "str",
        "REPORTER_CITY": "str",
        "REPORTER_STATE": "str",
        "REPORTER_ZIP": "str",
        "REPORTER_COUNTY": "str",
        "BUYER_DEA_NO": "str",
        "BUYER_BUS_ACT": "str",
        "BUYER_NAME": "str",
        "BUYER_ADDL_CO_INFO": "str",
        "BUYER_ADDRESS1": "str",
        "BUYER_ADDRESS2": "str",
        "BUYER_CITY": "str",
        "BUYER_STATE": "str",
        "BUYER_ZIP": "str",
        "BUYER_COUNTY": "str",
        "TRANSACTION_CODE": "str",
        "DRUG_CODE": "str",
        "NDC_NO": "str",
        "DRUG_NAME": "str",
        "QUANTITY": "str",
        "UNIT": "str",
        "ACTION_INDICATOR": "str",
        "ORDER_FORM_NO": "str",
        "CORRECTION_NO": "str",
        "STRENGTH": "str",
        "TRANSACTION_DATE": "str",
        "CALC_BASE_WT_IN_GM": "float",
        "DOSAGE_UNIT": "str",
        "TRANSACTION_ID": "str",
        "Product_Name": "str",
        "Ingredient_Name": "str",
        "Measure": "str",
        "MME_Conversion_Factor": "float64",
        "Combined_Labeler_Name": "str",
        "Revised_Company_Name": "str",
        "Reporter_family": "str",
        "dos_str": "str",
        "date": "str",
        "year": "str",
    },
)

In [7]:
arcos_2011_2012_dask.head()

Unnamed: 0.1,Unnamed: 0,REPORTER_DEA_NO,REPORTER_BUS_ACT,REPORTER_NAME,REPORTER_ADDL_CO_INFO,REPORTER_ADDRESS1,REPORTER_ADDRESS2,REPORTER_CITY,REPORTER_STATE,REPORTER_ZIP,...,Product_Name,Ingredient_Name,Measure,MME_Conversion_Factor,Combined_Labeler_Name,Revised_Company_Name,Reporter_family,dos_str,date,year
0,0,PA0006836,DISTRIBUTOR,ACE SURGICAL SUPPLY CO INC,,1034 PEARL STREET,,BROCKTON,MA,2301,...,HYDROCODONE BIT/ACETA 10MG/500MG USP,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,SpecGx LLC,Mallinckrodt,ACE Surgical Supply Co Inc,10.0,2012-12-26,2012
1,9,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONE BITARTRATE & ACETA 5MG/,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,5.0,2012-12-05,2012
2,10,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONE BITARTRATE & ACETA 5MG/,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,5.0,2012-07-24,2012
3,16,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONEBITARTRATE & ACETA 7.5MG,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,7.5,2012-02-04,2012
4,17,PA0021179,DISTRIBUTOR,APOTHECA INC,,1622 N 16TH ST,,PHOENIX,AZ,85006,...,HYDROCODONE BITARTRATE & ACETA 5MG/,HYDROCODONE BITARTRATE HEMIPENTAHYDRATE,TAB,1.0,Apotheca Inc.,Apotheca Inc.,Apotheca Inc,5.0,2011-11-07,2011


In [8]:
arcos_2011_2012_dask = arcos_2011_2012_dask[
    [
        "REPORTER_DEA_NO",
        "REPORTER_NAME",
        "BUYER_STATE",
        "BUYER_COUNTY",
        "MME_Conversion_Factor",
        "CALC_BASE_WT_IN_GM",
        "year",
    ]
]

><span style="color: #008080">*Now, we perform the same operation as before, but on our Dask DataFrame to see how many shipments we have per reporter.*</span>

In [9]:
arcos_2011_2012_dask["Total_Shipments"] = (
    arcos_2011_2012_dask["MME_Conversion_Factor"]
    * arcos_2011_2012_dask["CALC_BASE_WT_IN_GM"]
)

arcos_2011_2012_dask.groupby(["REPORTER_DEA_NO", "REPORTER_NAME"])[
    "Total_Shipments"
].sum().compute().reset_index(name="Total_Shipments").sort_values(
    by="Total_Shipments", ascending=False
)

Unnamed: 0,REPORTER_DEA_NO,REPORTER_NAME,Total_Shipments
236,RW0294493,WALGREEN CO,1.991478e+06
120,RW0277752,WALGREEN CO,1.247551e+06
105,RW0204026,WALGREEN CO,1.211052e+06
22,PM0000771,MCKESSON CORPORATION,8.440511e+05
187,RO0153609,CARDINAL HEALTH,7.499573e+05
...,...,...,...
401,RV0315956,"VETESSA PHARMACEUTICAL, INC",3.027000e-01
391,RI0236681,"IVESCO, LLC",3.027000e-01
405,RK0202123,KING PHARMACEUTICALS,3.027000e-01
385,RV0357675,"VET PHARM, INC.",3.027000e-01


## Exercise 4

Now let's calculate, *for each state*, what company shipped the most pills?

Note you will quickly find that you can't sort in dask -- sorting in parallel is *really* tricky! So you'll have to work around that. Do what you need to do on the big dataset first, then compute it all so you get it as a regular pandas dataframe, then finish. 

In [10]:
grouped_table = (
    arcos_2011_2012_dask.groupby(["REPORTER_DEA_NO", "REPORTER_NAME", "BUYER_STATE"])[
        "Total_Shipments"
    ]
    .sum()
    .compute()
    .reset_index(name="Total_Shipments")
    .sort_values(by="Total_Shipments", ascending=False)
)

grouped_table.reset_index()

Unnamed: 0,index,REPORTER_DEA_NO,REPORTER_NAME,BUYER_STATE,Total_Shipments
0,967,RW0277752,WALGREEN CO,FL,885224.019566
1,121,PM0000771,MCKESSON CORPORATION,FL,843970.588746
2,2271,RC0182080,CARDINAL HEALTH,FL,531054.227742
3,59,PF0000012,MCKESSON CORPORATION,CA,432201.148970
4,1517,RO0153609,CARDINAL HEALTH,OH,411709.926079
...,...,...,...,...,...
3966,3579,RH0302567,HAWTHORN PHARMACEUTICALS INC,OR,0.060540
3967,209,RC0231148,"ALTURA PHARMACEUTICALS, INC",TN,0.060540
3968,3472,RH0302567,HAWTHORN PHARMACEUTICALS INC,HI,0.060540
3969,3292,RH0302567,HAWTHORN PHARMACEUTICALS INC,NV,0.045405


><span style="color: #008080">*In order to carry out the operation, we first had to perform the computation in Dask and then apply the missing "group by" operations.*</span>
>
><span style="color: #008080">*Reviewing the results, we can see that there are several companies involved, but the main one appears to be MCKESSON CORPORATION.*</span>


In [11]:
max_indices = grouped_table.groupby("BUYER_STATE")["Total_Shipments"].idxmax()
grouped_table.loc[max_indices][
    ["BUYER_STATE", "REPORTER_NAME", "REPORTER_DEA_NO", "Total_Shipments"]
]

Unnamed: 0,BUYER_STATE,REPORTER_NAME,REPORTER_DEA_NO,Total_Shipments
852,AK,CARDINAL HEALTH,RW0191813,19447.768327
1430,AL,MCKESSON CORPORATION,RM0336950,313080.933988
2697,AR,AMERISOURCEBERGEN DRUG CORPORATION,RA0316958,83172.51142
166,AZ,MCKESSON CORPORATION,PM0021131,343507.894122
59,CA,MCKESSON CORPORATION,PF0000012,432201.14897
153,CO,MCKESSON CORPORATION,PM0018425,181587.285809
265,CT,CARDINAL HEALTH,RD0108200,162078.687446
960,DC,CARDINAL HEALTH,RW0269654,22902.037314
1950,DE,WALGREEN CO,RW0294493,82099.678748
967,FL,WALGREEN CO,RW0277752,885224.019566


## Exercise 5 

Now go ahead and try and re-do the chunking you did by hand for your project (with this 2 years of data) -- calculate, for each year, the total morphine equivalents sent to each county in the US. 

In [12]:
arcos_2011_2012_dask.groupby(["year", "BUYER_STATE", "BUYER_COUNTY"])[
    "Total_Shipments"
].sum().compute().reset_index(name="Total_Shipments").sort_values(
    by="Total_Shipments", ascending=False
)

Unnamed: 0,year,BUYER_STATE,BUYER_COUNTY,Total_Shipments
1367,2012,AZ,MARICOPA,337023.243956
71,2011,AZ,MARICOPA,314707.706697
96,2011,CA,LOS ANGELES,283866.848898
1392,2012,CA,LOS ANGELES,278747.192600
224,2011,FL,HILLSBOROUGH,233352.040536
...,...,...,...,...
6140,2011,SD,DEWEY,0.302700
6148,2012,MT,CARTER,0.302700
6145,2011,TX,THROCKMORTON,0.181620
6146,2012,TX,SCHLEICHER,0.121080


## Exercise 6

Now, re-write your opioid project's initial opioid import using dask. Each person on your team should create a NEW branch to try this. The person who wrote the initial chunking code can help everyone else understand what they did originally and the data, but everyone should write their own code. 

**WARNING:** You will probably run into a lot of type errors (depending on how the ARCOS data has changed since last year). With real world messy data one of the biggest problems with dask is that it struggles if halfway through dataset it discovers that the column it *thought* was floats contains text. That's why, in the dask reading, [I specified the column type for so many columns](https://nickeubank.github.io/practicaldatascience_book/notebooks/PDS_not_yet_in_coursera/30_big_data/70_dask.html#what-can-dask-do-for-me) as `objects` explicitly. Then, because occasionally there data cleanliness issues, I had to do some converting data types by hand. 

><span style="color: #008080">*Let's first try using pandas.*</span>

In [13]:
file_path = "../../arcos_all_washpost.tsv"
arcos_all_washpost = pd.read_csv(file_path, sep="\t", nrows=100)
arcos_all_washpost.head()

Unnamed: 0,REPORTER_DEA_NO,REPORTER_BUS_ACT,REPORTER_NAME,REPORTER_ADDL_CO_INFO,REPORTER_ADDRESS1,REPORTER_ADDRESS2,REPORTER_CITY,REPORTER_STATE,REPORTER_ZIP,REPORTER_COUNTY,...,DRUG_NAME,Measure,MME_Conversion_Factor,Dosage_Strength,TRANSACTION_DATE,Combined_Labeler_Name,Reporter_family,CALC_BASE_WT_IN_GM,DOSAGE_UNIT,MME
0,RM0220688,DISTRIBUTOR,MCKESSON CORPORATION,,DBA MCKESSON DRUG CO.,3000 KENSKILL AVE,WASHINGTON CT HOUSE,OH,43160,FAYETTE,...,OXYCODONE,TAB,1.5,10.0,2011-02-08,"Par Pharmaceutical, Inc.",McKesson Corporation,0.8965,100.0,1344.75
1,RM0220688,DISTRIBUTOR,MCKESSON CORPORATION,,DBA MCKESSON DRUG CO.,3000 KENSKILL AVE,WASHINGTON CT HOUSE,OH,43160,FAYETTE,...,HYDROCODONE,TAB,1.0,7.5,2011-03-07,SpecGx LLC,McKesson Corporation,0.45405,100.0,454.05
2,RM0220688,DISTRIBUTOR,MCKESSON CORPORATION,,DBA MCKESSON DRUG CO.,3000 KENSKILL AVE,WASHINGTON CT HOUSE,OH,43160,FAYETTE,...,OXYCODONE,TAB,1.5,10.0,2011-03-10,"Par Pharmaceutical, Inc.",McKesson Corporation,3.586,400.0,5379.0
3,RM0220688,DISTRIBUTOR,MCKESSON CORPORATION,,DBA MCKESSON DRUG CO.,3000 KENSKILL AVE,WASHINGTON CT HOUSE,OH,43160,FAYETTE,...,HYDROCODONE,TAB,1.0,5.0,2011-04-05,SpecGx LLC,McKesson Corporation,1.5135,500.0,1513.5
4,RM0220688,DISTRIBUTOR,MCKESSON CORPORATION,,DBA MCKESSON DRUG CO.,3000 KENSKILL AVE,WASHINGTON CT HOUSE,OH,43160,FAYETTE,...,HYDROCODONE,TAB,1.0,5.0,2011-04-06,SpecGx LLC,McKesson Corporation,0.3027,100.0,302.7


><span style="color: #008080">*Now let's try with Dask*</span>

In [14]:
arcos_all_washpost_dask = dd.read_csv(
    file_path,
    sep="\t",
    dtype={
        "REPORTER_DEA_NO": "str",
        "REPORTER_BUS_ACT": "str",
        "REPORTER_NAME": "str",
        "REPORTER_ADDL_CO_INFO": "str",
        "REPORTER_ADDRESS1": "str",
        "REPORTER_ADDRESS2": "str",
        "REPORTER_CITY": "str",
        "REPORTER_STATE": "str",
        "REPORTER_ZIP": "str",
        "REPORTER_COUNTY": "str",
        "BUYER_DEA_NO": "str",
        "BUYER_BUS_ACT": "str",
        "BUYER_NAME": "str",
        "BUYER_ADDL_CO_INFO": "str",
        "BUYER_ADDRESS1": "str",
        "BUYER_ADDRESS2": "str",
        "BUYER_CITY": "str",
        "BUYER_STATE": "str",
        "BUYER_ZIP": "str",
        "BUYER_COUNTY": "str",
        "TRANSACTION_CODE": "str",
        "DRUG_CODE": "str",
        "NDC_NO": "str",
        "DRUG_NAME": "str",
        "Measure": "str",
        "MME_Conversion_Factor": "float64",
        "Dosage_Strength": "str",
        "TRANSACTION_DATE": "str",
        "Combined_Labeler_Name": "str",
        "Reporter_family": "str",
        "CALC_BASE_WT_IN_GM": "float64",
        "DOSAGE_UNIT": "str",
        "MME": "float64",
    },
)

In [15]:
arcos_all_washpost_dask = arcos_all_washpost_dask[
    [
        "REPORTER_DEA_NO",
        "REPORTER_NAME",
        "BUYER_STATE",
        "BUYER_COUNTY",
        "MME_Conversion_Factor",
        "CALC_BASE_WT_IN_GM",
        "TRANSACTION_DATE",
        "MME",
    ]
]

In [16]:
arcos_all_washpost_dask["Total_Shipments"] = (
    arcos_all_washpost_dask["MME_Conversion_Factor"]
    * arcos_all_washpost_dask["CALC_BASE_WT_IN_GM"]
)

arcos_all_washpost_dask["year_month"] = arcos_all_washpost_dask["TRANSACTION_DATE"].str[
    0:7
]

In [17]:
groupped_arcos_all_washpost_dask = (
    arcos_all_washpost_dask.groupby(["year_month", "BUYER_STATE", "BUYER_COUNTY"])[
        "Total_Shipments", "MME"
    ]
    .sum()
    .compute()
)

In [18]:
groupped_arcos_all_washpost_dask = groupped_arcos_all_washpost_dask.reset_index()
groupped_arcos_all_washpost_dask.head()

Unnamed: 0,year_month,BUYER_STATE,BUYER_COUNTY,Total_Shipments,MME
0,2006-01,IN,ADAMS,190.651252,190651.3
1,2006-01,IN,ALLEN,2515.866036,2515866.0
2,2006-01,IN,BARTHOLOMEW,1441.961333,1441961.0
3,2006-01,IN,BOONE,487.815865,487815.9
4,2006-01,IN,CASS,247.48227,247482.3


In [None]:
groupped_arcos_all_washpost_dask.to_parquet(
    "../20_intermediate_files/arcos_all_washpost_collapsed.parquet"
)

><span style="color: #008080">*Finally, the branch with this code is located in:*</span>
>
><span style="color: #008080">*https://github.com/MIDS-at-Duke/IDS720_PracticalDataScience_JBR/blob/PDS_assigment_barbara/10_code/20_load_data_dask.py*</span>
>