Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
improve training time
1. we use new tfrecords-generator to train directly from tfrecords
2. we sort keywords in tfrecords which improves accuracy
  • Loading branch information
radibnia77 committed Jan 26, 2022
1 parent 11e55ed commit 69477315f86165190ac0c857a5e6fa37cb7cbb21
Show file tree
Hide file tree
Showing 9 changed files with 494 additions and 331 deletions.
@@ -166,7 +166,8 @@ pipeline:
trainready_output_table: '{product_tag}_{pipeline_tag}_trainready'
tfrecords:
tfrecords_statistics_path: '{product_tag}_{pipeline_tag}_tfrecord_statistics.pkl'
tfrecords_hdfs_path: '{product_tag}_{pipeline_tag}_tfrecord' # it is hdfs location for tfrecords, over-writes the existing files
tfrecords_hdfs_path_train: '{product_tag}_{pipeline_tag}_train_tfrecord' # it is hdfs location for tfrecords, over-writes the existing files
tfrecords_hdfs_path_test: '{product_tag}_{pipeline_tag}_test_tfrecord'
cutting_date: 1627171200 # 07/30/2021 It uses data before this for training samples and after that for testing samples. <Improvement is required.>
length: 10

@@ -0,0 +1,170 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
spark-submit --master yarn --executor-memory 16G --driver-memory 24G --num-executors 10 --executor-cores 5 --jars spark-tensorflow-connector_2.11-1.15.0.jar --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/_main_trainready_india.py config.yml
input: trainready table
output: dataset readable by trainer in tfrecord format
"""

import yaml
import argparse
import os
import timeit
from pyspark import SparkContext
from pyspark.sql import functions as fn
from pyspark.sql.functions import lit, col, udf, collect_list, concat_ws, first, create_map, monotonically_increasing_id
from pyspark.sql.functions import count, lit, col, udf, expr, collect_list, explode
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, ArrayType, StringType,BooleanType
from pyspark.sql import HiveContext
from pyspark.sql.session import SparkSession
from datetime import datetime, timedelta
from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition, print_batching_info, resolve_placeholder, load_config, load_batch_config, load_df
from itertools import chain
from pyspark.sql.types import IntegerType, ArrayType, StringType, BooleanType, FloatType, DoubleType
from util import write_to_table, write_to_table_with_partition, save_pickle_file


def generate_tfrecord(sc, hive_context, tf_statis_path, keyword_table, cutting_date, length, trainready_table, tfrecords_hdfs_path_train, tfrecords_hdfs_path_test):

def str_to_intlist(table):
ji = []
for k in [table[j].decode().split(",") for j in range(len(table))]:
s = []
for a in k:
b = int(a.split(":")[1])
s.append(b)
ji.append(s)
return ji

def list_of_list_toint(table):
ji = []
for k in [table[j].decode().split(",") for j in range(len(table))]:
s = [int(a) for a in k]
ji.append(s)
return ji

def flatten(lst):
f = [y for x in lst for y in x]
return f

def padding(kwlist,length):
diff = length-len(kwlist)
print(len(kwlist))
print(length)
print(diff)
temp_list = [0 for i in range(diff)]
padded_keyword = kwlist + temp_list
return padded_keyword

def create_dataset(df_panda ,click, keyword):
t_set = []
for i in range(len(df_panda.aid_index)):
click_counts = click[i]
keyword_int = keyword[i]
aid_index = df_panda.aid_index[i]
for m in range(len(click_counts)):
for n in range(len(click_counts[m])):
if (click_counts[m][n] != 0):
pos = (aid_index, flatten(keyword_int[m + 1:m + 1 + length]), keyword_int[m][n], 1)
if len(pos[1]) >= 1:
t_set.append(pos)
elif (m % 5 == 0 and n % 2 == 0):
neg = (aid_index, flatten(keyword_int[m + 1:m + 1 + length]), keyword_int[m][n], 0)
if len(neg[1]) >= 1:
t_set.append(neg)
return t_set

def generating_dataframe(dataset, spark ):
data_set = [(int(tup[0]), tup[1], int(tup[2]), int(tup[3])) for tup in dataset]
df = spark.createDataFrame(data=data_set, schema=deptColumns)
df = df.withColumn("sl", udf(lambda x: len(x), IntegerType())(df.keyword_list))
df = df.where(df.sl > 5)
df = df.withColumn('max_length', lit(df.agg({'sl': 'max'}).collect()[0][0]))
df = df.withColumn('keyword_list_padded',
udf(padding, ArrayType(IntegerType()))(df.keyword_list, df.max_length))
return df

def generate_tf_statistics(testsetDF, trainDF, keyword_df, tf_statis_path):
tfrecords_statistics = {}
tfrecords_statistics['test_dataset_count'] = testsetDF.count()
tfrecords_statistics['train_dataset_count'] = trainDF.count()
tfrecords_statistics['user_count'] = trainDF.select('aid').distinct().count()
tfrecords_statistics['item_count'] = keyword_df.distinct().count() + 1
save_pickle_file(tfrecords_statistics, tf_statis_path)


command = """SELECT * FROM {}"""
df = hive_context.sql(command.format(trainready_table))

df = df.withColumn('interval_starting_time', df['interval_starting_time'].cast(ArrayType(IntegerType())))
df = df.withColumn('_kwi', udf(list_of_list_toint, ArrayType(ArrayType(IntegerType())))(df.kwi))
df = df.withColumn('click_counts', udf(str_to_intlist, ArrayType(ArrayType(IntegerType())))(df['kwi_click_counts']))
df = df.withColumn('total_click', udf(lambda x: sum([item for sublist in x for item in sublist]), IntegerType())(df.click_counts))
df = df.where(df.total_click != 0)
df = df.withColumn('indicing', udf(lambda y: len([x for x in y if x >= cutting_date]), IntegerType())(df.interval_starting_time))
df = df.withColumn('keyword_int_train', udf(lambda x, y: x[y:],ArrayType(ArrayType(IntegerType())))(df._kwi, df.indicing))
df = df.withColumn('keyword_int_test', udf(lambda x, y: x[:y],ArrayType(ArrayType(IntegerType())))(df._kwi, df.indicing))
df = df.withColumn('click_counts_train', udf(lambda x, y: x[y:],ArrayType(ArrayType(IntegerType())))(df.click_counts, df.indicing))
df = df.withColumn('click_counts_test', udf(lambda x, y: x[:y],ArrayType(ArrayType(IntegerType())))(df.click_counts, df.indicing))

spark = SparkSession(sc)
deptColumns = ["aid", "keyword_list", "keyword", "label"]

df_panda = df.select('click_counts_train', 'keyword_int_train', 'aid_index').toPandas()
train_set = create_dataset(df_panda,df_panda.click_counts_train, df_panda.keyword_int_train)
trainDF = generating_dataframe(train_set, spark = spark)
trainDF.write.format("tfrecords").option("recordType", "Example").mode("overwrite").save(tfrecords_hdfs_path_train)


df_panda = df.select('click_counts_test', 'keyword_int_test', 'aid_index').toPandas()
test_set = create_dataset(df_panda, df_panda.click_counts_test, df_panda.keyword_int_test)
testsetDF = generating_dataframe(test_set, spark = spark)
testsetDF.write.format("tfrecords").option("recordType", "Example").mode("overwrite").save(tfrecords_hdfs_path_test)


command = "SELECT * from {}"
keyword_df = hive_context.sql(command.format(keyword_table))
generate_tf_statistics(testsetDF, trainDF, keyword_df, tf_statis_path)

def run(sc, hive_context, cfg):
cfgp = cfg['pipeline']
cfg_train = cfg['pipeline']['main_trainready']
trainready_table = cfg_train['trainready_output_table']
cfg_tfrecord = cfg['pipeline']['tfrecords']
tfrecords_hdfs_path_train = cfg_tfrecord['tfrecords_hdfs_path_train']
tfrecords_hdfs_path_test = cfg_tfrecord['tfrecords_hdfs_path_test']
cutting_date = cfg['pipeline']['cutting_date']
length = cfg['pipeline']['length']
tf_statis_path = cfgp['tfrecords']['tfrecords_statistics_path']
keyword_table = cfgp['main_keywords']['keyword_output_table']


generate_tfrecord(sc, hive_context, tf_statis_path, keyword_table, cutting_date, length, trainready_table, tfrecords_hdfs_path_train, tfrecords_hdfs_path_test)


if __name__ == "__main__":
"""
This program performs the followings:
adds normalized data by adding index of features
groups data into time_intervals and dids (labeled by did)
"""
sc, hive_context, cfg = load_config(description="pre-processing train ready data")
resolve_placeholder(cfg)
run(sc=sc, hive_context=hive_context, cfg=cfg)
sc.stop()
@@ -40,9 +40,11 @@ def save_tfrecords(hive_context, trainready_table, tfrecords_hdfs_path, tf_stati
sc, hive_context, cfg = load_config(description="generate tf records")
resolve_placeholder(cfg)
cfgp = cfg['pipeline']
trainready_table = cfgp['main_trainready']['trainready_output_table']
# trainready_table = cfgp['main_trainready']['trainready_output_table']
tfrecords_hdfs_path = cfgp['tfrecords']['tfrecords_hdfs_path']
tf_statis_path = cfgp['tfrecords']['tfrecords_statistics_path']
trainready_table = "lookalike_trainready_lookalike_faezeh"

# save selected columns of train ready table as tfrecords.
save_tfrecords(hive_context, trainready_table, tfrecords_hdfs_path, tf_statis_path)
sc.stop()
@@ -5,7 +5,7 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
@@ -18,7 +18,7 @@
import argparse
import os
import timeit

import collections
from pyspark import SparkContext
from pyspark.sql import functions as fn
from pyspark.sql.functions import lit, col, udf, collect_list, concat_ws, first, create_map, monotonically_increasing_id, row_number
@@ -41,6 +41,12 @@ def generate_trainready(hive_context, batch_config,
interval_time_in_seconds,
logs_table_name, trainready_table, aid_bucket_num):

def remove_no_show_records(df):
w = Window.partitionBy('aid', 'interval_starting_time', 'keyword_index')
df = df.withColumn('_show_counts', fn.sum(fn.when(col('is_click') == 0, 1).otherwise(0)).over(w))
df = df.filter(fn.udf(lambda x: x > 0, BooleanType())(df._show_counts))
return df

def group_batched_logs(df_logs):
# group logs from did + interval_time + keyword.
# group 1: group by did + interval_starting_time + keyword
@@ -52,12 +58,11 @@ def group_batched_logs(df_logs):
fn.sum(col('is_click')).alias('kw_clicks_count'),
fn.sum(fn.when(col('is_click') == 0, 1).otherwise(0)).alias('kw_shows_count'),
)

# df = df.orderBy('keyword_index')
df = df.withColumn('kwi_clicks_count', concat_ws(":", col('keyword_index'), col('kw_clicks_count')))
df = df.withColumn('kwi_shows_count', concat_ws(":", col('keyword_index'), col('kw_shows_count')))
df = df.withColumn('kw_clicks_count', concat_ws(":", col('keyword'), col('kw_clicks_count')))
df = df.withColumn('kw_shows_count', concat_ws(":", col('keyword'), col('kw_shows_count')))

# group 2: group by did + interval_starting_time
df = df.groupBy('aid', 'interval_starting_time').agg(
concat_ws(",", collect_list('keyword_index')).alias('kwi'),
@@ -73,22 +78,33 @@ def group_batched_logs(df_logs):

return df

def sort_kwi_counts(unsorted_x):
unsorted_x = "{" + unsorted_x + "}"
d = eval(unsorted_x)
f = collections.OrderedDict(sorted(d.items()))
k = [str(i) + ':' + str(j) for i, j in f.iteritems()]
sorted_x = ','.join(k)
return sorted_x

def sort_kwi(unsorted_kwi):
l = [int(el) for el in unsorted_kwi.split(",")]
l.sort()
l = [str(item) for item in l]
sorted_kwi=','.join(l)
return sorted_kwi

def collect_trainready(df_trainready_batched_temp):
# group 3: group by did with the temp batched did-interval rows.

df = df_trainready_batched_temp

features = ['interval_starting_time', 'interval_keywords', 'kwi', 'kwi_click_counts', 'kwi_show_counts']
agg_attr_list = list(chain(*[(lit(attr), col(attr)) for attr in df.columns if attr in features]))
df = df.withColumn('attr_map', create_map(agg_attr_list))

df = df.groupBy('aid').agg(
collect_list('attr_map').alias('attr_map_list'),
first('age').alias('age'),
first('gender_index').alias('gender_index'),
first('aid_bucket').alias('aid_bucket')
)

return df

def build_feature_array(df):
@@ -163,7 +179,12 @@ def udf_function(attr_map_list):
aid_bucket= '{}' """
df_logs = hive_context.sql(command.format(logs_table_name, day_lower, day_upper, interval_point, aid_bucket))

