Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Add outlier module
  • Loading branch information
radibnia77 committed Sep 7, 2021
1 parent 5811db7 commit 422e092c79d3937b4d7a24cb6287fd731111cb45
Showing 7 changed files with 176 additions and 5 deletions.
@@ -1,5 +1,5 @@
product_tag: 'dlpm'
pipeline_tag: '05182021_1500' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
pipeline_tag: '08092021_1200' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
factdata_table_name: 'factdata_hq_09222020'

log:
@@ -47,11 +47,12 @@ pipeline:
time_series: # This is done on whole bucketized data
input_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
conditions: []
yesterday: "2020-05-31" # data is used for training from -<prepare_past_days> to -1(yesterday)
prepare_past_days: 90
yesterday: "2020-06-10" # data is used for training from -<prepare_past_days> to -1(yesterday)
prepare_past_days: 102
bucket_size: 10 # maximum number of buckets to process starting from 0
bucket_step: 1 # size of bucket batch that is processed in one iteration
output_table_name: '{product_tag}_{pipeline_tag}_tmp_ts' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
outlier_table: '{product_tag}_{pipeline_tag}_tmp_outlier'
uckey_clustering: # This is done on whole data, not slicing on buckets
pre_cluster_table_name: '{product_tag}_{pipeline_tag}_tmp_pre_cluster'
create_pre_cluster_table: True
@@ -54,6 +54,7 @@ def __save_as_table(df, table_name, hive_context, create_table):
hive_context.sql(command)



