Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
update lookalike application
  • Loading branch information
radibnia77 committed Feb 2, 2022
1 parent 4ee29bc commit eff880de5cacea28d3cdbb596990054ec60531dd
Showing 6 changed files with 200 additions and 97 deletions.
@@ -1,29 +1,33 @@
product_tag: 'lookalike_application'
pipeline_tag: '08192021_1m'
pipeline_tag: '12132021'
score_generator:
input:
log_table : "lookalike_08172021_1000_logs"
did_table: "lookalike_08172021_1000_trainready"
keywords_table: "din_ad_keywords_09172020"
significant_keywords_table: "lookalike_08172021_1000_keywords"
din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike:predict"
aid_table: "lookalike_11192021_trainready"
keywords_table: "lookalike_11192021_keywords"
din_model_tf_serving_url: "http://10.193.217.126:8501/v1/models/lookalike_1119:predict"
din_model_length: 20
extend: 2000
alg: "euclidean" ##### currently just support "euclideand" and "dot"
output:
score_table: "{product_tag}_{pipeline_tag}_score"
normalize: False
score_vector:
keywords_table: "din_ad_keywords_09172020"
keywords_table: "lookalike_11192021_keywords"
score_table: "{product_tag}_{pipeline_tag}_score"
score_vector_table: "{product_tag}_{pipeline_tag}_score_vector"
did_bucket_size: 100
did_bucket_step: 10
aid_bucket_size: 100
aid_bucket_step: 10
# Adds alpha_aid_bucket partition to allow for finer control of step size in top_n_similarity stage
score_vector_rebucketing:
aid_bucket_size: 100
aid_bucket_step: 10
alpha_aid_bucket_size: 10 # The number of buckets to allocate for the alpha_aid_bucket column
score_vector_alpha_table: "{product_tag}_{pipeline_tag}_score_vector_alpha"
top_n_similarity:
did_bucket_size: 100
load_bucket_step: 20
search_bucket_step: 50
top_n: 10
index_factory_string: "IVF256,Flat"
aid_bucket_size: 100 # Total number of alpha buckets to process similarity of
load_bucket_step: 20 # Number of alpha buckets to load into index at a time
search_bucket_step: 50 # Number of alpha buckets to process top N at a time
top_n: 10 # Number of nearest neighbors to store for each aid
index_factory_string: "IVF256,Flat" # See https://github.com/facebookresearch/faiss/wiki/The-index-factory for factory strings
similarity_table: "{product_tag}_{pipeline_tag}_similarity"

@@ -4,6 +4,8 @@ spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memo

spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml

spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml

# Run top_n_similarity.py on a machine with a GPU.
#spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity.py config.yml
#spark-submit --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity.py config.yml
@@ -30,10 +30,10 @@
'''
This process generates the score-table with the following format.
DataFrame[age: int, gender: int, did: string, did_index: bigint,
DataFrame[age: int, gender: int, aid: string, aid_index: bigint,
interval_starting_time: array<string>, interval_keywords: array<string>,
kwi: array<string>, kwi_show_counts: array<string>, kwi_click_counts: array<string>,
did_bucket: string, kws: map<string,float>, kws_norm: map<string,float>]
aid_bucket: string, kws: map<string,float>, kws_norm: map<string,float>]
'''

