Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge pull request #17 from radibnia77/main
- Adopting the new data structure, adding a new filter to remove inactive users in the lookalike pipeline
- Fix trainer
- Refactoring files
  • Loading branch information
radibnia77 committed Dec 9, 2021
2 parents c79cdc9 + 2ddd37c commit 0eaa7b813df11aaf6198b10ddca3b6ca0bf2daa4
Showing 48 changed files with 1,557 additions and 984 deletions.
@@ -0,0 +1,7 @@

IVF256,Flat
k = 10
1m users
We tuned the configuration. We need to index in 500k increaments and search in 1m increaments.
the overall time to process 1m user (including hive and spark operations) is 10 minutes. I beleive we can process 50m in less than one hour.
Clustering is good enough, we do not need vector optimization such as OP.
@@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.ml.clustering import KMeans
# import numpy as np

# Experiments with clustering methods.

score_table = 'from lookalike_application_score_vector_08192021_1m'
num_clusters = 100

spark = SparkSession.builder.enableHiveSupport().getOrCreate()

# num_features = 10
# num_rows = 10000
# data = np.random.rand(num_rows, num_features)
# spark.createDataFrame(data)

did_bucket = 0

command = "SELECT did, score_vector, did_bucket FROM {} WHERE did_bucket = {}".format(score_table, did_bucket)
df = spark.sql(command)

list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df = df.withColumn('score_vec', list_to_vector_udf(df.score_vector))

first_time = True
if first_time:
kmeans = KMeans(k=num_clusters, featuresCol='score_vec')
kmeans.setSeed(1)
kmeans.setPredictionCol('cluster_id')
model = kmeans.fit(df)
first_time = False

df2 = model.transform(df)
df2.show()





@@ -5,6 +5,10 @@
import heapq
from pyspark.sql import Row

'''
Author:Reza
'''


def calculate_similarity(alpha_user_did_score, c2):
# we don't need did_list in dataframe
@@ -0,0 +1,21 @@
## total number of rows
df = sql('select * from ads_clicklog_11162021')
df.count()
##7,640,717

## number of row with empty industry_id
df = sql('select * from ads_clicklog_11162021 where Length(industry_id) == 0 ')
df.count()
##2,425,201

## number of
df = sql('select industry_id, count(industry_id) as s from ads_showlog_11162021 where Length(industry_id) != 0 group by industry_id order by count(industry_id) ')
list_of_imp = df.select('s').collect()
l = 0
for i in range(len(list_of_imp)):
l+=list_of_imp[i][0]

df = df.withColumn('percentage', df.s/l)
df = df.filter(df.percentage >0.01)
l = df.select('industry_id').collect()
keyword_list= [str(row.industry_id) for row in df.select('industry_id').collect()]
@@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import numpy as np
import faiss
import time
import datetime

def testIndex(index, n, num_user, num_user_step, search_step):
name = str(type(index)).split('.')[-1].split('\'')[0]
print('Testing {}'.format(name))

start = time.time()
gpu_index = faiss.index_cpu_to_all_gpus(index)

for i in range(0, num_user, num_user_step):
xb = np.random.random((num_user_step, dim_of_user_vector)).astype('float32')
# xb[:, 0] += np.arange(num_user) / 1000.
# print(xb.shape)

if not gpu_index.is_trained:
gpu_index.train(xb)
print('{}: {} training: {}'.format(i, name, time.time() - start))
else:
print('{}: No training needed.'.format(i))

gpu_index.add(xb)
end_load = time.time()

total_search_time = 0
for i in range(0, num_user, search_step):
xb = np.random.random((num_user_step, dim_of_user_vector)).astype('float32')
print(i)

# search top 10 users which are similar to 1M query users
# Function gpu_index.search() return top 10 user index and its Distance from 1M queried users
search_start = time.time()
DISTANCE, K_NEAREST_NEIGHBOR = gpu_index.search(xb, n)
total_search_time += time.time()-search_start

print(' {} test: {}'.format(name, str(datetime.timedelta(seconds=time.time() - start))))
print(' Load index time: ', str(datetime.timedelta(seconds=end_load - start)))
print(' Total search time:', str(datetime.timedelta(seconds=total_search_time)))
print(' nprobe =', index.nprobe)

