Skip to content

Commit

Permalink
Add a new script to remove only the inference modes
Browse files Browse the repository at this point in the history
So that we can experiment with other options such as different feature
selection, etc
  • Loading branch information
shankari committed Feb 25, 2018
1 parent 8b3541e commit 236f89c
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 1 deletion.
70 changes: 70 additions & 0 deletions bin/analysis/remove_inferred_modes.py
@@ -0,0 +1,70 @@
"""
Reset only the mode inference objects
This helps us experiment with different mode inference methods without
resetting the entire pipeline.
Useful while mode inference is the most recent data object, and we need to
experiment with it to make it better.
"""

"""
This is similar to `bin/reset_pipeline.py` but *much* easier, since the mode
inference results are not linked to anything else. And the start_ts of the
inference result = start_ts of the section it maps to. We just need to delete
everything after the reset point and set the pipeline state accordingly.
"""
import logging
import argparse
import uuid
import arrow

import emission.analysis.classification.inference.mode.pipeline as eacimp
import emission.core.get_database as edb
import emission.storage.decorations.user_queries as esdu

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)

parser = argparse.ArgumentParser()
# Options corresponding to
# https://github.com/e-mission/e-mission-server/issues/333#issuecomment-312464984
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-a", "--all", action="store_true", default=False,
help="reset the pipeline for all users")
group.add_argument("-p", "--platform", choices = ['android', 'ios'],
help="reset the pipeline for all on the specified platform")
group.add_argument("-u", "--user_list", nargs='+',
help="user ids to reset the pipeline for")
group.add_argument("-e", "--email_list", nargs='+',
help="email addresses to reset the pipeline for")
parser.add_argument("-d", "--date",
help="date to reset the pipeline to. Format 'YYYY-mm-dd' e.g. 2016-02-17. Interpreted in UTC, so 2016-02-17 will reset the pipeline to 2016-02-16T16:00:00-08:00 in the pacific time zone")
parser.add_argument("-n", "--dry_run", action="store_true", default=False,
help="do everything except actually perform the operations")

args = parser.parse_args()
print(args)

# Handle the first row in the table
if args.date is None:
if args.all:
eacimp.del_all_objects(args.dry_run)
else:
user_list = _get_user_list(args)
logging.info("received list with %s users" % user_list)
logging.info("first few entries are %s" % user_list[0:5])
for user_id in user_list:
logging.info("resetting user %s to start" % user_id)
eacimp.del_objects_after(user_id, 0, args.dry_run)
else:
# Handle the second row in the table
day_dt = arrow.get(args.date, "YYYY-MM-DD")
logging.debug("day_dt is %s" % day_dt)
day_ts = day_dt.timestamp
logging.debug("day_ts is %s" % day_ts)
user_list = _get_user_list(args)
logging.info("received list with %s users" % user_list)
logging.info("first few entries are %s" % user_list[0:5])
for user_id in user_list:
logging.info("resetting user %s to ts %s" % (user_id, day_ts))
eacimp.del_objects_after(user_id, day_ts, args.dry_run)

30 changes: 29 additions & 1 deletion emission/analysis/classification/inference/mode/pipeline.py
Expand Up @@ -23,6 +23,7 @@
import emission.core.wrapper.entry as ecwe
import emission.core.wrapper.modeprediction as ecwm
import emission.core.wrapper.motionactivity as ecwma
import emission.core.wrapper.pipelinestate as ecwp

from uuid import UUID

Expand All @@ -48,7 +49,28 @@ def predict_mode(user_id):
logging.exception("Error while inferring modes, timestamp is unchanged")
epq.mark_mode_inference_failed(user_id)

# Delete the objects created by this pipeline step
# Delete the objects created by this pipeline step (across users)
def del_all_objects(is_dry_run):
del_query = {}
del_query.update({"metadata.key": {"$in": ["inference/prediction", "analysis/inferred_section"]}})
logging.info("About to delete %d entries"
% edb.get_analysis_timeseries_db().find(del_query).count())
logging.info("About to delete entries with keys %s"
% edb.get_analysis_timeseries_db().find(del_query).distinct("metadata.key"))

del_pipeline_query = {"pipeline_stage": ecwp.PipelineStages.MODE_INFERENCE.value}
logging.info("About to delete pipeline entries for stage %s" %
ecwp.PipelineStages.MODE_INFERENCE)

if is_dry_run:
logging.info("this is a dry-run, returning from del_objects_after without modifying anything")
else:
result = edb.get_analysis_timeseries_db().delete_many(del_query)
logging.info("this is not a dry-run, result of deleting analysis entries is %s" % result.raw_result)
result = edb.get_pipeline_state_db().delete_many(del_pipeline_query)
logging.info("this is not a dry-run, result of deleting pipeline state is %s" % result.raw_result)

# Delete the objects created by this pipeline step (for a particular user)
def del_objects_after(user_id, reset_ts, is_dry_run):
del_query = {}
# handle the user
Expand All @@ -59,6 +81,12 @@ def del_objects_after(user_id, reset_ts, is_dry_run):
del_query.update({"data.start_ts": {"$gt": reset_ts}})
logging.debug("After all updates, del_query = %s" % del_query)

reset_pipeline_query = {"pipeline_stage": ecwp.PipelineStages.MODE_INFERENCE.value}
reset_pipeline_update = {'$set': {'last_processed_ts': reset_ts + FUZZ_FACTOR}}
logging.info("About to reset stage %s to %s"
% (ecwp.PipelineStages.MODE_INFERENCE, reset_ts))


logging.info("About to delete %d entries"
% edb.get_analysis_timeseries_db().find(del_query).count())
logging.info("About to delete entries with keys %s"
Expand Down

0 comments on commit 236f89c

Please sign in to comment.