forked from gem/oq-engine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
vulnerability.py
147 lines (104 loc) · 5.06 KB
/
vulnerability.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010-2012, GEM Foundation.
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
Codec for processing vulnerability curves
from XML files.
A DOM version of the vulnerability model parser,
that takes into account the really small size of this input file.
"""
from lxml import etree
from openquake import kvs
from openquake import shapes
from openquake import producer
from openquake import xml
from openquake.nrml.utils import nrml_schema_file
from openquake.xml import NRML
def _parse_set_attributes(vulnerability_set):
"""Parse and return the attributes for all the
vulnerability functions defined in this set of the NRML file."""
imls = vulnerability_set.find(".//%sIML" % NRML)
vuln_function = {"IMT": imls.attrib["IMT"]}
vuln_function["IML"] = \
[float(x) for x in imls.text.strip().split()]
vuln_function["vulnerabilitySetID"] = \
vulnerability_set.attrib["vulnerabilitySetID"]
vuln_function["assetCategory"] = \
vulnerability_set.attrib["assetCategory"]
vuln_function["lossCategory"] = \
vulnerability_set.attrib["lossCategory"]
return vuln_function
class VulnerabilityModelFile(producer.FileProducer):
"""This class parsers a vulnerability model NRML file.
The class is implemented as a generator. For each vulnerability
function in the parsed instance document it yields a dictionary
with all the data defined for that function.
"""
def __init__(self, path):
producer.FileProducer.__init__(self, path)
nrml_schema = etree.XMLSchema(etree.parse(nrml_schema_file()))
self.vuln_model = etree.parse(self.path).getroot()
if not nrml_schema.validate(self.vuln_model):
raise xml.XMLValidationError(
nrml_schema.error_log.last_error, path)
model_el = self.vuln_model.getchildren()[0]
if model_el.tag != "%svulnerabilityModel" % NRML:
raise xml.XMLMismatchError(
path, 'vulnerabilityModel', str(model_el.tag)[len(NRML):])
def filter(self, region_constraint=None, attribute_constraint=None):
"""Filtering is not needed/supported for the vulnerability model."""
def _parse(self):
"""Parse the vulnerability model."""
for vuln_set in self.vuln_model.findall(
".//%sdiscreteVulnerabilitySet" % NRML):
vuln_function = _parse_set_attributes(vuln_set)
for raw_vuln_function in vuln_set.findall(
".//%sdiscreteVulnerability" % NRML):
loss_ratios = [float(x) for x in
raw_vuln_function.find(
"%slossRatio" % NRML).text.strip().split()]
coefficients_variation = [float(x) for x in
raw_vuln_function.find(
"%scoefficientsVariation" % NRML)
.text.strip().split()]
vuln_function["ID"] = \
raw_vuln_function.attrib["vulnerabilityFunctionID"]
vuln_function["probabilisticDistribution"] = \
raw_vuln_function.attrib["probabilisticDistribution"]
vuln_function["lossRatio"] = loss_ratios
vuln_function["coefficientsVariation"] = coefficients_variation
yield dict(vuln_function)
# TODO (ac): These two functions should be probably moved elsewhere
def load_vulnerability_model(job_id, path, retrofitted=False):
"""Load and store the vulnerability model defined in the
given NRML file in the underlying kvs system."""
vulnerability_model = {}
parser = VulnerabilityModelFile(path)
for vuln_curve in parser:
vuln_func = shapes.VulnerabilityFunction(vuln_curve['IML'],
vuln_curve['lossRatio'], vuln_curve['coefficientsVariation'])
vulnerability_model[vuln_curve["ID"]] = vuln_func.to_json()
kvs.set_value_json_encoded(kvs.tokens.vuln_key(job_id, retrofitted),
vulnerability_model)
def load_vuln_model_from_kvs(job_id, retrofitted=False):
"""Load the vulnerability model from kvs for the given job."""
vulnerability_model = kvs.get_value_json_decoded(
kvs.tokens.vuln_key(job_id, retrofitted))
vulnerability_curves = {}
if vulnerability_model is not None:
for k, v in vulnerability_model.items():
vulnerability_curves[k] = shapes.VulnerabilityFunction.from_json(v)
return vulnerability_curves