diff --git a/quidel_covidtest/.pylintrc b/quidel_covidtest/.pylintrc index 854cf38d2..29bd9aac2 100644 --- a/quidel_covidtest/.pylintrc +++ b/quidel_covidtest/.pylintrc @@ -9,6 +9,7 @@ disable=logging-format-interpolation, no-self-use, # Allow pytest classes to have one test. too-few-public-methods +enable=useless-suppression [BASIC] diff --git a/quidel_covidtest/delphi_quidel_covidtest/geo_maps.py b/quidel_covidtest/delphi_quidel_covidtest/geo_maps.py index d59dab692..0c5ac4f9b 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/geo_maps.py +++ b/quidel_covidtest/delphi_quidel_covidtest/geo_maps.py @@ -75,11 +75,11 @@ def add_parent_state(data, geo_res, geo_key): """ fips_to_state = GMPR.get_crosswalk(from_code="fips", to_code="state") if geo_res == "county": - mix_map = fips_to_state[["fips", "state_id"]] # pylint: disable=unsubscriptable-object + mix_map = fips_to_state[["fips", "state_id"]] else: fips_to_geo_res = GMPR.get_crosswalk(from_code="fips", to_code=geo_res) mix_map = fips_to_geo_res[["fips", geo_res]].merge( - fips_to_state[["fips", "state_id"]], # pylint: disable=unsubscriptable-object + fips_to_state[["fips", "state_id"]], on="fips", how="inner") mix_map = GMPR.add_population_column(mix_map, "fips").groupby( diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index b74d617d7..314d8b567 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -6,6 +6,7 @@ """ import atexit from datetime import datetime +from multiprocessing import Manager, Pool, cpu_count, current_process import time from typing import Dict, Any @@ -55,6 +56,49 @@ def get_smooth_info(sensors, _SMOOTHERS): smoothers[sensor] = smoothers.pop(RAW_TEST_PER_DEVICE) return smoothers +def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device, + first_date, last_date, suffix, # generate args + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, # export args + lock, log_filename, log_exceptions): # logger args + """Generate sensors, create export CSV then return stats.""" + # logger cannot be passed to child processes, so has to be recreated + with lock: + logger = get_structured_logger(__name__, log_filename, log_exceptions) + logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor_name, + pid=current_process().pid) + res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device, + first_date, last_date, suffix) + dates = create_export_csv(res_df, geo_res=geo_res, + sensor=sensor_name, export_dir=export_dir, + start_date=export_start_date, + end_date=export_end_date) + return dates + +def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, + first_date, last_date, suffix, # generate args + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, # export args + lock, log_filename, log_exceptions): # logger args + """Generate sensors, create export CSV then return stats.""" + # logger cannot be passed to child processes, so has to be recreated + with lock: + logger = get_structured_logger(__name__, log_filename, log_exceptions) + logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor_name, + pid=current_process().pid) + res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, + first_date, last_date, suffix) + dates = create_export_csv(res_df, geo_res=geo_res, + sensor=sensor_name, export_dir=export_dir, + start_date=export_start_date, + end_date=export_end_date, + remove_null_samples=True) # for parent geo, remove null sample size + return dates + def run_module(params: Dict[str, Any]): """Run the quidel_covidtest indicator. @@ -123,53 +167,71 @@ def run_module(params: Dict[str, Any]): wip_signal=params["indicator"]["wip_signal"], prefix="wip_") smoothers = get_smooth_info(sensors, SMOOTHERS) - for geo_res in NONPARENT_GEO_RESOLUTIONS: - geo_data, res_key = geo_map(geo_res, data) - geo_groups = geo_data.groupby(res_key) - for agegroup in AGE_GROUPS: - for sensor in sensors: - if agegroup == "total": - sensor_name = sensor - else: - sensor_name = "_".join([sensor, agegroup]) - logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name) - state_df = generate_sensor_for_nonparent_geo( - geo_groups, res_key, smooth=smoothers[sensor][1], - device=smoothers[sensor][0], first_date=first_date, - last_date=last_date, suffix=agegroup) - dates = create_export_csv( - state_df, - geo_res=geo_res, - sensor=sensor_name, - export_dir=export_dir, - start_date=export_start_date, - end_date=export_end_date) - if len(dates) > 0: - stats.append((max(dates), len(dates))) - assert geo_res == "state" # Make sure geo_groups is for state level - # County/HRR/MSA level - for geo_res in PARENT_GEO_RESOLUTIONS: - geo_data, res_key = geo_map(geo_res, data) - for agegroup in AGE_GROUPS: - for sensor in sensors: - if agegroup == "total": - sensor_name = sensor - else: - sensor_name = "_".join([sensor, agegroup]) - logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name) - res_df = generate_sensor_for_parent_geo( - geo_groups, geo_data, res_key, smooth=smoothers[sensor][1], - device=smoothers[sensor][0], first_date=first_date, - last_date=last_date, suffix=agegroup) - dates = create_export_csv(res_df, geo_res=geo_res, - sensor=sensor_name, export_dir=export_dir, - start_date=export_start_date, - end_date=export_end_date, - remove_null_samples=True) + n_cpu = min(8, cpu_count()) # for parallelization + with Manager() as manager: + # for using loggers in multiple threads + # disabled due to a Pylint bug, resolved by version bump (#1886) + lock = manager.Lock() # pylint: disable=no-member + logger.info("Parallelizing sensor generation", n_cpu=n_cpu) + with Pool(n_cpu) as pool: + pool_results = [] + for geo_res in NONPARENT_GEO_RESOLUTIONS: + geo_data, res_key = geo_map(geo_res, data) + geo_groups = geo_data.groupby(res_key) + for agegroup in AGE_GROUPS: + for sensor in sensors: + if agegroup == "total": + sensor_name = sensor + else: + sensor_name = "_".join([sensor, agegroup]) + pool_results.append( + pool.apply_async( + generate_and_export_for_nonparent_geo, + args=( + # generate_sensors_for_parent_geo + geo_groups, res_key, + smoothers[sensor][1], smoothers[sensor][0], + first_date, last_date, agegroup, + # create_export_csv + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, + # logger params + lock, + params["common"].get("log_filename"), + params["common"].get("log_exceptions", True) + ) + ) + ) + assert geo_res == "state" # Make sure geo_groups is for state level + # County/HRR/MSA level + for geo_res in PARENT_GEO_RESOLUTIONS: + geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups + for agegroup in AGE_GROUPS: + for sensor in sensors: + if agegroup == "total": + sensor_name = sensor + else: + sensor_name = "_".join([sensor, agegroup]) + pool_results.append( + pool.apply_async( + generate_and_export_for_parent_geo, + args=( + # generate_sensors_for_parent_geo + geo_groups, geo_data, res_key, + smoothers[sensor][1], smoothers[sensor][0], + first_date, last_date, agegroup, + # create_export_csv + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, + # logger params + lock, + params["common"].get("log_filename"), + params["common"].get("log_exceptions", True) + ) + ) + ) + pool_results = [proc.get() for proc in pool_results] + for dates in pool_results: if len(dates) > 0: stats.append((max(dates), len(dates)))