Skip to content

Commit

Permalink
Segments from data puller not in db #350 (#351)
Browse files Browse the repository at this point in the history
* improve logging

* set constant window size

* fix

* improve logging, save detected segments from push\pull process

* fix

* fix

* fix according review

* save segemnts, improve bucket managing in analytics
  • Loading branch information
amper43 authored and jonyrock committed Jan 21, 2019
1 parent 6bf1114 commit 7731007
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 16 deletions.
16 changes: 5 additions & 11 deletions analytics/analytics/detectors/pattern_detector.py
Expand Up @@ -36,9 +36,9 @@ def __init__(self, pattern_type: str, analytic_unit_id: AnalyticUnitId):
self.analytic_unit_id = analytic_unit_id
self.pattern_type = pattern_type
self.model = resolve_model_by_pattern(self.pattern_type)
self.window_size = 150
self.max_window_size = 150
self.window_size = 0
self.bucket = DataBucket()
self.bucket_full_reported = False

def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache:
# TODO: pass only part of dataframe that has segments
Expand Down Expand Up @@ -66,19 +66,13 @@ def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) ->

def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
self.bucket.receive_data(data.dropna())
if cache and self.window_size == 0:
self.window_size = cache['WINDOW_SIZE']

if len(self.bucket.data) >= self.window_size and cache != None:
if not self.bucket_full_reported:
logging.debug('{} unit`s bucket full, run detect'.format(self.analytic_unit_id))
self.bucket_full_reported = True

res = self.detect(self.bucket.data, cache)

excess_data = len(self.bucket.data) - self.window_size
excess_data = len(self.bucket.data) - self.max_window_size
self.bucket.drop_data(excess_data)
return res
else:
filling = len(self.bucket.data)*100 / self.window_size
logging.debug('bucket for {} {}% full'.format(self.analytic_unit_id, filling))

return None
12 changes: 9 additions & 3 deletions server/src/controllers/analytics_controller.ts
Expand Up @@ -44,8 +44,14 @@ function onTaskResult(taskResult: TaskResult) {
}
}

function onDetect(detectionResult: DetectionResult) {
processDetectionResult(detectionResult.analyticUnitId, detectionResult);
async function onDetect(detectionResult: DetectionResult) {
let id = detectionResult.analyticUnitId;
let payload = await processDetectionResult(id, detectionResult);
await Promise.all([
Segment.insertSegments(payload.segments),
AnalyticUnitCache.setData(id, payload.cache),
AnalyticUnit.setDetectionTime(id, payload.lastDetectionTime),
]);
}

async function onMessage(message: AnalyticsMessage) {
Expand All @@ -58,7 +64,7 @@ async function onMessage(message: AnalyticsMessage) {
}

if(message.method === AnalyticsMessageMethod.DETECT) {
onDetect(message.payload.payload);
await onDetect(message.payload.payload);
methodResolved = true;
}

Expand Down
2 changes: 0 additions & 2 deletions server/src/services/data_puller.ts
Expand Up @@ -43,9 +43,7 @@ export class DataPuller {
panelUrl = unit.panelUrl;
}

let startTime = Date.now();
let data = queryByMetric(unit.metric, panelUrl, from, to, HASTIC_API_KEY);
console.log(`data puller: query took ${Date.now() - startTime}ms for unit id ${unit.id}`);
return data;

}
Expand Down

0 comments on commit 7731007

Please sign in to comment.