Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
fix bugs of predictor model pipeline
fix denoise
fix distribution ratio
  • Loading branch information
radibnia77 committed Nov 21, 2021
1 parent b5cc16c commit 5dd0225384319db3065536b0d9ee95e6f1dfaff1
Showing 2 changed files with 29 additions and 21 deletions.
@@ -147,13 +147,13 @@ def denoise(df, percentile):
df = df.withColumn('nonzero_p', udf(
lambda ts: 1.0 * sum(ts) / len([_ for _ in ts if _ != 0]) if len(
[_ for _ in ts if _ != 0]) != 0 else 0.0, FloatType())(df.ts))
df = df.withColumn('nonzero_sd', udf(
lambda ts: stdev([_ for _ in ts if _ != 0]))(df.ts))
# df = df.withColumn('nonzero_sd', udf(
# lambda ts: stdev([_ for _ in ts if _ != 0]), FloatType())(df.ts))

df = df.withColumn('ts', udf(lambda ts, nonzero_p: [i if i and i > (nonzero_p / percentile) else 0 for i in ts],
ArrayType(IntegerType()))(df.ts, df.nonzero_p))
df = df.withColumn('ts', udf(lambda ts, nonzero_sd: [i if i and i < (nonzero_sd * 2) else 0 for i in ts],
ArrayType(IntegerType()))(df.ts, df.nonzero_sd))
# df = df.withColumn('ts', udf(lambda ts, nonzero_sd: [i if i and i < (nonzero_sd * 2) else 0 for i in ts],
# ArrayType(IntegerType()))(df.ts, df.nonzero_sd))
return df


@@ -5,7 +5,7 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
@@ -16,13 +16,17 @@

# accumulation of traffic from sparse uckeys to group uckeys and generate distribution ratios.
import argparse
from statistics import mode
import yaml

from pyspark.sql import HiveContext
from pyspark.context import SparkContext
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType, StringType
from util import resolve_placeholder
from pyspark.sql.window import Window

import pyspark.sql.functions as fn


def load_df(hc, table_name):
@@ -40,7 +44,7 @@ def __calcualte_distrib_ratio(uckey_sum, cluster_uckey_sum):
# the major entry function for generating distributions between sparse uckeys and cluster (virtual) uckeys.


def run(df_pre_cluster, df_cluster, table_distrib_output, table_distrib_detail, is_sparse=True):
def run(df_pre_cluster, df_cluster, table_distrib_output, table_distrib_detail, is_sparse=True, mode='overwrite'):
"""
pipeline_pre_cluster: (cn is the cluster id, an integer, and it is a string as uckey in df_cluster)
+--------------------+---------+--------------------+---+---+---+---+---+---+-----------+------------+------+---+
@@ -57,21 +61,26 @@ def run(df_pre_cluster, df_cluster, table_distrib_output, table_distrib_detail,
| 1039| 1|[2385, 1846, 2182...|[0 -> 0.071428575...|[ -> 0.04761905, ...|[4G -> 0.16666667...|[11 -> 0.02380952...|[ -> 1.0]|153697|1707.7444|
+-----+---------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+------+---------+
"""
# filter out the sparse uckeys and make sure type of the "cn" column is string.
df_pre = df_pre_cluster.filter(col("sparse") == is_sparse).select('uckey', 'price_cat', 'imp', 'cn')
df_pre = df_pre.withColumn("cn", df_pre["cn"].cast(StringType()))
df_cluster = df_cluster.withColumnRenamed('uckey', 'cluster_uckey').withColumnRenamed('imp', 'cluster_imp')
# join the two tables to make all the data ready and remove some repeated columns.
df_join = df_pre.join(df_cluster, [df_pre.cn == df_cluster.cluster_uckey if is_sparse else
df_pre.uckey == df_cluster.cluster_uckey,
df_pre.price_cat == df_cluster.price_cat],
how="inner").drop(df_cluster.price_cat)
# calculate the distribution ratio with sparse uckey's total imp and the cluster's total imp.
df_join = df_join.withColumn("ratio", udf(__calcualte_distrib_ratio, FloatType())(df_join.imp, df_join.cluster_imp))

if is_sparse:
df_sparse = df_pre_cluster.filter(col("sparse") == is_sparse).select('cn', 'uckey', 'price_cat')
df_sparse = df_sparse.withColumn("cluster_uckey", df_sparse["cn"].cast(StringType()))
df_cluster = df_cluster.withColumnRenamed('uckey', 'cluster_uckey')
df_join = df_sparse.join(df_cluster, on=['cluster_uckey', 'price_cat'], how="inner")
df_join = df_join.withColumn('cluster_imp', fn.sum('imp').over(Window.partitionBy('cluster_uckey')))
df_join = df_join.withColumn("ratio", udf(__calcualte_distrib_ratio, FloatType())(df_join.imp, df_join.cluster_imp))
else:
df_dense = df_pre_cluster.filter(col("sparse") == is_sparse).select('uckey', 'price_cat')
df_join = df_dense.join(df_cluster, on=['uckey', 'price_cat'], how="inner")
df_join = df_join.withColumn('ratio', fn.lit(1.0))
df_join = df_join.withColumn('cluster_uckey', col('uckey'))
df_join = df_join.withColumn('cluster_imp', col('imp'))

# output the final result, save the distribtion table and the details table.
mode = 'overwrite' if is_sparse else 'append'

df_distrib_output = df_join.select('uckey', 'cluster_uckey', 'price_cat', 'ratio')
df_distrib_output.write.option("header", "true").option("encoding", "UTF-8").mode(mode).format('hive').saveAsTable(table_distrib_output)

df_distrib_detail = df_join.select('uckey', 'cluster_uckey', 'price_cat', 'ratio', 'imp', 'cluster_imp')
df_distrib_detail.write.option("header", "true").option("encoding", "UTF-8").mode(mode).format('hive').saveAsTable(table_distrib_detail)

@@ -104,8 +113,7 @@ def run(df_pre_cluster, df_cluster, table_distrib_output, table_distrib_detail,
df_pre_cluster = load_df(hive_context, pre_cluster_table_name)
df_cluster = load_df(hive_context, cluster_table_name)

run(df_pre_cluster, df_cluster, output_table_name, output_detail_table_name, is_sparse=True)
# comment out the following line to generate distribution for sparse uckeys only, or execute both.
run(df_pre_cluster, df_cluster, output_table_name, output_detail_table_name, is_sparse=False)
run(df_pre_cluster, df_cluster, output_table_name, output_detail_table_name, is_sparse=True, mode='overwrite')
run(df_pre_cluster, df_cluster, output_table_name, output_detail_table_name, is_sparse=False, mode='append')
finally:
sc.stop()

0 comments on commit 5dd0225

Please sign in to comment.