Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
release-1.6
Have both IPL and residency in Pre-Process
Add pipeline and product tag to config.yml
Save model-stats in Hive file
  • Loading branch information
spyglass700 committed Nov 13, 2020
1 parent c0a9552 commit 704f463edd4789e60fef06601cb55304bf557fc5
Showing 38 changed files with 983 additions and 743 deletions.
@@ -17,14 +17,16 @@ pip install -r requirements.txt
1. Download the blue-martin/models project
2. Transfer the predictor_dl_model directory to ~/code/predictor_dl_model/ on a GPU machine which also has Spark Client.
3. cd predictor_dl_model
4. pip install -r requirements.txt (to install required packages)
4. pip install -r requirements.txt to install required packages. These packages are install on top of python using pip.
5. python setup install (to install predictor_dl_model package)
3. Follow the steps in ~/code/predictor_dl_model/datagen/README.md to generate data
4. Go to directory ~/code/predictor_dl_model/predictor_dl_model
5. Run run.sh or each script individually
6. (optional) python set_up.py bdist_egg (to create .egg file to provide to spark-submit)
7. Follow the steps in ~/code/predictor_dl_model/datagen/README.md to generate data
8. Go to directory ~/code/predictor_dl_model/predictor_dl_model
9. Run run.sh or each script individually


### Documentation
Documentation is provided through comments in config.yml and README files

###note
### Note
saved_model_cli show --dir <model_dir>/<version> --all
@@ -5,4 +5,8 @@
a. Remove invalid si
b. Remove region
c. Remaps ip based on ip-mapping table
d. Recalculate bucket-id
d. Recalculate bucket-id

### 1.6
1. Add residency and IPL features
2. Add pipeline and product tags to config file. The whole set of tmp tables are named by product_tag and pipeline_tag. The user does not need to review the name of those tables anymore.
@@ -1,14 +1,17 @@
product_tag: 'dlpm'
pipeline_tag: '11092020'
factdata_table_name: 'factdata_hq_09222020_bucket_0'

log:
level: 'INFO' # log level for spark and app
level: 'WARN' # log level for spark and app

