Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend: Speed up cdc_restricted_local step #2342

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
08045ee
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Jul 31, 2023
12fb315
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 1, 2023
74c9477
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 1, 2023
18c4e44
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 1, 2023
9850889
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 1, 2023
1d74516
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 4, 2023
6c0ac9c
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 4, 2023
7c1b2dc
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 4, 2023
744654e
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 4, 2023
eed29ae
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 7, 2023
e95e1e7
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 8, 2023
60dac18
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 8, 2023
4add785
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 8, 2023
c9c86bf
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 9, 2023
45a7116
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 10, 2023
85ea6a5
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 10, 2023
79712dd
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 10, 2023
5b4a48d
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 11, 2023
cc346a1
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 11, 2023
14947f1
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 14, 2023
c878c4d
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 14, 2023
023563d
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 15, 2023
5d876ae
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 16, 2023
4f7a527
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 17, 2023
52679ef
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 18, 2023
098ec3f
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 22, 2023
318745c
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 22, 2023
5f02bc6
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 23, 2023
0c13c70
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 23, 2023
80e9867
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 23, 2023
7094a70
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 24, 2023
a523d67
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 24, 2023
4cf154c
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 24, 2023
bdb9f74
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 25, 2023
f828711
Merge branch 'main' of https://github.com/SatcherInstitute/health-equ…
benhammondmusic Aug 25, 2023
b3230cb
try new function with timing
benhammondmusic Aug 28, 2023
a81acd5
some improvements on local maybe
benhammondmusic Aug 28, 2023
79f2e47
faster
benhammondmusic Aug 28, 2023
4f39bd6
hm
benhammondmusic Aug 29, 2023
1c6c647
improvements
benhammondmusic Aug 30, 2023
9ecc8f5
rv
benhammondmusic Aug 30, 2023
33bfcad
cleanup
benhammondmusic Aug 30, 2023
9425fe0
removes suppression logic
benhammondmusic Aug 30, 2023
875c221
Revert "removes suppression logic"
benhammondmusic Aug 30, 2023
a156bba
lint
benhammondmusic Aug 30, 2023
e8271b8
Merge branch 'main' into cdc-local-vectorize
benhammondmusic Aug 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cspell.config.json
Expand Up @@ -238,6 +238,7 @@
"Gnumeric",
"goode",
"goto",
"gpt",
"gracia",
"graffunder",
"greenblue",
Expand Down
81 changes: 45 additions & 36 deletions python/datasources/cdc_restricted_local.py
Expand Up @@ -22,6 +22,7 @@
import pandas as pd # type: ignore
pd.options.mode.chained_assignment = None # default='warn'

CHUNK_SIZE = 5_000_000

# Command line flags for the dir and file name prefix for the data.
parser = argparse.ArgumentParser()
Expand All @@ -37,14 +38,27 @@
STATE_COL = 'res_state'
COUNTY_FIPS_COL = 'county_fips_code'
COUNTY_COL = 'res_county'
RACE_ETH_COL = 'race_ethnicity_combined'
SEX_COL = 'sex'
AGE_COL = 'age_group'
OUTCOME_COLS = ['hosp_yn', 'death_yn']
RACE_COL = 'race'
ETH_COL = 'ethnicity'
CASE_DATE_COL = 'cdc_case_earliest_dt'

USE_COLS = [
STATE_COL,
COUNTY_FIPS_COL,
COUNTY_COL,
SEX_COL,
AGE_COL,
*OUTCOME_COLS,
RACE_COL,
ETH_COL,
CASE_DATE_COL,
]

# column no longer provided by CDC that we need to recreate
RACE_ETH_COL = 'race_ethnicity_combined'

# Convenience list for when we group the data by county.
COUNTY_COLS = [COUNTY_FIPS_COL, COUNTY_COL, STATE_COL]
Expand Down Expand Up @@ -112,7 +126,7 @@
'race_and_age': ([RACE_COL, ETH_COL, AGE_COL], {**AGE_NAMES_MAPPING, **RACE_NAMES_MAPPING}),
}

