Name: Greeshma Poli

Course number: HDS 5230 - High Performance Computing

Assignment number: Week 05 - Dask Programming Assignment

In [11]:
# Import necessary libraries:
import dask.dataframe as dd
import pandas as pd
import numpy as np

In [12]:
# defining data types for each column to ensure consistency and optimize memory usage
dtypes = {'name':str,'level':str,'city':object,'county':str,'state':str,'country':str,'population':np.float64,'lat':float,'long':float,'url':str,'aggregate':object,'tz':str,'cases':np.float64,'deaths':np.float64,'recovered':np.float64,'active':np.float64,'tested':np.float64,'hospitalized':np.float64,'hospitalized_current':np.float64,'discharged':np.float64,
    'icu':np.float64,'icu_current':np.float64,'growthFactor':np.float64,'date':object}

In [13]:
# Load dataset using Dask
df = dd.read_csv("timeseries.csv",dtype=dtypes)


We use Dask to load the dataset because COVID-19 data can be huge, covering many countries and dates. Instead of loading the entire file into memory like Pandas, Dask loads it in chunks (partitions) and processes them in parallel across multiple CPU cores or even multiple machines in a distributed system. This prevents memory overload and speeds up the process significantly

In [14]:
df.head()

Unnamed: 0,locationID,slug,name,level,city,county,state,country,lat,long,...,deaths,recovered,active,tested,hospitalized,hospitalized_current,discharged,icu,icu_current,date
0,iso1:ad,ad,Andorra,country,,,,Andorra,42.55,1.58,...,0.0,0.0,,,,,,,,2020-01-22
1,iso1:ad,ad,Andorra,country,,,,Andorra,42.55,1.58,...,0.0,0.0,,,,,,,,2020-01-23
2,iso1:ad,ad,Andorra,country,,,,Andorra,42.55,1.58,...,0.0,0.0,,,,,,,,2020-01-24
3,iso1:ad,ad,Andorra,country,,,,Andorra,42.55,1.58,...,0.0,0.0,,,,,,,,2020-01-25
4,iso1:ad,ad,Andorra,country,,,,Andorra,42.55,1.58,...,0.0,0.0,,,,,,,,2020-01-26


In [15]:
# Filter dataset to include only rows where the country is 'United States'
df_us = df[df['country'] == 'United States']

In [16]:
# print results
df_us.head()


Unnamed: 0,locationID,slug,name,level,city,county,state,country,lat,long,...,deaths,recovered,active,tested,hospitalized,hospitalized_current,discharged,icu,icu_current,date
239811,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-22
239812,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-23
239813,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-24
239814,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-25
239815,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-26


Extracting records specific to the United States is a basic operation, but when handling millions of rows, it can be slow in Pandas. Dask optimizes this by applying filters independently to each partition, making the process significantly faster. In a distributed setup, filtering could be further accelerated by processing different partitions across machines.

In [17]:
# Convert date column to datetime format
df_us['date'] = dd.to_datetime(df_us['date'])

Dates are crucial for COVID-19 trend analysis, but converting them from text to a proper datetime format can be slow. Dask converts dates in parallel across partitions, so instead of processing millions of rows sequentially (as Pandas does), the work is split across multiple CPU cores, speeding up the operation.

In [18]:
# Filter for the time period (2020-Jan-01 to 2021-Feb-28)
df_us = df_us[(df_us['date'] >= '2020-01-01') & (df_us['date'] <= '2021-02-28')]

Filtering for Time Range (2020-Jan-01 to 2021-Feb-28): Like the previous filtering task, a date range filter is something that Dask can easily parallelize. It gets you the work done faster with large datasets by doing different chunks of data at the same time. With small datasets, you may not notice a significant difference, but Dask still makes it scalable in case your dataset grows.

In [19]:
# Group by state and compute total deaths and average population
state_wise_mortality = df_us.groupby('state').agg({'deaths': 'sum', 'population': 'mean'}).compute()


Aggregating total COVID-19 deaths and average population per state is computationally intensive. Dask distributes this workload across partitions and, if needed, across multiple machines, ensuring faster processing. This is particularly useful for large-scale data analysis.

