Skip to content

Commit

Permalink
Keyerror pattern model #471 (#476)
Browse files Browse the repository at this point in the history
  • Loading branch information
VargBurz authored and rozetko committed Mar 19, 2019
1 parent 4d2ca2b commit 4c2efac
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 13 deletions.
2 changes: 2 additions & 0 deletions analytics/analytics/detectors/pattern_detector.py
Expand Up @@ -43,6 +43,8 @@ def __init__(self, pattern_type: str, analytic_unit_id: AnalyticUnitId):
def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache:
# TODO: pass only part of dataframe that has segments
new_cache = self.model.fit(dataframe, segments, cache)
if new_cache == None or len(new_cache) == 0:
logging.warning('new_cache is empty with data: {}, segments: {}, cache: {}'.format(dataframe, segments, cache))
return {
'cache': new_cache
}
Expand Down
13 changes: 7 additions & 6 deletions analytics/analytics/models/general_model.py
Expand Up @@ -65,23 +65,24 @@ def do_fit(self, dataframe: pd.DataFrame, labeled_segments: list, deleted_segmen
def do_detect(self, dataframe: pd.DataFrame) -> list:
data = utils.cut_dataframe(dataframe)
data = data['value']
pat_data = self.state['pattern_model']
pat_data = self.state.get('pattern_model', [])
if pat_data.count(0) == len(pat_data):
raise ValueError('Labeled patterns must not be empty')

self.all_conv = []
self.all_corr = []
for i in range(self.state['WINDOW_SIZE'], len(data) - self.state['WINDOW_SIZE']):
watch_data = data[i - self.state['WINDOW_SIZE']: i + self.state['WINDOW_SIZE'] + 1]
window_size = self.state.get('WINDOW_SIZE', 0)
for i in range(window_size, len(data) - window_size):
watch_data = data[i - window_size: i + window_size + 1]
watch_data = utils.subtract_min_without_nan(watch_data)
conv = scipy.signal.fftconvolve(watch_data, pat_data)
correlation = pearsonr(watch_data, pat_data)
self.all_corr.append(correlation[0])
self.all_conv.append(max(conv))
all_conv_peaks = utils.peak_finder(self.all_conv, self.state['WINDOW_SIZE'] * 2)
all_corr_peaks = utils.peak_finder(self.all_corr, self.state['WINDOW_SIZE'] * 2)
all_conv_peaks = utils.peak_finder(self.all_conv, window_size * 2)
all_corr_peaks = utils.peak_finder(self.all_corr, window_size * 2)
filtered = self.__filter_detection(all_corr_peaks, data)
return set(item + self.state['WINDOW_SIZE'] for item in filtered)
return set(item + window_size for item in filtered)

def __filter_detection(self, segments: list, data: list):
if len(segments) == 0 or len(self.state.get('pattern_center', [])) == 0:
Expand Down
21 changes: 16 additions & 5 deletions analytics/analytics/models/model.py
Expand Up @@ -5,6 +5,7 @@
from typing import Optional
import pandas as pd
import math
import logging

ModelCache = dict

Expand Down Expand Up @@ -64,7 +65,7 @@ def get_model_type(self) -> (str, bool):

def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[ModelCache]) -> ModelCache:
data = dataframe['value']
if type(cache) is ModelCache and cache:
if cache != None and len(cache) > 0:
self.state = cache
max_length = 0
labeled = []
Expand All @@ -84,21 +85,31 @@ def fit(self, dataframe: pd.DataFrame, segments: list, cache: Optional[ModelCach
model, model_type = self.get_model_type()
learning_info = self.get_parameters_from_segments(dataframe, labeled, deleted, model, model_type)
self.do_fit(dataframe, labeled, deleted, learning_info)
logging.debug('fit complete successful with self.state: {}'.format(self.state))
return self.state

def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict:
if type(cache) is ModelCache:
#If cache is None or empty dict - default parameters will be used instead
if cache != None and len(cache) > 0:
self.state = cache

else:
logging.debug('get empty cache in detect')
if not self.state:
logging.warning('self.state is empty - skip do_detect')
return {
'segments': [],
'cache': {},
}
result = self.do_detect(dataframe)
segments = [(
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x - 1]),
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][x + 1])
) for x in result]

if not self.state:
logging.warning('return empty self.state after detect')
return {
'segments': segments,
'cache': self.state
'cache': self.state,
}

def _update_fiting_result(self, state: dict, confidences: list, convolve_list: list, del_conv_list: list, height_list: list) -> None:
Expand Down
2 changes: 1 addition & 1 deletion analytics/tests/test_dataset.py
Expand Up @@ -309,7 +309,7 @@ def test_random_dataset_for_random_model(self):
'conv_del_min': 0,
'conv_del_max': 0,
}
ws = random.randint(0, int(len(data['value']/2)))
ws = random.randint(1, int(len(data['value']/2)))
pattern_model = create_random_model(ws)
convolve = scipy.signal.fftconvolve(pattern_model, pattern_model)
confidence = 0.2 * (data['value'].max() - data['value'].min())
Expand Down
4 changes: 3 additions & 1 deletion server/src/services/data_puller.ts
Expand Up @@ -82,7 +82,9 @@ export class DataPuller {
throw Error(`data puller can't push unit: ${unit} data: ${data}`);
}
let task = new AnalyticsTask(unit.id, AnalyticsTaskType.PUSH, data);

if(_.isEmpty(data.cache)) {
console.log('push empty cache to analytics')
}
try {
this.analyticsService.sendTask(task);
let fromTime = new Date(data.from).toLocaleTimeString();
Expand Down

0 comments on commit 4c2efac

Please sign in to comment.