From b60e8e21d26d062d398f683bfae324eb645f8629 Mon Sep 17 00:00:00 2001 From: Noel Merket Date: Tue, 4 May 2021 13:31:08 -0600 Subject: [PATCH 1/4] moving dask output files to results/output directory --- buildstockbatch/localdocker.py | 4 ++++ buildstockbatch/postprocessing.py | 11 ++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/buildstockbatch/localdocker.py b/buildstockbatch/localdocker.py index b30fe801..aa158c36 100644 --- a/buildstockbatch/localdocker.py +++ b/buildstockbatch/localdocker.py @@ -11,6 +11,7 @@ """ import argparse +from dask.distributed import Client, LocalCluster import docker import functools from fsspec.implementations.local import LocalFileSystem @@ -217,6 +218,9 @@ def results_dir(self): os.makedirs(results_dir) return results_dir + def get_dask_client(self): + cluster = LocalCluster(local_directory=os.path.join(self.results_dir, 'dask-tmp')) + return Client(cluster) @log_error_details() def main(): diff --git a/buildstockbatch/postprocessing.py b/buildstockbatch/postprocessing.py index a02be786..a16c7f50 100644 --- a/buildstockbatch/postprocessing.py +++ b/buildstockbatch/postprocessing.py @@ -29,6 +29,7 @@ import random import re from s3fs import S3FileSystem +import tempfile import time logger = logging.getLogger(__name__) @@ -325,7 +326,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): ts_filenames = fs.glob(f'{ts_in_dir}/up{upgrade_id:02d}/bldg*.parquet') if not ts_filenames: - logger.info(f"There are no timeseries files for upgrade{upgrade_id}.") + logger.warning(f"There are no timeseries files for upgrade{upgrade_id}.") continue # Calculate the mean and estimate the total memory usage @@ -359,8 +360,12 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): partial(read_and_concat_enduse_timeseries_parquet, fs, all_ts_cols_sorted, ts_out_loc) ) group_ids = list(range(npartitions)) - with performance_report(filename=f'dask_combine_report{upgrade_id}.html'): - dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids)) + with tempfile.TemporaryDirectory() as tmpdir: + tmpfilepath = Path(tmpdir, 'dask-report.html') + with performance_report(filename=str(tmpfilepath)): + dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids)) + if tmpfilepath.exists(): + fs.put_file(str(tmpfilepath), f'{results_dir}/dask_combine_report{upgrade_id}.html') logger.info(f"Finished combining and saving timeseries for upgrade{upgrade_id}.") From 916e32d678788ddb33cc76c0b9efbd70093245b5 Mon Sep 17 00:00:00 2001 From: Noel Merket Date: Tue, 4 May 2021 13:31:44 -0600 Subject: [PATCH 2/4] adding test when an upgrade outputs no timeseries --- buildstockbatch/test/test_postprocessing.py | 22 +++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/buildstockbatch/test/test_postprocessing.py b/buildstockbatch/test/test_postprocessing.py index 183de211..a2053b6b 100644 --- a/buildstockbatch/test/test_postprocessing.py +++ b/buildstockbatch/test/test_postprocessing.py @@ -1,6 +1,8 @@ from fsspec.implementations.local import LocalFileSystem import gzip import json +import logging +import os import pandas as pd import pathlib import re @@ -110,3 +112,23 @@ def test_keep_individual_timeseries(keep_individual_timeseries, basic_residentia ts_path = simout_path / 'timeseries' assert ts_path.exists() == keep_individual_timeseries + + +def test_upgrade_missing_ts(basic_residential_project_file, mocker, caplog): + caplog.set_level(logging.WARNING, logger='buildstockbatch.postprocessing') + + project_filename, results_dir = basic_residential_project_file() + results_path = pathlib.Path(results_dir) + for filename in (results_path / 'simulation_output' / 'timeseries' / 'up01').glob('*.parquet'): + os.remove(filename) + + mocker.patch.object(BuildStockBatchBase, 'weather_dir', None) + mocker.patch.object(BuildStockBatchBase, 'get_dask_client') + mocker.patch.object(BuildStockBatchBase, 'results_dir', results_dir) + bsb = BuildStockBatchBase(project_filename) + bsb.process_results() + + assert len(caplog.records) == 1 + record = caplog.records[0] + assert record.levelname == 'WARNING' + assert record.message == 'There are no timeseries files for upgrade1.' From ad4c71d9fd2b27201adaadd7a357bd45034a9677 Mon Sep 17 00:00:00 2001 From: Noel Merket Date: Tue, 4 May 2021 13:35:56 -0600 Subject: [PATCH 3/4] adding changelog entry --- docs/changelog/changelog_dev.rst | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/changelog/changelog_dev.rst b/docs/changelog/changelog_dev.rst index 892de955..76a4fd89 100644 --- a/docs/changelog/changelog_dev.rst +++ b/docs/changelog/changelog_dev.rst @@ -100,3 +100,13 @@ Development Changelog Modifies docs to specify that the ``eagle.postprocessing.n_workers`` key is for how many Eagle nodes are used and indicates the default of 2. + + .. change:: + :tags: postprocessing, bugfix + :pullreq: 230 + :tickets: 199 + + Previously the postprocessing would fail if an upgrade scenario didn't + have any timeseries simulation output. Now it will skip it and post a + warning message. This was fixed previously, but now we have tests for + it. From 109d4830d5fcdb1f40305e878c587989ae152e2a Mon Sep 17 00:00:00 2001 From: Noel Merket Date: Tue, 4 May 2021 15:55:34 -0600 Subject: [PATCH 4/4] style fixes --- buildstockbatch/localdocker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/buildstockbatch/localdocker.py b/buildstockbatch/localdocker.py index aa158c36..110fcf7b 100644 --- a/buildstockbatch/localdocker.py +++ b/buildstockbatch/localdocker.py @@ -222,6 +222,7 @@ def get_dask_client(self): cluster = LocalCluster(local_directory=os.path.join(self.results_dir, 'dask-tmp')) return Client(cluster) + @log_error_details() def main(): logging.config.dictConfig({