Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge pull request #3 from radibnia77/main
Update lookalike application
  • Loading branch information
xun-hu-at-futurewei-com committed Mar 16, 2021
2 parents 049f420 + ecf42a5 commit 421bd9ea33ae7f47dad2dfc5efc1e4792b2b07ea
Showing 10 changed files with 199 additions and 109 deletions.
@@ -659,12 +659,21 @@ For example in the following pyspark code, the `get_elapsed_time(df)` is called
return
```

#### Application testing

To run the application test run.sh should be run. By running it, 4 lines of code would be run one after each other.

```shell
spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g seed_user_selector.py config.yml "29" ;
spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g score_generator.py config.yml ;
spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g distance_table_list.py config.yml ;
spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g validation.py config.yml "29";
```

A brief description for run.sh is as following:





> a. The first line of the code in the run.sh gets the config.yml and keyword index as an argument, create a list of seed users and write it to a hive table. The number of seed users is configurable and can be changed.
> b. The second line gets the config.yml in the argument and trainready table as an input. score_generator.py send the instances to the Rest API and write the responses to the score table.
> c. The third line gets the config file in the argument and score table as an input and create a distance table.
> d. The last line in the run.sh file gets config file and keywords index in the argument. The validation.py is calculating the number of clicks among the lookalike extended users, in the specific keywords and compare it with the number of click in the random selection.
@@ -5,11 +5,11 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
@@ -1,11 +1,14 @@
input:
log_table : "din_logs_09172020"
gucdocs_table: "look_alike_trainready_01112021"
log_table : "lookalike_03042021_logs"
did_table: "lookalike_03042021_trainready"
keywords_table: "din_ad_keywords_09172020"
test_table: "lookalike_trainready_jimmy_test"
din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike3:predict"
din_model_length: 20
seeduser_table : "look_alike_seeduser"
seeduser_table : "lookalike_seeduser"
number_of_seeduser: 1000
extend: 2000
output:
gucdocs_loaded_table: "lookalike_loaded_01112021"
gucdocs_loaded_table_norm: "lookalike_loaded_norm_01112021"
score_table: "lookalike_similarity"
did_score_table: "lookalike_score_01112021"
did_score_table_norm: "lookalike_score_norm_01112021"
similarity_table: "lookalike_similarity"
@@ -5,7 +5,7 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
@@ -29,18 +29,18 @@
import time




def distance(l1):
def _distance(l2):
dist = sum([l1[el]*l2[el] for el, value in l1.items()])
dist = sum([l1[el]*l2[el] for el,value in l1.items()])
return dist
return _distance


def x(l1):
_udf_distance = udf(distance(l1), FloatType())
_udf_distance = udf(distance(l1), FloatType() )
return _udf_distance


def run(hive_context, cfg):
# load dataframes
lookalike_loaded_table_norm = cfg['output']['gucdocs_loaded_table_norm']
@@ -53,25 +53,27 @@ def run(hive_context, cfg):
df_keywords = hive_context.sql(command.format(keywords_table))
df_seed_user = hive_context.sql(command.format(seeduser_table))

# creating a tuple of did and kws for seed users
df_seed_user = df_seed_user.join(df.select('did', 'kws_norm'), on=['did'], how='left')

#### creating a tuple of did and kws for seed users
df_seed_user = df_seed_user.join(df.select('did','kws_norm'), on=['did'], how='left')
# df_seed_user = df_seed_user.withColumn("seed_user_list", zip_("did", "kws"))
seed_user_list = df_seed_user.select('did', 'kws_norm').collect()
seed_user_list = df_seed_user.select('did','kws_norm').collect()
# seed_user list = [(did1, {k1:0, k2:0.2, ...}), (did2, )]
# user =
c = 0
temp_list = []
for item in seed_user_list:

c += 1
if c > 850:
c+= 1
if c > 850 :
break
df = df.withColumn(item[0], x(item[1])(col('kws_norm')))
df = df.withColumn(item[0],x(item[1])(col('kws_norm')))

df.write.option("header", "true").option(
"encoding", "UTF-8").mode("overwrite").format('hive').saveAsTable(lookalike_score_table)



if __name__ == "__main__":
start = time.time()
parser = argparse.ArgumentParser(description=" ")
@@ -5,29 +5,22 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import yaml
import argparse
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit, col, udf, array, mean
from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType
# from rest_client import predict, str_to_intlist
import requests
import json
import argparse
from pyspark.sql.functions import udf
from math import sqrt
import time
from pyspark.sql.functions import lit


def dot(l1):
@@ -40,84 +33,76 @@ def _dot(l2):
return _dot



def ux(l1):
_udf_similarity = udf(dot(l1), ArrayType(FloatType()))
_udf_similarity = udf(dot(l1), ArrayType(FloatType()) )
return _udf_similarity



def l(d):
s = [value for key, value in d.items()]
return s


udf_tolist = udf(l, ArrayType(FloatType()))


def top_n(l):
# top 10
#### top 10
n = 10
l.sort()
return l[-n:]


udf_top_n = udf(top_n, ArrayType(FloatType()))


def _top_n(l1, l2):
n = 10
l = sorted(l1+l2)
return l[-n:]


_udf_top_n = udf(_top_n, ArrayType(FloatType()))


def _mean(l):
ave = sum(l)/len(l)
return ave


udf_mean = udf(_mean, FloatType())


def run(hive_context, cfg):
# load dataframes
lookalike_loaded_table_norm = cfg['output']['gucdocs_loaded_table_norm']
lookalike_score_table_norm = cfg['output']['did_score_table_norm']
keywords_table = cfg["input"]["keywords_table"]
seeduser_table = cfg["input"]["seeduser_table"]
lookalike_score_table = cfg["output"]["score_table"]
lookalike_similarity_table = cfg["output"]["similarity_table"]

command = "SELECT * FROM {}"
df = hive_context.sql(command.format(lookalike_loaded_table_norm))
df = hive_context.sql(command.format(lookalike_score_table_norm))
df_keywords = hive_context.sql(command.format(keywords_table))
df_seed_user = hive_context.sql(command.format(seeduser_table))

# creating a tuple of did and kws for seed users

#### creating a tuple of did and kws for seed users
df = df.withColumn('kws_norm_list', udf_tolist(col('kws_norm')))
df_seed_user = df_seed_user.join(df.select('did', 'kws_norm_list'), on=['did'], how='left')
df_seed_user = df_seed_user.join(df.select('did','kws_norm_list'), on=['did'], how='left')
seed_user_list = df_seed_user.select('did', 'kws_norm_list').collect()

# batch 1 : 0-100 801 seed
## batch 1 : 0-100 801 seed
batch_length = 800
c = 0
# i=0, c=0 , batched_user=[0,200], top_10
#### i=0, c=0 , batched_user=[0,200], top_10
total_c = len(seed_user_list)
df = df.withColumn('top_10', array(lit(0.0)))
while total_c > 0:
len_tobe_p = min(batch_length, total_c)
total_c -= len_tobe_p
batched_user = [item[1] for item in seed_user_list[c: c+len_tobe_p]]
df = df.withColumn("similarity_list", ux(batched_user)(col('kws_norm_list')))
df = df.withColumn("top_10", _udf_top_n(col("similarity_list"), col("top_10")))
c += len_tobe_p

df = df.withColumn("mean_score", udf_mean(col("top_10")))
while total_c > 0 :
len_tobe_p = min(batch_length,total_c)
total_c-= len_tobe_p
batched_user = [item[1] for item in seed_user_list[c: c+len_tobe_p]]
df = df.withColumn("similarity_list",ux(batched_user)(col('kws_norm_list')))
df = df.withColumn("top_10", _udf_top_n(col("similarity_list"),col("top_10")))
c+=len_tobe_p

df = df.withColumn("mean_score",udf_mean(col("top_10")))
df.write.option("header", "true").option(
"encoding", "UTF-8").mode("overwrite").format('hive').saveAsTable(lookalike_score_table)
"encoding", "UTF-8").mode("overwrite").format('hive').saveAsTable(lookalike_similarity_table)
extended_did = df.sort(col('mean_score').desc()).select('did', 'mean_score')



if __name__ == "__main__":
start = time.time()
parser = argparse.ArgumentParser(description=" ")
@@ -5,7 +5,7 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
@@ -70,7 +70,7 @@ def run(cfg):
url = cfg['input']['din_model_tf_serving_url']
##time_interval, did, click_counts, show_counts, media_category, net_type_index, gender, age, keyword
record = {"did": 0, "show_counts": ['25:3', '29:6,25:2', '29:1,25:2,14:2', '14:1,29:2,25:2',
'29:1', '26:1,14:2,25:4', '14:1,25:3'], "show_clicks": [], "age": '10', "gender": '3'}
'29:1', '26:1,14:2,25:4', '14:1,25:3'], "show_clicks": [], "age": '10', "gender": '3'}
record['show_counts'] = str_to_intlist(record['show_counts'])
new_keyword = [26, 27, 29]
response = predict(serving_url=url, record=record, length=length, new_keyword=new_keyword)
@@ -1,5 +1,6 @@
#!/bin/bash
spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g seed_user_selector.py config.yml ;
spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g seed_user_selector.py config.yml "29" ;
spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g score_generator.py config.yml ;
spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g distance_table_list.py config.yml ;
spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g validation.py config.yml "29";

0 comments on commit 421bd9e

Please sign in to comment.