Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into enable-com
Browse files Browse the repository at this point in the history
  • Loading branch information
nmerket committed May 20, 2020
2 parents 744bae5 + 2124a82 commit 37b56fd
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 1 deletion.
2 changes: 2 additions & 0 deletions buildstockbatch/aws/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from buildstockbatch.base import ValidationError
from buildstockbatch.aws.awsbase import AwsJobBase
from buildstockbatch import postprocessing
from ..utils import log_error_details

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -2112,6 +2113,7 @@ def run_job(cls, job_id, bucket, prefix, job_name, region):
os.remove(item)


@log_error_details()
def main():
logging.config.dictConfig({
'version': 1,
Expand Down
2 changes: 2 additions & 0 deletions buildstockbatch/eagle.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from .base import BuildStockBatchBase, SimulationExists
from .sampler import ResidentialSingularitySampler, CommercialSobolSingularitySampler, PrecomputedSingularitySampler
from .utils import log_error_details
from . import postprocessing

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -691,6 +692,7 @@ def user_cli(argv=sys.argv[1:]):
# eagle.sh calls main()


@log_error_details()
def main():
"""
Determines which piece of work is to be run right now, on this process, on
Expand Down
2 changes: 2 additions & 0 deletions buildstockbatch/localdocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from buildstockbatch.base import BuildStockBatchBase, SimulationExists
from buildstockbatch.sampler import ResidentialDockerSampler, CommercialSobolDockerSampler, PrecomputedDockerSampler
from buildstockbatch import postprocessing
from .utils import log_error_details

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -251,6 +252,7 @@ def results_dir(self):
return results_dir


@log_error_details()
def main():
logging.config.dictConfig({
'version': 1,
Expand Down
5 changes: 4 additions & 1 deletion buildstockbatch/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

logger = logging.getLogger(__name__)

MAX_PARQUET_MEMORY = 1e9 # maximum size of the parquet file in memory when combining multiple parquets


def read_data_point_out_json(fs, reporting_measures, filename):
try:
Expand Down Expand Up @@ -326,7 +328,8 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
total_mem = mean_mem * len(ts_filenames)

# Determine how many files should be in each partition and group the files
npartitions = math.ceil(total_mem / 1e9) # 1 GB per partition
npartitions = math.ceil(total_mem / MAX_PARQUET_MEMORY) # 1 GB per partition
npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition
ts_files_in_each_partition = np.array_split(ts_filenames, npartitions)

# Read the timeseries into a dask dataframe
Expand Down
20 changes: 20 additions & 0 deletions buildstockbatch/test/test_postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from buildstockbatch import postprocessing
from buildstockbatch.base import BuildStockBatchBase
from unittest.mock import patch


def test_report_additional_results_csv_columns(basic_residential_project_file):
Expand Down Expand Up @@ -55,3 +56,22 @@ def test_report_additional_results_csv_columns(basic_residential_project_file):
assert (df['reporting_measure1.column_2'] == 2).all()
assert (df['reporting_measure2.column_3'] == 3).all()
assert (df['reporting_measure2.column_4'] == 4).all()


def test_large_parquet_combine(basic_residential_project_file):
# Test a simulated scenario where the individual timeseries parquet are larger than the max memory per partition
# allocated for the parquet file combining.

post_process_config = {
'postprocessing': {
'aggregate_timeseries': True
}
}
project_filename, results_dir = basic_residential_project_file(post_process_config)

with patch.object(BuildStockBatchBase, 'weather_dir', None), \
patch.object(BuildStockBatchBase, 'get_dask_client'), \
patch.object(BuildStockBatchBase, 'results_dir', results_dir),\
patch.object(postprocessing, 'MAX_PARQUET_MEMORY', 1e6): # set the max memory to just 1MB
bsb = BuildStockBatchBase(project_filename)
bsb.process_results() # this would raise exception if the postprocessing could not handle the situation
51 changes: 51 additions & 0 deletions buildstockbatch/test/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import tempfile
from buildstockbatch.utils import log_error_details, _str_repr
import pytest
import os


def test_str_repr():
test_obj = [{1, 2, 3, 4, 5, 6}, {"List1": ["Item1", ('a', 'b', 'c', 'd'), "item3"],
"long_name_list": ["long_name_one_two_three", "long_name"],
"dict": {"key1": ["List_item1", "List_item2", "List_item3"], "Key2": "value2",
"key3": "value3", "key4": "val4"}}]

gen_repr = _str_repr(test_obj, list_max=2, dict_max=3, string_max=10)
true_repr = "[{'1','2','3' ...6},{'List1': ['Item1',('a','b' ...4) ...3],'long_...14..._list': ['long_...23..."\
"three','long_name'],'dict': {'key1': ['List_item1','List_item2' ...3],'Key2': 'value2',"\
"'key3': 'value3' ...4}}]"

assert true_repr == gen_repr


def test_get_error_details():
tf = tempfile.NamedTemporaryFile('w+', delete=False)
tf.close()

@log_error_details(tf.name)
def failing_function1(arg1):
level_1_string = f"string1_{arg1}"
level_1_dict = {"key1": "value1"}

def failing_function2(arg2):
level_2_string = f"string2_{arg2}"
level_2_list = ["level_2_str1", "level_2_str2"]
if level_2_string and level_2_list:
raise KeyError("actual dummy exception")

if level_1_dict and level_1_string:
failing_function2("my_arg2")

with pytest.raises(KeyError) as ex_info:
failing_function1("my_arg1")

assert "actual dummy exception" in str(ex_info.value)
with open(tf.name, 'r') as f:
error_log = f.read()
assert "'arg1':'my_arg1'" in error_log
assert "'level_1_string':'string1_my_arg1'" in error_log
assert "'level_1_dict':{'key1': 'value1'}" in error_log
assert "'arg2':'my_arg2'" in error_log
assert "'level_2_string':'string2_my_arg2'" in error_log
assert "'level_2_list':['level_2_str1','level_2_str2']" in error_log
os.remove(tf.name)
68 changes: 68 additions & 0 deletions buildstockbatch/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import traceback
import inspect


def _str_repr(obj, list_max=20, dict_max=20, string_max=100):
if type(obj) is str:
if len(obj) <= string_max:
return f"'{obj}'"
else:
return f"'{obj[0:string_max//2]}...{len(obj)}...{obj[-string_max//2:]}'"
elif type(obj) in [int, float]:
return _str_repr(str(obj), list_max, dict_max, string_max)
elif type(obj) is list:
txt = "[" + ",".join([_str_repr(item, list_max, dict_max, string_max) for item in obj[0:list_max]])
if len(obj) > list_max:
txt += f" ...{len(obj)}"
txt += "]"
return txt
elif type(obj) is tuple:
txt = "(" + ",".join([_str_repr(item, list_max, dict_max, string_max) for item in obj[0:list_max]])
if len(obj) > list_max:
txt += f" ...{len(obj)}"
txt += ")"
return txt
elif type(obj) is set:
obj = list(obj)
txt = "{" + ",".join([_str_repr(item, list_max, dict_max, string_max) for item in obj[0:dict_max]])
if len(obj) > dict_max:
txt += f" ...{len(obj)}"
txt += "}"
return txt
elif type(obj) is dict:
keys = list(obj.keys())
txt = "{" + ",".join([f"{_str_repr(key, list_max, dict_max, string_max)}:"
f" {_str_repr(obj[key], list_max, dict_max, string_max)}" for key in keys[0:dict_max]])
if len(keys) > dict_max:
txt += f" ...{len(keys)}"
txt += "}"
return txt
else:
return str(obj)


def _get_error_details():
text = ""
text += traceback.format_exc()
frames = inspect.trace()
for frame in frames:
text += f'\nIn file: {frame[1]}, module {str(frame[3])} line: {frame[2]} \n'
text += "Local Variables: "
for var, value in frame[0].f_locals.items():
text += _str_repr(var) + ":" + _str_repr(value)
text += "\n"
return text


def log_error_details(output_file="buildstockbatch_crash_details.log"):
def log_error_decorator(func):
def run_with_error_capture(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
with open(output_file, "w") as f:
f.write(_get_error_details())
raise
return run_with_error_capture

return log_error_decorator

0 comments on commit 37b56fd

Please sign in to comment.