Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PP bug fix and crash log #154

Merged
merged 10 commits into from
May 20, 2020
2 changes: 2 additions & 0 deletions buildstockbatch/aws/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from buildstockbatch.base import ValidationError
from buildstockbatch.aws.awsbase import AwsJobBase
from buildstockbatch import postprocessing
from ..utils import log_error_details

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -2112,6 +2113,7 @@ def run_job(cls, job_id, bucket, prefix, job_name, region):
os.remove(item)


@log_error_details()
def main():
logging.config.dictConfig({
'version': 1,
Expand Down
2 changes: 2 additions & 0 deletions buildstockbatch/eagle.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from .base import BuildStockBatchBase, SimulationExists
from .sampler import ResidentialSingularitySampler, CommercialSobolSampler
from .utils import log_error_details
from . import postprocessing

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -650,6 +651,7 @@ def user_cli(argv=sys.argv[1:]):
# eagle.sh calls main()


@log_error_details()
def main():
"""
Determines which piece of work is to be run right now, on this process, on
Expand Down
2 changes: 2 additions & 0 deletions buildstockbatch/localdocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from buildstockbatch.base import BuildStockBatchBase, SimulationExists
from buildstockbatch.sampler import ResidentialDockerSampler, CommercialSobolSampler
from buildstockbatch import postprocessing
from .utils import log_error_details

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -231,6 +232,7 @@ def results_dir(self):
return results_dir


@log_error_details()
def main():
logging.config.dictConfig({
'version': 1,
Expand Down
5 changes: 4 additions & 1 deletion buildstockbatch/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

logger = logging.getLogger(__name__)

MAX_PARQUET_MEMORY = 1e9 # maximum size of the parquet file in memory when combining multiple parquets


def read_data_point_out_json(fs, reporting_measures, filename):
try:
Expand Down Expand Up @@ -325,7 +327,8 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
total_mem = mean_mem * len(ts_filenames)

# Determine how many files should be in each partition and group the files
npartitions = math.ceil(total_mem / 1e9) # 1 GB per partition
npartitions = math.ceil(total_mem / MAX_PARQUET_MEMORY) # 1 GB per partition
npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test for this? To avoid having to make files that are actually big enough to hit this issue, you could mock some function in the line of calculation for total_mem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the test.

ts_files_in_each_partition = np.array_split(ts_filenames, npartitions)

# Read the timeseries into a dask dataframe
Expand Down
20 changes: 20 additions & 0 deletions buildstockbatch/test/test_postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from buildstockbatch import postprocessing
from buildstockbatch.base import BuildStockBatchBase
from unittest.mock import patch


def test_report_additional_results_csv_columns(basic_residential_project_file):
Expand Down Expand Up @@ -55,3 +56,22 @@ def test_report_additional_results_csv_columns(basic_residential_project_file):
assert (df['reporting_measure1.column_2'] == 2).all()
assert (df['reporting_measure2.column_3'] == 3).all()
assert (df['reporting_measure2.column_4'] == 4).all()


def test_large_parquet_combine(basic_residential_project_file):
# Test a simulated scenario where the individual timeseries parquet are larger than the max memory per partition
# allocated for the parquet file combining.

post_process_config = {
'postprocessing': {
'aggregate_timeseries': True
}
}
project_filename, results_dir = basic_residential_project_file(post_process_config)

with patch.object(BuildStockBatchBase, 'weather_dir', None), \
patch.object(BuildStockBatchBase, 'get_dask_client'), \
patch.object(BuildStockBatchBase, 'results_dir', results_dir),\
patch.object(postprocessing, 'MAX_PARQUET_MEMORY', 1e6): # set the max memory to just 1MB
bsb = BuildStockBatchBase(project_filename)
bsb.process_results() # this would raise exception if the postprocessing could not handle the situation
51 changes: 51 additions & 0 deletions buildstockbatch/test/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import tempfile
from buildstockbatch.utils import log_error_details, _str_repr
import pytest
import os


def test_str_repr():
test_obj = [{1, 2, 3, 4, 5, 6}, {"List1": ["Item1", ('a', 'b', 'c', 'd'), "item3"],
"long_name_list": ["long_name_one_two_three", "long_name"],
"dict": {"key1": ["List_item1", "List_item2", "List_item3"], "Key2": "value2",
"key3": "value3", "key4": "val4"}}]

gen_repr = _str_repr(test_obj, list_max=2, dict_max=3, string_max=10)
true_repr = "[{'1','2','3' ...6},{'List1': ['Item1',('a','b' ...4) ...3],'long_...14..._list': ['long_...23..."\
"three','long_name'],'dict': {'key1': ['List_item1','List_item2' ...3],'Key2': 'value2',"\
"'key3': 'value3' ...4}}]"

assert true_repr == gen_repr


def test_get_error_details():
tf = tempfile.NamedTemporaryFile('w+', delete=False)
tf.close()

@log_error_details(tf.name)
def failing_function1(arg1):
level_1_string = f"string1_{arg1}"
level_1_dict = {"key1": "value1"}

def failing_function2(arg2):
level_2_string = f"string2_{arg2}"
level_2_list = ["level_2_str1", "level_2_str2"]
if level_2_string and level_2_list:
raise KeyError("actual dummy exception")

if level_1_dict and level_1_string:
failing_function2("my_arg2")

with pytest.raises(KeyError) as ex_info:
failing_function1("my_arg1")

assert "actual dummy exception" in str(ex_info.value)
with open(tf.name, 'r') as f:
error_log = f.read()
assert "'arg1':'my_arg1'" in error_log
assert "'level_1_string':'string1_my_arg1'" in error_log
assert "'level_1_dict':{'key1': 'value1'}" in error_log
assert "'arg2':'my_arg2'" in error_log
assert "'level_2_string':'string2_my_arg2'" in error_log
assert "'level_2_list':['level_2_str1','level_2_str2']" in error_log
os.remove(tf.name)
68 changes: 68 additions & 0 deletions buildstockbatch/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import traceback
import inspect


def _str_repr(obj, list_max=20, dict_max=20, string_max=100):
if type(obj) is str:
if len(obj) <= string_max:
return f"'{obj}'"
else:
return f"'{obj[0:string_max//2]}...{len(obj)}...{obj[-string_max//2:]}'"
elif type(obj) in [int, float]:
return _str_repr(str(obj), list_max, dict_max, string_max)
elif type(obj) is list:
txt = "[" + ",".join([_str_repr(item, list_max, dict_max, string_max) for item in obj[0:list_max]])
if len(obj) > list_max:
txt += f" ...{len(obj)}"
txt += "]"
return txt
elif type(obj) is tuple:
txt = "(" + ",".join([_str_repr(item, list_max, dict_max, string_max) for item in obj[0:list_max]])
if len(obj) > list_max:
txt += f" ...{len(obj)}"
txt += ")"
return txt
elif type(obj) is set:
obj = list(obj)
txt = "{" + ",".join([_str_repr(item, list_max, dict_max, string_max) for item in obj[0:dict_max]])
if len(obj) > dict_max:
txt += f" ...{len(obj)}"
txt += "}"
return txt
elif type(obj) is dict:
keys = list(obj.keys())
txt = "{" + ",".join([f"{_str_repr(key, list_max, dict_max, string_max)}:"
f" {_str_repr(obj[key], list_max, dict_max, string_max)}" for key in keys[0:dict_max]])
if len(keys) > dict_max:
txt += f" ...{len(keys)}"
txt += "}"
return txt
else:
return str(obj)


def _get_error_details():
text = ""
text += traceback.format_exc()
frames = inspect.trace()
for frame in frames:
text += f'\nIn file: {frame[1]}, module {str(frame[3])} line: {frame[2]} \n'
text += "Local Variables: "
for var, value in frame[0].f_locals.items():
text += _str_repr(var) + ":" + _str_repr(value)
text += "\n"
return text


def log_error_details(output_file="buildstockbatch_crash_details.log"):
def log_error_decorator(func):
def run_with_error_capture(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
with open(output_file, "w") as f:
f.write(_get_error_details())
raise
return run_with_error_capture

return log_error_decorator