Skip to content

Commit

Permalink
Merge pull request #142 from pagreene/get-ro-dump
Browse files Browse the repository at this point in the history
Get the latest dump file
  • Loading branch information
pagreene committed Oct 22, 2020
2 parents 69877bd + 1013423 commit fbe4327
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 62 deletions.
52 changes: 0 additions & 52 deletions indra_db/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -1238,58 +1238,6 @@ def _clear(self, tbl_list=None, force=False):
else:
return False

@staticmethod
def get_latest_dump_file():
import boto3
from indra.util.aws import iter_s3_keys
from indra_db.config import get_s3_dump

s3 = boto3.client('s3')
s3_path = get_s3_dump()

logger.debug("Looking for the latest dump file on s3 to %s." % s3_path)

# Get the most recent file from s3.
max_date_str = None
max_lm_date = None
latest_key = None
for key, lm_date in iter_s3_keys(s3, with_dt=True, **s3_path.kw()):

# Get the date string from the name, ignoring non-standard files.
suffix = key.split('/')[-1]
m = re.match('readonly-(\S+).dump', suffix)
if m is None:
logger.debug("{key} is not a standard key, will not be "
"considered.".format(key=key))
continue
date_str, = m.groups()

# Compare the the current maxes. If the date_str and the last
# -modified date don't agree, raise an error.
if not max_lm_date \
or date_str > max_date_str and lm_date > max_lm_date:
max_date_str = date_str
max_lm_date = lm_date
latest_key = key
elif max_lm_date \
and (date_str > max_date_str or lm_date > max_lm_date):
raise S3DumpTimeAmbiguityError(key, date_str > max_date_str,
lm_date > max_lm_date)
logger.debug("Latest dump file from %s was found to be %s."
% (s3_path, latest_key))

return S3Path(s3_path.bucket, latest_key)


class S3DumpTimeAmbiguityError(Exception):
def __init__(self, key, is_latest_str, is_last_modified):
msg = ('%s is ' % key) + ('' if is_latest_str else 'not ') \
+ 'the largest date string but is ' \
+ ('' if is_last_modified else 'not ')\
+ 'the latest time stamp.'
super().__init__(msg)
return


class ReadonlyDatabaseManager(DatabaseManager):
"""This class represents the readonly database."""
Expand Down
72 changes: 62 additions & 10 deletions indra_db/managers/dump_manager.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import re
import json

import boto3
import pickle
import logging
from datetime import datetime
from argparse import ArgumentParser

from indra.statements import get_all_descendants
from indra.statements.io import stmts_from_json
from indra.util.aws import iter_s3_keys
from indra_db.belief import get_belief
from indra_db.config import CONFIG
from indra_db.config import get_s3_dump
from indra_db.config import CONFIG, get_s3_dump
from indra_db.util import get_db, get_ro, S3Path
from indra_db.util.aws import get_role_kwargs
from indra_db.util.dump_sif import dump_sif, get_source_counts
Expand All @@ -22,15 +25,55 @@
aws_lambda_function = CONFIG['lambda']['function']


dump_names = ['sif', 'belief']

def list_dumps(started=None, ended=None):
"""List all dumps, optionally filtered by their status.
def list_dumps():
Parameters
----------
started : Optional[bool]
If True, find dumps that have started. If False, find dumps that have
NOT been started. If None, do not filter by start status.
ended : Optional[bool]
The same as `started`, but checking whether the dump is ended or not.
"""
# Get all the dump "directories".
s3_base = get_s3_dump()
s3 = boto3.client('s3')
res = s3.list_objects_v2(Delimiter='/', **s3_base.kw(prefix=True))
return [S3Path.from_key_parts(s3_base.bucket, d['Prefix'])
for d in res['CommonPrefixes']]
dumps = [S3Path.from_key_parts(s3_base.bucket, d['Prefix'])
for d in res['CommonPrefixes']]

# Filter to those that have "started"
if started is not None:
dumps = [p for p in dumps
if p.get_element_path(Start.file_name()).exists(s3) == started]

# Filter to those that have "ended"
if ended is not None:
dumps = [p for p in dumps
if p.get_element_path(End.file_name()).exists(s3) == ended]

return dumps


def get_latest_dump_s3_path(dumper_name):
"""Get the latest version of a dump file by the given name.
`dumper_name` is indexed using the standardized `name` class attribute of
the dumper object.
"""
# Get all the dumps that were properly started.
s3 = boto3.client('s3')
all_dumps = list_dumps(started=True)

