Skip to content
Permalink
Browse files

Add support for deleting mode inference results

May be worthwhile for each pipeline step to know how to cleanup after itself
let's start with this one and see how well it is used

Also:
- log any exceptions encountered during the analysis pipeline run
- Fix assert so that it prints meaningful error message
- remove pipeline results before testing the pipeline
- also copy the seed model over as part of testing
  • Loading branch information...
shankari committed Feb 24, 2018
1 parent b390ee0 commit 97c3ac0938bc32fef70d3b2c8e371bef15f9379a
@@ -45,8 +45,31 @@ def predict_mode(user_id):
else:
epq.mark_mode_inference_done(user_id, mip.getLastSectionDone())
except:
logging.exception("Error while inferring modes, timestamp is unchanged")
epq.mark_mode_inference_failed(user_id)

# Delete the objects created by this pipeline step
def del_objects_after(user_id, reset_ts, is_dry_run):
del_query = {}
# handle the user
del_query.update({"user_id": user_id})

del_query.update({"metadata.key": {"$in": ["inference/prediction", "analysis/inferred_section"]}})
# all objects inserted here have start_ts and end_ts and are trip-like
del_query.update({"data.start_ts": {"$gt": reset_ts}})
logging.debug("After all updates, del_query = %s" % del_query)

logging.info("About to delete %d entries"
% edb.get_analysis_timeseries_db().find(del_query).count())
logging.info("About to delete entries with keys %s"
% edb.get_analysis_timeseries_db().find(del_query).distinct("metadata.key"))

if is_dry_run:
logging.info("this is a dry-run, returning from del_objects_after without modifying anything")
else:
result = edb.get_analysis_timeseries_db().remove(del_query)
logging.info("this is not a dry-run, result of deleting analysis entries is %s" % result)

class ModeInferencePipeline:
def __init__(self):
self.featureLabels = ["distance", "duration", "first filter mode", "sectionId", "avg speed",
@@ -53,7 +53,7 @@ def _get_inference_entry_for_section(user_id, section_id, entry_key, section_id_
logging.debug("About to query %s" % combo_query)
ret_list = list(edb.get_analysis_timeseries_db().find(combo_query))
# We currently have only one algorithm
assert(len(ret_list) <= 1)
assert len(ret_list) <= 1, "Found len(ret_list) = %d, expected <=1" % len(ret_list)
if len(ret_list) == 0:
logging.debug("Found no inferred prediction, returning None")
return None
@@ -12,6 +12,7 @@
import arrow
import logging
import numpy as np
import os
from datetime import datetime, timedelta

# Our imports
@@ -42,18 +43,26 @@ class TestPipeline(unittest.TestCase):
def setUp(self):
# Thanks to M&J for the number!
np.random.seed(61297777)
self.copied_model_path = etc.copy_dummy_seed_for_inference()
dataFile = "emission/tests/data/real_examples/shankari_2016-08-10"
start_ld = ecwl.LocalDate({'year': 2016, 'month': 8, 'day': 9})
end_ld = ecwl.LocalDate({'year': 2016, 'month': 8, 'day': 10})
cacheKey = "diary/trips-2016-08-10"
etc.setupRealExample(self, dataFile)
etc.runIntakePipeline(self.testUUID)
# Default intake pipeline now includes mode inference
# this is correct in general, but causes errors while testing the mode inference
# because then that step is effectively run twice. This code
# rolls back the results of running the mode inference as part of the
# pipeline and allows us to correctly test the mode inference pipeline again.
pipeline.del_objects_after(self.testUUID, 0, is_dry_run=False)
self.pipeline = pipeline.ModeInferencePipeline()
self.pipeline.loadModelStage()

def tearDown(self):
logging.debug("Clearing related databases")
self.clearRelatedDb()
os.remove(self.copied_model_path)

def clearRelatedDb(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUUID})

0 comments on commit 97c3ac0

Please sign in to comment.
You can’t perform that action at this time.