Skip to content

Commit

Permalink
Merge pull request #1564 from gem/special-assets
Browse files Browse the repository at this point in the history
Implemented losses per event per asset
  • Loading branch information
micheles committed Oct 22, 2014
2 parents 09f5f86 + 8659b1d commit d2643a6
Show file tree
Hide file tree
Showing 16 changed files with 306 additions and 138 deletions.
49 changes: 1 addition & 48 deletions openquake/engine/calculators/risk/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
Base RiskCalculator class.
"""

import collections
import psutil

from openquake.commonlib import risk_parsers
Expand Down Expand Up @@ -254,7 +253,7 @@ def calculator_parameters(self):
celery task function. A calculator must override this to
provide custom arguments to its celery task
"""
return []
return self.job.get_oqparam()

def get_risk_models(self):
# regular risk models
Expand Down Expand Up @@ -282,49 +281,3 @@ def get_workflow(self, vulnerability_functions):
class Workflow():
vulnerability_functions = {}
return Workflow()

#: Calculator parameters are used to compute derived outputs like loss
#: maps, disaggregation plots, quantile/mean curves. See
#: :class:`openquake.engine.db.models.RiskCalculation` for a description

CalcParams = collections.namedtuple(
'CalcParams', [
'conditional_loss_poes',
'poes_disagg',
'sites_disagg',
'insured_losses',
'quantiles',
'asset_life_expectancy',
'interest_rate',
'mag_bin_width',
'distance_bin_width',
'coordinate_bin_width',
'damage_state_ids'
])


def make_calc_params(conditional_loss_poes=None,
poes_disagg=None,
sites_disagg=None,
insured_losses=None,
quantiles=None,
asset_life_expectancy=None,
interest_rate=None,
mag_bin_width=None,
distance_bin_width=None,
coordinate_bin_width=None,
damage_state_ids=None):
"""
Constructor of CalculatorParameters
"""
return CalcParams(conditional_loss_poes,
poes_disagg,
sites_disagg,
insured_losses,
quantiles,
asset_life_expectancy,
interest_rate,
mag_bin_width,
distance_bin_width,
coordinate_bin_width,
damage_state_ids)
24 changes: 7 additions & 17 deletions openquake/engine/calculators/risk/classical_risk/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def do_classical(risk_model, getters, outputdict, params, monitor):
outputs_per_loss_type = risk_model.compute_outputs(
getters, monitor.copy('getting data'))
stats_per_loss_type = risk_model.compute_stats(
outputs_per_loss_type, params.quantiles, post_processing)
outputs_per_loss_type, params.quantile_loss_curves, post_processing)

for loss_type, outputs in outputs_per_loss_type.iteritems():
stats = stats_per_loss_type[loss_type]
Expand Down Expand Up @@ -161,17 +161,19 @@ def save_statistical_output(outputdict, stats, params):

# quantile curves, maps and fractions
outputdict.write_all(
"quantile", params.quantiles,
"quantile", params.quantile_loss_curves,
[(c, a) for c, a in itertools.izip(
stats.quantile_curves, stats.quantile_average_losses)],
stats.assets, output_type="loss_curve", statistics="quantile")

for quantile, maps in zip(params.quantiles, stats.quantile_maps):
for quantile, maps in zip(
params.quantile_loss_curves, stats.quantile_maps):
outputdict.write_all("poe", params.conditional_loss_poes, maps,
stats.assets, output_type="loss_map",
statistics="quantile", quantile=quantile)

for quantile, fractions in zip(params.quantiles, stats.quantile_fractions):
for quantile, fractions in zip(
params.quantile_loss_curves, stats.quantile_fractions):
outputdict.write_all("poe", params.poes_disagg, fractions,
stats.assets, [a.taxonomy for a in stats.assets],
output_type="loss_fraction",
Expand All @@ -186,7 +188,7 @@ def save_statistical_output(outputdict, stats, params):
output_type="loss_curve", statistics="mean", insured=True)

outputdict.write_all(
"quantile", params.quantiles,
"quantile", params.quantile_loss_curves,
[(c, a) for c, a in itertools.izip(
stats.quantile_insured_curves,
stats.quantile_average_insured_losses)],
Expand Down Expand Up @@ -219,15 +221,3 @@ def get_workflow(self, vulnerability_functions):
self.rc.conditional_loss_poes,
self.rc.poes_disagg,
self.rc.insured_losses)

@property
def calculator_parameters(self):
"""
Specific calculator parameters returned as list suitable to be
passed in task_arg_gen
"""

