In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.types import StructType,StructField,IntegerType,FloatType,LongType,StringType
from reco_utils.common.spark_utils import start_or_get_spark
from reco_utils.dataset.spark_splitters import spark_random_split
from pyspark.sql.functions import regexp_replace, col


In [2]:
sc = pyspark.SparkContext()
sql = SQLContext(sc)

In [3]:
spark = SparkSession \
    .builder \
    .appName("ALS") \
    .getOrCreate()

In [4]:
spark.version

'3.1.1'

In [5]:
df = spark.read.csv('BX-Book-Ratings.csv',header=True
                   ,sep=';')


In [6]:
df.show(10)

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276725|034545104X|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
| 276733|2080674722|          0|
| 276736|3257224281|          8|
| 276737|0600570967|          6|
| 276744|038550120X|          7|
| 276745| 342310538|         10|
+-------+----------+-----------+
only showing top 10 rows



### Remove Alphabet from aphanumeric ISBN Columns using 'withColumn' function

In [7]:
df=df.withColumn("ISBN",regexp_replace(col("ISBN"),"[a-zA-Z]",""))


In [8]:
COL_USER = "User-ID"
COL_ITEM = "ISBN"
COL_RATING = "Book-Rating"
COL_PREDICTION = "prediction"

schema=StructType(
    (
    StructField(COL_USER,LongType()),
    StructField(COL_ITEM,LongType()),
    StructField(COL_RATING,FloatType())))





In [9]:
df.first()

Row(User-ID='276725', ISBN='034545104', Book-Rating='0')

<h2>Changing the schema of the Dataframe using 'withColumn' method</h2>

In [10]:
df2 = df.withColumn("User-ID",col("User-ID").cast(IntegerType())) \
    .withColumn("ISBN",col("ISBN").cast(IntegerType())) \
    .withColumn("Book-Rating",col("Book-Rating").cast(FloatType()))

In [11]:
df.printSchema()

root
 |-- User-ID: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Book-Rating: string (nullable = true)



In [12]:
df2.printSchema()

root
 |-- User-ID: integer (nullable = true)
 |-- ISBN: integer (nullable = true)
 |-- Book-Rating: float (nullable = true)



In [13]:
df2.select('ISBN').show(5)

+---------+
|     ISBN|
+---------+
| 34545104|
|155061224|
|446520802|
| 52165615|
|521795028|
+---------+
only showing top 5 rows



In [14]:
df2.show(4)

+-------+---------+-----------+
|User-ID|     ISBN|Book-Rating|
+-------+---------+-----------+
| 276725| 34545104|        0.0|
| 276726|155061224|        5.0|
| 276727|446520802|        0.0|
| 276729| 52165615|        3.0|
+-------+---------+-----------+
only showing top 4 rows



In [15]:
df2.describe().show()

+-------+------------------+--------------------+------------------+
|summary|           User-ID|                ISBN|       Book-Rating|
+-------+------------------+--------------------+------------------+
|  count|           1149780|             1062289|           1149780|
|   mean|140386.39512602412|5.3162145050241506E8|2.8669501991685364|
| stddev| 80562.27771851176| 3.929768082443173E8| 3.854183859201656|
|    min|                 2|                   0|               0.0|
|    max|            278854|          2130530508|              10.0|
+-------+------------------+--------------------+------------------+



In [16]:
from pyspark.sql.functions import isnan, when, count, col

df2.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+-------+----+-----------+
|User-ID|ISBN|Book-Rating|
+-------+----+-----------+
|      0|   0|          0|
+-------+----+-----------+



<h1 style="text-align:center">Runnning SQL queries Programmatically</h1>

In [17]:
df.createOrReplaceTempView("Book")

In [18]:
sqlDF=spark.sql("select `User-ID`,count(*) as Frequency from Book group by `User-ID`")

In [19]:
sqlDF.show()

