# SLU - HDS 5230 

**High Performance Computing**

**Week 6**

**Derek Dixon**

Install the dask module...

In [1]:
conda install dask

Collecting package metadata (current_repodata.json): ...working... done
Note: you may need to restart the kernel to use updated packages.
Solving environment: ...working... done

## Package Plan ##

  environment location: C:\Users\Derek\anaconda3

  added / updated specs:
    - dask


The following packages will be downloaded:


    package                    |            build
    ---------------------------|-----------------
    conda-4.11.0               |   py39haa95532_0        14.4 MB
    ------------------------------------------------------------
                                           Total:        14.4 MB

The following packages will be UPDATED:

  conda                               4.10.3-py39haa95532_0 --> 4.11.0-py39haa95532_0



Downloading and Extracting Packages

conda-4.11.0         | 14.4 MB   |            |   0% 
conda-4.11.0         | 14.4 MB   | 1          |   2% 
conda-4.11.0         | 14.4 MB   | ####5      |  45% 
conda-4.11.0         | 14.4 MB   | ########

Import other required modules.

In [20]:
import os
import pandas as pd
import numpy as np
from dask.distributed import Client
from multiprocessing import cpu_count
import dask.dataframe as ddf

Define the CPU count and dask client via the respective class object instantiations.

In [2]:
ncores = cpu_count()
client = Client()

ncores

24

Read in the .csv file. Datatypes will be defined ahead of time using a dictionary. The data will be loaded into a pandas df first, then distributed across a Dask df.

In [47]:
filename = os.path.join('timeseries.csv')

dtype = {'locationID': object,
       'slug': object,
       'name': object,
       'level': object,
       'city': object,
       'county': object,
       'state': object,
       'country': object,
       'lat': float,
       'long': float,
       'population': float,
       'aggregate': float,
       'tz': object,
       'cases': float,
       'deaths': float,
       'recovered':float,
       'active': float,
       'tested': float, 
       'hospitalized': float,                            
       'hospitalized_current': float,
       'discharged': float,
       'icu': float,
       'icu_current': float}

data_pd = pd.read_csv(filename, parse_dates={'Date': [23]}, dtype=dtype)

data = ddf.from_pandas(data_pd, npartitions = ncores)

## CFR Per State During January 1, 2020 through February 28, 2021

There are 6 value of states that aren't actually recognized states within the United States. I'm going to filter these out using a list. After that I will filter the data as specificed in the problem statement, then group by state.

In [48]:
not_actually_states = ['American Samoa', 'Washington, D.C.', 'Guam', 'Northern Mariana Islands','Puerto Rico', 'United States Virgin Islands']

In [50]:
grouped = data[(data['Date'] >= '2020-01-01') & (data['Date'] <= '2021-02-28') & (data['country'] == 'United States') & (~data.state.isin(not_actually_states))].groupby('state')
grouped

<dask.dataframe.groupby.DataFrameGroupBy at 0x21e95d06eb0>

Now we can compute the mortality rates. We take the max of the deaths column because that column operates in a cumulative fashion. The max for a particular state would correspond to the most up-to-date value for that state. 

In [51]:
mort_data = (grouped.deaths.max()/grouped.population.mean()).compute()

In [52]:
mort_data.sort_values()

state
Hawaii            0.000331
Vermont           0.000697
Maine             0.000814
Wyoming           0.001099
Alaska            0.001189
Delaware          0.001251
New Hampshire     0.001793
Utah              0.002255
Oregon            0.002509
Rhode Island      0.003175
Nevada            0.004742
Montana           0.005120
Washington        0.005657
West Virginia     0.005687
Connecticut       0.005701
Idaho             0.006069
Arizona           0.006273
New Mexico        0.007248
Maryland          0.008190
North Dakota      0.008529
Wisconsin         0.008657
Massachusetts     0.009002
South Dakota      0.009391
Oklahoma          0.010507
Colorado          0.011677
California        0.012070
Nebraska          0.012221
Kansas            0.012698
Indiana           0.013420
Arkansas          0.015046
South Carolina    0.015774
Minnesota         0.016665
Kentucky          0.017469
North Carolina    0.017512
Alabama           0.017745
Tennessee         0.018000
Ohio              0.01

These numbers appear reasonable given our prior knowledge of COVID statistics and case fatality rates.

Next we compute the case fatality rates per state per month by expanding out this logic. Here we expect a 50x14 matrix (14 months as columns).

## CFR Per State Per Month

In [41]:
import numexpr # https://docs.dask.org/en/latest/_modules/dask/dataframe/core.html#DataFrame.queryb
numexpr.set_num_threads(1)

8

Because it will become useful later, I will make a version of the dataset where the data column is the index.

In [53]:
useful_df = data[(data['Date'] >= '2020-01-01') & (data['Date'] <= '2021-02-28') & (data['country'] == 'United States') & (~data.state.isin(not_actually_states))].set_index('Date')

Now we begin defining helper functions that are intended to build off one another, representing more and more layers of aggregation. 

First we aggregate the sum of cases and the max of deaths per date.

