Skip to content

Commit

Permalink
Store pipeline state + switch to argparse
Browse files Browse the repository at this point in the history
The raw data and the analysis results do not constitute the entire state of a
pipeline. In particular, if we store only the raw + analysis results, and then
we try to run the pipeline again, we will end up with two copies of the
analysis results.

Instead, when we transfer data, it should include the raw data, the pipeline
state, and the analysis results.

Change this code to store the pipeline state as well.

And since I am in there changing things anyway, switch to argparse to handle
the arguments as well.

```
$ ./e-mission-py.bash bin/debug/extract_timeline_for_day_range_and_user.py -e test_output_gen_curr_ts -- 2010-01-01 2020-01-01 /tmp/test_dump
storage not configured, falling back to sample, default configuration
Connecting to database URL localhost
INFO:root:==================================================
INFO:root:Extracting timeline for user d4dfcc42-b6fc-4b6b-a246-d1abec1d039f day 2010-01-01 -> 2020-01-01 and saving to file /tmp/test_dump
DEBUG:root:start_day_ts = 1262304000 (2010-01-01T00:00:00+00:00), end_day_ts = 1577836800 (2020-01-01T00:00:00+00:00)
DEBUG:root:curr_query = {'user_id': UUID('d4dfcc42-b6fc-4b6b-a246-d1abec1d039f'), 'data.ts': {'$lte': 1577836800, '$gte': 1262304000}}, sort_key = data.ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None
DEBUG:root:finished querying values for None
DEBUG:root:curr_query = {'user_id': UUID('d4dfcc42-b6fc-4b6b-a246-d1abec1d039f'), 'data.start_ts': {'$lte': 1577836800, '$gte': 1262304000}}, sort_key = data.start_ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None
DEBUG:root:finished querying values for None
DEBUG:root:curr_query = {'user_id': UUID('d4dfcc42-b6fc-4b6b-a246-d1abec1d039f'), 'data.enter_ts': {'$lte': 1577836800, '$gte': 1262304000}}, sort_key = data.enter_ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None
DEBUG:root:finished querying values for None
INFO:root:Found 1449 loc entries, 27 trip-like entries, 19 place-like entries = 1495 total entries
INFO:root:timeline has unique keys = {'stats/server_api_error', 'statemachine/transition', 'analysis/cleaned_stop', 'background/filtered_location', 'segmentation/raw_trip', 'background/location', 'segmentation/raw_stop', 'segmentation/raw_section', 'stats/client_time', 'background/motion_activity', 'analysis/recreated_location', 'segmentation/raw_place', 'analysis/cleaned_trip', 'background/battery', 'analysis/cleaned_section', 'stats/server_api_time', 'analysis/cleaned_place', 'stats/pipeline_time', 'stats/client_nav_event'}
INFO:root:Found 6 pipeline states [6, 1, 2, 3, 11, 9]

$ ls -1 /tmp/test_dump_*
/tmp/test_dump_d4dfcc42-b6fc-4b6b-a246-d1abec1d039f.gz
/tmp/test_dump_pipelinestate_d4dfcc42-b6fc-4b6b-a246-d1abec1d039f.gz
```
  • Loading branch information
shankari committed Jan 11, 2018
1 parent 869a43d commit b38366b
Showing 1 changed file with 47 additions and 17 deletions.
64 changes: 47 additions & 17 deletions bin/debug/extract_timeline_for_day_range_and_user.py
Expand Up @@ -19,6 +19,7 @@
import arrow
import argparse

import emission.core.wrapper.user as ecwu
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.timeseries.timequery as estt
import emission.storage.decorations.user_queries as esdu
Expand Down Expand Up @@ -53,9 +54,25 @@ def export_timeline(user_id, start_day_str, end_day_str, file_name):
if len(combined_list) == 0 or unique_key_list == set(['stats/pipeline_time']):
logging.info("No entries found in range for user %s, skipping save" % user_id)
else:
# Also dump the pipeline state, since that's where we have analysis results upto
# This allows us to copy data to a different *live system*, not just
# duplicate for analysis
combined_filename = "%s_%s.gz" % (file_name, user_id)
json.dump(combined_list,
gzip.open(combined_filename, "wb"), default=bju.default, allow_nan=False, indent=4)
with gzip.open(combined_filename, "wt") as gcfd:
json.dump(combined_list,
gcfd, default=bju.default, allow_nan=False, indent=4)

import emission.core.get_database as edb

pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id}))
logging.info("Found %d pipeline states %s" %
(len(pipeline_state_list),
list([ps["pipeline_stage"] for ps in pipeline_state_list])))

pipeline_filename = "%s_pipelinestate_%s.gz" % (file_name, user_id)
with gzip.open(pipeline_filename, "wt") as gpfd:
json.dump(pipeline_state_list,
gpfd, default=bju.default, allow_nan=False, indent=4)

def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list):
MAX_LIMIT = 25 * 10000
Expand All @@ -70,20 +87,33 @@ def export_timeline_for_users(user_id_list, args):
for curr_uuid in user_id_list:
if curr_uuid != '':
logging.info("=" * 50)
export_timeline(user_id=curr_uuid, start_day_str=sys.argv[2], end_day_str=sys.argv[3], file_name=sys.argv[4])

export_timeline(user_id=curr_uuid, start_day_str=args.start_day,
end_day_str= args.end_day, file_name=args.file_prefix)

if __name__ == '__main__':
if len(sys.argv) != 5:
print("Usage: %s [<user>|'all'|'file_XXX'] <start_day> <end_day> <file_prefix>" % (sys.argv[0]))
else:
user_id_str = sys.argv[1]
if user_id_str == "all":
all_uuids = esdu.get_all_uuids()
export_timeline_for_users(all_uuids, sys.argv)
elif user_id_str.startswith("file_"):
uuid_strs = json.load(open(user_id_str))
uuids = [uuid.UUID(ustr) for ustr in uuid_strs]
export_timeline_for_users(uuids, sys.argv)
else:
export_timeline(user_id=uuid.UUID(sys.argv[1]), start_day_str=sys.argv[2], end_day_str=sys.argv[3], file_name=sys.argv[4])
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser(prog="extract_timeline_for_day_range_and_user")

group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-e", "--user_email", nargs="+")
group.add_argument("-u", "--user_uuid", nargs="+")
group.add_argument("-a", "--all", action="store_true")
group.add_argument("-f", "--file")

parser.add_argument("start_day", help="start day in utc - e.g. 'YYYY-MM-DD'" )
parser.add_argument("end_day", help="start day in utc - e.g. 'YYYY-MM-DD'" )
parser.add_argument("file_prefix", help="prefix for the filenames generated - e.g /tmp/dump_ will generate files /tmp/dump_<uuid1>.gz, /tmp/dump_<uuid2>.gz..." )

args = parser.parse_args()

if args.user_uuid:
uuid_list = [uuid.UUID(uuid_str) for uuid_str in args.user_uuid]
elif args.user_email:
uuid_list = [ecwu.User.fromEmail(uuid_str).uuid for uuid_str in args.user_email]
elif args.all:
uuid_list = esdu.get_all_uuids()
elif args.file:
with open(args.file) as fd:
uuid_strs = json.load(fd)
uuid_list = [uuid.UUID(ustr) for ustr in uuid_strs]
export_timeline_for_users(uuid_list, args)

0 comments on commit b38366b

Please sign in to comment.