Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
update
remove residency
remove region mapping for ipl
  • Loading branch information
radibnia77 committed Nov 10, 2021
1 parent d40c934 commit b5cc16cee37b2d1cbb1c8ff2503791eb4d1a9628
Showing 27 changed files with 2,935 additions and 625 deletions.
@@ -9,4 +9,9 @@

### 1.6
1. Add region and IPL features
2. Add TAG to config file. The whole set of tmp tables are named by product_tag and pipeline_tag. The user does not need to review the name of those tables anymore.
2. Add TAG to config file. The whole set of tmp tables are named by product_tag and pipeline_tag. The user does not need to review the name of those tables anymore.

### 1.7
1. Remove residency from UCKey. The value of residency is repleace by an empty string. The number of commas are still the same.
2. Remove region mapping for IPL.
3. Remove normalization of residency and IPL in main_norm.

This file was deleted.

@@ -0,0 +1,63 @@
from pyspark import SparkContext, SparkConf,SQLContext
from pyspark.sql.functions import count, lit, col, udf, expr, collect_list, explode
from pyspark.sql.types import IntegerType, StringType, MapType, ArrayType, BooleanType,FloatType
from pyspark.sql import HiveContext
from datetime import datetime, timedelta
from pyspark.sql.functions import broadcast

def _list_to_map(count_array):
count_map = {}
for item in count_array:
key_value = item.split(':')
count_map[key_value[0]] = key_value[1]
return count_map


def add_count_map(df):
# Convert count_array to count_map
list_to_map_udf = udf(_list_to_map, MapType(
StringType(), StringType(), False))
df = df.withColumn('count_map', list_to_map_udf(df.count_array))
return df

def variance(plist):
l=len(plist)
ex=sum(plist)/l
ex2=sum([i*i for i in plist])/l
return ex2-ex*ex



query="select count_array,day,uckey from factdata where day in ('2020-05-15','2020-05-14','2020-05-13','2020-05-12','2020-05-11','2020-05-10','2020-05-09')"
sc = SparkContext()
hive_context = HiveContext(sc)

df = hive_context.sql(query)
df = add_count_map(df)

df = df.select('uckey', 'day', explode(df.count_map)).withColumnRenamed("value", "impr_count")

df = df.withColumn('impr_count', udf(lambda x: int(x), IntegerType())(df.impr_count))
df = df.groupBy('uckey', 'day').sum('impr_count').withColumnRenamed("sum(impr_count)", 'impr_count')


split_uckey_udf = udf(lambda x: x.split(","), ArrayType(StringType()))
df = df.withColumn('col', split_uckey_udf(df.uckey))
df = df.select('uckey','impr_count', 'day', df.col[1]).withColumnRenamed("col[1]", 'slot_id')


df_slot=df.select('slot_id','impr_count', 'day')
df_slot=df_slot.groupBy('slot_id','day').sum('impr_count').withColumnRenamed("sum(impr_count)", "impr_total")
bc_df_slot = broadcast(df_slot)

df_new = df.join(bc_df_slot, on=["slot_id",'day'],how="inner")

df_new = df_new.withColumn('percent', udf(lambda x,y: (x*100)/y, FloatType())(df_new.impr_count,df_new.impr_total))


df2=df_new.groupBy("uckey").agg(collect_list('percent').alias('percent'))
df2 = df2.withColumn('var', udf(lambda x: variance(x), FloatType())(df2.percent))
df2.select("uckey","var").orderBy(["var"],ascending=False).show(300,truncate=False)
df2.cache()
print("% uckeys having varience > 0.01 ",df2.filter((df2.var <= 0.01)).count()*100/df2.count())

@@ -1,54 +1,51 @@
product_tag: 'dlpm'
pipeline_tag: '08092021_1200' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
factdata_table_name: 'factdata_09202021' #factdata_hq_09222020
pipeline_tag: '111021_no_residency_no_mapping' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
factdata_table_name: 'factdata_10202021' #factdata_hq_09222020

log:
level: 'WARN' # log level for spark and app
level: 'warn' # log level for spark and app

pipeline:
config_table: '{product_tag}_{pipeline_tag}_config'
filter: # This is for data filtering- si and region
percentile: 10 # This is for filtering traffic less than 1/10 of average traffic
region_mapping_table: 'ipl_region_mapping_09282021' #region_mapping_01012020
output_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
init_start_bucket: 0
bucket_size: 1000
bucket_step: 100
new_bucket_size: 10
condition: ''
new_si_list: ['15e9ddce941b11e5bdec00163e291137',
'66bcd2720e5011e79bc8fa163e05184e',
'7b0d7b55ab0c11e68b7900163e3e481d',
'a8syykhszz',
'w3wx3nv9ow5i97',
'x2fpfbm8rt',
'17dd6d8098bf11e5bdec00163e291137',
'5cd1c663263511e6af7500163e291137',
'68bcd2720e5011e79bc8fa163e05184e',
'71bcd2720e5011e79bc8fa163e05184e',
'a290af82884e11e5bdec00163e291137',
'a47eavw7ex',
'b6le0s4qo8',
'd4d7362e879511e5bdec00163e291137',
'd971z9825e',
'd9jucwkpr3',
'e351de37263311e6af7500163e291137',
'f1iprgyl13',
'j1430itab9wj3b',
'k4werqx13k',
'l03493p0r3',
'l2d4ec6csv',
'p7gsrebd4m',
's4z85pd1h8',
'w9fmyd5r0i',
'x0ej5xhk60kjwq',
'z041bf6g4s']
new_si_list: ['a47eavw7ex',
'66bcd2720e5011e79bc8fa163e05184e',
'x0ej5xhk60kjwq',
'l03493p0r3',
'7b0d7b55ab0c11e68b7900163e3e481d',
'b6le0s4qo8',
'e351de37263311e6af7500163e291137',
'a290af82884e11e5bdec00163e291137',
'68bcd2720e5011e79bc8fa163e05184e',
'f1iprgyl13',
'w9fmyd5r0i',
'w3wx3nv9ow5i97',
'd971z9825e',
'l2d4ec6csv',
'z041bf6g4s',
'71bcd2720e5011e79bc8fa163e05184e',
'5cd1c663263511e6af7500163e291137',
'x2fpfbm8rt',
'd9jucwkpr3',
'k4werqx13k',
'j1430itab9wj3b',
'a8syykhszz',
's4z85pd1h8',
'17dd6d8098bf11e5bdec00163e291137',
'd4d7362e879511e5bdec00163e291137']

