Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dump and restore Stochastic Event Set #1300

Merged
merged 33 commits into from Nov 11, 2013
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d0ecad0
first pass
matley Nov 3, 2013
735913d
qa test passes
matley Nov 3, 2013
9351be0
removed old tests
matley Nov 3, 2013
050dac1
added new qa test and migration script
matley Nov 3, 2013
fcd0432
fix deadlock problem
matley Nov 3, 2013
00409d2
removed an old test
matley Nov 3, 2013
773172e
removed duplicated xml files
matley Nov 3, 2013
ec2b185
Merge branch 'master' of https://github.com/gem/oq-engine into dump-i…
matley Nov 4, 2013
dd82f1a
updated revision info
matley Nov 4, 2013
ec46f67
Merge branch 'master' of https://github.com/gem/oq-engine into dump-i…
matley Nov 4, 2013
d2f8a98
do not rely on field ordering
matley Nov 5, 2013
11342c1
merge commit
matley Nov 6, 2013
9317bf7
removed duplication
matley Nov 6, 2013
2407520
added docstring
matley Nov 6, 2013
1beea6f
removed an unnecessary conversion
matley Nov 6, 2013
47f7a04
guarded an expression
matley Nov 6, 2013
ebb7ac9
restore_hazard only used from openquake script
matley Nov 6, 2013
0ed3cd7
use copy expert
matley Nov 6, 2013
2513cd1
removed unused imports
matley Nov 6, 2013
4b8cc9b
fix oqscript to get an opened connection
matley Nov 7, 2013
cb015d3
changing openquake tables ownership
matley Nov 7, 2013
db386a0
added migration script to change ownership
matley Nov 7, 2013
2367c51
renamed dump/restore -> save/load
matley Nov 7, 2013
b853685
renamed restore->load
matley Nov 7, 2013
0ffd443
Merge branch 'master' of https://github.com/gem/oq-engine into dump-i…
matley Nov 8, 2013
a725726
Merge branch 'master' of https://github.com/gem/oq-engine into dump-i…
matley Nov 8, 2013
a9c3364
skip saving gmf
matley Nov 8, 2013
8925e60
do not setup a new connection when saving hazard calculations
matley Nov 8, 2013
3c207bb
added assertion to check for the existance of the hc
matley Nov 8, 2013
9eda7e3
change the API to support only one job/calculation
matley Nov 8, 2013
36d6d68
gzip csv dumps by default
matley Nov 8, 2013
95f54de
added comment explaining why I am temporarly altering the tables
matley Nov 8, 2013
69f228d
Merge branch 'master' of https://github.com/gem/oq-engine into dump-i…
matley Nov 8, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions openquake/engine/bin/oqscript.py
Expand Up @@ -98,6 +98,7 @@
from openquake.engine.input import source
from openquake.engine.tools.import_gmf_scenario import import_gmf_scenario
from openquake.engine.tools.import_hazard_curves import import_hazard_curves
from openquake.engine.tools import dump_hazards, restore_hazards

HAZARD_OUTPUT_ARG = "--hazard-output-id"
HAZARD_CALCULATION_ARG = "--hazard-calculation-id"
Expand Down Expand Up @@ -237,6 +238,17 @@ def set_up_arg_parser():
'desired output format. Defaults to "xml".')
)

dump_restore_grp = parser.add_argument_group('Dump/Restore')
dump_restore_grp.add_argument(
'--dump-hazard-calculation',
help=('Dump a hazard calculation to a new created directory.'),
nargs=2, metavar=('HAZARD_CALCULATION_ID', 'DUMP_DIR'))
dump_restore_grp.add_argument(
'--restore-hazard-calculation',
help=("Restore a hazard calculation from a dump. "
"Only SES outputs currently supported"),
metavar=('DUMP_DIR'))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After yesterday discussion, I think we should change the names to --save-hazard-calculation and --load-hazard-calculation. Paul and Matteo like those names better. Also restore_hazards.py and dump_hazards.py could be renamed.