# Generate the test dataset.
dim_of_user_vector = 32
n = 10
nprobe = 1
num_user = 10000000
num_user_step = 500000
search_step = 1000000
np.random.seed(1234)

# Query the number of GPU resources.
no_gpus = faiss.get_num_gpus()
print('num of gpus available count: {}'.format(no_gpus))

# Test different index subclasses.
index = faiss.index_factory(dim_of_user_vector, "IVF1024,Flat")
index.nprobe = nprobe
testIndex(index, n, num_user, num_user_step, search_step)
# testIndex(faiss.IndexLSH(dim_of_user_vector, 10))



@@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, FloatType, IntegerType, ArrayType, MapType

spark_session = SparkSession.builder \
.config('hive.metastore.uris', "thrift://10.213.37.46:9083") \
.config('hive.server2.thrift.http.port', 10002) \
.config('hive.server2.thrift.port', 10016) \
.config('hive.server2.transport.mode', 'binary') \
.config('hive.metastore.client.socket.timeout', 1800) \
.config('hive.metastore.client.connect.retry.delay', 5) \
.config('spark.hadoop.hive.exec.dynamic.partition.mode', 'nonstrict') \
.enableHiveSupport().getOrCreate()

data = [(1,2), (3,4), (5,6)]

schema = StructType([
StructField("a", IntegerType(), True),
StructField("b", IntegerType(), True),
])

df = spark_session.createDataFrame(spark_session.sparkContext.parallelize(data), schema)
df.show()

@@ -1,5 +1,5 @@
product_tag: 'lookalike_application'
pipeline_tag: '08172021_1000'
pipeline_tag: '08192021_1m'
score_generator:
input:
log_table : "lookalike_08172021_1000_logs"
@@ -17,15 +17,13 @@ score_vector:
keywords_table: "din_ad_keywords_09172020"
score_table: "{product_tag}_{pipeline_tag}_score"
score_vector_table: "{product_tag}_{pipeline_tag}_score_vector"
did_bucket_size: 2
did_bucket_step: 2
score_matrix_table:
did_bucket_size: 2
did_bucket_step: 2
score_matrix_table: '{product_tag}_{pipeline_tag}_score_matrix'
did_bucket_size: 100
did_bucket_step: 10
top_n_similarity:
did_bucket_size: 100
did_bucket_step: 100
cross_bucket_size: 1 # in production this should be as same as did_bucket_size
load_bucket_step: 20
search_bucket_step: 50
top_n: 10
similarity_table: "{product_tag}_{pipeline_tag}_similarity"
index_factory_string: "IVF256,Flat"
similarity_table: "{product_tag}_{pipeline_tag}_similarity"

@@ -4,25 +4,7 @@ spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memo

spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml

spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_matrix_table.py config.yml

# spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml

spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_matrix_table.py config.yml

# Run create_archive.sh before running this line.
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 16G --driver-memory 16G \
--conf spark.driver.maxResultSize=5g \
--conf spark.hadoop.hive.exec.dynamic.partition=true \
--conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
--deploy-mode client \
--conf "spark.yarn.dist.archives=lookalike-application-python-venv.tar.gz#environment" \
--conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/lookalike-application-python-venv/bin/python" \
--conf "spark.executorEnv.PYSPARK_PYTHON=./environment/lookalike-application-python-venv/bin/python" \
top_n_similarity_table_generator.py config.yml

# optional
spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation.py config.yml "29"
# Run top_n_similarity.py on a machine with a GPU.
#spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity.py config.yml
#spark-submit --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity.py config.yml

@@ -25,8 +25,7 @@
import json
import argparse
from math import sqrt
from util import resolve_placeholder
from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition
from util import resolve_placeholder, write_to_table_with_partition

'''
This process generates the score-table with the following format.

This file was deleted.

@@ -83,7 +83,7 @@ def run(hive_context, cfg):

mode = 'overwrite' if first_round else 'append'
write_to_table_with_partition(df.select('did', 'score_vector', 'c1', 'did_bucket'), score_vector_table, partition=('did_bucket'), mode=mode)
# write_to_table_with_partition(df.select('did', 'did_bucket'), score_vector_table, partition=('did_bucket'), mode=mode)

first_round = False
batch_num += 1

0 comments on commit 0eaa7b8

Please sign in to comment.