Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge pull request #13 from radibnia77/main
Update lookalike application
  • Loading branch information
xun-hu-at-futurewei-com committed Sep 16, 2021
2 parents 2f14296 + e9316a4 commit 371e4b223ad64047b261907a64436ba81555a404
Showing 35 changed files with 1,376 additions and 591 deletions.
Binary file not shown.
Binary file not shown.

This file was deleted.

This file was deleted.

@@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@@ -1,35 +1,31 @@
product_tag: 'lookalike'
pipeline_tag: '08042021'
product_tag: 'lookalike_application'
pipeline_tag: '08172021_1000'
score_generator:
input:
log_table : "lookalike_03042021_logs"
did_table: "lookalike_03042021_trainready"
log_table : "lookalike_08172021_1000_logs"
did_table: "lookalike_08172021_1000_trainready"
keywords_table: "din_ad_keywords_09172020"
test_table: "lookalike_trainready_jimmy_test"
significant_keywords_table: "lookalike_08172021_1000_keywords"
din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike:predict"
din_model_length: 20
seeduser_table : "lookalike_seeduser"
number_of_seeduser: 1000
extend: 2000
alg: "euclidean" ##### currently just support "euclideand" and "dot"
output:
did_score_table: "{product_tag}_score_{pipeline_tag}"
score_norm_table: "{product_tag}_score_norm_{pipeline_tag}"
normalize: False

score_table: "{product_tag}_{pipeline_tag}_score"
normalize: False
score_vector:
keywords_table: "din_ad_keywords_09172020"
score_norm_table: "{product_tag}_score_norm_{pipeline_tag}"
score_vector_table: "{product_tag}_score_vector_{pipeline_tag}"
score_table: "{product_tag}_{pipeline_tag}_score"
score_vector_table: "{product_tag}_{pipeline_tag}_score_vector"
did_bucket_size: 2
did_bucket_step: 2
score_vector_rebucketing:
score_matrix_table:
did_bucket_size: 2
did_bucket_step: 2
alpha_did_bucket_size: 20 #default=1000
score_vector_alpha_table: '{product_tag}_score_vector_alpha_{pipeline_tag}'
score_matrix_table: '{product_tag}_{pipeline_tag}_score_matrix'
top_n_similarity:
did_bucket_step: 1
alpha_did_bucket_step: 10
did_bucket_size: 100
did_bucket_step: 100
cross_bucket_size: 1 # in production this should be as same as did_bucket_size
top_n: 10
similarity_table: "{product_tag}_similarity_{pipeline_tag}"
similarity_table: "{product_tag}_{pipeline_tag}_similarity"
@@ -1,16 +1,10 @@
#!/bin/bash

# Not used as part of pipeline
spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict seed_user_selector.py config.yml "29"
spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_generator.py config.yml

spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_generator.py config.yml
spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml

spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml
spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_matrix_table.py config.yml

spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml

spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity_table_generator.py config.yml

# Not used as part of pipeline
spark-submit ---master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation.py config.yml "29"
spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity_table_generator.py config.yml

@@ -29,7 +29,7 @@
from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition

'''
This process generates the score-norm-table with the following format.
This process generates the score-table with the following format.
DataFrame[age: int, gender: int, did: string, did_index: bigint,
interval_starting_time: array<string>, interval_keywords: array<string>,
@@ -55,7 +55,7 @@ def str_to_intlist(table):
return ji


def inputData(record, keyword, length):
def input_data(record, keyword, length):
if len(record['show_counts']) >= length:
hist = flatten(record['show_counts'][:length])
instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': keyword, 'sl': len(hist)}
@@ -69,7 +69,7 @@ def inputData(record, keyword, length):
def predict(serving_url, record, length, new_keyword):
body = {'instances': []}
for keyword in new_keyword:
instance = inputData(record, keyword, length)
instance = input_data(record, keyword, length)
body['instances'].append(instance)
body_json = json.dumps(body)
result = requests.post(serving_url, data=body_json).json()
@@ -174,18 +174,21 @@ def __helper(did, kwi_show_counts, age, gender):
hive_context = HiveContext(sc)

# load dataframes
did_table, keywords_table, din_tf_serving_url, length = cfg['score_generator']['input']['did_table'], cfg['score_generator']['input'][
'keywords_table'], cfg['score_generator']['input']['din_model_tf_serving_url'], cfg['score_generator']['input']['din_model_length']
did_table, keywords_table, significant_keywords_table, din_tf_serving_url, length = cfg['score_generator']['input']['did_table'], cfg['score_generator']['input'][
'keywords_table'], cfg['score_generator']['input'][
'significant_keywords_table'], cfg['score_generator']['input']['din_model_tf_serving_url'], cfg['score_generator']['input']['din_model_length']

command = 'SELECT * FROM {}'
df_did = hive_context.sql(command.format(did_table))
df_keywords = hive_context.sql(command.format(keywords_table))

command = 'SELECT T1.keyword,T1.spread_app_id,T1.keyword_index FROM {} AS T1 JOIN {} AS T2 ON T1.keyword=T2.keyword'
df_keywords = hive_context.sql(command.format(keywords_table, significant_keywords_table))
# temporary adding to filter based on active keywords
df_keywords = df_keywords.filter((df_keywords.keyword == 'video') | (df_keywords.keyword == 'shopping') | (df_keywords.keyword == 'info') |
(df_keywords.keyword == 'social') | (df_keywords.keyword == 'reading') | (df_keywords.keyword == 'travel') |
(df_keywords.keyword == 'entertainment'))
did_loaded_table = cfg['score_generator']['output']['did_score_table']
score_norm_table = cfg['score_generator']['output']['score_norm_table']

score_table = cfg['score_generator']['output']['score_table']

# create a CTR score generator instance and run to get the loaded did
ctr_score_generator = CTRScoreGenerator(df_did, df_keywords, din_tf_serving_url, length)
@@ -198,4 +201,4 @@ def __helper(did, kwi_show_counts, age, gender):
df = df.withColumn('kws_norm', udf_normalize(col('kws')))

# save the loaded did to hive table
write_to_table_with_partition(df, score_norm_table, partition=('did_bucket'), mode='overwrite')
write_to_table_with_partition(df, score_table, partition=('did_bucket'), mode='overwrite')

0 comments on commit 371e4b2

Please sign in to comment.