+-------+---------+
|User-ID|Frequency|
+-------+---------+
| 277594|        1|
| 277840|        1|
| 278220|        2|
| 278659|        1|
|   1436|       12|
|   2136|       10|
|   3959|        2|
|   4032|        2|
|   4821|        3|
|   4937|        5|
|   5325|        1|
|   5925|        1|
|   6613|        1|
|   6731|       26|
|   7711|        1|
|   8433|        1|
|   9030|        1|
|   9583|        1|
|   9586|        2|
|   9993|        1|
+-------+---------+
only showing top 20 rows



In [20]:
df2=df2.dropna()


<h1 style="color:red;text-align:center;">Simple Algorithm for Recommendation (SAR)</h1>

In [21]:
from pyspark.ml.recommendation import ALS

In [22]:
# dataset split into training and testing set
(training, test) = df2.randomSplit([0.8, 0.2])
# training the model
als = ALS(maxIter=5, implicitPrefs=True,userCol="User-ID", itemCol="ISBN", ratingCol="Book-Rating",coldStartStrategy="drop")
model = als.fit(training)
# predict using the testing datatset
predictions = model.transform(test)
predictions.show()

+-------+--------+-----------+-------------+
|User-ID|    ISBN|Book-Rating|   prediction|
+-------+--------+-----------+-------------+
| 189334| 2250810|        0.0|   0.01370871|
| 226393| 6019250|        0.0| 0.0076020802|
| 116170| 6019250|       10.0| 1.2555723E-4|
| 230259| 6019250|        9.0|  1.849127E-4|
| 153662| 6101379|        8.0|          0.0|
| 149908| 6722253|        0.0| 1.2253312E-4|
|  81210|14027877|       10.0| -1.294015E-7|
| 258152|19281785|        0.0|          0.0|
|  94242|20427115|        9.0| 1.3043522E-4|
| 234828|20427115|        9.0|  0.018266551|
| 103630|20427115|        0.0| 0.0011910278|
| 197012|20427115|        0.0| 0.0011818362|
|  11676|28604458|        8.0|   0.00577456|
| 213350|28616340|        0.0|-1.0370857E-6|
| 167759|30080037|        5.0|          0.0|
| 206219|31021792|        8.0| 3.3554045E-4|
| 160541|34071252|        0.0|-2.1652492E-4|
| 204864|34543448|        0.0|   0.09448918|
| 251019|34543448|        8.0|   0.01636376|
| 172742|3

In [24]:
from reco_utils.evaluation.spark_evaluation import SparkRankingEvaluation, SparkRatingEvaluation

evaluations = SparkRankingEvaluation(
    test, 
    predictions,
    col_user='User-ID',
    col_item='ISBN',
    col_rating='Book-Rating',
    col_prediction='prediction',
   
    k=10
)

print(
    "Precision@k = {}".format(evaluations.precision_at_k()),
    "Recall@k = {}".format(evaluations.recall_at_k()),
    "NDCG@k = {}".format(evaluations.ndcg_at_k()),
    "Mean average precision = {}".format(evaluations.map_at_k()),
    sep="\n"
)

Precision@k = 0.31443872128198835
Recall@k = 0.865966405332303
NDCG@k = 0.9511033680061365
Mean average precision = 0.8661343422900482


In [25]:
books = spark.read.csv('BX-Books.csv',header=True
                   ,sep=';')

In [27]:
books.show(2)

+----------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|      ISBN|         Book-Title|         Book-Author|Year-Of-Publication|           Publisher|         Image-URL-S|         Image-URL-M|         Image-URL-L|
+----------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|0195153448|Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|       Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|http://images.ama...|http://images.ama...|http://images.ama...|
+----------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



## Combining Ratings table with Books table

In [33]:
df_combine=df2.join(books,df2.ISBN ==  books.ISBN,"inner")

In [45]:
df_combine.createOrReplaceTempView("Book")

In [54]:

sqlDF=spark.sql("select distinct(`Book-Title`),`Book-Author`,`Publisher`  from Book")

In [55]:
sqlDF.show(10)

