Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run scenario_risk/damage from a shakemap #3608

Merged
merged 21 commits into from Apr 12, 2018
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 34 additions & 5 deletions openquake/calculators/base.py
Expand Up @@ -36,6 +36,7 @@
from openquake.commonlib import readinput, source, calc, writers
from openquake.baselib.parallel import Starmap
from openquake.baselib.python3compat import with_metaclass
from openquake.hazardlib.shakemap import get_sitecol_shakemap, to_gmfs
from openquake.calculators.export import export as exp
from openquake.calculators.getters import GmfDataGetter, PmapGetter

Expand Down Expand Up @@ -324,7 +325,8 @@ def can_read_parent(self):
read_access = (
config.distribution.oq_distribute in ('no', 'processpool') or
config.directory.shared_dir)
if self.oqparam.hazard_calculation_id and read_access:
if (self.oqparam.hazard_calculation_id and read_access
and 'gmf_data' not in self.datastore.hdf5):
self.datastore.parent.close() # make sure it is closed
return self.datastore.parent

Expand Down Expand Up @@ -410,11 +412,11 @@ def init(self):
"""
To be overridden to initialize the datasets needed by the calculation
"""
if not self.oqparam.imtls:
if not self.oqparam.risk_imtls:
if self.datastore.parent:
self.oqparam.risk_imtls = (
self.datastore.parent['oqparam'].risk_imtls)
else:
elif not self.oqparam.imtls:
raise ValueError('Missing intensity_measure_types!')
if self.precalc:
self.rlzs_assoc = self.precalc.rlzs_assoc
Expand Down Expand Up @@ -588,6 +590,33 @@ class RiskCalculator(HazardCalculator):
attributes .riskmodel, .sitecol, .assetcol, .riskinputs in the
pre_execute phase.
"""
def read_shakemap(self):
"""
Enabled only if there is a shakemap_id parameter in the job.ini.
Download, unzip, parse USGS shakemap files and build a corresponding
set of GMFs which are then filtered with the hazard site collection
and stored in the datastore.
"""
oq = self.oqparam
E = oq.number_of_ground_motion_fields
haz_sitecol = self.datastore.parent['sitecol']

logging.info('Getting/reducing shakemap')
with self.monitor('getting/reducing shakemap'):
smap = oq.shakemap_id if oq.shakemap_id else numpy.load(
oq.inputs['shakemap'])
self.sitecol, shakemap = get_sitecol_shakemap(
smap, haz_sitecol, oq.asset_hazard_distance)

logging.info('Building GMFs')
with self.monitor('building/saving GMFs'):
gmfs = to_gmfs(shakemap, oq.site_effects, oq.truncation_level, E,
oq.random_seed)
save_gmf_data(self.datastore, self.sitecol, gmfs)
events = numpy.zeros(E, readinput.stored_event_dt)
events['eid'] = numpy.arange(E, dtype=U64)
self.datastore['events'] = events

def make_eps(self, num_ruptures):
"""
:param num_ruptures: the size of the epsilon array for each asset
Expand Down Expand Up @@ -743,7 +772,7 @@ def get_gmfs(calculator):
elif calculator.precalc: # from previous step
num_assocs = dstore['csm_info'].get_num_rlzs()
E = oq.number_of_ground_motion_fields
eids = numpy.arange(E)
eids = numpy.arange(E, dtype=U64)
gmfs = numpy.zeros((num_assocs, N, E, I))
for g, gsim in enumerate(calculator.precalc.gsims):
gmfs[g, sitecol.sids] = calculator.precalc.gmfa[gsim]
Expand All @@ -758,7 +787,7 @@ def save_gmf_data(dstore, sitecol, gmfs):
"""
:param dstore: a :class:`openquake.baselib.datastore.DataStore` instance
:param sitecol: a :class:`openquake.hazardlib.site.SiteCollection` instance
:param gmfs: an array of shape (R, N, E, I)
:param gmfs: an array of shape (R, N, E, M)
"""
offset = 0
dstore['gmf_data/data'] = gmfa = get_gmv_data(sitecol.sids, gmfs)
Expand Down
30 changes: 19 additions & 11 deletions openquake/calculators/scenario.py
Expand Up @@ -16,6 +16,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import collections
import logging
import numpy

