Skip to content
This repository has been archived by the owner on Mar 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #15 from radibnia77/main
Browse files Browse the repository at this point in the history
update
  • Loading branch information
radibnia77 committed Nov 10, 2021
2 parents d40c934 + b5cc16c commit ccbbf99
Show file tree
Hide file tree
Showing 27 changed files with 2,935 additions and 625 deletions.
7 changes: 6 additions & 1 deletion Model/predictor-dl-model/VERSION.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,9 @@

### 1.6
1. Add region and IPL features
2. Add TAG to config file. The whole set of tmp tables are named by product_tag and pipeline_tag. The user does not need to review the name of those tables anymore.
2. Add TAG to config file. The whole set of tmp tables are named by product_tag and pipeline_tag. The user does not need to review the name of those tables anymore.

### 1.7
1. Remove residency from UCKey. The value of residency is repleace by an empty string. The number of commas are still the same.
2. Remove region mapping for IPL.
3. Remove normalization of residency and IPL in main_norm.
76 changes: 0 additions & 76 deletions Model/predictor-dl-model/dags/dlpm_data_prep_dag_05142021_1500.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from pyspark import SparkContext, SparkConf,SQLContext
from pyspark.sql.functions import count, lit, col, udf, expr, collect_list, explode
from pyspark.sql.types import IntegerType, StringType, MapType, ArrayType, BooleanType,FloatType
from pyspark.sql import HiveContext
from datetime import datetime, timedelta
from pyspark.sql.functions import broadcast

def _list_to_map(count_array):
count_map = {}
for item in count_array:
key_value = item.split(':')
count_map[key_value[0]] = key_value[1]
return count_map


def add_count_map(df):
# Convert count_array to count_map
list_to_map_udf = udf(_list_to_map, MapType(
StringType(), StringType(), False))
df = df.withColumn('count_map', list_to_map_udf(df.count_array))
return df

def variance(plist):
l=len(plist)
ex=sum(plist)/l
ex2=sum([i*i for i in plist])/l
return ex2-ex*ex



query="select count_array,day,uckey from factdata where day in ('2020-05-15','2020-05-14','2020-05-13','2020-05-12','2020-05-11','2020-05-10','2020-05-09')"
sc = SparkContext()
hive_context = HiveContext(sc)

df = hive_context.sql(query)
df = add_count_map(df)

df = df.select('uckey', 'day', explode(df.count_map)).withColumnRenamed("value", "impr_count")

df = df.withColumn('impr_count', udf(lambda x: int(x), IntegerType())(df.impr_count))
df = df.groupBy('uckey', 'day').sum('impr_count').withColumnRenamed("sum(impr_count)", 'impr_count')


split_uckey_udf = udf(lambda x: x.split(","), ArrayType(StringType()))
df = df.withColumn('col', split_uckey_udf(df.uckey))
df = df.select('uckey','impr_count', 'day', df.col[1]).withColumnRenamed("col[1]", 'slot_id')


df_slot=df.select('slot_id','impr_count', 'day')
df_slot=df_slot.groupBy('slot_id','day').sum('impr_count').withColumnRenamed("sum(impr_count)", "impr_total")
bc_df_slot = broadcast(df_slot)

df_new = df.join(bc_df_slot, on=["slot_id",'day'],how="inner")

df_new = df_new.withColumn('percent', udf(lambda x,y: (x*100)/y, FloatType())(df_new.impr_count,df_new.impr_total))


df2=df_new.groupBy("uckey").agg(collect_list('percent').alias('percent'))
df2 = df2.withColumn('var', udf(lambda x: variance(x), FloatType())(df2.percent))
df2.select("uckey","var").orderBy(["var"],ascending=False).show(300,truncate=False)
df2.cache()
print("% uckeys having varience > 0.01 ",df2.filter((df2.var <= 0.01)).count()*100/df2.count())

119 changes: 56 additions & 63 deletions Model/predictor-dl-model/predictor_dl_model/config.yml
Original file line number Diff line number Diff line change
@@ -1,54 +1,51 @@
product_tag: 'dlpm'
pipeline_tag: '08092021_1200' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
factdata_table_name: 'factdata_09202021' #factdata_hq_09222020
pipeline_tag: '111021_no_residency_no_mapping' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
factdata_table_name: 'factdata_10202021' #factdata_hq_09222020

log:
level: 'WARN' # log level for spark and app
level: 'warn' # log level for spark and app