df_logs = remove_no_show_records(df_logs)

df_trainready = group_batched_logs(df_logs)
df_trainready = df_trainready.withColumn('kwi_click_counts', udf(sort_kwi_counts, StringType())(df_trainready.kwi_click_counts))
df_trainready = df_trainready.withColumn('kwi_show_counts', udf(sort_kwi_counts, StringType())(df_trainready.kwi_show_counts))
df_trainready = df_trainready.withColumn('kwi', udf(sort_kwi, StringType())(df_trainready.kwi))
mode = 'overwrite' if batched_round == 1 else 'append'
write_to_table_with_partition(df_trainready, trainready_table_temp, partition=('aid_bucket'), mode=mode)
batched_round += 1
@@ -201,7 +222,7 @@ def udf_function(attr_map_list):
# Add did_index
w = Window.orderBy("aid_bucket", "aid")
df = df.withColumn('row_number', row_number().over(w))
df = df.withColumn('aid_index', udf(lambda x: shift+ x, LongType())(col('row_number')))
df = df.withColumn('aid_index', udf(lambda x: shift + x, LongType())(col('row_number')))
# df = df.withColumn('aid_index', udf(lambda x: aid_bucket * (MAX_USER_IN_BUCKET) + x, LongType())(col('row_number')))
df = df.select('age', 'gender_index', 'aid', 'aid_index', 'interval_starting_time', 'interval_keywords',
'kwi', 'kwi_show_counts', 'kwi_click_counts', 'aid_bucket')
@@ -31,5 +31,5 @@ if false
then
# generate tf records: din tf record in hdfs.
# after the tf records folder is generated in hdfs, use 'hadoop fs -copyToLocal' to copy it to local.
spark-submit --master yarn --num-executors 10 --executor-cores 5 --jars spark-tensorflow-connector_2.11-1.15.0.jar pipeline/main_tfrecords.py config.yml
spark-submit --master yarn --executor-memory 16G --driver-memory 24G --num-executors 10 --executor-cores 5 --jars spark-tensorflow-connector_2.11-1.15.0.jar pipeline/main_tfrecord_generator.py config.yml
fi

0 comments on commit 6947731

Please sign in to comment.