Skip to content

Commit

Permalink
Bug in prepro cli and add a timeout to entity tasks (#784)
Browse files Browse the repository at this point in the history
* Bug in prepro cli and add a timeout to entity tasks

* fix test
  • Loading branch information
fmaussion committed May 27, 2019
1 parent e1063f8 commit d77e82a
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 7 deletions.
1 change: 1 addition & 0 deletions oggm/cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ def initialize(file=None, logging_level='INFO'):
grids[grid_json] = salem.Grid.from_json(fp)
DATA['dem_grids'] = grids


def oggm_static_paths():
"""Initialise the OGGM paths from the config file."""

Expand Down
23 changes: 17 additions & 6 deletions oggm/cli/prepro_levels.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ def _rename_dem_folder(gdir, source=''):
"""

# open tif-file to check if it's worth it
dem_f = gdir.get_filepath('dem')
try:
dem_dr = rasterio.open(gdir.get_filepath('dem'), 'r', driver='GTiff')
dem_dr = rasterio.open(dem_f, 'r', driver='GTiff')
dem = dem_dr.read(1).astype(rasterio.float32)
except IOError:
# No file, no problem
# No file, no problem - still, delete the file if needed
if os.path.exists(dem_f):
os.remove(dem_f)
return

# Grid
Expand All @@ -53,7 +56,9 @@ def _rename_dem_folder(gdir, source=''):
dem[dem <= min_z] = np.NaN
isfinite = np.isfinite(dem)
if np.all(~isfinite) or (np.min(dem) == np.max(dem)):
# Nothing to do, return
# Remove the file and return
if os.path.exists(dem_f):
os.remove(dem_f)
return

# Create a source dir and move the files
Expand All @@ -68,7 +73,8 @@ def run_prepro_levels(rgi_version=None, rgi_reg=None, border=None,
output_folder='', working_dir='', dem_source='',
is_test=False, demo=False, test_rgidf=None,
test_intersects_file=None, test_topofile=None,
test_crudir=None, disable_mp=False, max_level=4):
test_crudir=None, disable_mp=False, timeout=0,
max_level=4):
"""Does the actual job.
Parameters
Expand Down Expand Up @@ -138,6 +144,9 @@ def _time_log():
# Set to True for operational runs
cfg.PARAMS['continue_on_error'] = True

# Timeout
cfg.PARAMS['task_timeout'] = timeout

# For statistics
climate_periods = [1920, 1960, 2000]

Expand Down Expand Up @@ -187,7 +196,6 @@ def _time_log():
if dem_source.upper() == 'ALL':
# This is the complex one, just do the job an leave
log.workflow('Running prepro on ALL sources')
cfg.PARAMS['use_intersects'] = False
for i, s in enumerate(utils.DEM_SOURCES):
rs = i == 0
rgidf['DEM_SOURCE'] = s
Expand Down Expand Up @@ -346,6 +354,9 @@ def parse_args(args):
'the default OGGM DEM.')
parser.add_argument('--disable-mp', nargs='?', const=True, default=False,
help='if you want to disable multiprocessing.')
parser.add_argument('--timeout', type=int, default=0,
help='apply a timeout to the entity tasks '
'(in seconds).')
parser.add_argument('--demo', nargs='?', const=True, default=False,
help='if you want to run the prepro for the '
'list of demo glaciers.')
Expand Down Expand Up @@ -389,7 +400,7 @@ def parse_args(args):
border=border, output_folder=output_folder,
working_dir=working_dir, is_test=args.test,
demo=args.demo, dem_source=args.dem_source,
max_level=args.max_level,
max_level=args.max_level, timeout=args.timeout,
disable_mp=args.disable_mp)


Expand Down
4 changes: 4 additions & 0 deletions oggm/params.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ continue_on_error = False
# (works only for entity tasks)
auto_skip_task = False

# Apply a timeout check to entity tasks?
# 0 means no timeout, positive values give timeout threshold in seconds
task_timeout = 0

# Use compression for the intermediate pickles? (might slow down I/O a bit)
# Both the performance loss (0% ?) and the space gain (-10%) seem to be low
use_compression = True
Expand Down
8 changes: 7 additions & 1 deletion oggm/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,8 +846,14 @@ def test_source_run(self):
from oggm.cli.prepro_levels import run_prepro_levels

# Read in the RGI file
inter = gpd.read_file(utils.get_demo_file('rgi_intersect_oetztal.shp'))
rgidf = gpd.read_file(utils.get_demo_file('rgi_oetztal.shp'))

rgidf['RGIId'] = [rid.replace('RGI50', 'RGI60') for rid in rgidf.RGIId]
inter['RGIId_1'] = [rid.replace('RGI50', 'RGI60')
for rid in inter.RGIId_1]
inter['RGIId_2'] = [rid.replace('RGI50', 'RGI60')
for rid in inter.RGIId_2]
rgidf = rgidf.iloc[:4]

wdir = os.path.join(self.testdir, 'wd')
Expand All @@ -856,7 +862,7 @@ def test_source_run(self):
topof = utils.get_demo_file('srtm_oetztal.tif')
run_prepro_levels(rgi_version=None, rgi_reg='11', border=20,
output_folder=odir, working_dir=wdir, is_test=True,
test_rgidf=rgidf,
test_rgidf=rgidf, test_intersects_file=inter,
test_topofile=topof, dem_source='ALL')

rid = rgidf.iloc[0].RGIId
Expand Down
10 changes: 10 additions & 0 deletions oggm/utils/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import shutil
import tarfile
import sys
import signal
import datetime
import logging
import pickle
Expand Down Expand Up @@ -347,6 +348,10 @@ def __exit__(self, a, b, c):
logging.disable(logging.NOTSET)


def _timeout_handler(signum, frame):
raise TimeoutError('This task was killed because of timeout')


class entity_task(object):
"""Decorator for common job-controlling logic.
Expand Down Expand Up @@ -411,7 +416,12 @@ def _entity_task(gdir, *, reset=None, print_log=True, **kwargs):

# Run the task
try:
if cfg.PARAMS['task_timeout'] > 0:
signal.signal(signal.SIGALRM, _timeout_handler)
signal.alarm(cfg.PARAMS['task_timeout'])
out = task_func(gdir, **kwargs)
if cfg.PARAMS['task_timeout'] > 0:
signal.alarm(0)
if task_name != 'gdir_to_tar':
gdir.log(task_name)
except Exception as err:
Expand Down

0 comments on commit d77e82a

Please sign in to comment.