From b38366beb8d716b10d4e0a14d64a321150969375 Mon Sep 17 00:00:00 2001 From: "K. Shankari" Date: Wed, 10 Jan 2018 21:55:38 -0800 Subject: [PATCH] Store pipeline state + switch to argparse The raw data and the analysis results do not constitute the entire state of a pipeline. In particular, if we store only the raw + analysis results, and then we try to run the pipeline again, we will end up with two copies of the analysis results. Instead, when we transfer data, it should include the raw data, the pipeline state, and the analysis results. Change this code to store the pipeline state as well. And since I am in there changing things anyway, switch to argparse to handle the arguments as well. ``` $ ./e-mission-py.bash bin/debug/extract_timeline_for_day_range_and_user.py -e test_output_gen_curr_ts -- 2010-01-01 2020-01-01 /tmp/test_dump storage not configured, falling back to sample, default configuration Connecting to database URL localhost INFO:root:================================================== INFO:root:Extracting timeline for user d4dfcc42-b6fc-4b6b-a246-d1abec1d039f day 2010-01-01 -> 2020-01-01 and saving to file /tmp/test_dump DEBUG:root:start_day_ts = 1262304000 (2010-01-01T00:00:00+00:00), end_day_ts = 1577836800 (2020-01-01T00:00:00+00:00) DEBUG:root:curr_query = {'user_id': UUID('d4dfcc42-b6fc-4b6b-a246-d1abec1d039f'), 'data.ts': {'$lte': 1577836800, '$gte': 1262304000}}, sort_key = data.ts DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None DEBUG:root:finished querying values for None DEBUG:root:finished querying values for None DEBUG:root:curr_query = {'user_id': UUID('d4dfcc42-b6fc-4b6b-a246-d1abec1d039f'), 'data.start_ts': {'$lte': 1577836800, '$gte': 1262304000}}, sort_key = data.start_ts DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None DEBUG:root:finished querying values for None DEBUG:root:finished querying values for None DEBUG:root:curr_query = {'user_id': UUID('d4dfcc42-b6fc-4b6b-a246-d1abec1d039f'), 'data.enter_ts': {'$lte': 1577836800, '$gte': 1262304000}}, sort_key = data.enter_ts DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None DEBUG:root:finished querying values for None DEBUG:root:finished querying values for None INFO:root:Found 1449 loc entries, 27 trip-like entries, 19 place-like entries = 1495 total entries INFO:root:timeline has unique keys = {'stats/server_api_error', 'statemachine/transition', 'analysis/cleaned_stop', 'background/filtered_location', 'segmentation/raw_trip', 'background/location', 'segmentation/raw_stop', 'segmentation/raw_section', 'stats/client_time', 'background/motion_activity', 'analysis/recreated_location', 'segmentation/raw_place', 'analysis/cleaned_trip', 'background/battery', 'analysis/cleaned_section', 'stats/server_api_time', 'analysis/cleaned_place', 'stats/pipeline_time', 'stats/client_nav_event'} INFO:root:Found 6 pipeline states [6, 1, 2, 3, 11, 9] $ ls -1 /tmp/test_dump_* /tmp/test_dump_d4dfcc42-b6fc-4b6b-a246-d1abec1d039f.gz /tmp/test_dump_pipelinestate_d4dfcc42-b6fc-4b6b-a246-d1abec1d039f.gz ``` --- ...extract_timeline_for_day_range_and_user.py | 64 ++++++++++++++----- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/bin/debug/extract_timeline_for_day_range_and_user.py b/bin/debug/extract_timeline_for_day_range_and_user.py index 7077a5f4c..d1916dc88 100644 --- a/bin/debug/extract_timeline_for_day_range_and_user.py +++ b/bin/debug/extract_timeline_for_day_range_and_user.py @@ -19,6 +19,7 @@ import arrow import argparse +import emission.core.wrapper.user as ecwu import emission.storage.timeseries.abstract_timeseries as esta import emission.storage.timeseries.timequery as estt import emission.storage.decorations.user_queries as esdu @@ -53,9 +54,25 @@ def export_timeline(user_id, start_day_str, end_day_str, file_name): if len(combined_list) == 0 or unique_key_list == set(['stats/pipeline_time']): logging.info("No entries found in range for user %s, skipping save" % user_id) else: + # Also dump the pipeline state, since that's where we have analysis results upto + # This allows us to copy data to a different *live system*, not just + # duplicate for analysis combined_filename = "%s_%s.gz" % (file_name, user_id) - json.dump(combined_list, - gzip.open(combined_filename, "wb"), default=bju.default, allow_nan=False, indent=4) + with gzip.open(combined_filename, "wt") as gcfd: + json.dump(combined_list, + gcfd, default=bju.default, allow_nan=False, indent=4) + + import emission.core.get_database as edb + + pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id})) + logging.info("Found %d pipeline states %s" % + (len(pipeline_state_list), + list([ps["pipeline_stage"] for ps in pipeline_state_list]))) + + pipeline_filename = "%s_pipelinestate_%s.gz" % (file_name, user_id) + with gzip.open(pipeline_filename, "wt") as gpfd: + json.dump(pipeline_state_list, + gpfd, default=bju.default, allow_nan=False, indent=4) def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list): MAX_LIMIT = 25 * 10000 @@ -70,20 +87,33 @@ def export_timeline_for_users(user_id_list, args): for curr_uuid in user_id_list: if curr_uuid != '': logging.info("=" * 50) - export_timeline(user_id=curr_uuid, start_day_str=sys.argv[2], end_day_str=sys.argv[3], file_name=sys.argv[4]) - + export_timeline(user_id=curr_uuid, start_day_str=args.start_day, + end_day_str= args.end_day, file_name=args.file_prefix) if __name__ == '__main__': - if len(sys.argv) != 5: - print("Usage: %s [|'all'|'file_XXX'] " % (sys.argv[0])) - else: - user_id_str = sys.argv[1] - if user_id_str == "all": - all_uuids = esdu.get_all_uuids() - export_timeline_for_users(all_uuids, sys.argv) - elif user_id_str.startswith("file_"): - uuid_strs = json.load(open(user_id_str)) - uuids = [uuid.UUID(ustr) for ustr in uuid_strs] - export_timeline_for_users(uuids, sys.argv) - else: - export_timeline(user_id=uuid.UUID(sys.argv[1]), start_day_str=sys.argv[2], end_day_str=sys.argv[3], file_name=sys.argv[4]) + logging.basicConfig(level=logging.DEBUG) + parser = argparse.ArgumentParser(prog="extract_timeline_for_day_range_and_user") + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("-e", "--user_email", nargs="+") + group.add_argument("-u", "--user_uuid", nargs="+") + group.add_argument("-a", "--all", action="store_true") + group.add_argument("-f", "--file") + + parser.add_argument("start_day", help="start day in utc - e.g. 'YYYY-MM-DD'" ) + parser.add_argument("end_day", help="start day in utc - e.g. 'YYYY-MM-DD'" ) + parser.add_argument("file_prefix", help="prefix for the filenames generated - e.g /tmp/dump_ will generate files /tmp/dump_.gz, /tmp/dump_.gz..." ) + + args = parser.parse_args() + + if args.user_uuid: + uuid_list = [uuid.UUID(uuid_str) for uuid_str in args.user_uuid] + elif args.user_email: + uuid_list = [ecwu.User.fromEmail(uuid_str).uuid for uuid_str in args.user_email] + elif args.all: + uuid_list = esdu.get_all_uuids() + elif args.file: + with open(args.file) as fd: + uuid_strs = json.load(fd) + uuid_list = [uuid.UUID(ustr) for ustr in uuid_strs] + export_timeline_for_users(uuid_list, args)