diff --git a/buildstockbatch/aws/aws.py b/buildstockbatch/aws/aws.py index 16cf1447..8f1ea03d 100644 --- a/buildstockbatch/aws/aws.py +++ b/buildstockbatch/aws/aws.py @@ -40,6 +40,7 @@ from buildstockbatch.base import ValidationError from buildstockbatch.aws.awsbase import AwsJobBase from buildstockbatch import postprocessing +from ..utils import log_error_details logger = logging.getLogger(__name__) @@ -2112,6 +2113,7 @@ def run_job(cls, job_id, bucket, prefix, job_name, region): os.remove(item) +@log_error_details() def main(): logging.config.dictConfig({ 'version': 1, diff --git a/buildstockbatch/eagle.py b/buildstockbatch/eagle.py index 8e01969f..2a5487e4 100644 --- a/buildstockbatch/eagle.py +++ b/buildstockbatch/eagle.py @@ -36,6 +36,7 @@ from .base import BuildStockBatchBase, SimulationExists from .sampler import ResidentialSingularitySampler, CommercialSobolSampler +from .utils import log_error_details from . import postprocessing logger = logging.getLogger(__name__) @@ -650,6 +651,7 @@ def user_cli(argv=sys.argv[1:]): # eagle.sh calls main() +@log_error_details() def main(): """ Determines which piece of work is to be run right now, on this process, on diff --git a/buildstockbatch/localdocker.py b/buildstockbatch/localdocker.py index 6bafa9e7..97079169 100644 --- a/buildstockbatch/localdocker.py +++ b/buildstockbatch/localdocker.py @@ -30,6 +30,7 @@ from buildstockbatch.base import BuildStockBatchBase, SimulationExists from buildstockbatch.sampler import ResidentialDockerSampler, CommercialSobolSampler from buildstockbatch import postprocessing +from .utils import log_error_details logger = logging.getLogger(__name__) @@ -231,6 +232,7 @@ def results_dir(self): return results_dir +@log_error_details() def main(): logging.config.dictConfig({ 'version': 1, diff --git a/buildstockbatch/postprocessing.py b/buildstockbatch/postprocessing.py index ebb0f7de..c29d3173 100644 --- a/buildstockbatch/postprocessing.py +++ b/buildstockbatch/postprocessing.py @@ -34,6 +34,8 @@ logger = logging.getLogger(__name__) +MAX_PARQUET_MEMORY = 1e9 # maximum size of the parquet file in memory when combining multiple parquets + def read_data_point_out_json(fs, reporting_measures, filename): try: @@ -325,7 +327,8 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): total_mem = mean_mem * len(ts_filenames) # Determine how many files should be in each partition and group the files - npartitions = math.ceil(total_mem / 1e9) # 1 GB per partition + npartitions = math.ceil(total_mem / MAX_PARQUET_MEMORY) # 1 GB per partition + npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition ts_files_in_each_partition = np.array_split(ts_filenames, npartitions) # Read the timeseries into a dask dataframe diff --git a/buildstockbatch/test/test_postprocessing.py b/buildstockbatch/test/test_postprocessing.py index de1d7a71..aedfcc71 100644 --- a/buildstockbatch/test/test_postprocessing.py +++ b/buildstockbatch/test/test_postprocessing.py @@ -8,6 +8,7 @@ from buildstockbatch import postprocessing from buildstockbatch.base import BuildStockBatchBase +from unittest.mock import patch def test_report_additional_results_csv_columns(basic_residential_project_file): @@ -55,3 +56,22 @@ def test_report_additional_results_csv_columns(basic_residential_project_file): assert (df['reporting_measure1.column_2'] == 2).all() assert (df['reporting_measure2.column_3'] == 3).all() assert (df['reporting_measure2.column_4'] == 4).all() + + +def test_large_parquet_combine(basic_residential_project_file): + # Test a simulated scenario where the individual timeseries parquet are larger than the max memory per partition + # allocated for the parquet file combining. + + post_process_config = { + 'postprocessing': { + 'aggregate_timeseries': True + } + } + project_filename, results_dir = basic_residential_project_file(post_process_config) + + with patch.object(BuildStockBatchBase, 'weather_dir', None), \ + patch.object(BuildStockBatchBase, 'get_dask_client'), \ + patch.object(BuildStockBatchBase, 'results_dir', results_dir),\ + patch.object(postprocessing, 'MAX_PARQUET_MEMORY', 1e6): # set the max memory to just 1MB + bsb = BuildStockBatchBase(project_filename) + bsb.process_results() # this would raise exception if the postprocessing could not handle the situation diff --git a/buildstockbatch/test/test_utils.py b/buildstockbatch/test/test_utils.py new file mode 100644 index 00000000..62c5d215 --- /dev/null +++ b/buildstockbatch/test/test_utils.py @@ -0,0 +1,51 @@ +import tempfile +from buildstockbatch.utils import log_error_details, _str_repr +import pytest +import os + + +def test_str_repr(): + test_obj = [{1, 2, 3, 4, 5, 6}, {"List1": ["Item1", ('a', 'b', 'c', 'd'), "item3"], + "long_name_list": ["long_name_one_two_three", "long_name"], + "dict": {"key1": ["List_item1", "List_item2", "List_item3"], "Key2": "value2", + "key3": "value3", "key4": "val4"}}] + + gen_repr = _str_repr(test_obj, list_max=2, dict_max=3, string_max=10) + true_repr = "[{'1','2','3' ...6},{'List1': ['Item1',('a','b' ...4) ...3],'long_...14..._list': ['long_...23..."\ + "three','long_name'],'dict': {'key1': ['List_item1','List_item2' ...3],'Key2': 'value2',"\ + "'key3': 'value3' ...4}}]" + + assert true_repr == gen_repr + + +def test_get_error_details(): + tf = tempfile.NamedTemporaryFile('w+', delete=False) + tf.close() + + @log_error_details(tf.name) + def failing_function1(arg1): + level_1_string = f"string1_{arg1}" + level_1_dict = {"key1": "value1"} + + def failing_function2(arg2): + level_2_string = f"string2_{arg2}" + level_2_list = ["level_2_str1", "level_2_str2"] + if level_2_string and level_2_list: + raise KeyError("actual dummy exception") + + if level_1_dict and level_1_string: + failing_function2("my_arg2") + + with pytest.raises(KeyError) as ex_info: + failing_function1("my_arg1") + + assert "actual dummy exception" in str(ex_info.value) + with open(tf.name, 'r') as f: + error_log = f.read() + assert "'arg1':'my_arg1'" in error_log + assert "'level_1_string':'string1_my_arg1'" in error_log + assert "'level_1_dict':{'key1': 'value1'}" in error_log + assert "'arg2':'my_arg2'" in error_log + assert "'level_2_string':'string2_my_arg2'" in error_log + assert "'level_2_list':['level_2_str1','level_2_str2']" in error_log + os.remove(tf.name) diff --git a/buildstockbatch/utils.py b/buildstockbatch/utils.py new file mode 100644 index 00000000..d8ddf56b --- /dev/null +++ b/buildstockbatch/utils.py @@ -0,0 +1,68 @@ +import traceback +import inspect + + +def _str_repr(obj, list_max=20, dict_max=20, string_max=100): + if type(obj) is str: + if len(obj) <= string_max: + return f"'{obj}'" + else: + return f"'{obj[0:string_max//2]}...{len(obj)}...{obj[-string_max//2:]}'" + elif type(obj) in [int, float]: + return _str_repr(str(obj), list_max, dict_max, string_max) + elif type(obj) is list: + txt = "[" + ",".join([_str_repr(item, list_max, dict_max, string_max) for item in obj[0:list_max]]) + if len(obj) > list_max: + txt += f" ...{len(obj)}" + txt += "]" + return txt + elif type(obj) is tuple: + txt = "(" + ",".join([_str_repr(item, list_max, dict_max, string_max) for item in obj[0:list_max]]) + if len(obj) > list_max: + txt += f" ...{len(obj)}" + txt += ")" + return txt + elif type(obj) is set: + obj = list(obj) + txt = "{" + ",".join([_str_repr(item, list_max, dict_max, string_max) for item in obj[0:dict_max]]) + if len(obj) > dict_max: + txt += f" ...{len(obj)}" + txt += "}" + return txt + elif type(obj) is dict: + keys = list(obj.keys()) + txt = "{" + ",".join([f"{_str_repr(key, list_max, dict_max, string_max)}:" + f" {_str_repr(obj[key], list_max, dict_max, string_max)}" for key in keys[0:dict_max]]) + if len(keys) > dict_max: + txt += f" ...{len(keys)}" + txt += "}" + return txt + else: + return str(obj) + + +def _get_error_details(): + text = "" + text += traceback.format_exc() + frames = inspect.trace() + for frame in frames: + text += f'\nIn file: {frame[1]}, module {str(frame[3])} line: {frame[2]} \n' + text += "Local Variables: " + for var, value in frame[0].f_locals.items(): + text += _str_repr(var) + ":" + _str_repr(value) + text += "\n" + return text + + +def log_error_details(output_file="buildstockbatch_crash_details.log"): + def log_error_decorator(func): + def run_with_error_capture(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception: + with open(output_file, "w") as f: + f.write(_get_error_details()) + raise + return run_with_error_capture + + return log_error_decorator