/
pipeline_queries.py
222 lines (185 loc) · 9.58 KB
/
pipeline_queries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()
from builtins import *
import logging
import datetime as pydt
import emission.core.get_database as edb
import emission.core.wrapper.pipelinestate as ps
import emission.storage.timeseries.timequery as estt
import time
END_FUZZ_AVOID_LTE = 5
def mark_usercache_done(user_id, last_processed_ts):
if last_processed_ts is None:
mark_stage_done(user_id, ps.PipelineStages.USERCACHE, None)
else:
mark_stage_done(user_id, ps.PipelineStages.USERCACHE, last_processed_ts + END_FUZZ_AVOID_LTE)
def get_time_range_for_usercache(user_id):
tq = get_time_range_for_stage(user_id, ps.PipelineStages.USERCACHE)
return tq
def get_time_range_for_accuracy_filtering(user_id):
return get_time_range_for_stage(user_id, ps.PipelineStages.ACCURACY_FILTERING)
def mark_accuracy_filtering_done(user_id, last_processed_ts):
if last_processed_ts is None:
mark_stage_done(user_id, ps.PipelineStages.ACCURACY_FILTERING, None)
else:
mark_stage_done(user_id, ps.PipelineStages.ACCURACY_FILTERING, last_processed_ts + END_FUZZ_AVOID_LTE)
def mark_accuracy_filtering_failed(user_id):
mark_stage_failed(user_id, ps.PipelineStages.ACCURACY_FILTERING)
def get_time_range_for_segmentation(user_id):
return get_time_range_for_stage(user_id, ps.PipelineStages.TRIP_SEGMENTATION)
def mark_segmentation_done(user_id, last_processed_ts):
if last_processed_ts is None:
mark_stage_done(user_id, ps.PipelineStages.TRIP_SEGMENTATION, None)
else:
mark_stage_done(user_id, ps.PipelineStages.TRIP_SEGMENTATION,
last_processed_ts + END_FUZZ_AVOID_LTE)
def mark_segmentation_failed(user_id):
mark_stage_failed(user_id, ps.PipelineStages.TRIP_SEGMENTATION)
def get_time_range_for_sectioning(user_id):
# Returns the time range for the trips that have not yet been converted into sections.
# Note that this is a query against the trip database, so we cannot search using the
# "write_ts" query. Instead, we change the query to be against the trip's end_ts
tq = get_time_range_for_stage(user_id, ps.PipelineStages.SECTION_SEGMENTATION)
tq.timeType = "data.end_ts"
return tq
def mark_sectioning_done(user_id, last_trip_done):
if last_trip_done is None:
mark_stage_done(user_id, ps.PipelineStages.SECTION_SEGMENTATION, None)
else:
mark_stage_done(user_id, ps.PipelineStages.SECTION_SEGMENTATION,
last_trip_done.data.end_ts + END_FUZZ_AVOID_LTE)
def mark_sectioning_failed(user_id):
mark_stage_failed(user_id, ps.PipelineStages.SECTION_SEGMENTATION)
def get_time_range_for_smoothing(user_id):
# type: (uuid.UUID) -> emission.storage.timeseries.timequery.TimeQuery
# Returns the time range for the trips that have not yet been converted into sections.
# Note that this is a query against the trip database, so we cannot search using the
# "write_ts" query. Instead, we change the query to be against the trip's end_ts
"""
:rtype: emission.storage.timeseries.timequery.TimeQuery
"""
tq = get_time_range_for_stage(user_id, ps.PipelineStages.JUMP_SMOOTHING)
tq.timeType = "data.end_ts"
return tq
def mark_smoothing_done(user_id, last_section_done):
if last_section_done is None:
mark_stage_done(user_id, ps.PipelineStages.JUMP_SMOOTHING, None)
else:
mark_stage_done(user_id, ps.PipelineStages.JUMP_SMOOTHING,
last_section_done.data.end_ts + END_FUZZ_AVOID_LTE)
def mark_smoothing_failed(user_id):
mark_stage_failed(user_id, ps.PipelineStages.JUMP_SMOOTHING)
def get_complete_ts(user_id):
return get_current_state(user_id, ps.PipelineStages.CLEAN_RESAMPLING).last_processed_ts
def get_time_range_for_clean_resampling(user_id):
# type: (uuid.UUID) -> emission.storage.timeseries.timequery.TimeQuery
# Returns the time range for the trips that have not yet been converted into sections.
# Note that this is a query against the trip database, so we cannot search using the
# "write_ts" query. Instead, we change the query to be against the trip's end_ts
"""
:rtype: emission.storage.timeseries.timequery.TimeQuery
"""
tq = get_time_range_for_stage(user_id, ps.PipelineStages.CLEAN_RESAMPLING)
tq.timeType = "data.end_ts"
return tq
def mark_clean_resampling_done(user_id, last_section_done):
if last_section_done is None:
mark_stage_done(user_id, ps.PipelineStages.CLEAN_RESAMPLING, None)
else:
mark_stage_done(user_id, ps.PipelineStages.CLEAN_RESAMPLING,
last_section_done.data.enter_ts + END_FUZZ_AVOID_LTE)
def mark_clean_resampling_failed(user_id):
mark_stage_failed(user_id, ps.PipelineStages.CLEAN_RESAMPLING)
def get_time_range_for_output_gen(user_id):
return get_time_range_for_stage(user_id, ps.PipelineStages.OUTPUT_GEN)
def mark_output_gen_done(user_id, last_processed_ts):
if last_processed_ts is None:
mark_stage_done(user_id, ps.PipelineStages.OUTPUT_GEN, None)
else:
mark_stage_done(user_id, ps.PipelineStages.OUTPUT_GEN,
last_processed_ts + END_FUZZ_AVOID_LTE)
def mark_output_gen_failed(user_id):
mark_stage_failed(user_id, ps.PipelineStages.OUTPUT_GEN)
def mark_stage_done(user_id, stage, last_processed_ts):
# We move failed entries to the error timeseries. So usercache runs never fail.
curr_state = get_current_state(user_id, stage)
assert(curr_state is not None)
assert(curr_state.curr_run_ts is not None)
curr_state.last_ts_run = curr_state.curr_run_ts
# It is incorrect to assume that we have processed all the data until the
# start of the last run. In particular, due to network connectivity or
# other issues, it is possible that there is outstanding data on phones
# that was collected before the last run started. And if we set this, then
# that data will simply be skipped. The same logic applies to all
# decorators that are based on client collected data (trip start ts, etc) -
# it is only accurate for server generated data. So for maximum generality,
# let's allow the stage to pass in last_processed_ts.
if last_processed_ts is not None:
logging.info("For stage %s, last_ts_processed = %s" %
(stage, pydt.datetime.utcfromtimestamp(last_processed_ts).isoformat()))
curr_state.last_processed_ts = last_processed_ts
else:
logging.info("For stage %s, last_ts_processed is unchanged" % stage)
curr_state.curr_run_ts = None
logging.debug("About to save object %s" % curr_state)
edb.save(edb.get_pipeline_state_db(), curr_state)
logging.debug("After saving state %s, list is %s" % (curr_state,
list(edb.get_pipeline_state_db().find({"user_id": user_id}))))
def mark_stage_failed(user_id, stage):
curr_state = get_current_state(user_id, stage)
assert(curr_state is not None)
assert(curr_state.curr_run_ts is not None)
# last_ts_run remains unchanged since this run did not succeed
# the next query will start from the start_ts of this run
# we also reset the curr_run_ts to indicate that we are not currently running
curr_state.curr_run_ts = None
logging.debug("About to save object %s" % curr_state)
edb.save(edb.get_pipeline_state_db(), curr_state)
logging.debug("After saving state %s, list is %s" % (curr_state,
list(edb.get_pipeline_state_db().find({"user_id": user_id}))))
def get_time_range_for_stage(user_id, stage):
"""
Returns the start ts and the end ts of the entries in the stage
"""
curr_state = get_current_state(user_id, stage)
if curr_state is None:
start_ts = None
curr_state = ps.PipelineState()
curr_state.user_id = user_id
curr_state.pipeline_stage = stage
curr_state.curr_run_ts = None
curr_state.last_processed_ts = None
curr_state.last_ts_run = None
else:
start_ts = curr_state.last_processed_ts
if start_ts is None:
logging.info("For stage %s, start_ts is None" % stage)
else:
logging.info("For stage %s, start_ts = %s" % (stage, pydt.datetime.utcfromtimestamp(start_ts).isoformat()))
assert curr_state.curr_run_ts is None, "curr_state.curr_run_ts = %s" % curr_state.curr_run_ts
# Let's pick a point 5 secs in the past. If we don't do this, then we will
# read all entries upto the current ts and this may lead to lost data. For
# example, let us say that the current ts is t1. At the time that we read
# the data, we have 4 entries for t1. By the time we finish copying, we
# have 6 entries for t1, we will end up deleting all 6, which will lose 2
# entries.
end_ts = time.time() - END_FUZZ_AVOID_LTE
ret_query = estt.TimeQuery("metadata.write_ts", start_ts, end_ts)
curr_state.curr_run_ts = end_ts
logging.debug("About to save object %s" % curr_state)
edb.save(edb.get_pipeline_state_db(), curr_state)
logging.debug("After saving state %s, list is %s" % (curr_state,
list(edb.get_pipeline_state_db().find({"user_id": user_id}))))
return ret_query
def get_current_state(user_id, stage):
curr_state_doc = edb.get_pipeline_state_db().find_one({"user_id": user_id,
"pipeline_stage": stage.value})
# logging.debug("returning curr_state_doc %s for stage %s " % (curr_state_doc, stage))
if curr_state_doc is not None:
return ps.PipelineState(curr_state_doc)
else:
return None