<a href="https://colab.research.google.com/github/eyepuri/HPC-HOMEWORK/blob/main/YEPURI_ESHA_WEEK_5_Dask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



Yepuri Esha   HDS 5230 - High Performance Computing Week 05 - Dask Programming Assignment

In [51]:
# Loading Necessary Libraries

import dask.dataframe as dd
import pandas as pd
import numpy as np
from datetime import datetime

When dealing with large datasets, just loading them can be time consuming and require a lot of memory. Dask helps by dividing the dataset into smaller chunks, that are then loaded in parallel across the processors or even machines. This speeds up the loading process and also helps to avoid memory issues, particularly when the data is too large to fit in memory at one go.

In [52]:

# Defining Column Data Types
# Define column data types explicitly to avoid mismatches
dtype_mapping = {
    "cases": "float64",
    "county": "object",
    "deaths": "float64",
    "recovered": "float64",
    "state": "object",
    "country": "object",  # Ensure "country" is read as a string
}

# Load the dataset using Dask
df = dd.read_csv("/content/timeseries.csv", dtype=dtype_mapping, low_memory=False)

# Check the first few rows
df.head()


Unnamed: 0,locationID,slug,name,level,city,county,state,country,lat,long,...,deaths,recovered,active,tested,hospitalized,hospitalized_current,discharged,icu,icu_current,date
0,iso1:ad,ad,Andorra,country,,,,Andorra,42.55,1.58,...,0.0,0.0,,,,,,,,2020-01-22
1,iso1:ad,ad,Andorra,country,,,,Andorra,42.55,1.58,...,0.0,0.0,,,,,,,,2020-01-23
2,iso1:ad,ad,Andorra,country,,,,Andorra,42.55,1.58,...,0.0,0.0,,,,,,,,2020-01-24
3,iso1:ad,ad,Andorra,country,,,,Andorra,42.55,1.58,...,0.0,0.0,,,,,,,,2020-01-25
4,iso1:ad,ad,Andorra,country,,,,Andorra,42.55,1.58,...,0.0,0.0,,,,,,,,2020-01-26


In [53]:
# Filtering for US States

df_states = df[df["country"].str.strip() == "United States"]
df_states.head()




Unnamed: 0,locationID,slug,name,level,city,county,state,country,lat,long,...,deaths,recovered,active,tested,hospitalized,hospitalized_current,discharged,icu,icu_current,date
239811,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-22
239812,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-23
239813,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-24
239814,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-25
239815,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-26


Using Dask can help speed up filtering data (say, selecting rows for the United States) by filtering different parts of the data at once. You may not notice a big difference with a small dataset, but with large datasets, this parallel approach can really come in handy.

In [54]:
#: Checking the Number of Rows
print(df_states.shape[0].compute())  # Should show number of rows


844599


In [55]:
# Convert 'date' column to datetime
df_states["date"] = dd.to_datetime(df_states["date"], errors="coerce")

# Filter for the date range
df_states = df_states[(df_states["date"] >= "2020-01-01") & (df_states["date"] <= "2021-02-28")]

# Show a few rows to confirm
df_states.head()


Unnamed: 0,locationID,slug,name,level,city,county,state,country,lat,long,...,deaths,recovered,active,tested,hospitalized,hospitalized_current,discharged,icu,icu_current,date
239811,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-22
239812,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-23
239813,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-24
239814,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-25
239815,iso1:us,us,United States,country,,,,United States,45.67,-112.87,...,,,,,,,,,,2020-01-26


Applying a parallelized and distributed approach with Dask is reasonable for all the operations in this workflow. First, when loading the large CSV file, parallelization is useful because Dask reads the file by chunks and can work with big datasets without overflowing the memory. Selecting the time range and filtering for US states are also perfect for parallel processing since they are the features that require scanning through many rows. Instead of the sequential processing of each row, Dask divides the filtering job across several processors to accelerate the process.


In [48]:

#Calculating Per-Capita Mortality

state_metrics = df_states.groupby('state').agg({
    'deaths': 'max',
    'population': 'mean'
}).compute()

state_metrics['per_capita_mortality'] = (
    state_metrics['deaths'] / state_metrics['population'] * 100000
)

ranked_states = state_metrics.sort_values(
    'per_capita_mortality',
    ascending=False
)

ranked_states = ranked_states.round(2)

print(ranked_states[['deaths', 'population', 'per_capita_mortality']])


                               deaths  population  per_capita_mortality
state                                                                  
Texas                         16437.0   227418.67               7227.64
New York                      33214.0   617573.37               5378.15
Georgia                        7192.0   155021.36               4639.36
Mississippi                    2980.0    71714.43               4155.37
Louisiana                      5396.0   143039.82               3772.38
Illinois                       8805.0   287995.93               3057.33
Michigan                       7137.0   237782.31               3001.48
Virginia                       3291.0   127395.81               2583.29
Florida                       14444.0   631698.15               2286.54
Pennsylvania                   8207.0   376529.09               2179.65
Missouri                       2188.0   105616.18               2071.65
New Jersey                    16009.0   807471.82               