@@ -57,11 +57,11 @@ def str_to_intlist(table):
def input_data(record, keyword, length):
if len(record['show_counts']) >= length:
hist = flatten(record['show_counts'][:length])
instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
instance = {'hist_i': hist, 'u': record['aid'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
else:
hist = flatten(record['show_counts'])
# [hist.extend([0]) for i in range(length - len(hist))]
instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
instance = {'hist_i': hist, 'u': record['aid'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
return instance


@@ -112,12 +112,12 @@ def normalize(x):


class CTRScoreGenerator:
def __init__(self, df_did, df_keywords, din_model_tf_serving_url, din_model_length):
self.df_did = df_did
def __init__(self, df_aid, df_keywords, din_model_tf_serving_url, din_model_length):
self.df_aid = df_aid
self.df_keywords = df_keywords
self.din_model_tf_serving_url = din_model_tf_serving_url
self.din_model_length = din_model_length
self.df_did_loaded = None
self.df_aid_loaded = None
self.keyword_index_list, self.keyword_list = self.get_keywords()

def get_keywords(self):
@@ -131,33 +131,35 @@ def get_keywords(self):
def run(self):

def predict_udf(din_model_length, din_model_tf_serving_url, keyword_index_list, keyword_list):
def __helper(did, kwi_show_counts, age, gender):
def __helper(aid, kwi_show_counts, age, gender):
kwi_show_counts = str_to_intlist(kwi_show_counts)
record = {'did': did,
record = {'aid': aid,
'show_counts': kwi_show_counts,
'a': str(age),
'g': str(gender)}

response = predict(serving_url=din_model_tf_serving_url, record=record,
length=din_model_length, new_keyword=keyword_index_list)
# If the response is a string, there was an error.
assert not isinstance(response, str), 'Error occurred when retrieving keyword scores from URL.'

did_kw_scores = dict()
aid_kw_scores = dict()
for i in range(len(response)):
keyword = keyword_list[i]
keyword_score = response[i][0]
did_kw_scores[keyword] = keyword_score
aid_kw_scores[keyword] = keyword_score

return did_kw_scores
return aid_kw_scores

return __helper

self.df_did_loaded = self.df_did.withColumn('kws',
self.df_aid_loaded = self.df_aid.withColumn('kws',
udf(predict_udf(din_model_length=self.din_model_length,
din_model_tf_serving_url=self.din_model_tf_serving_url,
keyword_index_list=self.keyword_index_list,
keyword_list=self.keyword_list),
MapType(StringType(), FloatType()))
(col('did_index'), col('kwi_show_counts'), col('age'), col('gender')))
(col('aid_index'), col('kwi_show_counts'), col('age'), col('gender_index')))


if __name__ == '__main__':
@@ -173,31 +175,28 @@ def __helper(did, kwi_show_counts, age, gender):
hive_context = HiveContext(sc)

# load dataframes
did_table, keywords_table, significant_keywords_table, din_tf_serving_url, length = cfg['score_generator']['input']['did_table'], cfg['score_generator']['input'][
'keywords_table'], cfg['score_generator']['input'][
'significant_keywords_table'], cfg['score_generator']['input']['din_model_tf_serving_url'], cfg['score_generator']['input']['din_model_length']
aid_table = cfg['score_generator']['input']['aid_table']
keywords_table = cfg['score_generator']['input']['keywords_table']
din_tf_serving_url = cfg['score_generator']['input']['din_model_tf_serving_url']
length = cfg['score_generator']['input']['din_model_length']

command = 'SELECT * FROM {}'
df_did = hive_context.sql(command.format(did_table))
command = 'SELECT * FROM {}'.format(aid_table)
df_aid = hive_context.sql(command)

command = 'SELECT T1.keyword,T1.spread_app_id,T1.keyword_index FROM {} AS T1 JOIN {} AS T2 ON T1.keyword=T2.keyword'
df_keywords = hive_context.sql(command.format(keywords_table, significant_keywords_table))
# temporary adding to filter based on active keywords
df_keywords = df_keywords.filter((df_keywords.keyword == 'video') | (df_keywords.keyword == 'shopping') | (df_keywords.keyword == 'info') |
(df_keywords.keyword == 'social') | (df_keywords.keyword == 'reading') | (df_keywords.keyword == 'travel') |
(df_keywords.keyword == 'entertainment'))
command = 'SELECT keyword, keyword_index FROM {}'.format(keywords_table)
df_keywords = hive_context.sql(command)

score_table = cfg['score_generator']['output']['score_table']

# create a CTR score generator instance and run to get the loaded did
ctr_score_generator = CTRScoreGenerator(df_did, df_keywords, din_tf_serving_url, length)
# create a CTR score generator instance and run to get the loaded aid
ctr_score_generator = CTRScoreGenerator(df_aid, df_keywords, din_tf_serving_url, length)
ctr_score_generator.run()
df = ctr_score_generator.df_did_loaded
df = ctr_score_generator.df_aid_loaded

# normalization is required
udf_normalize = udf(normalize, MapType(StringType(), FloatType()))
if cfg['score_generator']['normalize']:
df = df.withColumn('kws_norm', udf_normalize(col('kws')))

# save the loaded did to hive table
write_to_table_with_partition(df, score_table, partition=('did_bucket'), mode='overwrite')
# save the loaded aid to hive table
write_to_table_with_partition(df, score_table, partition=('aid_bucket'), mode='overwrite')
@@ -0,0 +1,96 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import yaml
import argparse
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit, col, udf
from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType, IntegerType
# from rest_client import predict, str_to_intlist
import requests
import json
import argparse
from pyspark.sql.functions import udf
from math import sqrt
import time
import hashlib
from util import resolve_placeholder


from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition

'''
To run, execute the following in application folder.
spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml
This process generates added secondary buckects ids (alpha-aid-bucket).
'''


def assign_new_bucket_id(df, n, new_column_name):
def __hash_sha256(s):
hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest()
return int(hex_value, 16)
_udf = udf(lambda x: __hash_sha256(x) % n, IntegerType())
df = df.withColumn(new_column_name, _udf(df.aid))
return df


def run(hive_context, cfg):

score_vector_table = cfg['score_vector']['score_vector_table']
bucket_size = cfg['score_vector_rebucketing']['aid_bucket_size']
bucket_step = cfg['score_vector_rebucketing']['aid_bucket_step']
alpha_bucket_size = cfg['score_vector_rebucketing']['alpha_aid_bucket_size']
score_vector_alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table']

first_round = True
num_batches = (bucket_size + bucket_step - 1) / bucket_step
batch_num = 1
for aid_bucket in range(0, bucket_size, bucket_step):
print('Processing batch {} of {} bucket number: {}'.format(batch_num, num_batches, aid_bucket))

command = "SELECT aid, aid_bucket, score_vector, c1 FROM {} WHERE aid_bucket BETWEEN {} AND {}".format(score_vector_table, aid_bucket, min(aid_bucket+bucket_step-1, bucket_size))

df = hive_context.sql(command)
df = assign_new_bucket_id(df, alpha_bucket_size, 'alpha_aid_bucket')

mode = 'overwrite' if first_round else 'append'
write_to_table_with_partition(df.select('aid', 'score_vector', 'c1', 'aid_bucket', 'alpha_aid_bucket'),
score_vector_alpha_table, partition=('aid_bucket', 'alpha_aid_bucket'), mode=mode)
first_round = False
batch_num += 1


if __name__ == "__main__":
start = time.time()
parser = argparse.ArgumentParser(description='')
parser.add_argument('config_file')
args = parser.parse_args()
with open(args.config_file, 'r') as yml_file:
cfg = yaml.safe_load(yml_file)
resolve_placeholder(cfg)
sc = SparkContext.getOrCreate()
sc.setLogLevel('WARN')
hive_context = HiveContext(sc)

run(hive_context=hive_context, cfg=cfg)
sc.stop()
end = time.time()
print('Runtime of the program is:', (end - start))
@@ -42,11 +42,11 @@
The top-n-similarity table is
|user| score-vector | did-bucket
|user| score-vector | aid-bucket
|:-------------| :------------: |
|user-1-did| [similarity-score-11, similarity-score-12, similarity-score-13] | 1
|user-2-did| [similarity-score-21, similarity-score-22, similarity-score-23] | 1
|user-3-did| [similarity-score-31, similarity-score-32, similarity-score-33] | 2
|user-1-aid| [similarity-score-11, similarity-score-12, similarity-score-13] | 1
|user-2-aid| [similarity-score-21, similarity-score-22, similarity-score-23] | 1
|user-3-aid| [similarity-score-31, similarity-score-32, similarity-score-33] | 2
'''

@@ -56,8 +56,8 @@ def run(hive_context, cfg):
keywords_table = cfg["score_vector"]["keywords_table"]
score_table = cfg['score_vector']['score_table']
score_vector_table = cfg['score_vector']['score_vector_table']
bucket_size = cfg['score_vector']['did_bucket_size']
bucket_step = cfg['score_vector']['did_bucket_step']
bucket_size = cfg['score_vector']['aid_bucket_size']
bucket_step = cfg['score_vector']['aid_bucket_step']

# get kw list
keywords = hive_context.sql("SELECT DISTINCT(keyword) FROM {}".format(keywords_table)).collect()
@@ -68,10 +68,10 @@ def run(hive_context, cfg):
first_round = True
num_batches = (bucket_size + bucket_step - 1) / bucket_step
batch_num = 1
for did_bucket in range(0, bucket_size, bucket_step):
print('Processing batch {} of {} bucket number: {}'.format(batch_num, num_batches, did_bucket))
for aid_bucket in range(0, bucket_size, bucket_step):
print('Processing batch {} of {} bucket number: {}'.format(batch_num, num_batches, aid_bucket))

command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_table, did_bucket, min(did_bucket+bucket_step-1, bucket_size))
command = "SELECT aid, aid_bucket, kws FROM {} WHERE aid_bucket BETWEEN {} AND {}".format(score_table, aid_bucket, min(aid_bucket+bucket_step-1, bucket_size))

# |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0 |
# [social -> 0.24231663, entertainment -> 0.20828941, reading -> 0.44120282, video -> 0.34497723, travel -> 0.3453492, shopping -> 0.5347804, info -> 0.1978679]|
@@ -82,7 +82,7 @@ def run(hive_context, cfg):
df = df.withColumn('c1', udf(lambda x: float(np.array(x).dot(np.array(x))), FloatType())(df.score_vector))

mode = 'overwrite' if first_round else 'append'
write_to_table_with_partition(df.select('did', 'score_vector', 'c1', 'did_bucket'), score_vector_table, partition=('did_bucket'), mode=mode)
write_to_table_with_partition(df.select('aid', 'score_vector', 'c1', 'aid_bucket'), score_vector_table, partition=('aid_bucket'), mode=mode)

first_round = False
batch_num += 1

0 comments on commit eff880d

Please sign in to comment.