In [20]:
# print results
print(state_wise_mortality)

                             deaths    population
state                                            
Alaska                       7965.0  4.876967e+04
Alabama                    438471.0  1.442113e+05
Arkansas                   158952.0  9.616907e+04
American Samoa                  0.0  5.568900e+04
Arizona                    939211.0  9.098396e+05
California                2783684.0  1.339739e+06
Colorado                   546938.0  1.771919e+05
Connecticut               1383249.0  7.922860e+05
Washington, D.C.           175246.0  7.057490e+05
Delaware                   120278.0  4.868820e+05
Florida                   2161963.0  6.316981e+05
Georgia                   1230519.0  1.550214e+05
Guam                         1890.0  1.677720e+05
Hawaii                      14242.0  4.719573e+05
Iowa                       264347.0  7.384984e+04
Idaho                       66368.0  7.942511e+04
Illinois                  2238987.0  2.879959e+05
Indiana                    283501.0  1.447789e+05


In [21]:
# Compute per-capita mortality
#This step calculates the total COVID-19 deaths per state, adjusted by the average population, to measure mortality risk.
state_wise_mortality['per_capita_mortality'] = state_wise_mortality['deaths'] / state_wise_mortality['population']


Per-capita mortality (deaths/population) is a straightforward calculation, but applying it to large datasets benefits from parallel execution across partitions. Instead of computing it sequentially, Dask runs these calculations in parallel, reducing processing time.

In [22]:
# Rank states by per-capita mortality
state_wise_mortality_ranked = state_wise_mortality.sort_values(by='per_capita_mortality', ascending=False)


Sorting is one of the most expensive operations in data processing. Since sorting requires comparing many values, Dask speeds this up by sorting each partition separately in parallel before merging the results. This is much faster than Pandas, which would sort everything sequentially.

In [23]:
# print results
print(state_wise_mortality_ranked)

                             deaths    population  per_capita_mortality
state                                                                  
New York                  9781636.0  6.175734e+05             15.838824
Michigan                  2107456.0  2.377823e+05              8.862964
Louisiana                 1217102.0  1.430398e+05              8.508834
Georgia                   1230519.0  1.550214e+05              7.937739
Illinois                  2238987.0  2.879959e+05              7.774370
Mississippi                504736.0  7.171443e+04              7.038137
New Jersey                4609656.0  8.074718e+05              5.708752
Pennsylvania              2145790.0  3.765291e+05              5.698869
Ohio                      1029731.0  2.626764e+05              3.920150
Minnesota                  470852.0  1.281735e+05              3.673553
Iowa                       264347.0  7.384984e+04              3.579520
Florida                   2161963.0  6.316981e+05              3

In [25]:
# Compute CFR per month
df_us['month'] = df_us['date'].dt.to_period('M')
cfr_per_month = df_us.groupby(['state', 'month']).agg({'deaths': 'sum', 'cases': 'sum'}).compute()
cfr_per_month['CFR'] = cfr_per_month['deaths'] / cfr_per_month['cases']

In [26]:
# print results
print(cfr_per_month)

                      deaths      cases       CFR
state       month                                
Alaska      2020-01      0.0        0.0       NaN
            2020-02      0.0        0.0       NaN
            2020-03     21.0     1566.0  0.013410
            2020-04    404.0    16558.0  0.024399
            2020-05    610.0    24501.0  0.024897
...                      ...        ...       ...
Puerto Rico 2020-06   4380.0   255489.0  0.017144
            2020-07   5485.0   519733.0  0.010553
            2020-08  10262.0  1173434.0  0.008745
            2020-09  16703.0  1720918.0  0.009706
            2020-10   3400.0   363492.0  0.009354

[430 rows x 3 columns]


COVID-19 fatality rates fluctuate over time, so we group data by state and month. Since grouping is computationally expensive, Dask distributes this operation across partitions and, if needed, across multiple machines, making it highly efficient for large datasets

In [27]:
# Pivot table for CFR (50 states X 14 months)
cfr_matrix = cfr_per_month.pivot_table(values='CFR', index='state', columns='month')

In [28]:
# print results
print(cfr_matrix)

