In [3]:
import sys
import math
from annoy import AnnoyIndex
import timeit
import numpy as np
from sklearn.metrics.pairwise import euclidean_distances

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.recommendation import ALS
from pyspark.context import SparkContext
from pyspark.mllib.evaluation import RankingMetrics


In [5]:
spark = SparkSession.builder.appName("ALS").getOrCreate()

In [6]:
data_train = spark.read.parquet('cf_train.parquet')
data_val = spark.read.parquet('cf_validation.parquet')
data_test = spark.read.parquet('cf_test.parquet')

df_train = data_train.select('user_id','track_id','count')

df_val = data_val.select('user_id','track_id','count')

df_test = data_test.select('user_id','track_id','count')

frac = 1.0

df_train = data_train.sample(withReplacement=False, fraction=frac)
df_val = data_val.sample(withReplacement=False, fraction=frac)
df_test = data_test.sample(withReplacement=False, fraction=frac)

df_train = df_train.withColumn('user_id', F.hash(col('user_id')))
df_train = df_train.withColumn('track_id', F.hash(col('track_id')))
df_val = df_val.withColumn('user_id', F.hash(col('user_id')))
df_val = df_val.withColumn('track_id', F.hash(col('track_id')))
df_test = df_test.withColumn('user_id', F.hash(col('user_id')))
df_test = df_test.withColumn('track_id', F.hash(col('track_id')))

rank = 20
reg_param = 0.05
alpha = 15

als = ALS(rank = rank, regParam = reg_param, alpha = alpha, userCol = "user_id", itemCol = "track_id", ratingCol = "count", implicitPrefs = True)

als_model = als.fit(df_train)


In [7]:
item_factors = als_model.itemFactors
item_factors.show(5)
item_factors['features']