time_series: # This is done on whole bucketized data
input_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
conditions: []
yesterday: "2020-06-10" # data is used for training from -<prepare_past_days> to -1(yesterday)
prepare_past_days: 102
yesterday: "2021-07-21" # data is used for training from -<prepare_past_days> to -1(yesterday)
prepare_past_days: 82 # this should be equal to duration.tfrecorder_reader
bucket_size: 10 # maximum number of buckets to process starting from 0
bucket_step: 1 # size of bucket batch that is processed in one iteration
output_table_name: '{product_tag}_{pipeline_tag}_tmp_ts' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
@@ -73,36 +70,32 @@ pipeline:
'a': ['','1','2','3','4','5','6'],
'g':['','g_f','g_m','g_x'],
't':['UNKNOWN','3G','4G','WIFI','2G'],
'r':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
'ipl':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
'si':[
'15e9ddce941b11e5bdec00163e291137',
'66bcd2720e5011e79bc8fa163e05184e',
'7b0d7b55ab0c11e68b7900163e3e481d',
'a8syykhszz',
'w3wx3nv9ow5i97',
'x2fpfbm8rt',
'17dd6d8098bf11e5bdec00163e291137',
'5cd1c663263511e6af7500163e291137',
'68bcd2720e5011e79bc8fa163e05184e',
'71bcd2720e5011e79bc8fa163e05184e',
'a290af82884e11e5bdec00163e291137',
'a47eavw7ex',
'b6le0s4qo8',
'd4d7362e879511e5bdec00163e291137',
'd971z9825e',
'd9jucwkpr3',
'e351de37263311e6af7500163e291137',
'f1iprgyl13',
'j1430itab9wj3b',
'k4werqx13k',
'l03493p0r3',
'l2d4ec6csv',
'p7gsrebd4m',
's4z85pd1h8',
'w9fmyd5r0i',
'x0ej5xhk60kjwq',
'z041bf6g4s']
'66bcd2720e5011e79bc8fa163e05184e',
'x0ej5xhk60kjwq',
'l03493p0r3',
'7b0d7b55ab0c11e68b7900163e3e481d',
'b6le0s4qo8',
'e351de37263311e6af7500163e291137',
'a290af82884e11e5bdec00163e291137',
'68bcd2720e5011e79bc8fa163e05184e',
'f1iprgyl13',
'w9fmyd5r0i',
'w3wx3nv9ow5i97',
'd971z9825e',
'l2d4ec6csv',
'z041bf6g4s',
'71bcd2720e5011e79bc8fa163e05184e',
'5cd1c663263511e6af7500163e291137',
'x2fpfbm8rt',
'd9jucwkpr3',
'k4werqx13k',
'j1430itab9wj3b',
'a8syykhszz',
's4z85pd1h8',
'17dd6d8098bf11e5bdec00163e291137',
'd4d7362e879511e5bdec00163e291137']
}
holidays: ['2019-11-09', '2019-11-10', '2019-11-11', '2019-11-25', '2019-11-26', '2019-11-27','2019-11-28', '2019-12-24','2019-12-25', '2019-12-26','2019-12-31', '2020-01-01', '2020-01-02', '2020-01-19','2020-01-20', '2020-01-21', '2020-01-22', '2020-01-23', '2020-01-24', '2020-01-25', '2020-02-08']
tfrecords:
@@ -121,7 +114,7 @@ tfrecorder_reader:
end: '' # help="Effective end date. Data past the end is dropped"
corr_backoffset: 0 # default=0, type=int, help="Offset for correlation calculation"
batch_size: 11880 # batch size of exmaples in tfrecord
duration: 90 # time series length, This has to less or equal prepare_past_days
duration: 82 # time series length, This has to less or equal prepare_past_days
tf_statistics_path: './tf_statistics_{pipeline_tag}.pkl'

trainer:
@@ -149,7 +142,7 @@ trainer:
max_steps: 20000 # type=int, help="Stop training after max steps"
save_from_step: 100 # type=int, help="Save model on each evaluation (10 evals per epoch), starting from this step"
predict_window: 10 # default=3, type=int, help="Number of days to predict"
back_offset: 20
back_offset: 0 # don't change it.

save_model:
table: '{product_tag}_{pipeline_tag}_model_stat'

0 comments on commit b5cc16c

Please sign in to comment.