def estimate_number_of_non_dense_clusters(df, median_popularity_of_dense, cluster_dense_num_ratio_cap):
# find avg of non-dense popularity
median_non_dense_p = df.filter('sparse=True').agg(
@@ -151,6 +152,8 @@ def denoise(df, percentile):
return df




def run(hive_context, cluster_size_cfg, input_table_name,
pre_cluster_table_name, output_table_name, percentile, create_pre_cluster_table):

@@ -163,6 +166,7 @@ def run(hive_context, cluster_size_cfg, input_table_name,
popularity_th = cluster_size_cfg['popularity_th']
datapoints_min_th = cluster_size_cfg['datapoints_min_th']


# Read factdata table
command = """
SELECT ts, price_cat, uckey, a, g, t, si, r, ipl FROM {}
@@ -253,6 +257,8 @@ def run(hive_context, cluster_size_cfg, input_table_name,
# denoising uckeys: remove some datapoints of the uckey
df = denoise(df, percentile)



__save_as_table(df, output_table_name, hive_context, True)


@@ -278,9 +284,10 @@ def run(hive_context, cluster_size_cfg, input_table_name,
output_table_name = cfg['uckey_clustering']['output_table_name']
pre_cluster_table_name = cfg['uckey_clustering']['pre_cluster_table_name']
create_pre_cluster_table = cfg['uckey_clustering']['create_pre_cluster_table']
input_table_name = cfg['time_series']['output_table_name']
input_table_name = cfg['time_series']['outlier_table']
cluster_size_cfg = cfg['uckey_clustering']['cluster_size']


run(hive_context=hive_context,
cluster_size_cfg=cluster_size_cfg,
input_table_name=input_table_name,
@@ -0,0 +1,97 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import yaml
import argparse
import pyspark.sql.functions as fn
from pyspark import SparkContext
from pyspark.sql.functions import udf, lit, col, collect_list, avg, dense_rank, array
from pyspark.sql.types import IntegerType, ArrayType
from pyspark.sql import HiveContext
from util import resolve_placeholder, hampel
import pandas as pd


def write_to_table(df, table_name, mode='overwrite'):
df.write.option("header", "true").option("encoding", "UTF-8").mode(mode).format('hive').saveAsTable(table_name)


def run(hive_context, input_table_name, outlier_table):

# Read factdata table
command = """
SELECT ts FROM {}
""".format(input_table_name)

# DataFrame[ts: array<int>]
df = hive_context.sql(command)

columns = df.columns
df_sizes = df.select(*[fn.size(col).alias(col) for col in columns])
df_max = df_sizes.agg(*[fn.max(col).alias(col) for col in columns])
max_dict = df_max.collect()[0].asDict()
df_result = df.select(*[df[col][i] for col in columns for i in range(max_dict[col])])
df_result = df_result.na.fill(value=0)
ts_l = df_result.groupBy().sum().collect()[0]
ts_l = pd.Series(list(ts_l))
outlier_indice = hampel(ts_l, window_size=5, n=6)

def _filter_outlier(x, ind_list):
for i in range(len(x)):
if i in ind_list and x[i] != None and x[i + 1] != None and x[i - 1] != None:
x[i] = (x[i - 1] + x[i + 1]) / 2
return x

command = """
SELECT * FROM {}
""".format(input_table_name)
df = hive_context.sql(command)
df = df.withColumn("indice", array([fn.lit(x) for x in outlier_indice]))
df = df.withColumn('ts', udf(_filter_outlier, ArrayType(IntegerType()))(df['ts'], df['indice']))
write_to_table(df, outlier_table, mode='overwrite')


if __name__ == "__main__":

parser = argparse.ArgumentParser(description='Prepare data')
parser.add_argument('config_file')
args = parser.parse_args()

# Load config file
with open(args.config_file, 'r') as ymlfile:
cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
resolve_placeholder(cfg)

cfg_log = cfg['log']
cfg = cfg['pipeline']

sc = SparkContext()
hive_context = HiveContext(sc)
sc.setLogLevel(cfg_log['level'])

percentile = cfg['filter']['percentile']
output_table_name = cfg['uckey_clustering']['output_table_name']
pre_cluster_table_name = cfg['uckey_clustering']['pre_cluster_table_name']
create_pre_cluster_table = cfg['uckey_clustering']['create_pre_cluster_table']
input_table_name = cfg['time_series']['output_table_name']
cluster_size_cfg = cfg['uckey_clustering']['cluster_size']
outlier_table = cfg['time_series']['outlier_table']

run(hive_context=hive_context,
input_table_name=input_table_name,
outlier_table=outlier_table)

sc.stop()
@@ -17,6 +17,9 @@
import datetime
import math
import re
import pandas as pd
import numpy as np


def get_dow(day_list):
dow_list = []
@@ -48,5 +51,59 @@ def resolve_placeholder(in_dict):
new_value = new_value.replace('{'+item+'}', in_dict[item])
_dict[key] = new_value

def median_absolute_deviation(x):
"""
Returns the median absolute deviation from the window's median
:param x: Values in the window
:return: MAD
"""
return np.median(np.abs(x - np.median(x)))


def hampel(ts, window_size=5, n=3, imputation=False):

"""
Median absolute deviation (MAD) outlier in Time Series
:param ts: a pandas Series object representing the timeseries
:param window_size: total window size will be computed as 2*window_size + 1
:param n: threshold, default is 3 (Pearson's rule)
:param imputation: If set to False, then the algorithm will be used for outlier detection.
If set to True, then the algorithm will also imput the outliers with the rolling median.
:return: Returns the outlier indices if imputation=False and the corrected timeseries if imputation=True
"""

if type(ts) != pd.Series:
raise ValueError("Timeserie object must be of tyme pandas.Series.")

if type(window_size) != int:
raise ValueError("Window size must be of type integer.")
else:
if window_size <= 0:
raise ValueError("Window size must be more than 0.")

if type(n) != int:
raise ValueError("Window size must be of type integer.")
else:
if n < 0:
raise ValueError("Window size must be equal or more than 0.")

# Copy the Series object. This will be the cleaned timeserie
ts_cleaned = ts.copy()

# Constant scale factor, which depends on the distribution
# In this case, we assume normal distribution
k = 1.4826

rolling_ts = ts_cleaned.rolling(window_size*2, center=True)
rolling_median = rolling_ts.median().fillna(method='bfill').fillna(method='ffill')
rolling_sigma = k*(rolling_ts.apply(median_absolute_deviation).fillna(method='bfill').fillna(method='ffill'))

outlier_indices = list(
np.array(np.where(np.abs(ts_cleaned - rolling_median) >= (n * rolling_sigma))).flatten())

if imputation:
ts_cleaned[outlier_indices] = rolling_median[outlier_indices]
return ts_cleaned

return outlier_indices

@@ -27,6 +27,15 @@ then
spark-submit --master yarn --py-files pipeline/transform.py --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G pipeline/main_ts.py config.yml
fi

#Run outlier filter and save the results as <config.pipeline.time_series.{product_tag}_{pipeline_tag}_tmp_outlier>
if false
then
# simple call
# spark-submit pipeline/main_outlier.py config.yml

spark-submit --master yarn --py-files pipeline/transform.py --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G pipeline/main_outlier.py config.yml
fi

#Preparing clustering
if false
then
@@ -93,6 +93,7 @@ def make_pred_input(duration, train_window, predict_window, full_record_exp, x_h
lag_mask = cropped_lags < 0
# Convert -1 to 0 for gather(), it don't accept anything exotic
cropped_lags = np.maximum(cropped_lags, 0)
cropped_lags = np.where(cropped_lags > len(full_record_exp) - 1, len(full_record_exp) - 1, cropped_lags)
# Translate lag indexes to hit values
lagged_hit = np.take(full_record_exp, cropped_lags)
# Convert masked (see above) or NaN lagged hits to zeros
@@ -471,7 +471,6 @@ def create_model(scope, index, prefix, seed):
predict_completeness_threshold=train_completeness_threshold, train_window=train_window,
predict_window=predict_window,
rand_seed=seed, train_skip_first=hparams.train_skip_first,
# back_offset=predict_window if forward_split else 0)
back_offset=back_offset)
inp_scope.reuse_variables()
if side_split:

0 comments on commit 422e092

Please sign in to comment.