Skip to content

Commit

Permalink
Closing multiprocessing pool objects
Browse files Browse the repository at this point in the history
Updating repo to close all multiprocessing Pool objects to fix memory issues
  • Loading branch information
BrianAvant committed Apr 20, 2021
1 parent a304cf3 commit 6387384
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 55 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
All notable changes to this project will be documented in this file.
We follow the [Semantic Versioning 2.0.0](http://semver.org/) format.
## v3.0.15.4 - 2021-04-20 - [PR #356](https://github.com/NOAA-OWP/cahaba/pull/356)

Closing all multiprocessing Pool objects in repo.
<br/><br/>
## v3.0.15.3 - 2021-04-19 - [PR #358](https://github.com/NOAA-OWP/cahaba/pull/358)

Preprocess NHDPlus HR rasters for consistent projections, nodata values, and convert from cm to meters.
Expand Down
14 changes: 7 additions & 7 deletions src/acquire_and_preprocess_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ def pull_and_prepare_wbd(path_to_saved_data_parent_dir,nwm_dir_name,nwm_file_to_
#wbd_gpkg_list.append(output_gpkg)
#procs_list.append(['ogr2ogr -overwrite -progress -f GPKG -t_srs "{projection}" {output_gpkg} {wbd_gdb_path} {wbd_layer}'.format(output_gpkg=output_gpkg, wbd_gdb_path=wbd_gdb_path, wbd_layer=wbd_layer, projection=PREP_PROJECTION)])

#pool = Pool(num_workers)
#pool.map(run_system_command, procs_list)
# with Pool(processes=num_workers) as pool:
# pool.map(run_system_command, procs_list)

# Subset WBD layers to CONUS and add to single geopackage.
#print("Subsetting WBD layers to CONUS...")
Expand Down Expand Up @@ -150,9 +150,8 @@ def pull_and_prepare_nwm_hydrofabric(path_to_saved_data_parent_dir, path_to_prei
output_gpkg = os.path.join(nwm_hydrofabric_directory, nwm_layer + '_proj.gpkg')
procs_list.append(['ogr2ogr -overwrite -progress -f GPKG -t_srs "{projection}" {output_gpkg} {nwm_hydrofabric_gdb} {nwm_layer}'.format(projection=PREP_PROJECTION, output_gpkg=output_gpkg, nwm_hydrofabric_gdb=nwm_hydrofabric_gdb, nwm_layer=nwm_layer)])

pool = Pool(num_workers)
pool.map(run_system_command, procs_list)
pool.close()
with Pool(processes=num_workers) as pool:
pool.map(run_system_command, procs_list)


def pull_and_prepare_nhd_data(args):
Expand Down Expand Up @@ -349,8 +348,9 @@ def manage_preprocessing(hucs_of_interest, num_workers=1,overwrite_nhd=False, ov
nhd_procs_list.append([nhd_raster_download_url, nhd_raster_extraction_path, nhd_vector_download_url, nhd_vector_extraction_path, overwrite_nhd])

# Pull and prepare NHD data.
#pool = Pool(num_workers)
#pool.map(pull_and_prepare_nhd_data, nhd_procs_list)
# with Pool(processes=num_workers) as pool:
# pool.map(pull_and_prepare_nhd_data, nhd_procs_list)

for huc in nhd_procs_list:
try:
pull_and_prepare_nhd_data(huc)
Expand Down
4 changes: 2 additions & 2 deletions src/aggregate_fim_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,5 +218,5 @@ def reproject_raster(raster_name):
procs_list.append([fim_outputs_directory,huc6,limited_huc_list])

print(f"aggregating {len(huc_list)} hucs to HUC6 scale using {number_of_jobs} jobs")
pool = Pool(number_of_jobs)
pool.map(aggregate_fim_outputs, procs_list)
with Pool(processes=number_of_jobs) as pool:
pool.map(aggregate_fim_outputs, procs_list)
46 changes: 23 additions & 23 deletions tools/cache_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@


def process_alpha_test(args):

fim_run_dir = args[0]
version = args[1]
test_id = args[2]
magnitude = args[3]
archive_results = args[4]
mask_type = 'huc'

mask_type = 'huc'

if archive_results == False:
compare_to_previous = True
else:
Expand All @@ -42,31 +42,31 @@ def process_alpha_test(args):
parser.add_argument('-j','--job-number',help='Number of processes to use. Default is 1.',required=False, default="1")
parser.add_argument('-s','--special-string',help='Add a special name to the end of the branch.',required=False, default="")
parser.add_argument('-b','--benchmark-category',help='Options include ble or ahps. Defaults to process both.',required=False, default=None)

test_cases_dir_list = os.listdir(TEST_CASES_DIR)

args = vars(parser.parse_args())

config = args['config']
fim_version = args['fim_version']
job_number = int(args['job_number'])
special_string = args['special_string']
benchmark_category = args['benchmark_category']

if fim_version != "all":
previous_fim_list = [fim_version]
else:
previous_fim_list = os.listdir(PREVIOUS_FIM_DIR)
previous_fim_list = os.listdir(PREVIOUS_FIM_DIR)

if config == 'PREV':
archive_results = True
elif config == 'DEV':
archive_results = False
else:
print('Config (-c) option incorrectly set. Use "DEV" or "PREV"')

benchmark_category_list = []

if benchmark_category == None:
for d in test_cases_dir_list:
if 'test_cases' in d:
Expand All @@ -77,42 +77,42 @@ def process_alpha_test(args):
procs_list = []
for bench_cat in benchmark_category_list:
bench_cat_test_case_dir = os.path.join(TEST_CASES_DIR, bench_cat + '_test_cases')

bench_cat_test_case_list = os.listdir(bench_cat_test_case_dir)

for test_id in bench_cat_test_case_list:
if 'validation' and 'other' not in test_id:

current_huc = test_id.split('_')[0]
if test_id.split('_')[1] in bench_cat:

for version in previous_fim_list:

if config == 'DEV':
fim_run_dir = os.path.join(OUTPUTS_DIR, version, current_huc)
elif config == 'PREV':
fim_run_dir = os.path.join(PREVIOUS_FIM_DIR, version, current_huc)

if not os.path.exists(fim_run_dir):
fim_run_dir = os.path.join(PREVIOUS_FIM_DIR, version, current_huc[:6]) # For previous versions of HAND computed at HUC6 scale

if os.path.exists(fim_run_dir):
if special_string != "":
version = version + '_' + special_string

if 'ble' in test_id:
magnitude = ['100yr', '500yr']
elif 'usgs' or 'nws' in test_id:
magnitude = ['action', 'minor', 'moderate', 'major']
else:
continue

print("Adding " + test_id + " to list of test_ids to process...")
if job_number > 1:
procs_list.append([fim_run_dir, version, test_id, magnitude, archive_results])
else:
else:
process_alpha_test([fim_run_dir, version, test_id, magnitude, archive_results])

if job_number > 1:
pool = Pool(job_number)
pool.map(process_alpha_test, procs_list)
with Pool(processes=job_number) as pool:
pool.map(process_alpha_test, procs_list)
9 changes: 4 additions & 5 deletions tools/generate_categorical_fim_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ def generate_categorical_fim(fim_run_dir, source_flow_dir, output_cat_fim_dir, n

# Initiate multiprocessing
print(f"Running inundation for {len(procs_list)} sites using {number_of_jobs} jobs")
pool = Pool(number_of_jobs)
pool.map(run_inundation, procs_list)
with Pool(processes=number_of_jobs) as pool:
pool.map(run_inundation, procs_list)


def run_inundation(args):
Expand Down Expand Up @@ -153,7 +153,6 @@ def post_process_cat_fim_for_viz(number_of_jobs, output_cat_fim_dir, nws_lid_att
if not os.path.exists(gpkg_dir):
os.mkdir(gpkg_dir)


# Find the FIM version
fim_version = os.path.basename(output_cat_fim_dir)
merged_layer = os.path.join(output_cat_fim_dir, 'catfim_library.shp')
Expand Down Expand Up @@ -193,8 +192,8 @@ def post_process_cat_fim_for_viz(number_of_jobs, output_cat_fim_dir, nws_lid_att
pass

# Multiprocess with instructions
pool = Pool(number_of_jobs)
pool.map(reformat_inundation_maps, procs_list)
with Pool(processes=number_of_jobs) as pool:
pool.map(reformat_inundation_maps, procs_list)

# Merge all layers
print(f"Merging {len(os.listdir(gpkg_dir))} layers...")
Expand Down
8 changes: 4 additions & 4 deletions tools/rating_curve_comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
def check_file_age(file):
'''
Checks if file exists, determines the file age, and recommends
updating if older than 1 month.
updating if older than 1 month.
Returns
-------
None.
'''
'''
file = Path(file)
if file.is_file():
modification_time = file.stat().st_mtime
Expand Down Expand Up @@ -419,8 +419,8 @@ def calculate_rc_stats_elev(rc,stat_groups=None):

# Initiate multiprocessing
print(f"Generating rating curve metrics for {len(procs_list)} hucs using {number_of_jobs} jobs")
pool = Pool(number_of_jobs)
pool.map(generate_rating_curve_metrics, procs_list)
with Pool(processes=number_of_jobs) as pool:
pool.map(generate_rating_curve_metrics, procs_list)

print(f"Aggregating rating curve metrics for {len(procs_list)} hucs")
aggregate_metrics(output_dir,procs_list,stat_groups)
Expand Down
28 changes: 14 additions & 14 deletions tools/synthesize_test_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ def create_master_metrics_csv(master_metrics_csv_output, dev_comparison):
int(test_case.split('_')[0])

huc = test_case.split('_')[0]

for iteration in iteration_list:

if iteration == "official":
versions_to_crawl = os.path.join(benchmark_test_case_dir, test_case, 'official_versions')
versions_to_aggregate = os.listdir(PREVIOUS_FIM_DIR)
if iteration == "comparison":
versions_to_crawl = os.path.join(benchmark_test_case_dir, test_case, 'testing_versions')
versions_to_aggregate = [dev_comparison]

for magnitude in ['100yr', '500yr']:
for version in versions_to_aggregate:
if '_fr' in version:
Expand All @@ -100,7 +100,7 @@ def create_master_metrics_csv(master_metrics_csv_output, dev_comparison):
calibrated = "no"
version_dir = os.path.join(versions_to_crawl, version)
magnitude_dir = os.path.join(version_dir, magnitude)

if os.path.exists(magnitude_dir):
magnitude_dir_list = os.listdir(magnitude_dir)
for f in magnitude_dir_list:
Expand All @@ -119,7 +119,7 @@ def create_master_metrics_csv(master_metrics_csv_output, dev_comparison):
sub_list_to_append.append(benchmark_source)
sub_list_to_append.append(extent_config)
sub_list_to_append.append(calibrated)

list_to_write.append(sub_list_to_append)
except ValueError:
pass
Expand All @@ -132,9 +132,9 @@ def create_master_metrics_csv(master_metrics_csv_output, dev_comparison):
int(test_case.split('_')[0])

huc = test_case.split('_')[0]

for iteration in iteration_list:

if iteration == "official":
versions_to_crawl = os.path.join(benchmark_test_case_dir, test_case, 'official_versions')
versions_to_aggregate = os.listdir(PREVIOUS_FIM_DIR)
Expand All @@ -154,7 +154,7 @@ def create_master_metrics_csv(master_metrics_csv_output, dev_comparison):
calibrated = "yes"
else:
calibrated = "no"

version_dir = os.path.join(versions_to_crawl, version)
magnitude_dir = os.path.join(version_dir, magnitude)
if os.path.exists(magnitude_dir):
Expand All @@ -166,7 +166,7 @@ def create_master_metrics_csv(master_metrics_csv_output, dev_comparison):
full_json_path = os.path.join(magnitude_dir, f)
flow = ''
if os.path.exists(full_json_path):

# Get flow used to map.
flow_file = os.path.join(benchmark_test_case_dir, 'validation_data_' + benchmark_source, huc, nws_lid, magnitude, 'ahps_' + nws_lid + '_huc_' + huc + '_flows_' + magnitude + '.csv')
if os.path.exists(flow_file):
Expand All @@ -177,7 +177,7 @@ def create_master_metrics_csv(master_metrics_csv_output, dev_comparison):
flow = row[1]
if nws_lid == 'mcc01':
print(flow)

stats_dict = json.load(open(full_json_path))
for metric in metrics_to_write:
sub_list_to_append.append(stats_dict[metric])
Expand All @@ -186,7 +186,7 @@ def create_master_metrics_csv(master_metrics_csv_output, dev_comparison):
sub_list_to_append.append(benchmark_source)
sub_list_to_append.append(extent_config)
sub_list_to_append.append(calibrated)

list_to_write.append(sub_list_to_append)
except ValueError:
pass
Expand Down Expand Up @@ -321,12 +321,12 @@ def process_alpha_test(args):

# Multiprocess alpha test runs.
if job_number > 1:
pool = Pool(job_number)
pool.map(process_alpha_test, procs_list)
with Pool(processes=job_number) as pool:
pool.map(process_alpha_test, procs_list)

# Do aggregate_metrics.
print("Creating master metrics CSV...")

if config == 'DEV':
dev_comparison = fim_version + "_" + special_string
else:
Expand Down

0 comments on commit 6387384

Please sign in to comment.