return base.make_calc_params(
conditional_loss_poes=self.rc.conditional_loss_poes or [],
quantiles=self.rc.quantile_loss_curves or [],
poes_disagg=self.rc.poes_disagg or [])
112 changes: 77 additions & 35 deletions openquake/engine/calculators/risk/event_based_risk/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from openquake.hazardlib.geo import mesh
from openquake.risklib import scientific, workflows
from openquake.risklib.utils import numpy_map

from openquake.engine.calculators import post_processing
from openquake.engine.calculators.risk import (
Expand Down Expand Up @@ -66,13 +67,11 @@ def event_based(job_id, risk_model, getters, outputdict, params):

def get_output(risk_model, getter, outputdict, params, monitor):
output = risk_model.compute_output(getter, monitor.copy('getting data'))
# keep in memory the loss_matrix only when doing disaggregation
risk_model.workflow.return_loss_matrix = bool(params.sites_disagg)

# save outputs and stats and populate the event loss table
for loss_type, out in output.iteritems():

if params.sites_disagg:
if params.specific_assets and params.sites_disagg:
with monitor.copy('disaggregating results'):
ruptures = [models.SESRupture.objects.get(pk=rid)
for rid in getter.rupture_ids]
Expand All @@ -90,12 +89,25 @@ def get_output(risk_model, getter, outputdict, params, monitor):
return output


def _filter_loss_matrix_assets(loss_matrix, assets, specific_assets):
# reduce loss_matrix and assets to the specific_assets
mask = numpy.array([a.asset_ref in specific_assets for a in assets])
return loss_matrix[mask], numpy.array(assets)[mask]


def do_event_based(risk_model, getters, outputdict, params, monitor):
"""
See `event_based` for a description of the params
:returns: the event loss table generated by risk_model
"""
specific_assets = set(params.specific_assets)
# keep in memory the loss_matrix only when specific_assets are set
risk_model.workflow.return_loss_matrix = bool(specific_assets)

# the insert here will work only if specific_assets is set
inserter = writer.CacheInserter(
models.EventLossAsset, max_cache_size=10000)
outputs = []
# NB: event_loss_table is a dictionary (loss_type, out_id) -> loss,
# out_id can be None, and it that case it stores the statistics
Expand All @@ -104,12 +116,34 @@ def do_event_based(risk_model, getters, outputdict, params, monitor):
output = get_output(risk_model, getter, outputdict, params, monitor)
for loss_type, out in output.iteritems():
event_loss_table[loss_type, out.hid] = out.output.event_loss_table
if specific_assets:
loss_matrix, assets = _filter_loss_matrix_assets(
out.output.loss_matrix, out.output.assets, specific_assets)
if len(assets) == 0: # no specific_assets
continue
# compute the loss per rupture per asset
event_loss = models.EventLoss.objects.get(
output__oq_job=monitor.job_id,
output__output_type='event_loss_asset',
loss_type=loss_type, hazard_output=getter.hazard_output)
# losses is E x n matrix, where E is the number of ruptures
# and n the number of assets in the specific_assets set
losses = (loss_matrix.transpose() *
numpy_map(lambda a: a.value(loss_type), assets))
# save an EventLossAsset record for each specific asset
for rup_id, losses_per_rup in zip(getter.rupture_ids, losses):
for asset, loss_per_rup in zip(assets, losses_per_rup):
ela = models.EventLossAsset(
event_loss=event_loss, rupture_id=rup_id,
asset=asset, loss=loss_per_rup)
inserter.add(ela)
outputs.append(output)
inserter.flush()

outputs_by_loss_type = {loss_type: [out[loss_type] for out in outputs]
for loss_type in risk_model.loss_types}
stats_by_loss_type = risk_model.compute_stats(
outputs_by_loss_type, params.quantiles, post_processing)
outputs_by_loss_type, params.quantile_loss_curves, post_processing)
for loss_type, stats in stats_by_loss_type.iteritems():
if stats is not None:
with monitor.copy('saving risk statistics'):
Expand Down Expand Up @@ -202,13 +236,14 @@ def save_statistical_output(outputdict, stats, params):

# quantile curves and maps
outputdict.write_all(
"quantile", params.quantiles,
"quantile", params.quantile_loss_curves,
[(c, a) for c, a in itertools.izip(stats.quantile_curves,
stats.quantile_average_losses)],
stats.assets, output_type="loss_curve", statistics="quantile")

if params.quantiles:
for quantile, maps in zip(params.quantiles, stats.quantile_maps):
if params.quantile_loss_curves:
for quantile, maps in zip(
params.quantile_loss_curves, stats.quantile_maps):
outputdict.write_all(
"poe", params.conditional_loss_poes, maps,
stats.assets, output_type="loss_map",
Expand All @@ -222,7 +257,7 @@ def save_statistical_output(outputdict, stats, params):
output_type="loss_curve", statistics="mean", insured=True)

