Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge pull request #12 from radibnia77/main
Update main_outlier.py
  • Loading branch information
xun-hu-at-futurewei-com committed Sep 13, 2021
2 parents 4d1d14a + 554b52f commit 2f142965e702d336d92e07e0eb60095ac94c0ca8
Showing 1 changed file with 4 additions and 4 deletions.
@@ -47,10 +47,10 @@ def run(hive_context, input_table_name, outlier_table):
df_result = df_result.na.fill(value=0)
ts_l = df_result.groupBy().sum().collect()[0]
ts_l = pd.Series(list(ts_l))
outlier_indice = hampel(ts_l, window_size=5, n=6)
outlier_indices = hampel(ts_l, window_size=5, n=6)

def _filter_outlier(x, ind_list):
for i in range(len(x)):
for i in range(1, len(x)-1):
if i in ind_list and x[i] != None and x[i + 1] != None and x[i - 1] != None:
x[i] = (x[i - 1] + x[i + 1]) / 2
return x
@@ -59,8 +59,8 @@ def _filter_outlier(x, ind_list):
SELECT * FROM {}
""".format(input_table_name)
df = hive_context.sql(command)
df = df.withColumn("indice", array([fn.lit(x) for x in outlier_indice]))
df = df.withColumn('ts', udf(_filter_outlier, ArrayType(IntegerType()))(df['ts'], df['indice']))
df = df.withColumn("indices", array([fn.lit(int(x)) for x in outlier_indices]))
df = df.withColumn('ts', udf(_filter_outlier, ArrayType(IntegerType()))(df['ts'], df['indices']))
write_to_table(df, outlier_table, mode='overwrite')


0 comments on commit 2f14296

Please sign in to comment.