from openquake.hazardlib.calc.gmf import GmfComputer
Expand All @@ -37,22 +38,26 @@ def pre_execute(self):
"""
super(ScenarioCalculator, self).pre_execute()
oq = self.oqparam
trunc_level = oq.truncation_level
correl_model = oq.get_correl_model()
gsim_lt = readinput.get_gsim_lt(oq)
cinfo = source.CompositionInfo.fake(gsim_lt)
self.datastore['csm_info'] = cinfo
self.datastore['oqparam'] = oq
self.rlzs_assoc = cinfo.get_rlzs_assoc()
if 'rupture_model' not in oq.inputs:
logging.warn('There is no rupture_model, the calculator will just '
'import data without performing any calculation')
return
ebr, self.sitecol = readinput.get_rupture_sitecol(oq, self.sitecol)
self.gsims = readinput.get_gsims(oq)
self.datastore['events'] = ebr.events
rupser = calc.RuptureSerializer(self.datastore)
rupser.save([ebr])
rupser.close()
trunc_level = oq.truncation_level
correl_model = oq.get_correl_model()
self.computer = GmfComputer(
ebr, self.sitecol, oq.imtls, ContextMaker(self.gsims),
trunc_level, correl_model)
gsim_lt = readinput.get_gsim_lt(oq)
cinfo = source.CompositionInfo.fake(gsim_lt)
self.datastore['csm_info'] = cinfo
self.datastore['oqparam'] = oq
self.rlzs_assoc = cinfo.get_rlzs_assoc()

def init(self):
pass
Expand All @@ -62,6 +67,8 @@ def execute(self):
Compute the GMFs and return a dictionary gsim -> array(N, E, I)
"""
self.gmfa = collections.OrderedDict()
if 'rupture_model' not in self.oqparam.inputs:
return self.gmfa
with self.monitor('computing gmfs', autoflush=True):
n = self.oqparam.number_of_ground_motion_fields
for gsim in self.gsims:
Expand All @@ -70,7 +77,8 @@ def execute(self):
return self.gmfa

def post_execute(self, dummy):
with self.monitor('saving gmfs', autoflush=True):
base.save_gmf_data(
self.datastore, self.sitecol,
numpy.array(list(self.gmfa.values())))
if self.gmfa:
with self.monitor('saving gmfs', autoflush=True):
base.save_gmf_data(
self.datastore, self.sitecol,
numpy.array(list(self.gmfa.values())))
12 changes: 8 additions & 4 deletions openquake/calculators/scenario_damage.py
Expand Up @@ -120,12 +120,16 @@ def pre_execute(self):
if 'gmfs' in self.oqparam.inputs:
self.pre_calculator = None
base.RiskCalculator.pre_execute(self)
eids, self.R = base.get_gmfs(self)
self.param['number_of_ground_motion_fields'] = (
self.oqparam.number_of_ground_motion_fields)
if self.oqparam.shakemap_id or 'shakemap' in self.oqparam.inputs:
self.read_shakemap()
self.R = 1
else:
_, self.R = base.get_gmfs(self)
E = self.oqparam.number_of_ground_motion_fields
self.param['number_of_ground_motion_fields'] = E
self.param['consequence_models'] = riskmodels.get_risk_models(
self.oqparam, 'consequence')
self.riskinputs = self.build_riskinputs('gmf', num_events=len(eids))
self.riskinputs = self.build_riskinputs('gmf', num_events=E)
self.param['tags'] = list(self.assetcol.tagcol)

def post_execute(self, result):
Expand Down
6 changes: 5 additions & 1 deletion openquake/calculators/scenario_risk.py
Expand Up @@ -95,7 +95,11 @@ def pre_execute(self):
if 'gmfs' in self.oqparam.inputs:
self.pre_calculator = None
base.RiskCalculator.pre_execute(self)
eids, self.R = base.get_gmfs(self)
if self.oqparam.shakemap_id or 'shakemap' in self.oqparam.inputs:
self.read_shakemap()
self.R = 1
else:
_, self.R = base.get_gmfs(self)
self.assetcol = self.datastore['assetcol']
A = len(self.assetcol)
E = self.oqparam.number_of_ground_motion_fields
Expand Down
10 changes: 9 additions & 1 deletion openquake/calculators/tests/scenario_risk_test.py
Expand Up @@ -20,7 +20,7 @@
import numpy
from openquake.qa_tests_data.scenario_risk import (
case_1, case_2, case_2d, case_1g, case_1h, case_3, case_4, case_5,
case_6a, case_7, case_8, occupants, case_master)
case_6a, case_7, case_8, occupants, case_master, case_shakemap)

from openquake.baselib.general import writetmp
from openquake.calculators.tests import CalculatorTestCase
Expand Down Expand Up @@ -212,3 +212,11 @@ def test_case_8(self):

# make sure the fullreport can be extracted
view('fullreport', self.calc.datastore)

