Skip to content

Commit

Permalink
Merge pull request #115 from pagreene/mesh-update
Browse files Browse the repository at this point in the history
Update mesh table
  • Loading branch information
pagreene committed Jun 11, 2020
2 parents ca39293 + d002c7c commit 500187a
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 73 deletions.
131 changes: 81 additions & 50 deletions indra_db/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,13 +885,90 @@ def _form_pg_args(self):
'-w', # Don't prompt for a password, forces use of env.
'-d', self.url.database]

def pg_dump(self, dump_file, **options):
"""Use the pg_dump command to dump part of the database onto s3.
The `pg_dump` tool must be installed, and must be a compatible version
with the database(s) being used.
All keyword arguments are converted into flags/arguments of pg_dump. For
documentation run `pg_dump --help`. This will also confirm you have
`pg_dump` installed.
By default, the "General" and "Connection" options are already set. The
most likely specification you will want to use is `--table` or
`--schema`, specifying either a particular table or schema to dump.
Parameters
----------
dump_file : S3Path or str
The location on s3 where the content should be dumped.
"""
if isinstance(dump_file, str):
dump_file = S3Path.from_string(dump_file)
elif dump_file is not None and not isinstance(dump_file, S3Path):
raise ValueError("Argument `dump_file` must be appropriately "
"formatted string or S3Path object, not %s."
% type(dump_file))

from subprocess import check_call
from os import environ

# Make sure the session is fresh and any previous session are done.
self.session.close()
self.grab_session()

# Add the password to the env
my_env = environ.copy()
my_env['PGPASSWORD'] = self.url.password

# Dump the database onto s3, piping through this machine (errors if
# anything went wrong).
option_list = [f'--{opt}' if isinstance(val, bool) and val
else f'--{opt}={val}' for opt, val in options.items()]
cmd = ' '.join(["pg_dump", *self._form_pg_args(), *option_list, '-Fc',
'|', 'aws', 's3', 'cp', '-', dump_file.to_string()])
check_call(cmd, shell=True, env=my_env)
return dump_file

def vacuum(self, analyze=True):
conn = self.engine.raw_connection()
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
cursor.execute('VACUUM' + (' ANALYZE;' if analyze else ''))
return

def pg_restore(self, dump_file, **options):
"""Load content into the database from a dump file on s3."""
if isinstance(dump_file, str):
dump_file = S3Path.from_string(dump_file)
elif dump_file is not None and not isinstance(dump_file, S3Path):
raise ValueError("Argument `dump_file` must be appropriately "
"formatted string or S3Path object, not %s."
% type(dump_file))

from subprocess import run
from os import environ

self.session.close()
self.grab_session()

# Add the password to the env
my_env = environ.copy()
my_env['PGPASSWORD'] = self.url.password

# Pipe the database dump from s3 through this machine into the database
logger.info("Dumping into the database.")
option_list = [f'--{opt}' if isinstance(val, bool) and val
else f'--{opt}={val}' for opt, val in options.items()]
run(' '.join(['aws', 's3', 'cp', dump_file.to_string(), '-', '|',
'pg_restore', *self._form_pg_args(), *option_list,
'--no-owner']),
env=my_env, shell=True, check=True)
self.session.close()
self.grab_session()
return dump_file


class PrincipalDatabaseManager(DatabaseManager):
"""This class represents the methods special to the principal database."""
Expand Down Expand Up @@ -966,39 +1043,14 @@ def iter_names():

def dump_readonly(self, dump_file=None):
"""Dump the readonly schema to s3."""
if isinstance(dump_file, str):
dump_file = S3Path.from_string(dump_file)
elif dump_file is not None and not isinstance(dump_file, S3Path):
raise ValueError("Argument `dump_file` must be appropriately "
"formatted string or S3Path object, not %s."
% type(dump_file))

from subprocess import check_call
from indra_db.config import get_s3_dump
from os import environ

# Make sure the session is fresh and any previous session are done.
self.session.close()
self.grab_session()

# Add the password to the env
my_env = environ.copy()
my_env['PGPASSWORD'] = self.url.password

# Form the name of the s3 file, if not given.
if dump_file is None:
from indra_db.config import get_s3_dump
now_str = datetime.utcnow().strftime('%Y-%m-%d-%H-%M-%S')
dump_loc = get_s3_dump()
dump_file = dump_loc.get_element_path('readonly-%s.dump' % now_str)

# Dump the database onto s3, piping through this machine (errors if
# anything went wrong).
cmd = ' '.join(["pg_dump", *self._form_pg_args(),
'-n', 'readonly', '-Fc',
'|', 'aws', 's3', 'cp', '-', dump_file.to_string()])
check_call(cmd, shell=True, env=my_env)

return dump_file
return self.pg_dump(dump_file, schema='readonly')

@staticmethod
def get_latest_dump_file():
Expand Down Expand Up @@ -1079,22 +1131,6 @@ def __getattribute__(self, item):

def load_dump(self, dump_file, force_clear=True):
"""Load from a dump of the readonly schema on s3."""
if isinstance(dump_file, str):
dump_file = S3Path.from_string(dump_file)
elif dump_file is not None and not isinstance(dump_file, S3Path):
raise ValueError("Argument `dump_file` must be appropriately "
"formatted string or S3Path object, not %s."
% type(dump_file))

from subprocess import run
from os import environ