When computing per-capita mortality using aggregations like "groupby",
"state" agg, parallelization is highly beneficial because these operations are easily split into smaller tasks. Dask, for example, handles these aggregations partition-wise, processing each partition separately and only merging the results at the end. This makes parallelization efficient for this type of operation. In a distributed setting with multi-node clusters, each node can process a subset of states, and then the final results are merged. This approach is particularly useful when dealing with larger datasets that cover longer time periods, as it speeds up processing by distributing the workload across multiple nodes.

In [49]:
 #Computing Monthly Case Fatality Rate (CFR)

df_states['month_year'] = df_states['date'].dt.strftime('%Y-%m')
monthly_metrics = df_states.groupby(['state', 'month_year']).agg({
    'cases': 'max',
    'deaths': 'max'
}).compute()

# Computing Monthly New Cases and Deaths
monthly_metrics = monthly_metrics.reset_index()

monthly_metrics['new_cases'] = monthly_metrics.groupby('state')['cases'].diff().fillna(monthly_metrics['cases'])
monthly_metrics['new_deaths'] = monthly_metrics.groupby('state')['deaths'].diff().fillna(monthly_metrics['deaths'])

#Calculating Case Fatality Rate (CFR)

monthly_metrics['cfr'] = (monthly_metrics['new_deaths'] / monthly_metrics['new_cases'] * 100).round(2)


# Pivot table for CFR
cfr_matrix = monthly_metrics.pivot(
    index='state',
    columns='month_year',
    values='cfr'
)

print(cfr_matrix)


month_year                    2020-01  2020-02  2020-03  2020-04  2020-05  \
state                                                                       
Alabama                           NaN      NaN     1.33     4.24     3.33   
Alaska                            NaN      NaN     2.52     2.54     1.27   
American Samoa                    NaN      NaN      NaN      NaN      NaN   
Arizona                           0.0      NaN     1.86     4.65     4.77   
Arkansas                          NaN      NaN     1.53     1.94     1.80   
California                        0.0     8.00     2.11     4.43     3.47   
Colorado                          NaN      NaN     1.94     5.89     3.66   
Connecticut                       NaN      NaN     2.21     8.90    11.63   
Delaware                          NaN      NaN     3.76     4.48     5.42   
Florida                           NaN      NaN     1.21     4.43     5.54   
Georgia                           NaN      NaN     2.83     4.54     4.43   

The code involves operations that can be parallelized effectively due to the independence of calculations for each group (state) and month. As the dataset grows, distributed computation becomes increasingly important to handle larger volumes of data efficiently, utilizing multiple machines to improve performance and prevent memory bottlenecks.The code involves operations that can be parallelized effectively due to the independence of calculations for each group (state) and month. As the dataset grows, distributed computation becomes important to handle larger volumes of data efficiently, utilizing multiple machines to improve performance and prevent memory bottlenecks. Dask is a powerful tool for achieving this, as it allows for seamless parallel and distributed computation, making the code scalable and efficient for both small and large datasets.

Finally, ranking states based on CFR fluctuations over time is another highly parallelizable task. Since we are simply summing the month-to-month changes in CFR for each state, this is an aggregation task that Dask executes efficiently across partitions. Each partition first computes the sum of changes locally, and then the results are combined to produce the final ranking. In a distributed computing setup, this approach scales well because each node processes only a subset of the states, preventing unnecessary computational strain on a single machine.

By leveraging Dask’s parallelized and distributed capabilities for each of these operations, we ensure that the computations remain efficient and scalable, even when handling large-scale COVID-19 data. If the dataset were small enough to fit into memory, Pandas would be sufficient, but in this case, Dask is the optimal choice for performance and scalability.

In [50]:

#Aggregating CFR Changes
cfr_changes = cfr_matrix.diff(axis=1)
state_metrics = pd.DataFrame(index=cfr_matrix.index)
state_metrics['total_absolute_change'] = cfr_changes.abs().sum(axis=1)
state_metrics['avg_monthly_change'] = cfr_changes.mean(axis=1)
state_metrics['volatility'] = cfr_changes.std(axis=1)
state_metrics['positive_changes'] = (cfr_changes > 0).sum(axis=1)
state_metrics['negative_changes'] = (cfr_changes < 0).sum(axis=1)
state_metrics['no_changes'] = (cfr_changes == 0).sum(axis=1)
ranked_states = state_metrics.sort_values('total_absolute_change', ascending=False)

#Ranking States Based on CFR Change
print(ranked_states.head(10))
print(ranked_states.tail(10))

                              total_absolute_change  avg_monthly_change  \
state                                                                     
Massachusetts                                182.84            0.000000   
United States Virgin Islands                 128.75           -1.851667   
New Jersey                                    74.33           -7.275714   
New York                                      73.79           -0.132857   
Washington                                    65.34            0.128889   
Northern Mariana Islands                      33.34            0.000000   
Rhode Island                                  23.59           -0.052857   
Michigan                                      22.90           -0.251429   
Connecticut                                   22.40           -0.234286   
California                                    21.94            0.193333   

                              volatility  positive_changes  negative_changes  \
state              

In summary, using a parallelized and distributed approach is highly beneficial at every step of this workflow. Dask ensures that large datasets can be processed efficiently, preventing memory overload and speeding up operations that would otherwise be slow in a traditional pandas-based approach.