@attr('qa', 'risk', 'scenario_risk')
def test_case_shakemap(self):
self.run_calc(case_shakemap.__file__, 'pre-job.ini')
self.run_calc(case_shakemap.__file__, 'job.ini',
hazard_calculation_id=str(self.calc.datastore.calc_id))
sitecol = self.calc.datastore['sitecol']
self.assertEqual(len(sitecol), 1)
9 changes: 9 additions & 0 deletions openquake/commands/reduce.py
Expand Up @@ -21,6 +21,7 @@
import csv
import random
import shutil
import numpy
from openquake.hazardlib import valid, nrml, InvalidFile
from openquake.baselib.python3compat import encode
from openquake.baselib import sap
Expand Down Expand Up @@ -73,6 +74,14 @@ def reduce(fname, reduction_factor):
_save_csv(fname, lines, header)
print('Extracted %d lines out of %d' % (len(lines), len(all_lines)))
return
elif fname.endswith('.npy'):
array = numpy.load(fname)
shutil.copy(fname, fname + '.bak')
print('Copied the original file in %s.bak' % fname)
arr = numpy.array(random_filter(array, reduction_factor))
numpy.save(fname, arr)
print('Extracted %d rows out of %d' % (len(arr), len(array)))
return
node = nrml.read(fname)
model = node[0]
if model.tag.endswith('exposureModel'):
Expand Down
11 changes: 9 additions & 2 deletions openquake/commonlib/oqvalidation.py
Expand Up @@ -127,7 +127,8 @@ class OqParam(valid.ParamSet):
ses_per_logic_tree_path = valid.Param(valid.positiveint, 1)
ses_seed = valid.Param(valid.positiveint, 42)
max_site_model_distance = valid.Param(valid.positivefloat, 5) # by Graeme
shakemap_id = valid.Param(valid.simple_id, None)
shakemap_id = valid.Param(valid.nice_string, None)
site_effects = valid.Param(valid.boolean, True) # shakemap amplification
sites = valid.Param(valid.NoneOr(valid.coordinates), None)
sites_disagg = valid.Param(valid.NoneOr(valid.coordinates), [])
sites_per_tile = valid.Param(valid.positiveint, 30000) # by M. Simionato
Expand Down Expand Up @@ -470,6 +471,12 @@ def job_type(self):
'damage' in self.calculation_mode or
'bcr' in self.calculation_mode) else 'hazard'

def is_valid_shakemap(self):
"""
hazard_calculation_id must be set if shakemap_id is set
"""
return self.hazard_calculation_id if self.shakemap_id else True

def is_valid_truncation_level_disaggregation(self):
"""
Truncation level must be set for disaggregation calculations
Expand Down Expand Up @@ -663,7 +670,7 @@ def check_uniform_hazard_spectra(self):

def check_source_model(self):
if ('hazard_curves' in self.inputs or 'gmfs' in self.inputs or
'rupture_model' in self.inputs):
self.calculation_mode.startswith('scenario')):
return
if 'source' not in self.inputs and not self.hazard_calculation_id:
raise ValueError('Missing source_model_logic_tree in %s '
Expand Down
6 changes: 3 additions & 3 deletions openquake/commonlib/readinput.py
Expand Up @@ -332,11 +332,11 @@ def get_site_collection(oqparam):
sm, operator.itemgetter('lon'), operator.itemgetter('lat'))
sitecol = site.SiteCollection.from_points(
mesh.lons, mesh.lats, mesh.depths)
dic = site_model_params.assoc(
sids, params = site_model_params.assoc(
sitecol, oqparam.max_site_model_distance, 'warn')
for sid in dic:
for sid, param in zip(sids, params):
for name in site_model_dt.names[2:]: # all names except lon, lat
sitecol.array[sid][name] = dic[sid][name]
sitecol.array[sid][name] = param[name]
return sitecol

# else use the default site params
Expand Down
6 changes: 4 additions & 2 deletions openquake/hazardlib/geo/utils.py
Expand Up @@ -35,6 +35,7 @@
EARTH_RADIUS, geodetic_distance, min_idx_dst)
from openquake.baselib.slots import with_slots

U32 = numpy.uint32
SphericalBB = collections.namedtuple('SphericalBB', 'west east north south')


Expand Down Expand Up @@ -90,7 +91,7 @@ def assoc(self, sitecol, assoc_dist, mode='error'):
:param: a (filtered) site collection
:param assoc_dist: the maximum distance for association
:param mode: 'strict', 'error', 'warn' or 'ignore'
:returns: a dictionary site_id -> array of associated objects
:returns: two arrays (site_ids, associated objects)
"""
dic = {}
for sid, lon, lat in zip(sitecol.sids, sitecol.lons, sitecol.lats):
Expand All @@ -112,7 +113,8 @@ def assoc(self, sitecol, assoc_dist, mode='error'):
if not dic and mode == 'error':
raise SiteAssociationError(
'No sites could be associated within %s km' % assoc_dist)
return dic
sids = sorted(dic)
return numpy.array(sids, U32), numpy.array([dic[sid] for sid in sids])


def clean_points(points):
Expand Down