outputdict.write_all(
"quantile", params.quantiles,
"quantile", params.quantile_loss_curves,
[(c, a) for c, a in itertools.izip(
stats.quantile_insured_curves,
stats.quantile_average_insured_losses)],
Expand Down Expand Up @@ -270,9 +305,8 @@ def disaggregate_site(site, loss_ratios):

assets_disagg = []
disagg_matrix = []

for asset, losses in zip(outputs.assets, outputs.loss_matrix):
if asset.site in params.sites_disagg:
if (asset.site.x, asset.site.y) in params.sites_disagg:
disagg_matrix.extend(list(disaggregate_site(asset.site, losses)))

# FIXME. the functions in
Expand Down Expand Up @@ -316,6 +350,33 @@ def __init__(self, job):
super(EventBasedRiskCalculator, self).__init__(job)
# accumulator for the event loss tables
self.acc = collections.defaultdict(collections.Counter)
self.sites_disagg = self.job.get_param('sites_disagg')
self.specific_assets = self.job.get_param('specific_assets')

def pre_execute(self):
"""
Base pre_execute + build Event Loss Asset outputs if needed
"""
super(EventBasedRiskCalculator, self).pre_execute()
for hazard_output in self.rc.hazard_outputs():
for loss_type in self.loss_types:
models.EventLoss.objects.create(
output=models.Output.objects.create_output(
self.job,
"Event Loss Table type=%s, hazard=%s" % (
loss_type, hazard_output.id),
"event_loss"),
loss_type=loss_type,
hazard_output=hazard_output)
if self.specific_assets:
models.EventLoss.objects.create(
output=models.Output.objects.create_output(
self.job,
"Event Loss Asset type=%s, hazard=%s" % (
loss_type, hazard_output.id),
"event_loss_asset"),
loss_type=loss_type,
hazard_output=hazard_output)

@EnginePerformanceMonitor.monitor
def agg_result(self, acc, event_loss_table):
Expand All @@ -337,21 +398,16 @@ def post_process(self):
Compute aggregate loss curves and event loss tables
"""
with EnginePerformanceMonitor('post processing', self.job.id):

inserter = writer.CacheInserter(models.EventLossData,
max_cache_size=10000)
time_span, tses = self.hazard_times()
for (loss_type, out_id), event_loss_table in self.acc.items():
if out_id: # values for individual realizations
hazard_output = models.Output.objects.get(pk=out_id)

event_loss = models.EventLoss.objects.create(
output=models.Output.objects.create_output(
self.job,
"Event Loss Table. type=%s, hazard=%s" % (
loss_type, hazard_output.id),
"event_loss"),
loss_type=loss_type,
hazard_output=hazard_output)
inserter = writer.CacheInserter(models.EventLossData, 9999)
event_loss = models.EventLoss.objects.get(
output__oq_job=self.job,
output__output_type='event_loss',
loss_type=loss_type, hazard_output=hazard_output)
if isinstance(hazard_output.output_container,
models.SESCollection):
ses_coll = hazard_output.output_container
Expand Down Expand Up @@ -420,17 +476,3 @@ def hazard_times(self):
"""
return (self.rc.investigation_time,
self.hc.ses_per_logic_tree_path * self.hc.investigation_time)

@property
def calculator_parameters(self):
"""
Calculator specific parameters
"""
return base.make_calc_params(
conditional_loss_poes=self.rc.conditional_loss_poes or [],
quantiles=self.rc.quantile_loss_curves or [],
insured_losses=self.rc.insured_losses,
sites_disagg=self.rc.sites_disagg or [],
mag_bin_width=self.rc.mag_bin_width,
distance_bin_width=self.rc.distance_bin_width,
coordinate_bin_width=self.rc.coordinate_bin_width)
9 changes: 6 additions & 3 deletions openquake/engine/calculators/risk/scenario_damage/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ def get_risk_models(self):
@property
def calculator_parameters(self):
"""
Provides calculator specific params coming from
:class:`openquake.engine.db.RiskCalculation`
The specific calculation parameters passed as args to the
celery task function. A calculator must override this to
provide custom arguments to its celery task
"""
return base.make_calc_params(damage_state_ids=self.damage_state_ids)
oqparam = self.job.get_oqparam()
oqparam.damage_state_ids = self.damage_state_ids
return oqparam
2 changes: 1 addition & 1 deletion openquake/engine/calculators/risk/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ def individual_outputs(self, loss_type, hazard_output):
variables = ["magnitude_distance", "coordinate"]

loss_fractions = []
if self.calc.rc.sites_disagg:
if self.calc.sites_disagg:
for variable in variables:
name = ("loss fractions. type=%s variable=%s "
"hazard=%s" % (loss_type, hazard_output, variable))
Expand Down

0 comments on commit d2643a6

Please sign in to comment.