Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge pull request #47 from radibnia77/main
add rti transform module
The module transforms the request table into the impression table.
#44
  • Loading branch information
radibnia77 committed Feb 12, 2022
2 parents b38dcd7 + 004ab49 commit b0d7d70d93cda539c0e83468878d88137c0f8151
Showing 3 changed files with 225 additions and 9 deletions.
@@ -1,12 +1,23 @@
product_tag: 'dlpm'
pipeline_tag: '111021_no_residency_no_mapping' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
factdata_table_name: 'factdata_10202021' #factdata_hq_09222020
factdata_table_name: 'factdata_test_02112022' # factdata_10202021 #factdata_hq_09222020

log:
level: 'warn' # log level for spark and app

pipeline:
config_table: '{product_tag}_{pipeline_tag}_config'

rti_transform: # This is to transform reques-based factdata to impression-based by filling out empty places
default_hour: 7
default_price_cat: '1'
day_step: 2
start_day: '2020-01-01'
end_day: '2020-01-02'
new_bucket_size: 2
input_table: 'factdata_request_01012022'
# output_table is factdata_table_name

filter: # This is for data filtering- si and region
percentile: 10 # This is for filtering traffic less than 1/10 of average traffic
output_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
@@ -51,9 +62,9 @@ pipeline:
output_table_name: '{product_tag}_{pipeline_tag}_tmp_ts' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
outlier_table: '{product_tag}_{pipeline_tag}_tmp_outlier'
uckey_clustering: # This is done on whole data, not slicing on buckets
pre_cluster_table_name: '{product_tag}_{pipeline_tag}_tmp_pre_cluster'
pre_cluster_table_name: '{product_tag}_{pipeline_tag}_tmp_pre_cluster_test_12212021'
create_pre_cluster_table: True
output_table_name: '{product_tag}_{pipeline_tag}_tmp_cluster'
output_table_name: '{product_tag}_{pipeline_tag}_tmp_cluster_test_12212021'
cluster_size:
number_of_virtual_clusters: 1000
cluster_dense_num_ratio_cap: 0.01
@@ -64,7 +75,7 @@ pipeline:
popularity_th: 4
median_popularity_of_dense: 1856.2833251953125 # median imp of sparse=False, calculate once
normalization: # This is done on whole data, not slicing on buckets
output_table_name: '{product_tag}_{pipeline_tag}_trainready'
output_table_name: '{product_tag}_{pipeline_tag}_trainready_test_12212021'
columns: {
'price_cat':['1','2','3'],
'a': ['','1','2','3','4','5','6'],
@@ -102,8 +113,8 @@ pipeline:
tfrecords_hdfs_path: 'factdata.tfrecord.{pipeline_tag}' # it is hdfs location for tfrecords, over-writes the existing files
tf_statistics_path: './tf_statistics_{pipeline_tag}.pkl'
distribution:
output_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution'
output_detail_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution_detail'
output_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution_test_12212021'
output_detail_table_name: '{product_tag}_{pipeline_tag}_tmp_distribution_detail_test_12212021'

tfrecorder_reader:
tfrecords_local_path: './factdata.tfrecord.{pipeline_tag}' # it us local path for tfrecords, over-writes the existing files
@@ -113,7 +124,7 @@ tfrecorder_reader:
start: '' # help="Effective start date. Data before the start is dropped"
end: '' # help="Effective end date. Data past the end is dropped"
corr_backoffset: 0 # default=0, type=int, help="Offset for correlation calculation"
batch_size: 11880 # batch size of exmaples in tfrecord
batch_size: 155000 # batch size of exmaples in tfrecord,
duration: 82 # time series length, This has to less or equal prepare_past_days
tf_statistics_path: './tf_statistics_{pipeline_tag}.pkl'

@@ -145,12 +156,12 @@ trainer:
back_offset: 0 # don't change it.

save_model:
table: '{product_tag}_{pipeline_tag}_model_stat'
table: '{product_tag}_{pipeline_tag}_model_stat_test_12212021'
data_dir: data/vars
ckpt_dir: data/cpt/s32
saved_dir: data/vars
model_version: 'version_{pipeline_tag}'
model_name: 'model_{product_tag}_{pipeline_tag}'
model_name: 'model_{product_tag}_{pipeline_tag}_test_12212021'
train_window: 60 # Should be same as the one in hparams

elastic_search:
@@ -0,0 +1,199 @@
# Copyright 2019, Futurewei Technologies
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import yaml
import argparse

from pyspark import SparkContext
from pyspark.sql.functions import lit, udf
from pyspark.sql import HiveContext
from pyspark.sql.types import StringType, ArrayType
from datetime import datetime, timedelta
from util import resolve_placeholder
import hashlib


'''
This module transform
T1 : request based factdata
T2 : compatible factdata for main_ts.py
T1
+-----------------------+---------+-------+
|col_name |data_type|comment|
+-----------------------+---------+-------+
|uckey |string |null |
|total |int |null |
|pt_d |string |null |
|# Partition Information| | |
|# col_name |data_type|comment|
|pt_d |string |null |
+-----------------------+---------+-------+
CREATE TABLE table_name ( uckey string, total int)
PARTITIONED BY (pt_d string)
T2
+-----------------------+-------------+-------+
|col_name |data_type |comment|
+-----------------------+-------------+-------+
|uckey |string |null |
|count_array |array<string>|null |
|hour |int |null |
|day |string |null |
|bucket_id |int |null |
|# Partition Information| | |
|# col_name |data_type |comment|
|day |string |null |
|bucket_id |int |null |
+-----------------------+-------------+-------+
spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/main_rti_transform.py config.yml
'''


def advance_date(date, day_added):
_time = datetime.strptime(date, "%Y-%m-%d")
_time = _time + timedelta(days=day_added)
return _time.strftime("%Y-%m-%d")


def assign_new_bucket_id(df, n):
def __hash_sha256(s):
hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest()
return int(hex_value, 16)
_udf = udf(lambda x: __hash_sha256(x) % n)
df = df.withColumn('bucket_id', _udf(df.uckey))
return df


def __save_as_table(df, table_name, hive_context, create_table):

if create_table:
command = """
DROP TABLE IF EXISTS {}
""".format(table_name)

hive_context.sql(command)

command = """
CREATE TABLE IF NOT EXISTS {}
(
uckey string, count_array array<string>, hour int, day string
) PARTITIONED BY (bucket_id int)
""".format(table_name)

hive_context.sql(command)

df.select('uckey',
'count_array',
'hour',
'day',
'bucket_id'
).write.format('hive').option("header", "true").option("encoding", "UTF-8").mode('append').insertInto(table_name)


def run(hive_context,
input_table, output_table,
start_day, end_day, day_step,
new_bucket_size,
default_hour, default_price_cat):

_st = start_day
first_round = True

while True:

_end = min(end_day, advance_date(_st, day_step))

if _st > _end:
break

# Read factdata table
command = """
SELECT uckey, total, pt_d FROM {} WHERE pt_d BETWEEN '{}' AND '{}'
""".format(input_table, _st, _end)

_st = advance_date(_end, 1)

df = hive_context.sql(command)
print(command)

df = df.withColumnRenamed('pt_d', 'day')

# add count_array
# [Row(count_array=[u'0:0', u'1:0', u'2:0', u'3:0'], day=u'2018-03-09', hour=0, uckey=u'banner,1,3G,g_f,1,pt,1002,icc')]
df = df.withColumn('count_array', udf(lambda x: [default_price_cat + ':' + str(x)], ArrayType(StringType()))(df.total))

# add hour
df = df.withColumn('hour', lit(default_hour))

df = assign_new_bucket_id(df, new_bucket_size)

# we want to keep the paritions for a batch under 200
df = df.repartition(200)

# Writing into partitions might throw some exceptions but does not impair data.
__save_as_table(df, output_table, hive_context, first_round)

first_round = False

return


if __name__ == "__main__":

parser = argparse.ArgumentParser(description='Prepare data')
parser.add_argument('config_file')
args = parser.parse_args()

# Load config file
with open(args.config_file, 'r') as ymlfile:
cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
resolve_placeholder(cfg)

cfg_log = cfg['log']
cfg_rti = cfg['pipeline']['rti_transform']

sc = SparkContext()
hive_context = HiveContext(sc)
sc.setLogLevel(cfg_log['level'])

default_hour = cfg_rti['default_hour']
default_price_cat = cfg_rti['default_price_cat']
day_step = cfg_rti['day_step']
start_day = cfg_rti['start_day']
end_day = cfg_rti['end_day']
new_bucket_size = cfg_rti['new_bucket_size']
input_table = cfg_rti['input_table']
output_table = cfg['factdata_table_name']

run(hive_context=hive_context,
input_table=input_table, output_table=output_table,
start_day=start_day, end_day=end_day, day_step=day_step,
new_bucket_size=new_bucket_size,
default_hour=default_hour, default_price_cat=default_price_cat)

sc.stop()
@@ -5,6 +5,12 @@ then
spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G pipeline/show_config.py config.yml
fi

#This module transform T1 : request based factdata to T2 : compatible factdata for rest of pipelie
if false
then
spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/main_rti_transform.py config.yml
fi

#Preparing the data by filtering reliable si, remapping r, ipl and recalculating bucket-ids
#This part might be optional if uckeys have stable slot-id with region data
if false

0 comments on commit b0d7d70

Please sign in to comment.