In [0]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import BucketedRandomProjectionLSH

from pyspark.sql import DataFrame
from pyspark.sql.functions import col, udf, max, collect_list, lit, expr, coalesce, pow, sum
from pyspark.sql.types import *

import pandas as pd
import numpy as np

import math
import shutil

In [0]:
@udf(VectorUDT())
def to_vector(size, index_list, value_list):
    ind, val = zip(*sorted(zip(index_list, value_list)))
    return Vectors.sparse(size, ind, val)
    
_ = spark.udf.register('to_vector', to_vector)

# generate ratings vectors 
ratings_vectors = spark.sql('''
  SELECT 
      user_id,
      to_vector(size, index_list, value_list) as ratings,
      size,
      index_list,
      value_list
    FROM ( 
      SELECT
        user_id,
        (SELECT max(product_id) + 1 FROM instacart.products) as size,
        COLLECT_LIST(product_id) as index_list,
        COLLECT_LIST(normalized_purchases) as value_list
      FROM ( -- all users, ratings
        SELECT
          user_id,
          product_id,
          normalized_purchases
        FROM instacart.user_ratings
        WHERE split = 'calibration'
        )
      GROUP BY user_id
      )
    ''')

In [0]:
bucket_length = 0.0025
lsh_tables = 5

lsh = BucketedRandomProjectionLSH(
  inputCol = 'ratings', 
  outputCol = 'hash', 
  numHashTables = lsh_tables, 
  bucketLength = bucket_length
  )

# fit the algorithm to the dataset
fitted_lsh = lsh.fit(ratings_vectors)

# assign LSH buckets to users
hashed_vectors = (
  fitted_lsh
    .transform(ratings_vectors)
    ).cache()

hashed_vectors.createOrReplaceTempView('hashed_vectors')

In [0]:
user_148 = (
  hashed_vectors
    .filter('user_id=148')
    .select('user_id','ratings')
  )

user_148.collect()[0]['ratings']