self.session.close()
self.grab_session()

# Add the password to the env
my_env = environ.copy()
my_env['PGPASSWORD'] = self.url.password

# Make sure the database is clear.
if 'readonly' in self.get_schemas():
Expand All @@ -1105,13 +1141,8 @@ def load_dump(self, dump_file, force_clear=True):
raise IndraDbException("Tables already exist and force_clear "
"is False.")

# Pipe the database dump from s3 through this machine into the database
logger.info("Dumping into the database.")
run(' '.join(['aws', 's3', 'cp', dump_file.to_string(), '-', '|',
'pg_restore', *self._form_pg_args(), '--no-owner']),
env=my_env, shell=True, check=True)
self.session.close()
self.grab_session()
# Do the restore
self.pg_restore(dump_file)

# Run Vacuuming
logger.info("Running vacuuming.")
Expand Down
9 changes: 4 additions & 5 deletions indra_db/managers/content_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,22 +827,21 @@ def dump_annotations(self, db):
for pmid, annotation_list in self.annotations.items():
for annotation in annotation_list:
# Format the row.
copy_row = (pmid, annotation['mesh'], annotation['text'],
copy_row = (int(pmid), int(annotation['mesh'][1:]),
annotation['major_topic'])

# Handle the qualifier
qual = annotation['qualifier']
if qual is None:
copy_row += (None, None)
copy_row += (None,)
else:
copy_row += (qual['mesh'], qual['text'])
copy_row += (int(qual['mesh'][1:]),)

copy_rows.append(copy_row)

# Copy the results into the database
self.copy_into_db(db, 'mesh_ref_annotations', copy_rows,
('pmid', 'mesh_id', 'mesh_text', 'major_topic',
'qual_id', 'qual_text'))
('pmid_num', 'mesh_num', 'major_topic', 'qual_num'))
return True

def load_files_and_annotations(self, db, *args, **kwargs):
Expand Down
24 changes: 11 additions & 13 deletions indra_db/schemas/principal_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,21 +209,19 @@ def get_ref_dict(self):

class MeshRefAnnotations(Base, IndraDBTable):
__tablename__ = 'mesh_ref_annotations'
_always_disp = ['mesh_text', 'qual_text', 'text_ref_id']
_indices = [StringIndex('mesh_ref_annotations_pmid_idx', 'pmid'),
StringIndex('mesh_ref_annotations_mesh_id_idx', 'mesh_id'),
StringIndex('mesh_ref_annotations_mtext_idx', 'mesh_text'),
StringIndex('mesh_ref_annotations_qual_id_idx', 'qual_id'),
StringIndex('mesh_ref_annotations_qtext_idx', 'qual_text')]

_always_disp = ['pmid_num', 'mesh_num', 'qual_num']
_indices = [BtreeIndex('mesh_ref_annotations_pmid_idx', 'pmid_num'),
BtreeIndex('mesh_ref_annotations_mesh_id_idx', 'mesh_num'),
BtreeIndex('mesh_ref_annotations_qual_id_idx', 'qual_num')]
id = Column(Integer, primary_key=True)
mesh_id = Column(String, nullable=False)
mesh_text = Column(String, nullable=False)
pmid = Column(String, nullable=False)
pmid_num = Column(Integer, nullable=False)
mesh_num = Column(Integer, nullable=False)
qual_num = Column(Integer)
major_topic = Column(Boolean, default=False)
qual_id = Column(String)
qual_text = Column(String)

__table_args__ = (
UniqueConstraint('pmid_num', 'mesh_num', 'qual_num',
name='mesh-uniqueness'),
)
table_dict[MeshRefAnnotations.__tablename__] = MeshRefAnnotations

class SourceFile(Base, IndraDBTable):
Expand Down
6 changes: 3 additions & 3 deletions indra_db/schemas/readonly_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,10 @@ class RawStmtMesh(Base, ReadonlyTable):
__tablename__ = 'raw_stmt_mesh'
__table_args__ = {'schema': 'readonly'}
__definition__ = ('SELECT DISTINCT raw_statements.id as sid,\n'
' SUBSTRING(mesh_id, 2)::int as mesh_num\n'
' mesh_num\n'
'FROM text_ref\n'
' JOIN mesh_ref_annotations\n'
' ON text_ref.pmid = mesh_ref_annotations.pmid\n'
' JOIN mesh_ref_annotations AS mra\n'
' ON text_ref.pmid_num = mra.pmid_num\n'
' JOIN text_content ON text_ref.id = text_ref_id\n'
' JOIN reading\n'
' ON text_content.id = text_content_id\n'
Expand Down
4 changes: 2 additions & 2 deletions indra_db/tests/test_content_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ def test_full_upload():
num_mra_exp = sum(len(ann) for ann in pm.annotations.values())
assert len(mra_list) == num_mra_exp,\
"Only %s/%s annotations added" % (len(mra_list), num_mra_exp)
assert all([hasattr(mra, 'mesh_id') for mra in mra_list]), \
'All MESH annotations should have a mesh ID.'
assert all([hasattr(mra, 'mesh_num') for mra in mra_list]), \
'All MESH annotations should have a mesh ID Number.'

# Test the pmc oa upload.
PmcOA(ftp_url=get_test_ftp_url(), local=True).populate(db)
Expand Down

0 comments on commit 500187a

Please sign in to comment.