In [1]:
import pyspark
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import lit
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.functions import array_contains
from pyspark.sql.functions import abs
from pyspark.sql import Row

from sklearn.preprocessing import StandardScaler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import silhouette_score
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import BucketedRandomProjectionLSH

In [2]:
spark = SparkSession \
        .builder \
        .appName('Test') \
        .getOrCreate()

sc = spark.sparkContext

dbutils.widgets.text("targetDateStr", "")
targetDateStr = dbutils.widgets.get("targetDateStr")

lookAlikeResult = "dbfs:/mnt/tgam_adc_s3/wecloud/LookAlikeResult/date=" + targetDateStr

In [3]:
df_read = spark.read.format("parquet").option('inferSchema', 'true').load('dbfs:/mnt/tgam_adc_s3/wecloud/feature_table/feature_90.parquet/part-0*.parquet')
                                                                          
#'dbfs:/mnt/tgam_adc/data/la_features/part-0*.parquet'

In [4]:
#Rename / drop duplicate columns / fill N/A's

features = df_read.drop("DATE36","ADC_UID34","CLICK_COUNT54","PAGEVIEW_ID55","AUDIENCESEGMENTIDS60", \
                       "DATE38","ADC_UID36","max_Date", "AUDIENCESEGMENTIDS32", "AUDIENCESEGMENTIDS62", "AUDIENCESEGMENTIDS_LIST") \
                   .withColumnRenamed("ADC_UID27", "ADC_UID") \
                   .withColumnRenamed("DATE0", "DATE") \
                   .withColumnRenamed("PAGEVIEW_ID26", "PAGEVIEW_ID") \
                   .withColumnRenamed("CLICK_COUNT28", "CLICK_COUNT") \
                   .dropDuplicates(subset =['ADC_UID']) \
                   .fillna(0)

features.persist(pyspark.StorageLevel.DISK_ONLY)

columns = features.schema.names

In [5]:
input_data = features.rdd.map(lambda x:(x['adc_uid'],\
                                        Vectors.dense([x[col] for col in columns if col not in ['adc_uid']])))\
                         .toDF(["ADC_UID", "features"])

input_data.persist(pyspark.StorageLevel.DISK_ONLY)

In [6]:
scaler = MinMaxScaler(inputCol="features",\
                      outputCol="scaledFeatures")
scalerModel =  scaler.fit(input_data.select("features"))
scaledData = scalerModel.transform(input_data)

trainingDataClust = scaledData.drop('features') \
                              .withColumnRenamed("scaledFeatures", "features") \
                              .drop_duplicates() \

trainingDataClust.persist(pyspark.StorageLevel.DISK_ONLY)

In [7]:
#Full map path: dbfs:/mnt/tgam_adc_s3/wecloud/data/seg_adc_map.csv
#dbfs:/mnt/tgam_adc_s3/wecloud/data/SEG_ADC.csv

df_seg_defs = spark.read.format("csv").option('inferSchema', 'true').load('dbfs:/mnt/tgam_adc_s3/wecloud/segments/date=2020-05-29/5f823f07-a07c-4058-89b3-a23b2257f57e.csv') \
              .withColumnRenamed("_c0", "Seg_Def_ID") \
              .withColumnRenamed("_c1", "ADC_UID_2")

display(df_seg_defs)

Seg_Def_ID,ADC_UID_2
5f823f07-a07c-4058-89b3-a23b2257f57e,ffb2616a-b5fe-492f-a713-b30aab21f37e
5f823f07-a07c-4058-89b3-a23b2257f57e,6444ee08-e0ce-b9b4-a495-ade1a25615e5
5f823f07-a07c-4058-89b3-a23b2257f57e,96ec6697-6edb-6f4b-b439-4fa4c3a0a121
5f823f07-a07c-4058-89b3-a23b2257f57e,25d52b37-13a5-4e63-b7dc-ddd8dd41a91f
5f823f07-a07c-4058-89b3-a23b2257f57e,33f17b16-be2d-6308-2ce6-2abcb765d5c8
5f823f07-a07c-4058-89b3-a23b2257f57e,8d7119e2dfaa15605b0e3870e8479b8f
5f823f07-a07c-4058-89b3-a23b2257f57e,2fbb64b4-d770-4c64-9336-19b2113f5769
5f823f07-a07c-4058-89b3-a23b2257f57e,30f1ba87-d45f-4daf-9391-ebb5437daa6d
5f823f07-a07c-4058-89b3-a23b2257f57e,67477a55-4022-4818-8240-561516966698
5f823f07-a07c-4058-89b3-a23b2257f57e,978955d4-3055-45a4-8b1c-208aab4da0f2