import_grp = parser.add_argument_group('Import')
import_grp.add_argument(
'--load-gmf',
Expand Down Expand Up @@ -519,6 +531,12 @@ def main():
list_imported_outputs()
elif args.delete_uncompleted_calculations:
delete_uncompleted_calculations()
elif args.dump_hazard_calculation:
dump_hazards.main(*args.dump_hazard_calculation)
elif args.restore_hazard_calculation:
hc_ids = restore_hazards.django_restore(
args.restore_hazard_calculation)
print "Restore hazard calculation with IDs: %s" % hc_ids
else:
arg_parser.print_usage()

Expand Down
2 changes: 1 addition & 1 deletion openquake/engine/db/schema/load.sql
Expand Up @@ -17,5 +17,5 @@


-- If a new database is being built, explicitly set the oq-engine DB schema version:
INSERT INTO admin.revision_info(artefact, revision, step) VALUES('oq-engine', '1.0.1', 7);
INSERT INTO admin.revision_info(artefact, revision, step) VALUES('oq-engine', '1.0.1', 8);

4 changes: 0 additions & 4 deletions openquake/engine/db/schema/openquake.sql
Expand Up @@ -177,10 +177,6 @@ CREATE TABLE uiapi.job_phase_stats (


CREATE TABLE uiapi.hazard_calculation (
-- TODO(larsbutler): At the moment, this model only contains Classical
-- hazard parameters.
-- We'll need to update fields and constraints as we add the other
-- calculation modes.
id SERIAL PRIMARY KEY,
-- Contains the absolute path to the directory containing the job config
-- file
Expand Down
8 changes: 8 additions & 0 deletions openquake/engine/db/schema/security.sql
Expand Up @@ -60,6 +60,14 @@ GRANT INSERT,UPDATE,DELETE ON ALL TABLES IN SCHEMA riski TO oq_admin;
GRANT INSERT,UPDATE,DELETE ON ALL TABLES IN SCHEMA riskr TO oq_admin;
GRANT INSERT,UPDATE,DELETE ON ALL TABLES IN SCHEMA uiapi TO oq_admin;

GRANT ALL ON SCHEMA admin TO oq_admin;
GRANT ALL ON SCHEMA htemp TO oq_admin;
GRANT ALL ON SCHEMA hzrdi TO oq_admin;
GRANT ALL ON SCHEMA hzrdr TO oq_admin;
GRANT ALL ON SCHEMA riski TO oq_admin;
GRANT ALL ON SCHEMA riskr TO oq_admin;
GRANT ALL ON SCHEMA uiapi TO oq_admin;

----------------------------------------------
-- Specific permissions for individual tables:
----------------------------------------------
Expand Down
@@ -0,0 +1,7 @@
GRANT ALL ON SCHEMA admin TO oq_admin;
GRANT ALL ON SCHEMA htemp TO oq_admin;
GRANT ALL ON SCHEMA hzrdi TO oq_admin;
GRANT ALL ON SCHEMA hzrdr TO oq_admin;
GRANT ALL ON SCHEMA riski TO oq_admin;
GRANT ALL ON SCHEMA riskr TO oq_admin;
GRANT ALL ON SCHEMA uiapi TO oq_admin;
132 changes: 69 additions & 63 deletions openquake/engine/tools/dump_hazards.py
Expand Up @@ -43,7 +43,6 @@
import os
import shutil
import tarfile
import gzip
import argparse
import psycopg2
import tempfile
Expand All @@ -60,7 +59,7 @@ def _tuplestr(tup):
class Copier(object):
"""
Small wrapper around a psycopg2 cursor, which a .copy method
writing directly to .gz files. It remembers the copied filenames,
writing directly to csv files. It remembers the copied filenames,
which are stored in the attribute .filenames.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason why we are not generating .gz files? This makes a lot of difference in terms of disk space occupation. Notice that this tool is intended to be used even for large outputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This work is experimental. So, I did not want to add too many feature. I prefer at this moment to let the user zip or not zip the result. however, I see where you are coming from. Therefore, I have added a ticket https://bugs.launchpad.net/oq-engine/+bug/1248584

"""
def __init__(self, psycopg2_cursor):
Expand All @@ -79,22 +78,20 @@ def fetchall(self, query, *args):

def copy(self, query, dest, name, mode):
"""
Performs a COPY TO/FROM operation. <Works directly with gzipped files.
Performs a COPY TO/FROM operation. <Works directly with csv files.

:param str query: the COPY query
:param str dest: the destination directory
:param str name: the destination file name (no .gz)
:param str name: the destination file name
:param chr mode: 'w' (for COPY TO) or 'r' (for COPY FROM)
"""
fname = os.path.join(dest, name + '.gz')
fname = os.path.join(dest, name)
log.info('%s\n(-> %s)', query, fname)
TIMESTAMP = 1378800715.0 # some fake timestamp
# here is some trick to avoid storing filename and timestamp info
with open(fname, mode) as fileobj:
with gzip.GzipFile('', fileobj=fileobj, mtime=TIMESTAMP) as z:
self._cursor.copy_expert(query, z)
if fname not in self.filenames:
self.filenames.append(fname)
self._cursor.copy_expert(query, fileobj)
if fname not in self.filenames:
self.filenames.append(fname)


class HazardDumper(object):
Expand All @@ -107,51 +104,39 @@ class HazardDumper(object):
print hd.mktar() # generate a tarfile
"""

def __init__(self, conn, outdir=None, format='text'):
def __init__(self, conn, outdir=None):
self.conn = conn
self.curs = Copier(conn.cursor())
self.format = format
# there is no binary format for geography in postgis 1.5,
# this is why we are requiring text format
assert format == 'text', format
if outdir:
if os.path.exists(outdir):
# cleanup previously dumped archives, if any
for fname in os.listdir(outdir):
if fname.endswith('.gz'):
os.remove(os.path.join(outdir, fname))
else:
os.mkdir(outdir)
outdir = outdir or "/tmp/hc-dump"
if os.path.exists(outdir):
# cleanup previously dumped archives, if any
for fname in os.listdir(outdir):
if fname.endswith('.csv'):
os.remove(os.path.join(outdir, fname))
else:
outdir = tempfile.mkdtemp(prefix='hazard_calculation-')
os.mkdir(outdir)
self.outdir = outdir

def hazard_calculation(self, ids):
"""Dump hazard_calculation, lt_realization, hazard_site"""
self.curs.copy(
"""copy (select * from uiapi.hazard_calculation where id in %s)
to stdout with (format '%s')""" % (ids, self.format),
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ids,
self.outdir, 'uiapi.hazard_calculation.csv', 'w')
self.curs.copy(
"""copy (select * from hzrdr.lt_realization
where hazard_calculation_id in %s)
to stdout with (format '%s')""" % (ids, self.format),
where hazard_calculation_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ids,
self.outdir, 'hzrdr.lt_realization.csv', 'w')
self.curs.copy(
"""copy (select * from hzrdi.hazard_site
where hazard_calculation_id in %s)
to stdout with (format '%s')""" % (ids, self.format),
where hazard_calculation_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ids,
self.outdir, 'hzrdi.hazard_site.csv', 'w')

def performance(self, *job_ids):
"""Dump performance"""
ids = _tuplestr(job_ids)
self.oq_job(ids)
self.curs.copy(
"""copy (select * from uiapi.performance where oq_job_id in %s)
to stdout with (format '%s')""" % (ids, self.format),
self.outdir, 'uiapi.performance.csv', 'w')

def oq_job(self, ids):
"""Dump hazard_calculation, oq_job"""
hc_ids = self.curs.tuplestr(
Expand All @@ -163,71 +148,81 @@ def oq_job(self, ids):
self.hazard_calculation(hc_ids)
self.curs.copy(
"""copy (select * from uiapi.oq_job where id in %s)
to stdout with (format '%s')""" % (ids, self.format),
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ids,
self.outdir, 'uiapi.oq_job.csv', 'w')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of duplication, probably a template for the "copy" SQL query is a good idea.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While dumping the performance table is not related to the load/save functionality, it is very common that we want to save the performance info about one computation before deleting it. This is the reason why the functionality was there.

def output(self, ids):
"""Dump output"""
self.curs.copy(
"""copy (select * from uiapi.output where id in %s)
to stdout with (format '%s')""" % (ids, self.format),
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ids,
self.outdir, 'uiapi.output.csv', 'w')

def hazard_curve(self, output):
"""Dump hazard_curve, hazard_curve_data"""
self.curs.copy(
"""copy (select * from hzrdr.hazard_curve where output_id in %s)
to stdout with (format '%s')""" % (output, self.format),
to stdout
with (format 'csv', header true, encoding 'utf8')""" % output,
self.outdir, 'hzrdr.hazard_curve.csv', 'a')

ids = self.curs.tuplestr(
'select id from hzrdr.hazard_curve where output_id in %s' % output)

self.curs.copy(
"""copy (select * from hzrdr.hazard_curve_data
where hazard_curve_id in {})
to stdout with (format '{}')""".format(ids, self.format),
where hazard_curve_id in {})
to stdout
with (format 'csv', header true, encoding 'utf8')""".format(
ids),
self.outdir, 'hzrdr.hazard_curve_data.csv', 'a')

def gmf(self, output):
"""Dump gmf, gmf_data"""
self.curs.copy(
"""copy (select * from hzrdr.gmf
where output_id in %s)
to stdout with (format '%s')""" % (output, self.format),
where output_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % output,
self.outdir, 'hzrdr.gmf.csv', 'a')

coll_ids = self.curs.tuplestr('select id from hzrdr.gmf '
'where output_id in %s' % output)
self.curs.copy(
"""copy (select * from hzrdr.gmf_data
where gmf_id in %s)
to stdout with (format '%s')""" % (coll_ids, self.format),
where gmf_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % coll_ids,
self.outdir, 'hzrdr.gmf_data.csv', 'a')

def ses(self, output):
"""Dump ses_collection, ses, ses_rupture"""
self.curs.copy(
"""copy (select * from hzrdr.ses_collection
where output_id in %s)
to stdout with (format '%s')""" % (output, self.format),
where output_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % output,
self.outdir, 'hzrdr.ses_collection.csv', 'a')

coll_ids = self.curs.tuplestr('select id from hzrdr.ses_collection '
'where output_id in %s' % output)
self.curs.copy(
"""copy (select * from hzrdr.ses
where ses_collection_id in %s)
to stdout with (format '%s')""" % (coll_ids, self.format),
where ses_collection_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % coll_ids,
self.outdir, 'hzrdr.ses.csv', 'a')

ses_ids = self.curs.tuplestr(
'select id from hzrdr.ses where ses_collection_id in %s'
% coll_ids)
self.curs.copy(
"""copy (select * from hzrdr.ses_rupture
where ses_id in %s)
to stdout with (format '%s')""" % (ses_ids, self.format),
where ses_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ses_ids,
self.outdir, 'hzrdr.ses_rupture.csv', 'a')

def dump(self, *hazard_calculation_ids):
Expand Down Expand Up @@ -268,7 +263,7 @@ def dump(self, *hazard_calculation_ids):
self.output(_tuplestr(all_outs))
for output_type, output_ids in outputs:
ids = _tuplestr(output_ids)
print "Dumping %s %s" % (output_type, ids)
print "Dumping %s %s in %s" % (output_type, ids, self.outdir)
if output_type in ['hazard_curve', 'hazard_curve_multi']:
self.hazard_curve(ids)
elif output_type in ('gmf', 'gmf_scenario'):
Expand Down Expand Up @@ -298,30 +293,41 @@ def mktar(self):


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the functionality is intended to be called from bin/openquake only we should remove the main.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And perhaps also adding a comment/docstring saying that the dump/restore library is used in bin/openquake.

def main(hazard_calculation_id, outdir=None,
host='localhost', dbname='openquake',
user='admin', password='', port=None):
host=None, dbname=None, user=None, password=None, port=None):
"""
Dump a hazard_calculation and its relative outputs
"""
from openquake.engine.db.models import set_django_settings_module
set_django_settings_module()
from django.conf import settings
default_cfg = settings.DATABASES['default']
host = host or default_cfg.get('HOST', 'localhost')
dbname = dbname or default_cfg.get('NAME', 'openquake')
user = default_cfg.get('USER', 'oq_admin')
password = default_cfg.get('PASSWORD', 'openquake')
port = port or str(default_cfg.get('PORT', 5432))
# this is not using the predefined Django connections since
# the typical use case is to dump from a remote database
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.WARN)
conn = psycopg2.connect(
host=host, database=dbname, user=user, password=password, port=port)
hc = HazardDumper(conn, outdir)
hc.dump(hazard_calculation_id)
log.info('Written %s' % hc.outdir)
conn.close()
return hc.outdir


if __name__ == '__main__':
p = argparse.ArgumentParser()

p.add_argument('hazard_calculation_id')
p.add_argument('outdir')
p.add_argument('host', nargs='?', default='localhost')
p.add_argument('dbname', nargs='?', default='openquake')
p.add_argument('user', nargs='?', default='oq_admin')
p.add_argument('password', nargs='?', default='openquake')
p.add_argument('port', nargs='?', default='5432')
p.add_argument('outdir', nargs='?')
p.add_argument('host', nargs='?')
p.add_argument('dbname', nargs='?')
p.add_argument('user', nargs='?')
p.add_argument('password', nargs='?')
p.add_argument('port', nargs='?')
arg = p.parse_args()
main(arg.hazard_calculation_id, arg.outdir, arg.host,
arg.dbname, arg.user, arg.password, arg.port)