pipeline:
config_table: '{product_tag}_{pipeline_tag}_config'
filter: # This is for data filtering- si and region
percentile: 10 # This is for filtering traffic less than 1/10 of average traffic
region_mapping_table: 'ipl_region_mapping_09282021' #region_mapping_01012020
output_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
init_start_bucket: 0
bucket_size: 1000
bucket_step: 100
new_bucket_size: 10
condition: ''
new_si_list: ['15e9ddce941b11e5bdec00163e291137',
'66bcd2720e5011e79bc8fa163e05184e',
'7b0d7b55ab0c11e68b7900163e3e481d',
'a8syykhszz',
'w3wx3nv9ow5i97',
'x2fpfbm8rt',
'17dd6d8098bf11e5bdec00163e291137',
'5cd1c663263511e6af7500163e291137',
'68bcd2720e5011e79bc8fa163e05184e',
'71bcd2720e5011e79bc8fa163e05184e',
'a290af82884e11e5bdec00163e291137',
'a47eavw7ex',
'b6le0s4qo8',
'd4d7362e879511e5bdec00163e291137',
'd971z9825e',
'd9jucwkpr3',
'e351de37263311e6af7500163e291137',
'f1iprgyl13',
'j1430itab9wj3b',
'k4werqx13k',
'l03493p0r3',
'l2d4ec6csv',
'p7gsrebd4m',
's4z85pd1h8',
'w9fmyd5r0i',
'x0ej5xhk60kjwq',
'z041bf6g4s']
new_si_list: ['a47eavw7ex',
'66bcd2720e5011e79bc8fa163e05184e',
'x0ej5xhk60kjwq',
'l03493p0r3',
'7b0d7b55ab0c11e68b7900163e3e481d',
'b6le0s4qo8',
'e351de37263311e6af7500163e291137',
'a290af82884e11e5bdec00163e291137',
'68bcd2720e5011e79bc8fa163e05184e',
'f1iprgyl13',
'w9fmyd5r0i',
'w3wx3nv9ow5i97',
'd971z9825e',
'l2d4ec6csv',
'z041bf6g4s',
'71bcd2720e5011e79bc8fa163e05184e',
'5cd1c663263511e6af7500163e291137',
'x2fpfbm8rt',
'd9jucwkpr3',
'k4werqx13k',
'j1430itab9wj3b',
'a8syykhszz',
's4z85pd1h8',
'17dd6d8098bf11e5bdec00163e291137',
'd4d7362e879511e5bdec00163e291137']

time_series: # This is done on whole bucketized data
input_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
conditions: []
yesterday: "2020-06-10" # data is used for training from -<prepare_past_days> to -1(yesterday)
prepare_past_days: 102
yesterday: "2021-07-21" # data is used for training from -<prepare_past_days> to -1(yesterday)
prepare_past_days: 82 # this should be equal to duration.tfrecorder_reader
bucket_size: 10 # maximum number of buckets to process starting from 0
bucket_step: 1 # size of bucket batch that is processed in one iteration
output_table_name: '{product_tag}_{pipeline_tag}_tmp_ts' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
Expand All @@ -73,36 +70,32 @@ pipeline:
'a': ['','1','2','3','4','5','6'],
'g':['','g_f','g_m','g_x'],
't':['UNKNOWN','3G','4G','WIFI','2G'],
'r':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
'ipl':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
'si':[
'15e9ddce941b11e5bdec00163e291137',
'66bcd2720e5011e79bc8fa163e05184e',
'7b0d7b55ab0c11e68b7900163e3e481d',
'a8syykhszz',
'w3wx3nv9ow5i97',
'x2fpfbm8rt',
'17dd6d8098bf11e5bdec00163e291137',
'5cd1c663263511e6af7500163e291137',
'68bcd2720e5011e79bc8fa163e05184e',
'71bcd2720e5011e79bc8fa163e05184e',
'a290af82884e11e5bdec00163e291137',
'a47eavw7ex',
'b6le0s4qo8',
'd4d7362e879511e5bdec00163e291137',
'd971z9825e',
'd9jucwkpr3',
'e351de37263311e6af7500163e291137',
'f1iprgyl13',
'j1430itab9wj3b',
'k4werqx13k',
'l03493p0r3',
'l2d4ec6csv',
'p7gsrebd4m',
's4z85pd1h8',
'w9fmyd5r0i',
'x0ej5xhk60kjwq',
'z041bf6g4s']
'66bcd2720e5011e79bc8fa163e05184e',
'x0ej5xhk60kjwq',
'l03493p0r3',
'7b0d7b55ab0c11e68b7900163e3e481d',
'b6le0s4qo8',
'e351de37263311e6af7500163e291137',
'a290af82884e11e5bdec00163e291137',
'68bcd2720e5011e79bc8fa163e05184e',
'f1iprgyl13',
'w9fmyd5r0i',
'w3wx3nv9ow5i97',
'd971z9825e',
'l2d4ec6csv',
'z041bf6g4s',
'71bcd2720e5011e79bc8fa163e05184e',
'5cd1c663263511e6af7500163e291137',
'x2fpfbm8rt',
'd9jucwkpr3',
'k4werqx13k',
'j1430itab9wj3b',
'a8syykhszz',
's4z85pd1h8',
'17dd6d8098bf11e5bdec00163e291137',
'd4d7362e879511e5bdec00163e291137']
}
holidays: ['2019-11-09', '2019-11-10', '2019-11-11', '2019-11-25', '2019-11-26', '2019-11-27','2019-11-28', '2019-12-24','2019-12-25', '2019-12-26','2019-12-31', '2020-01-01', '2020-01-02', '2020-01-19','2020-01-20', '2020-01-21', '2020-01-22', '2020-01-23', '2020-01-24', '2020-01-25', '2020-02-08']
tfrecords:
Expand All @@ -121,7 +114,7 @@ tfrecorder_reader:
end: '' # help="Effective end date. Data past the end is dropped"
corr_backoffset: 0 # default=0, type=int, help="Offset for correlation calculation"
batch_size: 11880 # batch size of exmaples in tfrecord
duration: 90 # time series length, This has to less or equal prepare_past_days
duration: 82 # time series length, This has to less or equal prepare_past_days
tf_statistics_path: './tf_statistics_{pipeline_tag}.pkl'

trainer:
Expand Down Expand Up @@ -149,7 +142,7 @@ trainer:
max_steps: 20000 # type=int, help="Stop training after max steps"
save_from_step: 100 # type=int, help="Save model on each evaluation (10 evals per epoch), starting from this step"
predict_window: 10 # default=3, type=int, help="Number of days to predict"
back_offset: 20
back_offset: 0 # don't change it.

save_model:
table: '{product_tag}_{pipeline_tag}_model_stat'
Expand Down
Loading

0 comments on commit ccbbf99

Please sign in to comment.