In [8]:
#After the cells above have been run once they should not be run again
#Doing so will overwrite any model ouputs to that point and will rerun steps unnecessarily

#SET SEG_ID HERE!
#Takes list of strings (1 string is acceptable)

seg_id = df_seg_defs.select('Seg_Def_ID').distinct().rdd.map(lambda r: r[0]).collect()[0]
seg_id

#Soft coded to take distinct segment ID's from SEG_ADC.csv (aka df_seg_defs)
#'5f823f07-a07c-4058-89b3-a23b2257f57e' - good for dist, z-score=0/-0.25
#same for seg_map_adc[20]

In [9]:
seg_users = trainingDataClust.join(df_seg_defs.where(df_seg_defs['Seg_Def_ID'] == seg_id), \
                                           [trainingDataClust.ADC_UID == df_seg_defs.ADC_UID_2], \
                                           how = "inner") \
                                     .drop_duplicates()

seg_users.persist(pyspark.StorageLevel.DISK_ONLY)

pool_users = trainingDataClust.join(
                                    seg_users, \
                                    [trainingDataClust.ADC_UID == seg_users.ADC_UID], \
                                    how = "leftanti")

pool_users.persist(pyspark.StorageLevel.DISK_ONLY)

In [10]:
row_schema = StructType([StructField('Seg_Id', StringType()), StructField('Seg_User_Count', IntegerType()), \
                         StructField('Pool_User_Count', IntegerType()), StructField('Diff', IntegerType())])
seg_size_df = sqlContext.createDataFrame(sc.emptyRDD(), row_schema)

row = [Row(Seg_Id = seg_id, Seg_User_Count = seg_users.count(), Pool_User_Count = pool_users.count(), \
           Diff = seg_users.count() + pool_users.count() - trainingDataClust.count())]
rowdf = spark.createDataFrame(row, row_schema)
seg_size_df = seg_size_df.union(rowdf).drop_duplicates()

seg_size_df.persist(pyspark.StorageLevel.DISK_ONLY)

display(seg_size_df)

Seg_Id,Seg_User_Count,Pool_User_Count,Diff
5f823f07-a07c-4058-89b3-a23b2257f57e,532577,52966863,0


In [11]:
#Maximum number of clusters considered for each segment 
#v unlikely to be above 6, could use 10 to be safe

#Some segments only have one user and will cause this stage to error (because we can't subcluster 1 user)
#Recommended to disregard because model should never be fed a segment of 0 or 1
#This issue may extend to small custers of <50 users

max_clusters = 10

k_schema = StructType([StructField('k', IntegerType()), StructField('silhouette', FloatType())])
sil_scores = spark.createDataFrame(sc.emptyRDD(), k_schema)

for k in range(2, max_clusters):

  # Trains a k-means model.
  kmeans = KMeans().setK(k).setSeed(1)
  model = kmeans.fit(seg_users)

  # Make predictions
  predictions = model.transform(seg_users)

  # Evaluate clustering by computing Silhouette score
  evaluator = ClusteringEvaluator()
  silhouette = evaluator.evaluate(predictions)

  #Store silhouette scores
  row = [Row(k=k, silhouette=silhouette)]
  rowdf = spark.createDataFrame(row, k_schema)
  sil_scores = sil_scores.union(rowdf)

sil_scores.persist(pyspark.StorageLevel.DISK_ONLY)

optimal_k = sil_scores.orderBy(["silhouette"], ascending=[False,True]).select('k').collect()[0][0]

In [12]:
display(sil_scores).orderBy(["silhouette"], ascending=[False,True])

k,silhouette
2,0.9491807
3,0.92779267
4,0.94191927
5,0.95088047
6,0.9182099
7,0.8698506
8,0.9139011
9,0.86377645


In [13]:
kmeans = KMeans().setK(optimal_k).setSeed(1)
model = kmeans.fit(seg_users)

centers = model.clusterCenters()

import json
centroids_json = json.dumps(list(list(i) for i in centers))
centers_list = json.loads(centroids_json)
#I believe you requested centroids_json

In [14]:
#The entire pool of users is considered as lookalikes
#Minimum Euclidean distance from cluster centers are returned for each potential lookalike

NUM_NEIGH = pool_users.count() - 10000
#10000 is just a buffer so the model doesn't search for more lookalikes than there are

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)

model = brp.fit(seg_users)

schema = StructType([StructField('ADC_UID', StringType()), StructField('distCol', DoubleType())])
lookalikes = spark.createDataFrame(sc.emptyRDD(), schema)