Out[5]: SparseVector(49689, {132: 0.0814, 274: 0.0814, 781: 0.0814, 1203: 0.0814, 1915: 0.0814, 3376: 0.0814, 4025: 0.0814, 4477: 0.0814, 5201: 0.0814, 6891: 0.0814, 7412: 0.1628, 7539: 0.0814, 7870: 0.1628, 8362: 0.0814, 10129: 0.0814, 10451: 0.0814, 10983: 0.1628, 11109: 0.0814, 11212: 0.0814, 12023: 0.0814, 13577: 0.0814, 14804: 0.0814, 15008: 0.0814, 15438: 0.0814, 17284: 0.0814, 17450: 0.1628, 17828: 0.1628, 18629: 0.1628, 19385: 0.0814, 20310: 0.0814, 20564: 0.0814, 20632: 0.1628, 21295: 0.0814, 21806: 0.2441, 22035: 0.0814, 22217: 0.0814, 22721: 0.0814, 23383: 0.1628, 23459: 0.1628, 24118: 0.0814, 24230: 0.0814, 24413: 0.2441, 25821: 0.0814, 26313: 0.0814, 26638: 0.0814, 27156: 0.0814, 27219: 0.0814, 27845: 0.0814, 28568: 0.0814, 29406: 0.0814, 29721: 0.0814, 29871: 0.0814, 29926: 0.0814, 30391: 0.0814, 30461: 0.0814, 31504: 0.3255, 31990: 0.0814, 32420: 0.0814, 32426: 0.0814, 33407: 0.0814, 34524: 0.0814, 36953: 0.0814, 37119: 0.0814, 37514: 0.0814, 38288: 0.0814, 38481: 0.0814

In [0]:
number_of_customers = 10

# retrieve n nearest customers 
similar_k_users = (
  fitted_lsh.approxNearestNeighbors(
    hashed_vectors, 
    user_148.collect()[0]['ratings'], # must be a vector value (not a dataframe)
    number_of_customers, 
    distCol='distance'
    )
    .select('user_id', 'distance')
  )
  
display(similar_k_users)

user_id,distance
148,0.0
75604,1.2190600561395566
183757,1.2228734335140283
4900,1.2415711561941944
44259,1.24379822773884
78995,1.2447867934396355
112316,1.2533600998598375
59880,1.2639667426747596
195292,1.2702051276573023
58390,1.270427581764804


In [0]:
max_distance_from_target = 1.3

# retreive all users within a distance range
similar_d_users = (
    fitted_lsh.approxSimilarityJoin(
      user_148,
      hashed_vectors,  
      threshold = max_distance_from_target, 
      distCol='distance'
      )
    .selectExpr('datasetA.user_id as user_a', 'datasetB.user_id as user_b', 'distance')
    .orderBy('distance', ascending=True)
    )
  
display(similar_d_users)

user_a,user_b,distance
148,148,0.0
148,75604,1.2190600561395566
148,183757,1.2228734335140283
148,4900,1.2415711561941944
148,44259,1.24379822773884
148,78995,1.2447867934396355
148,112316,1.2533600998598375
148,59880,1.2639667426747596
148,195292,1.2702051276573023
148,58390,1.270427581764804


In [0]:
# calculate similarity score
similar_users = (
  similar_k_users
    .withColumn('similarity', lit(1) / (lit(1) + col('distance')))
  )

display(similar_users)

user_id,distance,similarity
148,0.0,1.0
75604,1.2190600561395566,0.4506412511158778
183757,1.2228734335140283,0.449868168346027
4900,1.2415711561941944,0.4461156618814766
44259,1.24379822773884,0.4456728718462968
78995,1.2447867934396355,0.4454766051379529
112316,1.2533600998598375,0.4437817107271055
59880,1.2639667426747596,0.4417026015225611
195292,1.2702051276573023,0.4404888297613582
58390,1.270427581764804,0.4404456711289156


In [0]:
# calculate lowest possible unscaled similarity score
min_score = 1 / (1 + math.sqrt(2))

# calculate similarity score
similar_users = (
  similar_users
    .withColumn(
       'similarity_rescaled', 
       (col('similarity') - lit(min_score)) / lit(1.0 - min_score)
       )
     )

# make available for SQL query
similar_users.createOrReplaceTempView('similar_users')

display(similar_users)

user_id,distance,similarity,similarity_rescaled
148,0.0,1.0,1.0
75604,1.2190600561395566,0.4506412511158778,0.0621859544757572
183757,1.2228734335140283,0.449868168346027,0.0608662196369265
4900,1.2415711561941944,0.4461156618814766,0.0544602904048462
44259,1.24379822773884,0.4456728718462968,0.0537044005331488
78995,1.2447867934396355,0.4454766051379529,0.0533693523044138
112316,1.2533600998598375,0.4437817107271055,0.050475986562261
59880,1.2639667426747596,0.4417026015225611,0.0469267251403558
195292,1.2702051276573023,0.4404888297613582,0.0448546871359937
58390,1.270427581764804,0.4404456711289156,0.0447810107418841


In [0]:
similar_ratings = spark.sql('''
      SELECT
        m.user_id,
        m.product_id,
        COALESCE(n.normalized_purchases, 0.0) as normalized_purchases,
        m.similarity_rescaled
      FROM ( -- get complete list of products across similar users
        SELECT
          x.user_id,
          y.product_id,
          x.similarity_rescaled
        FROM (
          SELECT user_id, similarity_rescaled
          FROM similar_users
          ) x
        CROSS JOIN instacart.products y
        ) m
      LEFT OUTER JOIN ( -- retrieve ratings actually provided by similar users
        SELECT x.user_id, x.product_id, x.normalized_purchases 
        FROM instacart.user_ratings x 
        LEFT SEMI JOIN similar_users y 
          ON x.user_id=y.user_id 
        WHERE x.split = 'calibration'
          ) n
        ON m.user_id=n.user_id AND m.product_id=n.product_id
      ''')

display(similar_ratings)

user_id,product_id,normalized_purchases,similarity_rescaled
112316,15,0.0,0.050475986562261
78995,44,0.0,0.0533693523044138
148,59,0.0,1.0
59880,63,0.0,0.0469267251403558
4900,94,0.0,0.0544602904048462
59880,127,0.0,0.0469267251403558
4900,131,0.0,0.0544602904048462
4900,142,0.0,0.0544602904048462
148,146,0.0,1.0
75604,157,0.0,0.0621859544757572


In [0]:
product_ratings = ( 
   similar_ratings
    .groupBy('product_id')
      .agg( 
        sum(col('normalized_purchases') * col('similarity_rescaled')).alias('weighted_rating'),
        sum('similarity_rescaled').alias('total_weight')
        )
    .withColumn('recommendation_score', col('weighted_rating')/col('total_weight'))
    #.select('product_id', 'recommendation_score')
    .orderBy('recommendation_score', ascending=False)
  )

display(product_ratings)

product_id,weighted_rating,total_weight,recommendation_score
24413,0.385672560750228,1.4716246269355877,0.2620726465779026
31504,0.3472890202416005,1.4716246269355877,0.2359902205257137
44557,0.3360943125094087,1.4716246269355877,0.2283831803013985
18629,0.2909664374978424,1.4716246269355877,0.1977178365815551
21806,0.2733979472203292,1.4716246269355877,0.1857796765671384
10983,0.1627576917542318,1.4716246269355873,0.1105972873620276
23383,0.1627576917542318,1.4716246269355873,0.1105972873620276
17828,0.1627576917542318,1.4716246269355877,0.1105972873620276
7412,0.1627576917542318,1.4716246269355877,0.1105972873620276
7870,0.1627576917542318,1.4716246269355877,0.1105972873620276


In [0]:
# retreive actual ratings from this user
user_product_ratings = (
  spark
    .table('instacart.user_ratings')
    .filter("user_id = 148 and split = 'calibration'")
  )

# combine with recommender ratings
product_ratings_for_user = (
    product_ratings
      .join( user_product_ratings, on='product_id', how='outer')
      .selectExpr('product_id', 'COALESCE(normalized_purchases, 0.0) as user_score', 'recommendation_score')
      .orderBy('recommendation_score', ascending=False)
  )

display(product_ratings_for_user)

product_id,user_score,recommendation_score
24413,0.2441365376313478,0.2620726465779026
31504,0.3255153835084637,0.2359902205257137
44557,0.2441365376313478,0.2283831803013985
18629,0.1627576917542318,0.1977178365815551
21806,0.2441365376313478,0.1857796765671384
10983,0.1627576917542318,0.1105972873620276
23383,0.1627576917542318,0.1105972873620276
20632,0.1627576917542318,0.1105972873620276
23459,0.1627576917542318,0.1105972873620276
7412,0.1627576917542318,0.1105972873620276


In [0]:
max_partition_count = sc.defaultParallelism * 100
spark.conf.set('spark.sql.shuffle.partitions', max_partition_count) 

In [0]:
# ratio of customers to sample
sample_fraction = 0.10

# calculate max possible distance between users
max_distance = math.sqrt(2)

# calculate min possible similarity (unscaled)
min_score = 1 / (1 + math.sqrt(2))

# remove any old comparisons that might exist
shutil.rmtree('/dbfs/mnt/instacart/gold/similarity_results', ignore_errors=True)

# perform similarity join for sample of users
sample_comparisons = (
  fitted_lsh.approxSimilarityJoin(
    hashed_vectors.sample(withReplacement=False, fraction=sample_fraction), # use a random sample for our target users
    hashed_vectors,
    threshold = max_distance,
    distCol = 'distance'
    )
    .withColumn('similarity', lit(1)/(lit(1)+col('distance')))
    .withColumn('similarity_rescaled', (col('similarity') - lit(min_score)) / lit(1.0 - min_score))
    .selectExpr(
      'datasetA.user_id as user_a',
      'datasetB.user_id as user_b',
      'similarity_rescaled as similarity'
      )
  )

# write output for reuse
(
  sample_comparisons
    .write
    .format('delta')
    .mode('overwrite')
    .save('/mnt/instacart/gold/similarity_results')
  )

display(
  spark.table( 'DELTA.`/mnt/instacart/gold/similarity_results`' )
  )

In [0]:
number_of_customers = 10

# get k number of similar users for each sample user
similar_users =  (
    spark.sql('''
      SELECT 
        user_a, 
        user_b, 
        similarity
      FROM (
        SELECT
          user_a,
          user_b,
          similarity,
          ROW_NUMBER() OVER (PARTITION BY user_a ORDER BY similarity DESC) as seq
        FROM DELTA.`/mnt/instacart/gold/similarity_results`
        )
      WHERE seq <= {0}
      '''.format(number_of_customers)
      )
    )

similar_users = similar_users.createOrReplaceTempView('similar_users')
display(similar_users)

In [0]:
similar_ratings = spark.sql('''
    SELECT
      m.user_a,
      m.user_b,
      m.product_id,
      COALESCE(n.normalized_purchases, 0.0) as normalized_purchases,
      m.similarity
    FROM (
      SELECT
        x.user_a,
        x.user_b,
        y.product_id,
        x.similarity
      FROM similar_users x
      CROSS JOIN instacart.products y
      ) m
    LEFT OUTER JOIN ( -- retrieve ratings actually provided by similar users
      SELECT 
        user_id as user_b, 
        product_id, 
        normalized_purchases 
      FROM instacart.user_ratings
      WHERE split = 'calibration'
        ) n
      ON m.user_b=n.user_b AND m.product_id=n.product_id
      ''')

display(similar_ratings)

In [0]:
product_ratings = ( 
   similar_ratings
    .groupBy('user_a','product_id')
      .agg( 
        sum(col('normalized_purchases') * col('similarity')).alias('weighted_rating'),
        sum('similarity').alias('total_weight')
        )
    .withColumn('recommendation_score', col('weighted_rating')/col('total_weight'))
    .select('user_a', 'product_id', 'recommendation_score')
    .orderBy(['user_a','recommendation_score'], ascending=[True,False])
  )

product_ratings.createOrReplaceTempView('product_ratings')

display(
  product_ratings
  )  

In [0]:
%sql

SELECT
  user_a as user_id,
  product_id,
  recommendation_score,
  PERCENT_RANK() OVER (PARTITION BY user_a ORDER BY recommendation_score DESC) as rank_ui
FROM product_ratings
ORDER BY user_a, recommendation_score DESC

In [0]:
eval_set = (
  spark
    .sql('''
    SELECT 
      x.user_id,
      x.product_id,
      x.r_t_ui,
      y.rank_ui
    FROM (
      SELECT
        user_id,
        product_id,
        normalized_purchases as r_t_ui
      FROM instacart.user_ratings 
      WHERE split = 'evaluation' -- the test period
        ) x
    INNER JOIN (
      SELECT
        user_a as user_id,
        product_id,
        PERCENT_RANK() OVER (PARTITION BY user_a ORDER BY recommendation_score DESC) as rank_ui
      FROM product_ratings
      ) y
      ON x.user_id=y.user_id AND x.product_id=y.product_id
      ''').cache()
  )

display(
  eval_set
    .withColumn('weighted_r', col('r_t_ui') * col('rank_ui') )
    .groupBy()
      .agg(
        sum('weighted_r').alias('numerator'),
        sum('r_t_ui').alias('denominator')
        )
    .withColumn('mean_percent_rank', col('numerator')/col('denominator'))
    .select('mean_percent_rank')
  )

In [0]:
%sql

SELECT
  product_id,
  PERCENT_RANK() OVER (ORDER BY normalized_purchases DESC) as rank_ui
FROM (
  SELECT
    x.product_id,
    COALESCE(y.normalized_purchases,0.0) as normalized_purchases
  FROM (SELECT product_id FROM instacart.products) x
  LEFT OUTER JOIN instacart.naive_ratings y
    ON x.product_id=y.product_id
  WHERE split = 'calibration'
  )

product_id,rank_ui
24852,0.0
13176,2.0130445285449716e-05
21137,4.026089057089943e-05
21903,6.0391335856349146e-05
47209,8.052178114179885e-05
47766,0.00010065222642724856
47626,0.00012078267171269828
16797,0.000140913116998148
26209,0.0001610435622835977
27845,0.00018117400756904745


In [0]:
%sql

SELECT
  SUM(r_t_ui * rank_ui) / SUM(rank_ui) as mean_percent_rank
FROM (
  SELECT 
    x.user_id,
    x.product_id,
    x.r_t_ui,
    y.rank_ui
  FROM (
    SELECT
      p.user_id,
      p.product_id,
      p.normalized_purchases as r_t_ui
    FROM instacart.user_ratings p
    INNER JOIN (SELECT DISTINCT user_a as user_id FROM similar_users) q
      ON p.user_id=q.user_id
    WHERE p.split = 'evaluation' -- the test period
      ) x
  INNER JOIN (
    SELECT
      product_id,
      PERCENT_RANK() OVER (ORDER BY normalized_purchases DESC) as rank_ui
    FROM (
      SELECT
        x.product_id,
        COALESCE(y.normalized_purchases,0.0) as normalized_purchases
      FROM (SELECT product_id FROM instacart.products) x
      LEFT OUTER JOIN instacart.naive_ratings y
        ON x.product_id=y.product_id
      WHERE split = 'calibration'
      )
    ) y
    ON x.product_id=y.product_id
    )

In [0]:
def list_cached_dataframes():
    return [(k,v) for (k,v) in [(k,v) for (k, v) in globals().items() if isinstance(v, DataFrame)] if v.is_cached]
  
for name, obj in list_cached_dataframes():
  obj.unpersist()