pipeline:
filter: # This is for data filtering- si and region
factdata_table_name: 'factdata_hq_09222020'
region_mapping_table: 'region_mapping'
output_table_name: "factdata_hq_09222020_filtered_si_region_bucket_09292020_test_1"
output_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
init_start_bucket: 0
bucket_size: 1000
bucket_step: 50
bucket_step: 100
new_bucket_size: 10
condition: ''
new_si_list: [
@@ -53,17 +56,17 @@ pipeline:
'd4d7362e879511e5bdec00163e291137',
'15e9ddce941b11e5bdec00163e291137' ]
time_series: # This is done on whole bucketized data
factdata_table_name: "factdata_hq_09222020_filtered_si_region_bucket_09292020_test_1"
input_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
conditions: []
yesterday: "2020-05-31" # data is used for training from -<prepare_past_days> to -1(yesterday)
prepare_past_days: 90
bucket_size: 10 # maximum number of buckets to process starting from 0
bucket_step: 1 # size of bucket batch that is processed in one iteration
output_table_name: 'pipeline_ts_tmp_06262020_test_1' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
output_table_name: '{product_tag}_{pipeline_tag}_tmp_ts' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
uckey_clustring: # This is done on whole data, not slicing on buckets
pre_cluster_table_name: 'pipeline_pre_cluster_tmp_09292020_test_1'
pre_cluster_table_name: '{product_tag}_{pipeline_tag}_tmp_pre_cluster'
create_pre_cluster_table: True
output_table_name: 'pipeline_cluster_tmp_09292020_test_1'
output_table_name: '{product_tag}_{pipeline_tag}_tmp_cluster'
cluster_size:
number_of_virtual_clusters: 1000
datapoints_min_th: 0.15
@@ -73,25 +76,26 @@ pipeline:
popularity_th: 5
median_popularity_of_dense: 1856.2833251953125 # median imp of sparse=False, calculate once
normalization: # This is done on whole data, not slicing on buckets
output_table_name: 'trainready_tmp_09292020_test_1'
output_table_name: '{product_tag}_{pipeline_tag}_trainready'
columns: {
'price_cat':['1','2','3'],
'a': ['','1','2','3','4','5','6'],
'g':['','g_f','g_m','g_x'],
't':['UNKNOWN','3G','4G','WIFI','2G'],
'r':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
'ipl':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
'si':['d4d7362e879511e5bdec00163e291137', 'b6le0s4qo8', 'd47737w664', 'd971z9825e', '72bcd2720e5011e79bc8fa163e05184e', 'j1430itab9wj3b', 'w3wx3nv9ow5i97', 'g9iv6p4sjy', '71bcd2720e5011e79bc8fa163e05184e', '7b0d7b55ab0c11e68b7900163e3e481d', 'm1040xexan', 'x2fpfbm8rt', '05', '66bcd2720e5011e79bc8fa163e05184e', 'g7m2zuits8', 'l2d4ec6csv', 'a8syykhszz', 'w9fmyd5r0i', 'a47eavw7ex', 'p7gsrebd4m', 'q4jtehrqn2', '03', 'l03493p0r3', 's4z85pd1h8', 'f1iprgyl13', '17dd6d8098bf11e5bdec00163e291137', 'e351de37263311e6af7500163e291137', '68bcd2720e5011e79bc8fa163e05184e', '5cd1c663263511e6af7500163e291137', 'k4werqx13k', 'x0ej5xhk60kjwq', '04', 'a290af82884e11e5bdec00163e291137', '15e9ddce941b11e5bdec00163e291137', 'z041bf6g4s', 'd9jucwkpr3', 'c4n08ku47t']
}
holidays: ['2019-11-09', '2019-11-10', '2019-11-11', '2019-11-25', '2019-11-26', '2019-11-27','2019-11-28', '2019-12-24','2019-12-25', '2019-12-26','2019-12-31', '2020-01-01', '2020-01-02', '2020-01-19','2020-01-20', '2020-01-21', '2020-01-22', '2020-01-23', '2020-01-24', '2020-01-25', '2020-02-08']
tfrecords:
tfrecords_hdfs_path: 'factdata.tfrecord.09292020_test_1' # it is hdfs location for tfrecords, over-writes the existing files
tf_statistics_path: 'tf_statistics.pkl'
tfrecords_hdfs_path: 'factdata.tfrecord.{pipeline_tag}' # it is hdfs location for tfrecords, over-writes the existing files
tf_statistics_path: './tf_statistics_{pipeline_tag}.pkl'
distribution:
output_table_name: 'pipeline_distribution_tmp_09292020_test_1'
output_detail_table_name: 'pipeline_distribution_detail_tmp_09292020_test_1'
output_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution'
output_detail_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution_detail'

tfrecorder_reader:
tfrecords_local_path: 'factdata.tfrecord.09292020_test_1' # it us local path for tfrecords, over-writes the existing files
tfrecords_local_path: './factdata.tfrecord.{pipeline_tag}' # it us local path for tfrecords, over-writes the existing files
data_dir: 'data/vars'
valid_threshold: 0.0 # default=0.0, type=float, help="Series minimal length threshold (pct of data length)"
add_days: 0 # default=64, type=int, help="Add N days in a future for prediction"
@@ -100,7 +104,7 @@ tfrecorder_reader:
corr_backoffset: 0 # default=0, type=int, help="Offset for correlation calculation"
batch_size: 11880 # batch size of exmaples in tfrecord
duration: 90 # time series length, This has to less or equal prepare_past_days
tf_statistics_path: '/home/faezeh/tf_statistics_test_1.pkl'
tf_statistics_path: './tf_statistics_{pipeline_tag}.pkl'

trainer:
name: 's32' # default='s32', help='Model name to identify different logs/checkpoints'
@@ -129,10 +133,12 @@ trainer:
predict_window: 10 # default=3, type=int, help="Number of days to predict"

save_model:
table: '{product_tag}_{pipeline_tag}_model_stat'
data_dir: data/vars
ckpt_dir: data/cpt/s32
saved_dir: data/vars
model_version: 1
model_version: 'version_{pipeline_tag}'
model_name: 'model_{product_tag}_{pipeline_tag}'
train_window: 60 # Should be same as the one in hparams