# States that we have decided to suppress different kinds of data for, due to
# States that we previously decided to suppress different kinds of data for, due to
# very incomplete data. Note that states that have all data suppressed will
# have case, hospitalization, and death data suppressed.
# See https://github.com/SatcherInstitute/health-equity-tracker/issues/617.
Expand All @@ -126,18 +140,26 @@ def combine_race_eth(df):
We will keep this in place until we can figure out a plan on how to display
the race and ethnicity to our users in a disaggregated way."""

def get_combined_value(row):
if row[ETH_COL] == 'Hispanic/Latino':
return std_col.Race.HISP.value
# Create a mask for Hispanic/Latino
hispanic_mask = df[ETH_COL] == 'Hispanic/Latino'

elif row[RACE_COL] in {'NA', 'Missing', 'Unknown'} or row[ETH_COL] in {'NA', 'Missing', 'Unknown'}:
return std_col.Race.UNKNOWN.value
# Create masks for 'NA', 'Missing', 'Unknown'
race_missing_mask = df[RACE_COL].isin({'NA', 'Missing', 'Unknown'})
eth_missing_mask = df[ETH_COL].isin({'NA', 'Missing', 'Unknown'})

else:
return RACE_NAMES_MAPPING[row[RACE_COL]]
# Create a mask for other cases
other_mask = ~race_missing_mask & ~eth_missing_mask

# Create a new combined race/eth column Initialize with UNKNOWN
df[RACE_ETH_COL] = std_col.Race.UNKNOWN.value
# Overwrite specific race if given
df.loc[other_mask, RACE_ETH_COL] = df.loc[other_mask, RACE_COL].map(RACE_NAMES_MAPPING)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

# overwrite with Hispanic if given
df.loc[hispanic_mask, RACE_ETH_COL] = std_col.Race.HISP.value

df[RACE_ETH_COL] = df.apply(get_combined_value, axis=1)
# Drop unnecessary columns
df = df.drop(columns=[RACE_COL, ETH_COL])

return df


Expand Down Expand Up @@ -195,8 +217,8 @@ def accumulate_data(df, geo_cols, overall_df, demog_cols, names_mapping):
groupby_cols = groupby_cols + [CASE_DATE_COL]
total_groupby_cols = total_groupby_cols + [CASE_DATE_COL]

df = df.groupby(groupby_cols).sum().reset_index()
totals = df.groupby(total_groupby_cols).sum().reset_index()
df = df.groupby(groupby_cols).sum(numeric_only=True).reset_index()
totals = df.groupby(total_groupby_cols).sum(numeric_only=True).reset_index()

# Special case required due to later processing.
if demog_cols[0] == RACE_ETH_COL:
Expand All @@ -222,22 +244,6 @@ def sanity_check_data(df):
df[std_col.COVID_DEATH_UNKNOWN])


def standardize_data(df):
"""Standardizes the data by cleaning string values and standardizing column
names.

df: Pandas dataframe to standardize.
"""
# Clean string values in the dataframe.
df = df.applymap(
lambda x: x.replace('"', '').strip() if isinstance(x, str) else x)

# Standardize column names.
df = df.rename(columns=COL_NAME_MAPPING)

return df


def generate_national_dataset(state_df, groupby_cols):
"""Generates a national level dataset from the state_df.
Returns a national level dataframe
Expand Down Expand Up @@ -300,18 +306,21 @@ def process_data(dir, files):

# Note that we read CSVs with keep_default_na = False as we want to
# prevent pandas from interpreting "NA" in the data as NaN
chunked_frame = pd.read_csv(os.path.join(dir, f), dtype=str,
chunksize=100000, keep_default_na=False)
chunked_frame = pd.read_csv(
os.path.join(dir, f),
dtype=str,
chunksize=CHUNK_SIZE,
keep_default_na=False,
usecols=USE_COLS
)

for chunk in chunked_frame:

# We first do a bit of cleaning up of geo values and str values.
df = chunk.replace({COUNTY_FIPS_COL: COUNTY_FIPS_NAMES_MAPPING})
df = df.replace({COUNTY_COL: COUNTY_NAMES_MAPPING})
df = df.replace({STATE_COL: STATE_NAMES_MAPPING})

def _clean_str(x):
return x.replace('"', '').strip() if isinstance(x, str) else x
df = df.applymap(_clean_str)

# For county fips, we make sure they are strings of length 5 as per
# our standardization (ignoring empty values).
df[COUNTY_FIPS_COL] = df[COUNTY_FIPS_COL].map(
Expand Down Expand Up @@ -341,7 +350,7 @@ def _clean_str(x):
demog_names_mapping)

end = time.time()
print("Took", round(end - start, 2), "seconds to process file", f)
print("Took", round(end - start), "seconds to process file", f)

# Post-processing of the data.
for key in all_dfs.copy():
Expand All @@ -356,7 +365,7 @@ def _clean_str(x):
all_dfs[key] = all_dfs[key].astype(int).reset_index()

# Standardize the column names and race/age/sex values.
all_dfs[key] = standardize_data(all_dfs[key])
all_dfs[key] = all_dfs[key].rename(columns=COL_NAME_MAPPING)

# Set hospitalization and death data for states we want to suppress to
# an empty string, indicating missing data.
Expand Down