Skip to content
Permalink
Browse files

Add a classification step to the pipeline

addressed simple issues - no further failures

```
2018-02-22T16:12:35.936949-08:00**********UUID 495a7860-36d2-4c56-a2bb-749414434e6c: cleaning and resampling timeline**********
2018-02-22T16:12:37.302180-08:00**********UUID 495a7860-36d2-4c56-a2bb-749414434e6c: inferring transportation mode**********
2018-02-22T16:12:37.430458-08:00**********UUID 495a7860-36d2-4c56-a2bb-749414434e6c: checking active mode trips to autocheck habits**********
```

Inference is generated. But it is always 'BUS' versus 'WALKING'.
That looks suspicious...

```
[{'_id': ObjectId('5a8f5c75f6858ffa4994910e'),
  'data': {'predicted_mode_map': {'BUS': 0.2, 'WALKING': 0.8}}},
 {'_id': ObjectId('5a8f5c75f6858ffa49949110'),
  'data': {'predicted_mode_map': {'BUS': 0.2, 'WALKING': 0.8}}},
 {'_id': ObjectId('5a8f5c75f6858ffa49949112'),
  'data': {'predicted_mode_map': {'BUS': 0.2, 'WALKING': 0.8}}},
 {'_id': ObjectId('5a8f5c75f6858ffa49949114'),
  'data': {'predicted_mode_map': {'BUS': 0.2, 'WALKING': 0.8}}},
 {'_id': ObjectId('5a8f5c75f6858ffa49949116'),
  'data': {'predicted_mode_map': {'BUS': 0.2, 'WALKING': 0.8}}},
 {'_id': ObjectId('5a8f5c75f6858ffa49949118'),
  'data': {'predicted_mode_map': {'BUS': 0.2, 'WALKING': 0.8}}}]
```

Ah! The model was the simplified model used for testing. Using one of the
generated models, we get

```
[{'_id': ObjectId('5a8f5e80f6858ffad3611ea2'),
  'data': {'predicted_mode_map': {'AIR_OR_HSR': 0.1,
    'BICYCLING': 0.5,
    'WALKING': 0.4}}},
 {'_id': ObjectId('5a8f5e80f6858ffad3611ea4'),
  'data': {'predicted_mode_map': {'WALKING': 1.0}}},
 {'_id': ObjectId('5a8f5e80f6858ffad3611ea6'),
  'data': {'predicted_mode_map': {'BICYCLING': 0.1,
    'CAR': 0.1,
    'WALKING': 0.8}}},
 {'_id': ObjectId('5a8f5e80f6858ffad3611ea8'),
  'data': {'predicted_mode_map': {'BICYCLING': 0.9, 'CAR': 0.1}}},
 {'_id': ObjectId('5a8f5e80f6858ffad3611eaa'),
  'data': {'predicted_mode_map': {'WALKING': 1.0}}},
 {'_id': ObjectId('5a8f5e80f6858ffad3611eac'),
  'data': {'predicted_mode_map': {'BICYCLING': 0.1, 'WALKING': 0.9}}}]
```
  • Loading branch information...
shankari committed Feb 23, 2018
1 parent ead85e3 commit ce6cec989aa54f62474dcb225ac4dd2077ec13c8
@@ -32,18 +32,18 @@
# problem, so we don't need to solve it right now.
minTrainingSetSize = 1000

def predictMode(user_id):
time_query = epq.get_time_range_for_segmentation(user_id)
def predict_mode(user_id):
time_query = epq.get_time_range_for_mode_inference(user_id)
try:
mip = ModeInferencePipeline()
mip.user_id = user_id
mip.runPredictionPipeline(user_id, time_query)
if mip.getLastTimestamp() == 0:
logging.debug("after, run, last_timestamp == 0, must be early return")
if mip.getLastSectionDone() is None:
logging.debug("after, run, last_section_done == None, must be early return")
epq.mark_mode_inference_done(user_id, None)
return
else:
epq.mark_mode_inference_done(user_id, mip.getLastTimestamp())
epq.mark_mode_inference_done(user_id, mip.getLastSectionDone())
except:
epq.mark_mode_inference_failed(user_id)

