Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Update main_cluster.py
update denoising
  • Loading branch information
radibnia77 committed Oct 4, 2021
1 parent 039eb1c commit 60de731f2182fc6962b5ffeebcd8837feec3a633
Showing 1 changed file with 5 additions and 8 deletions.
@@ -31,6 +31,7 @@
from pyspark.sql import HiveContext
from datetime import datetime, timedelta
from util import resolve_placeholder
from statistics import stdev


import transform as transform
@@ -54,7 +55,6 @@ def __save_as_table(df, table_name, hive_context, create_table):
hive_context.sql(command)



def estimate_number_of_non_dense_clusters(df, median_popularity_of_dense, cluster_dense_num_ratio_cap):
# find avg of non-dense popularity
median_non_dense_p = df.filter('sparse=True').agg(
@@ -146,14 +146,15 @@ def denoise(df, percentile):
df = df.withColumn('nonzero_p', udf(
lambda ts: 1.0 * sum(ts) / len([_ for _ in ts if _ != 0]) if len(
[_ for _ in ts if _ != 0]) != 0 else 0.0, FloatType())(df.ts))
df = df.withColumn('nonzero_sd', udf(lambda ts: stdev([_ for _ in ts if _ !=0]))(df.ts))

df = df.withColumn('ts', udf(lambda ts, nonzero_p: [i if i and i > (nonzero_p / percentile) else 0 for i in ts],
ArrayType(IntegerType()))(df.ts, df.nonzero_p))
df = df.withColumn('ts', udf(lambda ts, nonzero_sd: [i if i and i < (nonzero_sd * 2) else 0 for i in ts],
ArrayType(IntegerType()))(df.ts, df.nonzero_sd))
return df




def run(hive_context, cluster_size_cfg, input_table_name,
pre_cluster_table_name, output_table_name, percentile, create_pre_cluster_table):

@@ -166,7 +167,6 @@ def run(hive_context, cluster_size_cfg, input_table_name,
popularity_th = cluster_size_cfg['popularity_th']
datapoints_min_th = cluster_size_cfg['datapoints_min_th']


# Read factdata table
command = """
SELECT ts, price_cat, uckey, a, g, t, si, r, ipl FROM {}
@@ -254,11 +254,9 @@ def run(hive_context, cluster_size_cfg, input_table_name,

df = df.filter(udf(lambda p_n, ts: not is_spare(datapoints_th_clusters, -sys.maxsize - 1)(p_n, ts), BooleanType())(df.p_n, df.ts))

# denoising uckeys: remove some datapoints of the uckey
# denoising uckeys: remove some datapoints of the uckey. keep the data between upper and lower bound
df = denoise(df, percentile)



__save_as_table(df, output_table_name, hive_context, True)


@@ -287,7 +285,6 @@ def run(hive_context, cluster_size_cfg, input_table_name,
input_table_name = cfg['time_series']['outlier_table']
cluster_size_cfg = cfg['uckey_clustering']['cluster_size']


run(hive_context=hive_context,
cluster_size_cfg=cluster_size_cfg,
input_table_name=input_table_name,

0 comments on commit 60de731

Please sign in to comment.