for center in centers:
  key = Vectors.dense(center)
  modelo = model.approxNearestNeighbors(pool_users, key, int(NUM_NEIGH)).drop('features','hashes')
  lookalikes = lookalikes.union(modelo).repartition(200)

lookalikes.persist(pyspark.StorageLevel.DISK_ONLY)

In [15]:
lookalikes_pd = lookalikes.groupby('ADC_UID').agg(F.min(F.col('distCol')).alias('minDist'))\
                                      .orderBy(["minDist"], ascending=[True,True]).drop_duplicates(["ADC_UID"])

lookalikes_pd.persist(pyspark.StorageLevel.DISK_ONLY)

print(f'# of Lookalikes for {seg_id} = {lookalikes_pd.count()}')

In [16]:
#Use if returning a list is preferable to a DF
#It is ordered and can be sliced as needed
#E.g. lookalikes_list[:10] returns top 10 lookalikes

lookalikes_list = lookalikes_pd.select("ADC_UID").rdd.flatMap(lambda x: x).collect()

In [17]:
#Z-Scores calculate number of standard deviations from mean
#Used to filter outliers 

dist_std = lookalikes_pd.agg(F.stddev(lookalikes_pd.minDist)).first()[0]
dist_mean = lookalikes_pd.agg(F.mean(lookalikes_pd.minDist)).first()[0]

lookalikes_pd =\
lookalikes_pd.withColumn('Z_Score',\
                                  (lookalikes_pd.minDist - dist_mean) / \
                                  dist_std
                              )

lookalikes_pd.persist(pyspark.StorageLevel.DISK_ONLY)

In [18]:
from pyspark.sql.functions import *
#Create similarity score by scaling distance column
#Min/Max Scaler
#subtract 1, * 100, and take absolute to reverse scaling (i.e. closest neighbor now 100 not 0)
#decimal points on similarity score are optional

OUTLIER_THRESHOLD = 0

#of standard deviations from the mean of Euclidean distance column that will be considered for lookalikes
#Any outliers beyond this will be scored as 0 

dist_mean = lookalikes_pd.where(lookalikes_pd.Z_Score <= OUTLIER_THRESHOLD)\
                               .agg(F.mean(lookalikes_pd.minDist)).first()[0]
dist_min = lookalikes_pd.where(lookalikes_pd.Z_Score <= OUTLIER_THRESHOLD)\
                              .agg(F.min(lookalikes_pd.minDist)).first()[0]
dist_max = lookalikes_pd.where(lookalikes_pd.Z_Score <= OUTLIER_THRESHOLD) \
                              .agg(F.max(lookalikes_pd.minDist)).first()[0]

lookalikes_pd =\
lookalikes_pd.withColumn('Similarity_Score', when(lookalikes_pd.Z_Score <= OUTLIER_THRESHOLD,\
                                                        ((lookalikes_pd.minDist - dist_min) / \
                                                        (dist_max - dist_min) \
                                                        -1) * 100) \
                                                  .otherwise(0)
                                                  )

lookalikes_pd = lookalikes_pd.withColumn('Similarity_Score', F.round(abs(lookalikes_pd.Similarity_Score), 2))

lookalikes_pd.persist(pyspark.StorageLevel.DISK_ONLY)

display(lookalikes_pd)

ADC_UID,minDist,Z_Score,Similarity_Score
e9167841-1518-472b-b1f3-0d7fefbf7bdd,0.0025342352618233,-0.9253972263952536,100.0
7dc989c2-dca7-4239-a4a9-64ada9ca9429,0.0025407294685925,-0.9253100343151373,99.99
03d1904c-1ae1-4c4b-8eda-fd683f7baf1a,0.0025461734858193,-0.9252369422188944,99.98
7bdbdd3e-88ba-4618-abea-0e8cf0a0212c,0.0025499745962388,-0.925185908007018,99.98
687b53e0-58ee-4d42-a3cb-bf7b9d4e0932,0.0025549373665929,-0.9251192771950204,99.97
6e06d527-3e8c-451c-9dfd-da7c2fb40d5d,0.0025566035961758,-0.9250969061759808,99.97
4ecc44b5-19db-4f5c-ad29-5a7773cc6208,0.0025569454309471,-0.925092316657072,99.97
113c69d9-198c-44ca-a695-8624a8353668,0.0025579047458537,-0.9250794367680923,99.97
7f1e5cde-4e01-4f59-9cc5-e268e463a370,0.0025588482404517,-0.925066769284672,99.96
223e10c5-50e0-49b6-a57e-95c753799d07,0.0025632644266765,-0.9250074769835794,99.96


In [19]:
#To verify furthest distance = score of 0
display(lookalikes_pd.collect()[-100:])

