Skip to content

Commit

Permalink
Merge pull request #157 from kkaris/sif-tweak
Browse files Browse the repository at this point in the history
Sif tweaks
  • Loading branch information
johnbachman committed Jan 19, 2021
2 parents e35b010 + 835af2b commit 64cfbb3
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 55 deletions.
9 changes: 5 additions & 4 deletions indra_db/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,16 @@ def grab_session(self):
if self.session is None or not self.session.is_active:
logger.debug('Attempting to get session...')
DBSession = sessionmaker(bind=self.__engine,
autoflush=self.__protected,
autocommit=self.__protected)
autoflush=not self.__protected,
autocommit=not self.__protected)
logger.debug('Got session.')
self.session = DBSession()
if self.session is None:
raise IndraDbException("Failed to grab session.")
if self.__protected:
self.session.flush = \
lambda *a, **k: logger.error("Write not allowed!")
def no_flush(*a, **k):
logger.error("Write not allowed!")
self.session.flush = no_flush

def get_tables(self):
"""Get a list of available tables."""
Expand Down
112 changes: 92 additions & 20 deletions indra_db/managers/dump_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from indra_db.config import CONFIG, get_s3_dump, record_in_test
from indra_db.util import get_db, get_ro, S3Path
from indra_db.util.aws import get_role_kwargs
from indra_db.util.dump_sif import dump_sif, get_source_counts
from indra_db.util.dump_sif import dump_sif, get_source_counts, load_res_pos