In [54]:
def agg_stats_per_date(dt):
    """
    Return the sums of cases and max of deaths for a given date across states
    """
    resdf = useful_df[str(dt).split(" ")[0]][['state', 'cases','deaths']].compute()
    results = resdf.groupby('state').agg({'cases': 'sum', 
                                          'deaths':'max'})

    results['month'] = [dt.month for i in range(len(results))]

    return(results)

Then we need to define two series to encompass the spans of time for which we want to capture in our months. 

In [55]:
seed_dates = pd.Series(pd.date_range('2019-12-01', periods = 10, freq = 'M'))

st_dates = seed_dates.transform(lambda dt: dt + pd.Timedelta(days=1))[:-1] 
end_dates = seed_dates[1:]

Another helper function. This one will further aggregate, cumulating the results to the monthly level.

In [56]:
def cum_results_monthly(ind):
    dt_range = pd.date_range(str(st_dates[ind]).split(" ")[0], str(end_dates[ind + 1]).split(" ")[0])

    l_rts = [agg_stats_per_date(dt_range[i]) for i in range(len(dt_range))]

    comb_results = pd.concat(l_rts)
    cum_rt = comb_results.groupby(['state','month']).agg({'cases': 'sum', 'deaths': 'max'})
    
    return(cum_rt)


Now we can simply pass each monthly index to the function and store the results in a list using a list comprehension. We can then convert that list back into a pandas dataframe.

In [57]:
l_c_results = [cum_results_monthly(i) for i in range(0,9)]

full_results = pd.concat(l_c_results)

full_results



Unnamed: 0_level_0,Unnamed: 1_level_0,cases,deaths
state,month,Unnamed: 2_level_1,Unnamed: 3_level_1
Alabama,1.0,0.0,0.0
Alaska,1.0,0.0,0.0
Arizona,1.0,12.0,0.0
Arkansas,1.0,0.0,0.0
California,1.0,35.0,0.0
...,...,...,...
Virginia,9.0,8139620.0,3208.0
Washington,9.0,4834796.0,2126.0
West Virginia,9.0,788988.0,350.0
Wisconsin,9.0,5756751.0,1327.0


Here we calculate the CFR as a new column.

In [58]:
full_results['CFR'] = full_results['deaths']/full_results['cases']
full_results['CFR'] = full_results['CFR'].replace(np.nan, 0)

full_results

Unnamed: 0_level_0,Unnamed: 1_level_0,cases,deaths,CFR
state,month,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
Alabama,1.0,0.0,0.0,0.000000
Alaska,1.0,0.0,0.0,0.000000
Arizona,1.0,12.0,0.0,0.000000
Arkansas,1.0,0.0,0.0,0.000000
California,1.0,35.0,0.0,0.000000
...,...,...,...,...
Virginia,9.0,8139620.0,3208.0,0.000394
Washington,9.0,4834796.0,2126.0,0.000440
West Virginia,9.0,788988.0,350.0,0.000444
Wisconsin,9.0,5756751.0,1327.0,0.000231


**Closing Thoughts**

Here I think it makes a lot of sense to use dask and distributed computing amongst the various cores. Aggregating the results took 7+ minutes, by far the most expensive computation of the assignment. That is with the usage of all my machines cores (24 cores). I can imagine it taking much longer if it were done the old fashion way.

## CFR Changes Over Time

We continue to build off the final answer from above. We will start with a helper function to accumulate the CFRs.

In [71]:
def cum_CFRs(st, ulimit = 9, llimit = 1):
    """
    Returns aggregated change in the CFR lagged differences for a given state, st.
    Lag: between current and previous time period, starting at period 2.
    """  
    return sum([full_results.loc[(st, i)].CFR - full_results.loc[(st, i-1)].CFR for i in range(llimit+1, ulimit+1)])

I need to be able to pass the states in from a list so here I will form such a list. Somehow the list will contain nans if left unchecked so I've included a line of code to drop the nans.

In [78]:
l_states = useful_df.state.unique().compute().tolist()

l_states = [x for x in l_states if pd.isnull(x) == False]

print(len(l_states))

50


We compute the results by mapping the values of l_states (our state list) to the helper function, which we will store in a list. We then have two lists which we can pass to a pandas dataframe as our final result.

In [79]:
l_CFR_cum_deltas = list(map(lambda s: cum_CFRs(s), l_states))

out_df = pd.DataFrame({'state': l_states, 
                       'agg_CFR_change': l_CFR_cum_deltas})

In [80]:
out_df.sort_values('agg_CFR_change')

Unnamed: 0,state,agg_CFR_change
38,Utah,0.0001
49,Alaska,0.000141
48,Wyoming,0.000179
44,Nebraska,0.000202
24,Hawaii,0.000208
33,South Dakota,0.000211
22,Idaho,0.000215
7,Kansas,0.00022
15,Wisconsin,0.000231
32,Tennessee,0.000236


**Closing Thoughts**

Here we're dealing with the post-processed result from the prior question, which is much much smaller. In this case, Dask/distributed computations are not necessary. 