# Going in reverse order (implicitly by timestamp) and look for the file.
for s3_path in sorted(all_dumps, reverse=True):
sought_path = s3_path.get_element_path(dumpers[dumper_name].file_name())
if sought_path.exists(s3):
return sought_path

# If none is found, return None.
return None


class Dumper(object):
Expand All @@ -50,10 +93,13 @@ def get_s3_path(self):
self.s3_dump_path = self._gen_s3_name()
return self.s3_dump_path

@classmethod
def file_name(cls):
return '%s.%s' % (cls.name, cls.fmt)

def _gen_s3_name(self):
s3_base = get_s3_dump()
s3_path = s3_base.get_element_path(self.date_stamp,
'%s.%s' % (self.name, self.fmt))
s3_path = s3_base.get_element_path(self.date_stamp, self.file_name())
return s3_path

@classmethod
Expand All @@ -77,6 +123,10 @@ def from_list(cls, s3_path_list):
def dump(self, continuing=False):
raise NotImplementedError()

def shallow_mock_dump(self, *args, **kwargs):
s3 = boto3.client('s3')
self.get_s3_path().upload(s3, b'')


class Start(Dumper):
name = 'start'
Expand Down Expand Up @@ -316,6 +366,9 @@ def __exit__(self, exc_type, value, traceback):
"to Readonly." % exc_type)


dumpers = {dumper.name: dumper for dumper in get_all_descendants(Dumper)}


def parse_args():
parser = ArgumentParser(
description='Manage the materialized views.'
Expand Down Expand Up @@ -442,4 +495,3 @@ def main():

if __name__ == '__main__':
main()

85 changes: 85 additions & 0 deletions indra_db/tests/test_dump_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import boto3
import moto

from indra_db.config import get_s3_dump
from indra_db.managers import dump_manager as dm
from indra_db.util import S3Path


def _build_s3_test_dump(structure):
"""Build an s3 dump for testing.
The input is a structure of the following form:
```
structure = {
'2020-01-01': [
'start',
'readonly',
'sif',
],
'2020-02-01': [
'start',
'readonly',
'sif',
'belief',
'end'
]
}
```
where the names given are the canonical names of dumpers (see class
definitions or `dumpers` global for details).
"""
s3 = boto3.client('s3')
dump_head = get_s3_dump()
s3.create_bucket(Bucket=dump_head.bucket)
for date_stamp, contents in structure.items():
for dump_name in contents:
dumper_class = dm.dumpers[dump_name]
dumper_class(date_stamp=date_stamp).shallow_mock_dump()


@moto.mock_s3
def test_list_dumps():
"""Test the dump listing feature."""
_build_s3_test_dump({
'2020-01-01': ['start'], # a dump that is unfinished.
'2020-02-01': ['start', 'end'], # a dump the finished.
'2020-03-01': ['sif'] # something strange but possible.
})

dump_head = get_s3_dump()

def check_list(dumps, expected_timestamps):
assert all(isinstance(s3p, S3Path) for s3p in dumps)
assert all(s3p.key.startswith(dump_head.key) for s3p in dumps)
time_stamps = [s3p.key.split('/')[-2] for s3p in dumps]
assert expected_timestamps == time_stamps,\
f"Expected: {expected_timestamps}, Got: {time_stamps}"

all_dumps = dm.list_dumps()
check_list(all_dumps, ['2020-01-01', '2020-02-01', '2020-03-01'])

started_dumps = dm.list_dumps(started=True)
check_list(started_dumps, ['2020-01-01', '2020-02-01'])

done_dumps = dm.list_dumps(started=True, ended=True)
check_list(done_dumps, ['2020-02-01'])

unfinished_dumps = dm.list_dumps(started=True, ended=False)
check_list(unfinished_dumps, ['2020-01-01'])


@moto.mock_s3
def test_get_latest():
"""Test the function used to get the latest version of a dump file."""
_build_s3_test_dump({
'2019-12-01': ['start', 'readonly', 'sif', 'end'],
'2020-01-01': ['start', 'readonly'],
'2020-02-01': ['start', 'sif', 'end']
})

ro_dump = dm.get_latest_dump_s3_path('readonly')
assert '2020-01-01' in ro_dump.key, ro_dump.key

sif_dump = dm.get_latest_dump_s3_path('sif')
assert '2020-02-01' in sif_dump.key, sif_dump.key

0 comments on commit fbe4327

Please sign in to comment.