ADC_UID,minDist,Z_Score,Similarity_Score
6b05eb221076c0c1247968fa7ae5bda8,2.049723975758137,26.560443340454736,0.0
b39778dc8c399161c879aef9ce74f2ed,2.0497251566596963,26.560459195395456,0.0
9c366f6b1605b7a7fd4e63a9237e7cd5,2.0497252119048275,26.560459937123905,0.0
a34dda5b5dbd9d12d24c9fea9a935145,2.049726103834931,26.560471912295597,0.0
321b1d1edb42be08e6ce373184bd1f51,2.049726761162215,26.56048073765876,0.0
a34ea5013861d6781cad1b3c71b90e05,2.0497271627734657,26.5604861297446,0.0
33f7ee494285049972ad482f00fd86aa,2.0497323448176057,26.560555704555483,0.0
eac5df06abe33d0b0441c0efeab99da7,2.049735734153616,26.560601210229606,0.0
c68a525b8769c2f9131cc83c1ad055fd,2.0497360688113653,26.56060570338887,0.0
c91f005a748aec91c43d885f41faa6b0,2.04973664665448,26.560613461587053,0.0


In [20]:
#LOOKALIKE PREVIEW for END USER
#Likely on a sliding scale

scores = [90,80,70,60,50,40,30,20,10,0]

long = []

for score in scores:
  long.append(lookalikes_pd.where(lookalikes_pd['Similarity_Score'] >= score).count())
  data_tuples = zip(scores, long)
  sim_scores = pd.DataFrame(data_tuples, columns=['Score','# of Lookalikes'])

display(sim_scores)

Score,# of Lookalikes
90,7998
80,419293
70,1284758
60,2491388
50,4563562
40,4668998
30,9965840
20,46387678
10,46873814
0,52966863


In [21]:
#NN_REQUESTED = # of neighbors requested for segment by END USER in ADC tool

NN_REQUESTED = 10000

final_nn_dfs = lookalikes_pd[['ADC_UID', 'Similarity_Score']].collect()[:NN_REQUESTED]
final_lookalike_lists = lookalikes_list[:NN_REQUESTED]

display(final_nn_dfs)

#Currently displaying DF, but DE team probably just wants the ordered list of ADC_UIDS (i.e. final_lookalike_lists)

ADC_UID,Similarity_Score
e9167841-1518-472b-b1f3-0d7fefbf7bdd,100.0
7dc989c2-dca7-4239-a4a9-64ada9ca9429,99.99
03d1904c-1ae1-4c4b-8eda-fd683f7baf1a,99.98
7bdbdd3e-88ba-4618-abea-0e8cf0a0212c,99.98
687b53e0-58ee-4d42-a3cb-bf7b9d4e0932,99.97
6e06d527-3e8c-451c-9dfd-da7c2fb40d5d,99.97
4ecc44b5-19db-4f5c-ad29-5a7773cc6208,99.97
113c69d9-198c-44ca-a695-8624a8353668,99.97
7f1e5cde-4e01-4f59-9cc5-e268e463a370,99.96
223e10c5-50e0-49b6-a57e-95c753799d07,99.96


In [22]:
#SCORE_THRESHOLD = include all neighbors above this similarity score threshold as set by END USER in ADC tool

SCORE_THRESHOLD = 80

final_nn_dfs = lookalikes_pd.where(lookalikes_pd['Similarity_Score'] >= SCORE_THRESHOLD) \
                                          .select('ADC_UID', 'Similarity_Score')

#Similarity Score can be easily removed if desired
#then add.collect() to return ordered list of ADC_UIDs

final_nn_dfs.persist(pyspark.StorageLevel.DISK_ONLY)
  
display(final_nn_dfs)

#Currently displaying DF, but DE team probably just wants the ordered list of ADC_UIDS (i.e. final_lookalike_lists)

ADC_UID,Similarity_Score
e9167841-1518-472b-b1f3-0d7fefbf7bdd,100.0
7dc989c2-dca7-4239-a4a9-64ada9ca9429,99.99
03d1904c-1ae1-4c4b-8eda-fd683f7baf1a,99.98
7bdbdd3e-88ba-4618-abea-0e8cf0a0212c,99.98
687b53e0-58ee-4d42-a3cb-bf7b9d4e0932,99.97
6e06d527-3e8c-451c-9dfd-da7c2fb40d5d,99.97
4ecc44b5-19db-4f5c-ad29-5a7773cc6208,99.97
113c69d9-198c-44ca-a695-8624a8353668,99.97
7f1e5cde-4e01-4f59-9cc5-e268e463a370,99.96
223e10c5-50e0-49b6-a57e-95c753799d07,99.96