month                     2020-01   2020-02   2020-03   2020-04   2020-05  \
state                                                                       
Alabama                       NaN       NaN  0.006703  0.032222  0.038915   
Alaska                        NaN       NaN  0.013410  0.024399  0.024897   
Arizona                       0.0  0.000000  0.014445  0.037281  0.047639   
Arkansas                      NaN       NaN  0.009310  0.019205  0.021140   
California                    0.0  0.056836  0.021733  0.035652  0.040133   
Colorado                      NaN       NaN  0.017962  0.044440  0.050536   
Connecticut                   NaN       NaN  0.019335  0.064518  0.090128   
Delaware                      NaN       NaN  0.023356  0.032791  0.043008   
Florida                       NaN       NaN  0.014030  0.029117  0.043550   
Georgia                       NaN       NaN  0.030493  0.039662  0.044299   
Guam                          NaN       NaN  0.021318  0.036316  0.031658   

Pivoting data to display CFR trends requires reshaping the dataset, which can be memory-intensive. Dask performs this operation in parallel across multiple partitions, ensuring that memory usage remains optimized. If a cluster is used, Dask can distribute the task across machines, preventing slowdowns due to memory constraints.

In [29]:
# Compute month-to-month CFR change
mtm_cfr = cfr_matrix.diff(axis=1)

To analyze how CFR changed over time, we compute differences between consecutive months. Since each state's CFR values are independent of others, Dask can compute these differences in parallel across partitions, making it significantly faster than Pandas.

In [30]:
# Aggregate month-to-month changes per state
cfr_change_ranking = mtm_cfr.sum(axis=1).sort_values(ascending=False)


We sum the absolute CFR changes over time to see which states had the most fluctuations. Dask parallelizes this summation across partitions, speeding up the process. If the dataset is very large, Dask can distribute this workload across multiple machines, making it scalable.

In [31]:
# Output results
print("Per-capita mortality ranking:")
print(state_wise_mortality_ranked)

Per-capita mortality ranking:
                             deaths    population  per_capita_mortality
state                                                                  
New York                  9781636.0  6.175734e+05             15.838824
Michigan                  2107456.0  2.377823e+05              8.862964
Louisiana                 1217102.0  1.430398e+05              8.508834
Georgia                   1230519.0  1.550214e+05              7.937739
Illinois                  2238987.0  2.879959e+05              7.774370
Mississippi                504736.0  7.171443e+04              7.038137
New Jersey                4609656.0  8.074718e+05              5.708752
Pennsylvania              2145790.0  3.765291e+05              5.698869
Ohio                      1029731.0  2.626764e+05              3.920150
Minnesota                  470852.0  1.281735e+05              3.673553
Iowa                       264347.0  7.384984e+04              3.579520
Florida                   2161963.

In [32]:
print("\nCFR change ranking:")
print(cfr_change_ranking)



CFR change ranking:
state
Massachusetts               0.073796
Connecticut                 0.061393
New York                    0.054129
New Jersey                  0.052762
New Hampshire               0.044008
Pennsylvania                0.037870
Michigan                    0.034683
Illinois                    0.029210
Northern Mariana Islands    0.027248
Washington, D.C.            0.025891
Arizona                     0.025865
Indiana                     0.025784
New Mexico                  0.022614
Maine                       0.019541
California                  0.019374
Maryland                    0.017037
Oregon                      0.016496
Mississippi                 0.015210
North Carolina              0.013105
Minnesota                   0.013090
Ohio                        0.013050
Delaware                    0.010743
Nebraska                    0.010535
Colorado                    0.010482
Alabama                     0.009471
Hawaii                      0.008231
Arkansas   

I feel from the COVID-19 dataset is large and growing, so traditional Pandas methods would be slow and memory-intensive. Dask enables parallelized processing, which splits work across CPU cores, and distributed processing, which spreads computations across multiple machines if necessary. This makes operations like filtering, grouping, sorting, and aggregations much faster and more scalable.

For small datasets, Pandas might be sufficient, but as data size increases, Dask’s ability to process in parallel and distribute workloads makes it a powerful tool for COVID-19 analysis.