@@ -55,13 +55,13 @@ def __init__(self):
"start lat", "start lng", "stop lat", "stop lng",
"start hour", "end hour", "close to bus stop", "close to train stop",
"close to airport"]
self.last_timestamp = 0
self.last_section_done = None
with open("emission/analysis/classification/inference/mode/mode_id_old2new.txt") as fp:
self.seed_modes_mapping = json.load(fp)
logging.debug("Loaded modes %s" % self.seed_modes_mapping)

def getLastTimestamp(self):
return self.last_timestamp
def getLastSectionDone(self):
return self.last_section_done

# At this point, none of the clients except for CCI are supporting ground
# truth, and even cci is only supporting trip-level ground truth. So this
@@ -75,7 +75,8 @@ def runPredictionPipeline(self, user_id, timerange):
time_query=None)
if (len(self.toPredictSections) == 0):
logging.debug("len(toPredictSections) == 0, early return")
assert(self.last_timestamp == 0)
assert self.last_section_done is None, ("self.last_section_done == %s, expecting None" % \
self.last_section_done)
return None

self.loadModelStage()
@@ -132,7 +133,7 @@ def updateFeatureMatrixRowWithSection(self, featureMatrix, i, section_entry):

speeds = section['speeds']

if speeds != None and len(speeds) > 0:
if speeds is not None and len(speeds) > 0:
featureMatrix[i, 5] = np.mean(speeds)
featureMatrix[i, 6] = np.std(speeds)
featureMatrix[i, 7] = np.max(speeds)
@@ -141,7 +142,7 @@ def updateFeatureMatrixRowWithSection(self, featureMatrix, i, section_entry):
pass

accels = easf.calAccels(section)
if accels != None and len(accels) > 0:
if accels is not None and len(accels) > 0:
featureMatrix[i, 8] = np.max(accels)
else:
# They will remain zero
@@ -171,8 +172,8 @@ def updateFeatureMatrixRowWithSection(self, featureMatrix, i, section_entry):
if (hasattr(self, "air_cluster")):
featureMatrix[i, 21] = easf.mode_start_end_coverage(section, self.air_cluster,600)

if self.last_timestamp < section.end_ts:
self.last_timestamp = section.end_ts
if self.last_section_done is None or self.last_section_done.data.end_ts < section_entry.data.end_ts:
self.last_section_done = section_entry

# Replace NaN and inf by zeros so that it doesn't crash later
featureMatrix[i] = np.nan_to_num(featureMatrix[i])
@@ -28,6 +28,7 @@
import emission.analysis.intake.segmentation.section_segmentation as eaiss
import emission.analysis.intake.cleaning.location_smoothing as eaicl
import emission.analysis.intake.cleaning.clean_and_resample as eaicr
import emission.analysis.classification.inference.mode.pipeline as eacimp
import emission.net.ext_service.habitica.executor as autocheck

import emission.storage.decorations.stats_queries as esds
@@ -139,6 +140,14 @@ def run_intake_pipeline_for_user(uuid):
esds.store_pipeline_time(uuid, ecwp.PipelineStages.CLEAN_RESAMPLING.name,
time.time(), crt.elapsed)

with ect.Timer() as crt:
logging.info("*" * 10 + "UUID %s: inferring transportation mode" % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: inferring transportation mode" % uuid + "*" * 10)
eacimp.predict_mode(uuid)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.MODE_INFERENCE.name,
time.time(), crt.elapsed)

with ect.Timer() as act:
logging.info("*" * 10 + "UUID %s: checking active mode trips to autocheck habits" % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: checking active mode trips to autocheck habits" % uuid + "*" * 10)
@@ -135,7 +135,9 @@ def mark_clean_resampling_failed(user_id):
mark_stage_failed(user_id, ps.PipelineStages.CLEAN_RESAMPLING)

def get_time_range_for_mode_inference(user_id):
return get_time_range_for_stage(user_id, ps.PipelineStages.MODE_INFERENCE)
tq = get_time_range_for_stage(user_id, ps.PipelineStages.MODE_INFERENCE)
tq.timeType = "data.end_ts"
return tq

def mark_mode_inference_done(user_id, last_section_done):
if last_section_done is None:

0 comments on commit ce6cec9

Please sign in to comment.
You can’t perform that action at this time.