In [1]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
from datetime import datetime

In [3]:
# File Path
DATA_FILE_PATH = "timeseries.csv"

In [5]:
# Define data types for efficient processing
def define_data_types():
    return {
        "name": "str", "level": "str", "city": "str", "county": "str", "state": "str", "country": "str",
        "population": "float", "lat": "float", "long": "float", "url": "str", "aggregate": "str", "tz": "str",
        "cases": "float", "deaths": "float", "recovered": "str", "active": "str", "tested": "str",
        "hospitalized": "str", "hospitalized_current": "str", "discharged": "str", "icu": "str", "icu_current": "str",
        "growthFactor": "str", "date": "str"
    }

In [13]:
# Load data as Dask DataFrame
def load_data(file_path):
    data_types = define_data_types()
    return dd.read_csv(file_path, dtype=data_types)

In [9]:
# Convert date column to datetime format
def format_date_column(df):
    df["date"] = dd.to_datetime(df["date"], format="%Y-%m-%d")
    return df

In [11]:
# Filter data for the United States
def filter_us_data(df):
    return df[df["country"] == "United States"]

In [15]:
# Filter data based on date range
def filter_date_range(df, start_date, end_date):
    return df[(df["date"] >= start_date) & (df["date"] <= end_date)]

In [17]:
# Compute per-capita mortality ranking
def compute_per_capita_mortality(df):
    total_deaths = df.groupby('state')["deaths"].sum().compute()
    avg_population = df.groupby('state')["population"].mean().compute()
    per_capita_mortality = total_deaths / avg_population
    return per_capita_mortality.sort_values(ascending=False)

In [19]:
# Extract year-month from date
def extract_year_month(date_col):
    return f"{date_col.year}_{date_col.month}"

In [21]:
# Compute Case Fatality Rate (CFR) by state and month
def compute_cfr(df):
    df["year_month"] = df["date"].apply(extract_year_month)
    total_cases = df.groupby(["state", "year_month"])["cases"].sum().compute()
    total_deaths = df.groupby(["state", "year_month"])["deaths"].sum().compute()
    cfr = (total_deaths / total_cases) * 100
    return cfr.reset_index()

In [55]:
# Create pivot table for CFR analysis
def create_cfr_pivot_table(cfr_df):
    pivot_table = cfr_df.pivot(index='state', columns='year_month')
    cols = cfr_df.pivot(index='state', columns='year_month').columns.tolist()
    cols = [i[1] for i in cols]
    pivot_table.columns = cols
    expected_columns = [f"2020_{i}" for i in range(1, 13)] + ["2021_1", "2021_2"]
    for col in expected_columns:
        if col not in pivot_table.columns:
            pivot_table[col] = np.nan
    return pivot_table[expected_columns]

In [25]:
# Rank states based on CFR fluctuations over time
def rank_states_by_cfr_changes(pivot_table):
    pivot_table["total_cfr"] = pivot_table.abs().sum(axis=1)
    pivot_table['rank'] = pivot_table['total_cfr'].rank(ascending=False, method='min')
    return pivot_table.sort_values(by='rank')

In [27]:
df = load_data(DATA_FILE_PATH)
df = format_date_column(df)
df_us = filter_us_data(df)

start_date = datetime.strptime("2020-01-01", "%Y-%m-%d")
end_date = datetime.strptime("2021-02-28", "%Y-%m-%d")
df_us_filtered = filter_date_range(df_us, start_date, end_date)

In [29]:
ranked_states = compute_per_capita_mortality(df_us_filtered)
print("Per Capita Mortality Ranking:\n", ranked_states)

Per Capita Mortality Ranking:
 state
New York                        6.395701
Michigan                        3.204753
Louisiana                       2.735288
Illinois                        2.043863
New Jersey                      2.031200
Georgia                         2.026085
Pennsylvania                    1.831572
Virginia                        1.359063
Mississippi                     1.356705
Indiana                         1.353423
Ohio                            1.058943
Iowa                            0.958771
Massachusetts                   0.904775
Colorado                        0.894296
Minnesota                       0.768853
Kentucky                        0.751512
Texas                           0.737643
Missouri                        0.711146
Connecticut                     0.677952
Maryland                        0.658347
Alabama                         0.599759
North Carolina                  0.549190
Florida                         0.545793
Wisconsin           

In [57]:
cfr_df = compute_cfr(df_us_filtered)

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('date', 'object'))



In [58]:
cfr_pivot_table = create_cfr_pivot_table(cfr_df)
cfr_pivot_table

Unnamed: 0_level_0,2020_1,2020_2,2020_3,2020_4,2020_5,2020_6,2020_7,2020_8,2020_9,2020_10,2020_11,2020_12,2021_1,2021_2
state,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
Alabama,,,0.532313,2.830899,3.88927,2.962907,2.381771,,,,,,,
Alaska,,,0.335008,2.314519,2.196905,1.303247,1.207417,,,,,,,
American Samoa,,,,,,,,,,,,,,
Arizona,0.0,0.0,0.0,1.486545,1.992175,0.211513,0.973523,,,,,,,
Arkansas,,,0.915656,1.91145,2.129628,1.515155,1.27436,,,,,,,
California,0.0,0.0,2.006735,3.479974,3.98335,3.178666,2.543001,,,,,,,
Colorado,,,0.93925,2.636616,5.372019,5.41922,5.063691,,,,,,,
Connecticut,,,1.814771,6.477626,9.016204,9.383106,9.500807,,,,,,,
Delaware,,,1.334107,2.734038,3.574849,4.164733,4.013148,,,,,,,
Florida,,,0.842669,2.905738,4.27725,3.404313,2.039752,,,,,,,


In [63]:
ranked_cfr_changes = rank_states_by_cfr_changes(cfr_pivot_table)
print("CFR Change Rankings:\n")
ranked_cfr_changes[["total_cfr", "rank"]]

CFR Change Rankings:



Unnamed: 0_level_0,total_cfr,rank
state,Unnamed: 1_level_1,Unnamed: 2_level_1
Northern Mariana Islands,78.230784,1.0
Michigan,77.25087,2.0
Connecticut,75.385029,3.0
New York,65.042837,4.0
Virginia,63.583214,5.0
Alabama,63.194319,6.0
Florida,62.939445,7.0
West Virginia,62.921609,8.0
Idaho,62.752338,9.0
Texas,62.456479,10.0


Parallelization & Distributed Computing Considerations
### Dask is used for handling large datasets efficiently:
Dask delays computation until .compute() is called, making operations more efficient for large datasets.
Grouping operations (groupby.sum(), groupby.mean()) benefit from parallel execution.
### Parallelizing per-capita mortality computation:
Computing per-capita mortality involves simple aggregations, which are easily parallelizable
### Parallelizing CFR computation:
CFR calculation involves groupby operations on "state" and "year_month", which can be distributed across multiple cores.
### Parallelizing CFR fluctuation ranking:
Ranking operations are relatively lightweight but can still benefit from distributed processing when dealing with millions of records.