+--------------------+--------------------+-------------------+
|          Book-Title|         Book-Author|          Publisher|
+--------------------+--------------------+-------------------+
|The Cat Who Came ...|Lilian Jackson Braun|   Putnam Pub Group|
|The Martian Chron...|        RAY BRADBURY|            Spectra|
|       Primal Scream|       Michael Slade|        Signet Book|
|Picture of Dorian...|         Oscar Wilde|Penguin Putnam~mass|
|Rachel's Tears: T...|       Darrell Scott|       Nelson Books|
|Sacred Diary of A...|        Adrian Plass|          Zondervan|
|HISTORY OF THE SO...|       Clement Eaton|         Free Press|
|     Thing of Beauty|       Stephen Fried|             Pocket|
|             Rain Uk|   Stephen Gallagher|   Trafalgar Square|
|Fg on Our Immigra...|             J Smith|               Avon|
+--------------------+--------------------+-------------------+
only showing top 10 rows



<h3> Getting Recommendations
Now it's time to actually get some recommendations! The ALS model has built-in methods called .recommendForUserSubset() and .recommendForAllUsers(). We'll start off with using a subset of users.</h3>

###  Top K recommendation


#### Top  k for all users

In [57]:
model.recommendForAllUsers(10).show()

+-------+--------------------+
|User-ID|     recommendations|
+-------+--------------------+
|    463|[{312195516, 0.00...|
|    496|[{440241073, 0.07...|
|   1829|[{312195516, 0.00...|
|   2366|[{345342968, 0.03...|
|   3175|[{671027360, 0.00...|
|   3918|[{60928336, 0.021...|
|   4900|[{385504209, 0.18...|
|   5300|[{0, 0.0}, {10, 0...|
|   6336|[{60928336, 0.130...|
|   6357|[{0, 0.0}, {10, 0...|
|   6397|[{440214041, 0.00...|
|   6466|[{142001740, 0.01...|
|   6654|[{385504209, 2.70...|
|   7253|[{1400034779, 0.0...|
|   7340|[{0, 0.0}, {10, 0...|
|   7754|[{316666343, 0.00...|
|   7982|[{0, 0.0}, {10, 0...|
|   8086|[{60928336, 0.419...|
|   9427|[{60934417, 5.658...|
|   9465|[{312195516, 4.28...|
+-------+--------------------+
only showing top 20 rows



In [58]:
als.getUserCol()

'User-ID'

In [85]:
users = training_combine.select(als.getUserCol()).distinct().orderBy(col("User-ID").desc()).limit(5)

dfs_rec_subset = model.recommendForUserSubset(users, 10)

In [121]:
dfs_rec_subset.where(dfs_rec_subset['User-ID'] == 278854).collect()

[Row(User-ID=278854, recommendations=Row(ISBN=316601950, rating=0.059437818825244904)),
 Row(User-ID=278854, recommendations=Row(ISBN=312195516, rating=0.04924558475613594)),
 Row(User-ID=278854, recommendations=Row(ISBN=142001740, rating=0.047789402306079865)),
 Row(User-ID=278854, recommendations=Row(ISBN=60930535, rating=0.04427557811141014)),
 Row(User-ID=278854, recommendations=Row(ISBN=446672211, rating=0.04125460982322693)),
 Row(User-ID=278854, recommendations=Row(ISBN=440241073, rating=0.03986551612615585)),
 Row(User-ID=278854, recommendations=Row(ISBN=316666343, rating=0.03674742206931114)),
 Row(User-ID=278854, recommendations=Row(ISBN=316284955, rating=0.036407168954610825)),
 Row(User-ID=278854, recommendations=Row(ISBN=446310786, rating=0.03554272651672363)),
 Row(User-ID=278854, recommendations=Row(ISBN=375700757, rating=0.03380738943815231))]

In [106]:
recs=dfs_rec_subset.take(5)

pyspark.sql.dataframe.DataFrame

In [110]:
recs[4]['recommendations']

Row(ISBN=446672211, rating=0.04125460982322693)