Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Update alookalike application
  • Loading branch information
radibnia77 committed Aug 5, 2021
1 parent a8f3e4d commit 1661f3d963f69facc5606d05e3900e3f5d77a8de
Showing 17 changed files with 526 additions and 148 deletions.
@@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow import DAG
import datetime as dt
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta

default_args = {
'owner': 'lookalike_application',
'depends_on_past': False,
'start_date': dt.datetime(2021, 7, 28),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'lookalike_application',
default_args=default_args,
schedule_interval=None,

)


def sparkOperator(
file,
task_id,
**kwargs
):
return SparkSubmitOperator(
application='/home/airflow/airflow-apps/lookalike-model/lookalike_model/application/pipeline/{}'.format(file),
application_args=['/home/airflow/airflow-apps/lookalike-model/lookalike_model/application/pipeline/config.yml'],
conn_id='spark_default',
executor_memory='8G',
conf={'spark.driver.maxResultSize': '5g', 'spark.hadoop.hive.exec.dynamic.partition': True, 'spark.hadoop.hive.exec.dynamic.partition.mode': 'nonstrict'},
driver_memory='8G',
executor_cores=5,
num_executors=20,
task_id=task_id,
dag=dag,
**kwargs
)



score_generator = sparkOperator('score_generator.py', 'lookalike_score_generator')
score_vector_table = sparkOperator('score_vector_table.py', 'lookalike_score_vector_table')
score_vector_rebucketing = sparkOperator('score_vector_rebucketing.py', 'lookalike_score_vector_rebucketing')
top_n_similarity_table_generator = sparkOperator('top_n_similarity_table_generator.py', 'lookalike_top_n_similarity_table_generator')


score_generator >> score_vector_table >> score_vector_rebucketing >> top_n_similarity_table_generator
Binary file not shown.
@@ -1,31 +1,35 @@
product_tag: 'lookalike'
pipeline_tag: '08042021'
score_generator:
input:
log_table : "lookalike_03042021_logs"
did_table: "lookalike_03042021_trainready"
keywords_table: "din_ad_keywords_09172020"
test_table: "lookalike_trainready_jimmy_test"
din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike3:predict"
din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike:predict"
din_model_length: 20
seeduser_table : "lookalike_seeduser"
number_of_seeduser: 1000
extend: 2000
alg: "euclidean" ##### currently just support "euclideand" and "dot"
output:
did_score_table: "lookalike_score_01112021"
score_norm_table: "lookalike_score_norm_01112021"
did_score_table: "{product_tag}_score_{pipeline_tag}"
score_norm_table: "{product_tag}_score_norm_{pipeline_tag}"
normalize: False

score_vector:
keywords_table: "din_ad_keywords_09172020"
score_norm_table: "lookalike_score_norm_01112021"
score_vector_table: "lookalike_score_vector_01112021"
score_norm_table: "{product_tag}_score_norm_{pipeline_tag}"
score_vector_table: "{product_tag}_score_vector_{pipeline_tag}"
did_bucket_size: 2
did_bucket_step: 2
score_vector_rebucketing:
did_bucket_size: 2
did_bucket_step: 2
alpha_did_bucket_size: 1000
score_vector_alpha_table: 'lookalike_score_vector_alpha_01112021'
alpha_did_bucket_size: 20 #default=1000
score_vector_alpha_table: '{product_tag}_score_vector_alpha_{pipeline_tag}'
top_n_similarity:
alpha_did_bucket_step: 100
top_n: 100
similarity_table: "lookalike_similarity_01112021"
did_bucket_step: 1
alpha_did_bucket_step: 10
top_n: 10
similarity_table: "{product_tag}_similarity_{pipeline_tag}"
@@ -1,14 +1,16 @@
#!/bin/bash

spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g seed_user_selector.py config.yml "29"
# Not used as part of pipeline
spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict seed_user_selector.py config.yml "29"

spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g score_generator.py config.yml
spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_generator.py config.yml

spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml

spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.
spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml

spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity_table_generator.py config.yml

spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g validation.py config.yml "29"
# Not used as part of pipeline
spark-submit ---master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation.py config.yml "29"

@@ -3,13 +3,13 @@
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# 'License'); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@@ -25,6 +25,8 @@
import json
import argparse
from math import sqrt
from util import resolve_placeholder
from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition

'''
This process generates the score-norm-table with the following format.
@@ -44,10 +46,10 @@ def flatten(lst):

def str_to_intlist(table):
ji = []
for k in [table[j].split(",") for j in range(len(table))]:
for k in [table[j].split(',') for j in range(len(table))]:
s = []
for a in k:
b = int(a.split(":")[0])
b = int(a.split(':')[0])
s.append(b)
ji.append(s)
return ji
@@ -65,7 +67,7 @@ def inputData(record, keyword, length):


def predict(serving_url, record, length, new_keyword):
body = {"instances": []}
body = {'instances': []}
for keyword in new_keyword:
instance = inputData(record, keyword, length)
body['instances'].append(instance)
@@ -80,9 +82,9 @@ def predict(serving_url, record, length, new_keyword):

def gen_mappings_media(hive_context, cfg):
# this function generates mappings between the media category and the slots.
media_category_list = cfg['score_generator']["mapping"]["new_slot_id_media_category_list"]
media_category_list = cfg['score_generator']['mapping']['new_slot_id_media_category_list']
media_category_set = set(media_category_list)
slot_id_list = cfg['score_generator']["mapping"]["new_slot_id_list"]
slot_id_list = cfg['score_generator']['mapping']['new_slot_id_list']
# 1 vs 1: slot_id : media_category
media_slot_mapping = dict()
for media_category in media_category_set:
@@ -110,9 +112,6 @@ def normalize(x):
return result


udf_normalize = udf(normalize, MapType(StringType(), FloatType()))


class CTRScoreGenerator:
def __init__(self, df_did, df_keywords, din_model_tf_serving_url, din_model_length):
self.df_did = df_did
@@ -125,9 +124,9 @@ def __init__(self, df_did, df_keywords, din_model_tf_serving_url, din_model_leng
def get_keywords(self):
keyword_index_list, keyword_list = list(), list()
for dfk in self.df_keywords.collect():
if not dfk["keyword_index"] in keyword_index_list:
keyword_index_list.append(dfk["keyword_index"])
keyword_list.append(dfk["keyword"])
if not dfk['keyword_index'] in keyword_index_list:
keyword_index_list.append(dfk['keyword_index'])
keyword_list.append(dfk['keyword'])
return keyword_index_list, keyword_list

def run(self):
@@ -159,43 +158,44 @@ def __helper(did, kwi_show_counts, age, gender):
keyword_index_list=self.keyword_index_list,
keyword_list=self.keyword_list),
MapType(StringType(), FloatType()))
(col('did_index'), col('kwi_show_counts'),
col('age'), col('gender')))
(col('did_index'), col('kwi_show_counts'), col('age'), col('gender')))


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Performance Forecasting: CTR Score Generator")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Performance Forecasting: CTR Score Generator')
parser.add_argument('config_file')
args = parser.parse_args()
with open(args.config_file, 'r') as yml_file:
cfg = yaml.safe_load(yml_file)
resolve_placeholder(cfg)

sc = SparkContext.getOrCreate()
sc.setLogLevel('WARN')
hive_context = HiveContext(sc)

# load dataframes
did_table, keywords_table, din_tf_serving_url, length = cfg['score_generator']["input"]["did_table"],
cfg['score_generator']["input"]["keywords_table"],
cfg['score_generator']["input"]["din_model_tf_serving_url"],
cfg['score_generator']["input"]["din_model_length"]
did_table, keywords_table, din_tf_serving_url, length = cfg['score_generator']['input']['did_table'], cfg['score_generator']['input'][
'keywords_table'], cfg['score_generator']['input']['din_model_tf_serving_url'], cfg['score_generator']['input']['din_model_length']

command = "SELECT * FROM {}"
command = 'SELECT * FROM {}'
df_did = hive_context.sql(command.format(did_table))
df_keywords = hive_context.sql(command.format(keywords_table))
# temporary adding to filter based on active keywords
df_keywords = df_keywords.filter((df_keywords.keyword == "video") | (df_keywords.keyword == "shopping") | (df_keywords.keyword == "info") |
(df_keywords.keyword == "social") | (df_keywords.keyword == "reading") | (df_keywords.keyword == "travel") |
(df_keywords.keyword == "entertainment"))
df_keywords = df_keywords.filter((df_keywords.keyword == 'video') | (df_keywords.keyword == 'shopping') | (df_keywords.keyword == 'info') |
(df_keywords.keyword == 'social') | (df_keywords.keyword == 'reading') | (df_keywords.keyword == 'travel') |
(df_keywords.keyword == 'entertainment'))
did_loaded_table = cfg['score_generator']['output']['did_score_table']
score_norm_table = cfg['score_generator']['output']['score_norm_table']

# create a CTR score generator instance and run to get the loaded did
ctr_score_generator = CTRScoreGenerator(df_did, df_keywords, din_tf_serving_url, length)
ctr_score_generator.run()
df_did_loaded = ctr_score_generator.df_did_loaded
df_did_loaded_norm = df_did_loaded.withColumn('kws_norm', udf_normalize(col('kws')))
df = ctr_score_generator.df_did_loaded

# normalization is required
udf_normalize = udf(normalize, MapType(StringType(), FloatType()))
if cfg['score_generator']['normalize']:
df = df.withColumn('kws_norm', udf_normalize(col('kws')))

# save the loaded did to hive table
df_did_loaded_norm.write.option("header", "true").option(
"encoding", "UTF-8").mode("overwrite").format('hive').saveAsTable(score_norm_table)
write_to_table_with_partition(df, score_norm_table, partition=('did_bucket'), mode='overwrite')
@@ -28,6 +28,10 @@
from math import sqrt
import time
import hashlib
from util import resolve_placeholder


from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition

'''
@@ -39,24 +43,6 @@
'''


def __save_as_table(df, table_name, hive_context, create_table):

if create_table:
command = """
DROP TABLE IF EXISTS {}
""".format(table_name)

hive_context.sql(command)

df.createOrReplaceTempView("r907_temp_table")

command = """
CREATE TABLE IF NOT EXISTS {} as select * from r907_temp_table
""".format(table_name)

hive_context.sql(command)


def assign_new_bucket_id(df, n, new_column_name):
def __hash_sha256(s):
hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest()
@@ -75,13 +61,15 @@ def run(hive_context, cfg):
score_vector_alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table']

first_round = True
for start_bucket in range(0, bucket_size, bucket_step):
command = "SELECT did, did_bucket, score_vector FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_vector_table, start_bucket, start_bucket+bucket_size-1)
for did_bucket in range(0, bucket_size, bucket_step):
command = "SELECT did, did_bucket, score_vector, c1 FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_vector_table, did_bucket, did_bucket+bucket_step-1)

df = hive_context.sql(command)
df = assign_new_bucket_id(df, alpha_bucket_size, 'alpha_did_bucket')
df = df.select('did', 'did_bucket', 'score_vector', 'alpha_did_bucket')
__save_as_table(df, table_name=score_vector_alpha_table, hive_context=hive_context, create_table=first_round)

mode = 'overwrite' if first_round else 'append'
write_to_table_with_partition(df.select('did', 'score_vector', 'c1', 'did_bucket', 'alpha_did_bucket'),
score_vector_alpha_table, partition=('did_bucket', 'alpha_did_bucket'), mode=mode)
first_round = False


@@ -92,7 +80,7 @@ def run(hive_context, cfg):
args = parser.parse_args()
with open(args.config_file, 'r') as yml_file:
cfg = yaml.safe_load(yml_file)

resolve_placeholder(cfg)
sc = SparkContext.getOrCreate()
sc.setLogLevel('WARN')
hive_context = HiveContext(sc)

0 comments on commit 1661f3d

Please sign in to comment.