logger = logging.getLogger(__name__)
Expand All @@ -33,6 +33,18 @@ def list_dumps(started=None, ended=None):
NOT been started. If None, do not filter by start status.
ended : Optional[bool]
The same as `started`, but checking whether the dump is ended or not.
Returns
-------
list of S3Path objects
Each S3Path object contains the bucket and key prefix information for
a set of dump files, e.g.
[S3Path(bigmech, indra-db/dumps/2020-07-16/),
S3Path(bigmech, indra-db/dumps/2020-08-28/),
S3Path(bigmech, indra-db/dumps/2020-09-18/),
S3Path(bigmech, indra-db/dumps/2020-11-12/),
S3Path(bigmech, indra-db/dumps/2020-11-13/)]
"""
# Get all the dump "directories".
s3_base = get_s3_dump()
Expand All @@ -59,8 +71,20 @@ def list_dumps(started=None, ended=None):
def get_latest_dump_s3_path(dumper_name):
"""Get the latest version of a dump file by the given name.
`dumper_name` is indexed using the standardized `name` class attribute of
the dumper object.
Searches dumps that have already been *started* and gets the full S3
file path for the latest version of the dump of that type (e.g. "sif",
"belief", "source_count", etc.)
Parameters
----------
dumper_name : str
The standardized name for the dumper classes defined in this module,
defined in the `name` class attribute of the dumper object.
E.g., the standard dumper name "sif" can be obtained from ``Sif.name``.
Returns
-------
Union[S3Path, None]
"""
# Get all the dumps that were properly started.
s3 = boto3.client('s3')
Expand Down Expand Up @@ -245,6 +269,7 @@ def dump(self, continuing=False):


class Sif(Dumper):
"""Dumps a pandas dataframe of preassembled statements"""
name = 'sif'
fmt = 'pkl'
db_required = True
Expand All @@ -253,16 +278,20 @@ class Sif(Dumper):
def __init__(self, use_principal=False, **kwargs):
super(Sif, self).__init__(use_principal=use_principal, **kwargs)

def dump(self, continuing=False, include_src_counts=True):
def dump(self, src_counts_path, res_pos_path, belief_path,
continuing=False):
s3_path = self.get_s3_path()
if include_src_counts:
srcc = SourceCount(ro=self.db)
dump_sif(s3_path, src_count_file=srcc.get_s3_path(), ro=self.db)
else:
dump_sif(s3_path, ro=self.db)
dump_sif(df_file=s3_path,
src_count_file=src_counts_path,
res_pos_file=res_pos_path,
belief_file=belief_path,
reload=True,
reconvert=True,
ro=self.db)


class Belief(Dumper):
"""Dump a dict of belief scores keyed by hash"""
name = 'belief'
fmt = 'json'
db_required = True
Expand All @@ -275,21 +304,43 @@ def dump(self, continuing=False):


class SourceCount(Dumper):
"""Dumps a dict of dicts with source counts per source api per statement"""
name = 'source_count'
fmt = 'pkl'
db_required = True
db_options = ['principal', 'readonly']

def __init__(self, use_principal=True, **kwargs):
super(SourceCount, self).__init__(use_principal=use_principal, **kwargs)
super(SourceCount, self).__init__(use_principal=use_principal,
**kwargs)

def dump(self, continuing=False):
get_source_counts(self.get_s3_path(), self.db)


class ResiduePosition(Dumper):
"""Dumps a dict of dicts with residue/position data from Modifications"""
name = 'res_pos'
fmt = 'pkl'
db_required = True
db_options = ['readonly', 'principal']

def __init__(self, use_principal=True, **kwargs):
super(ResiduePosition, self).__init__(use_principal=use_principal,
**kwargs)

def dump(self, continuing=False):
res_pos_dict = load_res_pos(ro=self.db)
s3 = boto3.client('s3')
logger.info(f'Uploading residue position dump to '
f'{self.get_s3_path().to_string()}')
self.get_s3_path().upload(s3=s3, body=pickle.dumps(res_pos_dict))


class FullPaJson(Dumper):
"""Dumps all statements found in FastRawPaLink as jsonl"""
name = 'full_pa_json'
fmt = 'json'
fmt = 'jsonl'
db_required = True
db_options = ['principal', 'readonly']

Expand All @@ -298,12 +349,13 @@ def __init__(self, use_principal=False, **kwargs):

def dump(self, continuing=False):
query_res = self.db.session.query(self.db.FastRawPaLink.pa_json.distinct())
json_list = [json.loads(js[0]) for js in query_res.all()]
jsonl_str = '\n'.join([js.decode() for js, in query_res.all()])
s3 = boto3.client('s3')
self.get_s3_path().upload(s3, json.dumps(json_list).encode('utf-8'))
self.get_s3_path().upload(s3, jsonl_str.encode('utf-8'))


class FullPaStmts(Dumper):
"""Dumps all statements found in FastRawPaLink as a pickle"""
name = 'full_pa_stmts'
fmt = 'pkl'
db_required = True
Expand Down Expand Up @@ -431,6 +483,24 @@ def dump(principal_db, readonly_db, delete_existing=False, allow_continue=True,
else:
logger.info("Belief dump exists, skipping.")

res_pos_dump = ResiduePosition.from_list(starter.manifest)
if not allow_continue or not res_pos_dump:
logger.info("Dumping residue and position")
res_pos_dumper = ResiduePosition(db=readonly_db,
date_stamp=starter.date_stamp)
res_pos_dumper.dump(continuing=allow_continue)
res_pos_dump = res_pos_dumper.get_s3_path()
else:
logger.info("Residue position dump exists, skipping")

src_count_dump = SourceCount.from_list(starter.manifest)
if not allow_continue or not src_count_dump:
logger.info("Dumping source count")
src_count_dumper = SourceCount(db=readonly_db,
date_stamp=starter.date_stamp)
src_count_dumper.dump(continuing=allow_continue)
src_count_dump = src_count_dumper.get_s3_path()

dump_file = Readonly.from_list(starter.manifest)
if not allow_continue or not dump_file:
logger.info("Generating readonly schema (est. a long time)")
Expand All @@ -445,15 +515,16 @@ def dump(principal_db, readonly_db, delete_existing=False, allow_continue=True,
if not allow_continue or not Sif.from_list(starter.manifest):
logger.info("Dumping sif from the readonly schema on principal.")
Sif(db=principal_db, date_stamp=starter.date_stamp)\
.dump(continuing=allow_continue)
.dump(src_counts_path=src_count_dump,
res_pos_path=res_pos_dump,
belief_path=belief_dump,
continuing=allow_continue)
else:
logger.info("Sif dump exists, skipping.")

if not allow_continue \
or not FullPaStmts.from_list(starter.manifest):
logger.info("Dumping all PA Statements as a pickle.")
FullPaStmts(db=principal_db,
date_stamp=starter.date_stamp)\
if not allow_continue or not FullPaJson.from_list(starter.manifest):
logger.info("Dumping all PA Statements as jsonl.")
FullPaJson(db=principal_db, date_stamp=starter.date_stamp)\
.dump(continuing=allow_continue)
else:
logger.info("Statement dump exists, skipping.")
Expand Down Expand Up @@ -534,7 +605,8 @@ def parse_args():


if __name__ == '__main__':
# Collect args and run the top-level dump script
args = parse_args()
dump(get_db(args.database, protected=False),
get_ro(args.readonly, protected=False), args.delet_existing,
get_ro(args.readonly, protected=False), args.delete_existing,
args.allow_continue, args.load_only, args.dump_only)
2 changes: 2 additions & 0 deletions indra_db/schemas/readonly_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ class TextMeta(Base, NamespaceLookup):
activity = Column(String)
is_active = Column(Boolean)
agent_count = Column(Integer)
is_complex_dup = Column(Boolean)
ro_tables[TextMeta.__tablename__] = TextMeta

class NameMeta(Base, NamespaceLookup):
Expand All @@ -657,6 +658,7 @@ class NameMeta(Base, NamespaceLookup):
activity = Column(String)
is_active = Column(Boolean)
agent_count = Column(Integer)
is_complex_dup = Column(Boolean)
ro_tables[NameMeta.__tablename__] = NameMeta

class OtherMeta(Base, ReadonlyTable):
Expand Down

0 comments on commit 64cfbb3

Please sign in to comment.