elastic_search:
@@ -7,3 +7,15 @@ Pipeline takes the following steps:
4. Writes model into local directory
5. Compare the new model and old model (new model evaluation)(future)
6. Set the predictor to use the new model - predictor reads the name of the model that it uses from Ealsticsearch (future)

### UCKEY Elements
uckey consists of the following items.

ucdoc.m = parts[0] #media-type
ucdoc.si = parts[1] #slot-id
ucdoc.t = parts[2] #connection-type
ucdoc.g = parts[3] #gender
ucdoc.a = parts[4] #age
ucdoc.pm = parts[5] #price-model
ucdoc.r = parts[6] #resident-location
ucdoc.ipl = parts[7] #ip-location
@@ -30,12 +30,13 @@
from pyspark.sql.types import IntegerType, StringType, MapType, ArrayType, FloatType, BooleanType
from pyspark.sql import HiveContext
from datetime import datetime, timedelta
from util import resolve_placeholder


import transform as transform


def _save_as_table(df, table_name, hive_context, create_table):
def __save_as_table(df, table_name, hive_context, create_table):

if create_table:
command = """
@@ -93,7 +94,7 @@ def agg_ts(mlist):

def agg_on_uckey_price_cat(df):

column_names = ['ts', 'a', 'g', 't', 'si', 'r']
column_names = ['ts', 'a', 'g', 't', 'si', 'r', 'ipl']
agg_exprs = [collect_list(col).alias(col) for col in column_names]
df = df.groupBy('uckey', 'price_cat').agg(*agg_exprs)

@@ -151,7 +152,7 @@ def run(hive_context, cluster_size_cfg, input_table_name, pre_cluster_table_name

# Read factdata table
command = """
select ts,price_cat,uckey,a,g,t,si,r from {}
select ts, price_cat, uckey, a, g, t, si, r, ipl from {}
""".format(input_table_name)

# DataFrame[uckey: string, price_cat: string, ts: array<int>, a: string, g: string, t: string, si: string, r: string]
@@ -182,7 +183,7 @@ def run(hive_context, cluster_size_cfg, input_table_name, pre_cluster_table_name
number_of_virtual_clusters).cast('int'))

if create_pre_cluster_table:
_save_as_table(df, pre_cluster_table_name, hive_context, True)
__save_as_table(df, pre_cluster_table_name, hive_context, True)

# change the uckey of sparse to cn
df = df.withColumn('new_uckey', udf(lambda uckey, cn, sparse: str(
@@ -209,7 +210,7 @@ def run(hive_context, cluster_size_cfg, input_table_name, pre_cluster_table_name
df = df.filter(udf(lambda p_n, ts: not is_spare(datapoints_th_clusters, -
sys.maxsize-1)(p_n, ts), BooleanType())(df.p_n, df.ts))

_save_as_table(df, output_table_name, hive_context, True)
__save_as_table(df, output_table_name, hive_context, True)


if __name__ == "__main__":
@@ -220,7 +221,8 @@ def run(hive_context, cluster_size_cfg, input_table_name, pre_cluster_table_name

# Load config file
with open(args.config_file, 'r') as ymlfile:
cfg = yaml.load(ymlfile)
cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
resolve_placeholder(cfg)

cfg_log = cfg['log']
cfg = cfg['pipeline']
@@ -27,12 +27,14 @@
from pyspark.context import SparkContext
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType, StringType
from util import resolve_placeholder


def load_df(hc, table_name):
command = """select * from {}""".format(table_name)
return hc.sql(command)


def __calcualte_distrib_ratio(uckey_sum, cluster_uckey_sum):
if cluster_uckey_sum:
return 1.0 * uckey_sum / cluster_uckey_sum
@@ -41,7 +43,9 @@ def __calcualte_distrib_ratio(uckey_sum, cluster_uckey_sum):
# 1.0 * resolved the division issue of python2 to avoid 0 values.

# the major entry function for generating distributions between sparse uckeys and cluster (virtual) uckeys.
def run(df_pre_cluster, df_cluster, table_distrib_output, table_distrib_detail, is_sparse = True):


def run(df_pre_cluster, df_cluster, table_distrib_output, table_distrib_detail, is_sparse=True):
"""
pipeline_pre_cluster: (cn is the cluster id, an integer, and it is a string as uckey in df_cluster)
+--------------------+---------+--------------------+---+---+---+---+---+---+-----------+------------+------+---+
@@ -58,34 +62,35 @@ def run(df_pre_cluster, df_cluster, table_distrib_output, table_distrib_detail,
| 1039| 1|[2385, 1846, 2182...|[0 -> 0.071428575...|[ -> 0.04761905, ...|[4G -> 0.16666667...|[11 -> 0.02380952...|[ -> 1.0]|153697|1707.7444|
+-----+---------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+------+---------+
"""
#filter out the sparse uckeys and make sure type of the "cn" column is string.
# filter out the sparse uckeys and make sure type of the "cn" column is string.
df_pre = df_pre_cluster.filter(col("sparse") == is_sparse).select('uckey', 'price_cat', 'imp', 'cn')
df_pre = df_pre.withColumn("cn", df_pre["cn"].cast(StringType()))
df_cluster = df_cluster.withColumnRenamed('uckey', 'cluster_uckey').withColumnRenamed('imp', 'cluster_imp')
#join the two tables to make all the data ready and remove some repeated columns.
df_join = df_pre.join(df_cluster, [df_pre.cn == df_cluster.cluster_uckey if is_sparse else
df_pre.uckey == df_cluster.cluster_uckey,
df_pre.price_cat == df_cluster.price_cat],
how = "inner").drop(df_cluster.price_cat)
#calculate the distribution ratio with sparse uckey's total imp and the cluster's total imp.
# join the two tables to make all the data ready and remove some repeated columns.
df_join = df_pre.join(df_cluster, [df_pre.cn == df_cluster.cluster_uckey if is_sparse else
df_pre.uckey == df_cluster.cluster_uckey,
df_pre.price_cat == df_cluster.price_cat],
how="inner").drop(df_cluster.price_cat)
# calculate the distribution ratio with sparse uckey's total imp and the cluster's total imp.
df_join = df_join.withColumn("ratio", udf(__calcualte_distrib_ratio, FloatType())(df_join.imp, df_join.cluster_imp))
#output the final result, save the distribtion table and the details table.
# output the final result, save the distribtion table and the details table.
mode = 'overwrite' if is_sparse else 'append'
df_distrib_output = df_join.select('uckey', 'cluster_uckey', 'price_cat', 'ratio')
df_distrib_output.write.option("header", "true").option("encoding", "UTF-8").mode(mode).format('hive').saveAsTable(table_distrib_output)
df_distrib_detail = df_join.select('uckey', 'cluster_uckey', 'price_cat', 'ratio', 'imp', 'cluster_imp')
df_distrib_detail.write.option("header", "true").option("encoding", "UTF-8").mode(mode).format('hive').saveAsTable(table_distrib_detail)


if __name__ == '__main__':

parser = argparse.ArgumentParser(description='gen distribution')
parser.add_argument('config_file')
args = parser.parse_args()

# Load config file
with open(args.config_file, 'r') as ymlfile:
cfg = yaml.load(ymlfile)
cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
resolve_placeholder(cfg)

cfg_log = cfg['log']
cfg = cfg['pipeline']
@@ -98,15 +103,14 @@ def run(df_pre_cluster, df_cluster, table_distrib_output, table_distrib_detail,
cluster_table_name = cfg['uckey_clustring']['output_table_name']
output_table_name = cfg['distribution']['output_table_name']
output_detail_table_name = cfg['distribution']['output_detail_table_name']

try:
#prepare the two required data frames.
# prepare the two required data frames.
df_pre_cluster = load_df(hive_context, pre_cluster_table_name)
df_cluster = load_df(hive_context, cluster_table_name)

run(df_pre_cluster, df_cluster, output_table_name, output_detail_table_name, is_sparse = True)
#comment out the following line to generate distribution for sparse uckeys only, or execute both.
run(df_pre_cluster, df_cluster, output_table_name, output_detail_table_name, is_sparse = False)
run(df_pre_cluster, df_cluster, output_table_name, output_detail_table_name, is_sparse=True)
# comment out the following line to generate distribution for sparse uckeys only, or execute both.
run(df_pre_cluster, df_cluster, output_table_name, output_detail_table_name, is_sparse=False)
finally:
sc.stop()

0 comments on commit 704f463

Please sign in to comment.