+-----------+--------------------+
|         id|            features|
+-----------+--------------------+
|-2147356230|[0.002952819, -0....|
|-2147124630|[5.4565535E-5, -3...|
|-2146869950|[-1.3237651E-27, ...|
|-2145200330|[-1.4980401E-4, 1...|
|-2145140630|[-3.205877E-5, -8...|
+-----------+--------------------+
only showing top 5 rows



Column<b'features'>

In [8]:
item_factors = item_factors.withColumn('annoy_id', F.row_number().over(Window.orderBy('id')))

item_factors.show(5)
#item_factors.toPandas().to_csv('item_factors.csv')

+-----------+--------------------+--------+
|         id|            features|annoy_id|
+-----------+--------------------+--------+
|-2147427751|[-1.1584E-28, 2.5...|       1|
|-2147410333|[2.3735834E-5, 3....|       2|
|-2147356230|[0.002952819, -0....|       3|
|-2147354705|[1.5179839E-28, 2...|       4|
|-2147351338|[-2.4785043E-4, 5...|       5|
+-----------+--------------------+--------+
only showing top 5 rows



In [9]:
annoy_id_list = item_factors.select(['id', 'annoy_id'])     # 
item_factors = item_factors.select(['annoy_id', 'features'])

# 1. Annoy

In [10]:
# build
tic = timeit.default_timer()

annoy_tree = AnnoyIndex(als_model.rank, 'dot')

for item_factor in item_factors.collect():
    annoy_tree.add_item(item_factor.annoy_id, item_factor.features)

annoy_tree.build(10)

toc = timeit.default_timer()
build_time = toc-tic

print('Time to build tree: {} seconds'.format(build_time))

Time to build tree: 3.7635909999999626 seconds


In [11]:
val_user = df_val.select('user_id').distinct()
user_factors = als_model.userFactors.withColumnRenamed('id', 'user_id')
val_user_factors = val_user.join(user_factors, on='user_id', how='inner')

In [12]:
print(int(val_user.collect()[0].user_id))
val_user_factors.show()

-545358818
+-----------+--------------------+
|    user_id|            features|
+-----------+--------------------+
|-2090272930|[-5.840485E-19, -...|
|-1948602990|[-0.0047930637, 0...|
|-1773662890|[0.11956502, -1.2...|
|-1680509340|[-5.948269E-4, -0...|
|-1480121900|[0.3016885, 0.567...|
|-1364968800|[-0.0011344405, 0...|
|-1155265040|[9.781499E-4, -7....|
|-1051642160|[-0.0807854, 0.01...|
| -941150950|[-0.054587103, 0....|
| -776472750|[0.96694446, 0.38...|
| -699194540|[0.2584578, -0.09...|
| -249000270|[-0.0031910143, -...|
|   -6962990|[0.65244216, -0.0...|
|  608204120|[0.38735414, 0.15...|
| 1104141380|[1.3534796, 0.090...|
| 1365028600|[0.08714541, -0.1...|
| 1573842130|[-0.08874205, 0.0...|
| 1612213650|[-0.012219498, 0....|
| 1662925200|[8.322276E-5, 8.8...|
| 1818913200|[0.0013753459, -1...|
+-----------+--------------------+
only showing top 20 rows



In [39]:
#search

sc = SparkContext.getOrCreate()

tic = timeit.default_timer()

recommend_list = [(user.user_id, annoy_tree.get_nns_by_vector(user.features, 1000, search_k=-1, include_distances=False)) for user in
                      val_user_factors.collect()]

toc = timeit.default_timer()

search_time = toc-tic

print('Time to build recommendations for all users in: {} seconds'.format(search_time))

temp = sc.parallelize(recommend_list)
temp

Time to build recommendations for all users in: 0.3339261000000988 seconds


ParallelCollectionRDD[385] at parallelize at PythonRDD.scala:195

In [0]:
candidatesDF = temp.toDF(['user_id', 'candidates'])
candidatesDF.printSchema()

In [0]:
candidatesDF = candidatesDF.select("user_id",F.explode("candidates").alias('annoy_id'))
candidatesDF = candidatesDF.join(annoy_id_list, on='annoy_id', how='inner')
candidatesDF = candidatesDF.select('user_id', col('id').alias('track_id'))


In [0]:
predictions = als_model.transform(candidatesDF)
predictions = predictions.select('user_id','track_id','prediction')
predictions.createOrReplaceTempView("predictions")

In [0]:
ground_truth = df_val.groupBy('user_id').agg(F.collect_set("track_id").alias('tracks'))
ground_truth = spark.sql('SELECT user_id, collect_list(track_id) AS truth_tracks FROM predictions GROUP BY user_id')
ground_truth.createOrReplaceTempView('ground_truth')

val_users = df_val.select('user_id').distinct()
recommended = als_model.recommendForUserSubset(val_users, 500)
recommended.createOrReplaceTempView("recommended")

explode_recommended = (recommended.select("user_id", explode("recommendations").alias("recommendation")).select("user_id", "recommendation.*"))
explode_recommended.createOrReplaceTempView("explode_recommended")

agg_recommended = spark.sql('SELECT user_id, collect_list(track_id) AS recommended_tracks FROM explode_recommended GROUP BY user_id')
agg_recommended.createOrReplaceTempView("agg_recommended")

ground_truth_recommended = spark.sql('SELECT agg_recommended.recommended_tracks AS recommended_tracks, ground_truth.truth_tracks as truth_tracks FROM agg_recommended INNER JOIN ground_truth ON agg_recommended.user_id = ground_truth.user_id')

ground_truth_recommended

In [0]:
ground_truth_recommended.show()

In [0]:
ground_truth_recommended_rdd = ground_truth_recommended.select("recommended_tracks", "truth_tracks").rdd

ranking_metrics = RankingMetrics(ground_truth_recommended_rdd)

precision_at_K = ranking_metrics.precisionAt(500)
mean_average_preision = ranking_metrics.meanAveragePrecision
ncdg_at_K = ranking_metrics.ndcgAt(500)

print("Precision At K = ", precision_at_K)
print("Mean Average Precision = ", mean_average_preision)
print("ncdg At K = ", ncdg_at_K)

# 2. Brute Force

In [26]:
# we want to measure the distance of the vectors from the other indices to the vector of the source index
# define the source vector to be at index = idx = 100

# build
tic = timeit.default_timer()

source = item_factors.toPandas().loc[0]
distances = []

for item_factor in item_factors.collect():
    distance = np.dot(item_factor.features,source.features)
    distances.append(distance)
    
toc = timeit.default_timer()
build_time = toc - tic

annoy_id                                                    1
features    [-1.1584000373771495e-28, 2.516452806145417e-2...
Name: 0, dtype: object


In [32]:
print('Time to build distances array: {} seconds'.format(build_time))

Time to build distances array: 3.5557589999999664 seconds


In [30]:
# search
tic = timeit.default_timer()
nearest_neighbors = np.argmin(distances)

toc = timeit.default_timer()
search_time = toc-tic

In [36]:
print('Time to get nearest neighbors for item single item in: {} seconds'.format(search_time))
print('Time to get nearest neighbors for all items in: {} seconds'.format(search_time*len(item_factors.collect())))

Time to get nearest neighbors for item single item in: 0.008238499999833948 seconds
Time to get nearest neighbors for all items in: 880.102477982261 seconds


In [None]:
print('Approximate nearest neighbors build time was approximately {}x faster'.format(/3.5557589999999664/3.7635909999999626))
print('Approximate nearest neighbors search performed approximately {}x faster'.format(880.102477982261/0.000515899999982139))