Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed the SES table #1399

Merged
merged 2 commits into from
Apr 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 2 additions & 9 deletions openquake/engine/calculators/hazard/event_based/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def compute_ses_and_gmfs(job_id, src_seeds, lt_model, gsims_by_rlz, task_no):
ses_coll = models.SESCollection.objects.get(lt_model=lt_model)

hc = models.HazardCalculation.objects.get(oqjob=job_id)
all_ses = models.SES.objects.filter(ses_collection=ses_coll)
all_ses = list(ses_coll)
imts = map(from_string, hc.intensity_measure_types)
params = dict(
correl_model=general.get_correl_model(hc),
Expand Down Expand Up @@ -334,14 +334,7 @@ def initialize_ses_db_records(self, lt_model):
output_type='gmf')
models.Gmf.objects.create(output=output, lt_realization=rlz)

all_ses = []
for i in xrange(1, self.hc.ses_per_logic_tree_path + 1):
all_ses.append(
models.SES.objects.create(
ses_collection=ses_coll,
investigation_time=self.hc.investigation_time,
ordinal=i))
return all_ses
return ses_coll

def pre_execute(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions openquake/engine/calculators/risk/event_based/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ def post_process(self):
lt_model=hazard_output.output_container.
lt_realization.lt_model)
rupture_ids = models.SESRupture.objects.filter(
ses__ses_collection=ses_coll).values_list(
'id', flat=True)
rupture__ses_collection=ses_coll).values_list(
'id', flat=True)
for rupture_id in rupture_ids:
if rupture_id in event_loss_table:
inserter.add(
Expand Down
85 changes: 49 additions & 36 deletions openquake/engine/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,8 +1568,15 @@ def __iter__(self):
"""
Iterator for walking through all child :class:`SES` objects.
"""
return SES.objects.filter(ses_collection=self.id).order_by('ordinal') \
.iterator()
hc = self.output.oq_job.hazard_calculation
for ordinal in xrange(1, hc.ses_per_logic_tree_path + 1):
yield SES(self, ordinal)

def __len__(self):
"""
Return the ses_per_logic_tree_path parameter
"""
return self.output.oq_job.hazard_calculation.ses_per_logic_tree_path

@property
def sm_lt_path(self):
Expand All @@ -1579,29 +1586,32 @@ def sm_lt_path(self):
return tuple(self.lt_model.sm_lt_path)


class SES(djm.Model):
class SES(object):
"""
Stochastic Event Set: A container for 1 or more ruptures associated with a
specific investigation time span.

See also :class:`SESRupture`.
"""
ses_collection = djm.ForeignKey('SESCollection')
investigation_time = djm.FloatField()
# Order number of this Stochastic Event Set in a series of SESs
# (for a given logic tree realization).
ordinal = djm.IntegerField(null=True)
# the ordinal must be > 0: the reason is that it appears in the
# exported XML file and the schema constraints the number to be
# nonzero
def __init__(self, ses_collection, ordinal=1):
self.ses_collection = ses_collection
self.ordinal = ordinal

class Meta:
db_table = 'hzrdr\".\"ses'
ordering = ['ordinal']
@property
def investigation_time(self):
hc = self.ses_collection.output.oq_job.hazard_calculation
return hc.investigation_time

def __cmp__(self, other):
return cmp(self.ordinal, other.ordinal)

def __iter__(self):
"""
Iterator for walking through all child :class:`SESRupture` objects.
"""
return SESRupture.objects.filter(ses=self.id).order_by('tag') \
.iterator()
return SESRupture.objects.filter(
ses_id=self.ordinal).order_by('tag').iterator()


def is_from_fault_source(rupture):
Expand Down Expand Up @@ -1826,7 +1836,7 @@ class SESRupture(djm.Model):
A rupture as part of a Stochastic Event Set.
"""
rupture = djm.ForeignKey('ProbabilisticRupture')
ses = djm.ForeignKey('SES')
ses_id = djm.IntegerField(null=False)
tag = djm.TextField(null=False)
seed = djm.IntegerField(null=False)

Expand Down Expand Up @@ -1856,7 +1866,7 @@ def create(cls, prob_rupture, ses, source_id, rupt_no, rupt_occ, seed):
ses.ses_collection.ordinal, ses.ordinal, source_id, rupt_no,
rupt_occ)
return cls.objects.create(
rupture=prob_rupture, ses=ses, tag=tag, seed=seed)
rupture=prob_rupture, ses_id=ses.ordinal, tag=tag, seed=seed)


class _Point(object):
Expand Down Expand Up @@ -1904,36 +1914,39 @@ def __iter__(self):
If a SES does not generate any GMF, it is ignored.
"""
hc = self.output.oq_job.hazard_calculation
for ses in SES.objects.filter(
ses_collection__output__oq_job=self.output.oq_job):
query = """
for ses_coll in SESCollection.objects.filter(
output__oq_job=self.output.oq_job):
for ses in ses_coll:
query = """
SELECT imt, sa_period, sa_damping, tag,
array_agg(gmv) AS gmvs,
array_agg(ST_X(location::geometry)) AS xs,
array_agg(ST_Y(location::geometry)) AS ys
FROM (SELECT imt, sa_period, sa_damping,
unnest(rupture_ids) as rupture_id, location, unnest(gmvs) AS gmv
FROM hzrdr.gmf_data, hzrdi.hazard_site
WHERE site_id = hzrdi.hazard_site.id AND hazard_calculation_id=%s
WHERE site_id = hzrdi.hazard_site.id AND hazard_calculation_id=%s
AND gmf_id=%d) AS x, hzrdr.ses_rupture AS y
WHERE x.rupture_id = y.id AND y.ses_id=%d
GROUP BY imt, sa_period, sa_damping, tag
ORDER BY imt, sa_period, sa_damping, tag;
""" % (hc.id, self.id, ses.id)
with transaction.commit_on_success(using='job_init'):
curs = getcursor('job_init')
curs.execute(query)
# a set of GMFs generate by the same SES, one per rupture
gmfset = []
for imt, sa_period, sa_damping, rupture_tag, gmvs, xs, ys in curs:
# using a generator here saves a lot of memory
nodes = (_GroundMotionFieldNode(gmv, _Point(x, y))
for gmv, x, y in zip(gmvs, xs, ys))
gmfset.append(
_GroundMotionField(
imt, sa_period, sa_damping, rupture_tag, nodes))
if gmfset:
yield GmfSet(ses, gmfset)
""" % (hc.id, self.id, ses.ordinal)
with transaction.commit_on_success(using='job_init'):
curs = getcursor('job_init')
curs.execute(query)
# a set of GMFs generate by the same SES, one per rupture
gmfset = []
for (imt, sa_period, sa_damping, rupture_tag, gmvs,
xs, ys) in curs:
# using a generator here saves a lot of memory
nodes = (_GroundMotionFieldNode(gmv, _Point(x, y))
for gmv, x, y in zip(gmvs, xs, ys))
gmfset.append(
_GroundMotionField(
imt, sa_period, sa_damping, rupture_tag,
nodes))
if gmfset:
yield GmfSet(ses, gmfset)


class GmfSet(object):
Expand Down
2 changes: 1 addition & 1 deletion openquake/engine/db/schema/indexes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ CREATE INDEX hzrdr_ses_collection_ouput_id_idx on hzrdr.ses_collection(output_id
CREATE INDEX hzrdr_ses_ses_collection_id_idx on hzrdr.ses(ses_collection_id);

-- ses_rupture
CREATE UNIQUE INDEX hzrdr_ses_rupture_tag_uniq_idx ON hzrdr.ses_rupture(ses_id, tag);
CREATE UNIQUE INDEX hzrdr_ses_rupture_tag_uniq_idx ON hzrdr.ses_rupture(rupture_id, tag);
CREATE INDEX hzrdr_ses_rupture_ses_id_idx on hzrdr.ses_rupture(ses_id);
CREATE INDEX hzrdr_ses_rupture_tag_idx ON hzrdr.ses_rupture (tag);

Expand Down
8 changes: 1 addition & 7 deletions openquake/engine/db/schema/openquake.sql
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ SELECT AddGeometryColumn('hzrdr', 'probabilistic_rupture', 'hypocenter', 4326, '

CREATE TABLE hzrdr.ses_rupture (
id SERIAL PRIMARY KEY,
ses_id INTEGER NOT NULL, -- FK to ses.id
ses_id INTEGER NOT NULL,
rupture_id INTEGER NOT NULL, -- FK to probabilistic_rupture.id
tag VARCHAR NOT NULL,
seed INTEGER NOT NULL
Expand Down Expand Up @@ -1013,12 +1013,6 @@ ADD CONSTRAINT hzrdr_ses_rupture_probabilistic_rupture_fk
FOREIGN KEY (rupture_id) REFERENCES hzrdr.probabilistic_rupture(id)
ON DELETE CASCADE;

-- hzrdr.ses_rupture to hzrdr.ses FK
ALTER TABLE hzrdr.ses_rupture
ADD CONSTRAINT hzrdr_ses_rupture_ses_fk
FOREIGN KEY (ses_id) REFERENCES hzrdr.ses(id)
ON DELETE CASCADE;

ALTER TABLE riskr.loss_map
ADD CONSTRAINT riskr_loss_map_output_fk
FOREIGN KEY (output_id) REFERENCES uiapi.output(id) ON DELETE CASCADE;
Expand Down
4 changes: 4 additions & 0 deletions openquake/engine/db/schema/upgrades/1.0.1/21/01-drop-ses.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP INDEX hzrdr.hzrdr_ses_rupture_tag_uniq_idx;
CREATE UNIQUE INDEX hzrdr_ses_rupture_tag_uniq_idx ON hzrdr.ses_rupture(rupture_id, tag);
ALTER TABLE hzrdr.ses_rupture DROP CONSTRAINT hzrdr_ses_rupture_ses_fk;
DROP TABLE hzrdr.ses;
20 changes: 9 additions & 11 deletions openquake/engine/tests/calculators/hazard/event_based/core_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,12 @@ def test_initialize_ses_db_records(self):

# With this job configuration, we have 2 logic tree realizations
# for the GMPEs
lt_rlzs = models.LtRealization.objects.filter(
lt_model__hazard_calculation=hc)
self.assertEqual(2, len(lt_rlzs))

sess = models.SES.objects.filter(
ses_collection__lt_model=lt_rlzs[0].lt_model)
self.assertEqual(hc.ses_per_logic_tree_path, len(sess))
for ses in sess:
[lt_model] = models.LtSourceModel.objects.filter(hazard_calculation=hc)
self.assertEqual(2, len(list(lt_model)))

ses_coll = models.SESCollection.objects.get(lt_model=lt_model)
self.assertEqual(hc.ses_per_logic_tree_path, len(ses_coll))
for ses in ses_coll:
# The only metadata in in the SES is investigation time.
self.assertEqual(hc.investigation_time, ses.investigation_time)

Expand All @@ -189,16 +187,16 @@ def test_complete_event_based_calculation_cycle(self):
finally:
del os.environ['OQ_NO_DISTRIBUTE']
hc = job.hazard_calculation
rlz1, rlz2 = models.LtRealization.objects.filter(
lt_model__hazard_calculation=hc.id).order_by('ordinal')
[(rlz1, rlz2)] = models.LtSourceModel.objects.filter(
hazard_calculation=hc.id)

# check that the parameters are read correctly from the files
self.assertEqual(hc.ses_per_logic_tree_path, 5)

# check that we generated the right number of ruptures
# (this is fixed if the seeds are fixed correctly)
num_ruptures = models.SESRupture.objects.filter(
ses__ses_collection__output__oq_job=job.id).count()
rupture__ses_collection__output__oq_job=job.id).count()
self.assertEqual(num_ruptures, 96)

# check that we generated the right number of rows in GmfData
Expand Down
5 changes: 1 addition & 4 deletions openquake/engine/tests/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,10 +689,6 @@ def create_ses_ruptures(job, ses_collection, num):
Each rupture has a magnitude ranging from 0 to 10 and no geographic
information.
"""
ses = models.SES.objects.create(
ses_collection=ses_collection,
investigation_time=job.hazard_calculation.investigation_time,
ordinal=1)
rupture = ParametricProbabilisticRupture(
mag=1 + 10. / float(num), rake=0,
tectonic_region_type="test region type",
Expand All @@ -703,6 +699,7 @@ def create_ses_ruptures(job, ses_collection, num):
occurrence_rate=1,
temporal_occurrence_model=PoissonTOM(10),
source_typology=object())
ses = models.SES(ses_collection, ordinal=1)
seed = 42
pr = models.ProbabilisticRupture.create(rupture, ses_collection)
return [models.SESRupture.create(pr, ses, 'test', 1, i, seed + i)
Expand Down
23 changes: 11 additions & 12 deletions openquake/engine/tools/extract_gmvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,22 @@ def extract(hc_id, a_writer):
sa_period_fix = sa_period

ruptures = sorted(
[r.id
for r in models.SESRupture.objects.filter(
ses__ses_collection__lt_realization=lt)])
[r.id for r in models.SESRupture.objects.filter(
rupture__ses_collection__lt_realization=lt)])

for site in hc.hazardsite_set.all().order_by('id'):
gmvs = []
gmvs_data = dict()

for ses in models.SES.objects.filter(
ses_collection__lt_realization=lt).order_by('id'):

for gmf in models.GmfData.objects.filter(
ses=ses,
site=site,
imt=imt_type, sa_period=sa_period):

gmvs_data.update(dict(zip(gmf.rupture_ids, gmf.gmvs)))
for ses_coll in models.SESCollection.objects.filter(
lt_realization=lt).order_by('id'):
for ses in ses_coll:
for gmf in models.GmfData.objects.filter(
ses_id=ses.ordinal,
site=site,
imt=imt_type, sa_period=sa_period):
gmvs_data.update(
dict(zip(gmf.rupture_ids, gmf.gmvs)))
gmvs.extend([gmvs_data.get(r, 0.0) for r in ruptures])
a_writer.writerow([lt.id, site.location.x, site.location.y,
imt_type_fix, sa_period_fix] + gmvs)
Expand Down
4 changes: 2 additions & 2 deletions openquake/server/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ def update_calculation(callback_url=None, **query):
'ses': DbInterface(
"""SELECT tag, magnitude, St_AsText(hypocenter)
FROM hzrdr.ses_rupture r
JOIN hzrdr.ses ses ON ses.id = r.ses_id
JOIN hzrdr.ses_collection sc ON ses.ses_collection_id = sc.id
JOIN hzrdr.probabilistic_rupture pr ON r.id = r.rupture_id
JOIN hzrdr.ses_collection sc ON pr.ses_collection_id = sc.id
JOIN uiapi.output o ON o.id = sc.output_id
WHERE o.id = %(output_id)d""",
"icebox_ses",
Expand Down
2 changes: 1 addition & 1 deletion qa_tests/hazard/event_based/blocksize/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def run_with_concurrent_tasks(self, n):
cfg = os.path.join(os.path.dirname(__file__), 'job.ini')
job = self.run_hazard(cfg)
tags = models.SESRupture.objects.filter(
ses__ses_collection__output__oq_job=job
rupture__ses_collection__output__oq_job=job
).values_list('tag', flat=True)
# gets the GMFs for all the ruptures in the only existing SES
[gmfs_per_ses] = list(models.Gmf.objects.get(output__oq_job=job))
Expand Down