Skip to content

Commit

Permalink
Add multiprocessing to the Quidel indicator (#1881)
Browse files Browse the repository at this point in the history
  • Loading branch information
rzats committed Aug 10, 2023
1 parent ca20157 commit aa0eb70
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 49 deletions.
1 change: 1 addition & 0 deletions quidel_covidtest/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ disable=logging-format-interpolation,
no-self-use,
# Allow pytest classes to have one test.
too-few-public-methods
enable=useless-suppression

[BASIC]

Expand Down
4 changes: 2 additions & 2 deletions quidel_covidtest/delphi_quidel_covidtest/geo_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ def add_parent_state(data, geo_res, geo_key):
"""
fips_to_state = GMPR.get_crosswalk(from_code="fips", to_code="state")
if geo_res == "county":
mix_map = fips_to_state[["fips", "state_id"]] # pylint: disable=unsubscriptable-object
mix_map = fips_to_state[["fips", "state_id"]]
else:
fips_to_geo_res = GMPR.get_crosswalk(from_code="fips", to_code=geo_res)
mix_map = fips_to_geo_res[["fips", geo_res]].merge(
fips_to_state[["fips", "state_id"]], # pylint: disable=unsubscriptable-object
fips_to_state[["fips", "state_id"]],
on="fips",
how="inner")
mix_map = GMPR.add_population_column(mix_map, "fips").groupby(
Expand Down
156 changes: 109 additions & 47 deletions quidel_covidtest/delphi_quidel_covidtest/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
import atexit
from datetime import datetime
from multiprocessing import Manager, Pool, cpu_count, current_process
import time
from typing import Dict, Any

Expand Down Expand Up @@ -55,6 +56,49 @@ def get_smooth_info(sensors, _SMOOTHERS):
smoothers[sensor] = smoothers.pop(RAW_TEST_PER_DEVICE)
return smoothers

def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device,
first_date, last_date, suffix, # generate args
geo_res, sensor_name, export_dir,
export_start_date, export_end_date, # export args
lock, log_filename, log_exceptions): # logger args
"""Generate sensors, create export CSV then return stats."""
# logger cannot be passed to child processes, so has to be recreated
with lock:
logger = get_structured_logger(__name__, log_filename, log_exceptions)
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name,
pid=current_process().pid)
res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device,
first_date, last_date, suffix)
dates = create_export_csv(res_df, geo_res=geo_res,
sensor=sensor_name, export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date)
return dates

def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, device,
first_date, last_date, suffix, # generate args
geo_res, sensor_name, export_dir,
export_start_date, export_end_date, # export args
lock, log_filename, log_exceptions): # logger args
"""Generate sensors, create export CSV then return stats."""
# logger cannot be passed to child processes, so has to be recreated
with lock:
logger = get_structured_logger(__name__, log_filename, log_exceptions)
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name,
pid=current_process().pid)
res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device,
first_date, last_date, suffix)
dates = create_export_csv(res_df, geo_res=geo_res,
sensor=sensor_name, export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date,
remove_null_samples=True) # for parent geo, remove null sample size
return dates

def run_module(params: Dict[str, Any]):
"""Run the quidel_covidtest indicator.
Expand Down Expand Up @@ -123,53 +167,71 @@ def run_module(params: Dict[str, Any]):
wip_signal=params["indicator"]["wip_signal"],
prefix="wip_")
smoothers = get_smooth_info(sensors, SMOOTHERS)
for geo_res in NONPARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
geo_groups = geo_data.groupby(res_key)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name)
state_df = generate_sensor_for_nonparent_geo(
geo_groups, res_key, smooth=smoothers[sensor][1],
device=smoothers[sensor][0], first_date=first_date,
last_date=last_date, suffix=agegroup)
dates = create_export_csv(
state_df,
geo_res=geo_res,
sensor=sensor_name,
export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date)
if len(dates) > 0:
stats.append((max(dates), len(dates)))
assert geo_res == "state" # Make sure geo_groups is for state level
# County/HRR/MSA level
for geo_res in PARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name)
res_df = generate_sensor_for_parent_geo(
geo_groups, geo_data, res_key, smooth=smoothers[sensor][1],
device=smoothers[sensor][0], first_date=first_date,
last_date=last_date, suffix=agegroup)
dates = create_export_csv(res_df, geo_res=geo_res,
sensor=sensor_name, export_dir=export_dir,
start_date=export_start_date,
end_date=export_end_date,
remove_null_samples=True)
n_cpu = min(8, cpu_count()) # for parallelization
with Manager() as manager:
# for using loggers in multiple threads
# disabled due to a Pylint bug, resolved by version bump (#1886)
lock = manager.Lock() # pylint: disable=no-member
logger.info("Parallelizing sensor generation", n_cpu=n_cpu)
with Pool(n_cpu) as pool:
pool_results = []
for geo_res in NONPARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
geo_groups = geo_data.groupby(res_key)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_nonparent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
lock,
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
)
)
)
assert geo_res == "state" # Make sure geo_groups is for state level
# County/HRR/MSA level
for geo_res in PARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_parent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, geo_data, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
lock,
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
)
)
)
pool_results = [proc.get() for proc in pool_results]
for dates in pool_results:
if len(dates) > 0:
stats.append((max(dates), len(dates)))

Expand Down

0 comments on commit aa0eb70

Please sign in to comment.