In [3]:
import os
import time

# spark imports
import findspark
import pickle
findspark.init('C:/Users/Lenovo/Downloads/spark-3.0.0-preview2-bin-hadoop3.2')

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

from pyspark.sql.functions import col

from pyspark.mllib.recommendation import ALS

In [4]:
sc = SparkContext('local')
spark = SparkSession(sc)

In [5]:
books = spark.read.load(r"C:\Users\Lenovo\Documents\Springboard\Capstone\Dataset\BX-CSV-Dump\BX-Books.csv",
                     format="csv", sep=";", inferSchema="true", header="true", encoding ='latin1')

In [6]:
win_row_number = Window.orderBy("ISBN")
isbn = books.select(F.row_number().over(win_row_number).alias("book_id"), "ISBN")
isbn.cache().show(3)

+-------+----------+
|book_id|      ISBN|
+-------+----------+
|      1|0000913154|
|      2|0001010565|
|      3|0001046438|
+-------+----------+
only showing top 3 rows



In [7]:
ratings = spark.read.load(r"C:\Users\Lenovo\Documents\Springboard\Capstone\Dataset\BX-CSV-Dump\BX-Book-Ratings.csv",
                     format="csv", sep=";", inferSchema="true", header="true", encoding ='latin1')

In [8]:
ratings = ratings.withColumn("Book-Rating", ratings["Book-Rating"].cast("Float"))
ratings_with_bid = ratings.join(isbn, on=["ISBN"], how="inner").select(col("User-ID").alias("userID"), col("book_id").alias("productID"), col("Book-Rating").alias("rating"))
ratings_with_bid.show(3)

+------+---------+------+
|userID|productID|rating|
+------+---------+------+
|276725|    46116|   0.0|
|276726|    22830|   5.0|
|276727|    92994|   0.0|
+------+---------+------+
only showing top 3 rows



In [9]:
sample_ratings_with_bid = ratings_with_bid.sample(False, 0.0001, 3)
sample_ratings_with_bid.show(3)

+------+---------+------+
|userID|productID|rating|
+------+---------+------+
|277107|    62084|  10.0|
|277523|   109110|  10.0|
|  1172|   257519|  10.0|
+------+---------+------+
only showing top 3 rows



In [10]:
model = ALS.train(sample_ratings_with_bid, rank=5, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False, seed=None)

In [12]:
def recommendedBookNames(x):
    print(x.product)

In [11]:
model.recommendProducts(1172, 10)

[Rating(user=1172, product=257519, rating=9.998900536474139),
 Rating(user=1172, product=117255, rating=7.642835490199193),
 Rating(user=1172, product=27203, rating=6.55736177675994),
 Rating(user=1172, product=250921, rating=5.407352481218171),
 Rating(user=1172, product=64744, rating=4.225755884348793),
 Rating(user=1172, product=89231, rating=4.1350484026784144),
 Rating(user=1172, product=103524, rating=3.9064897578146445),
 Rating(user=1172, product=138582, rating=3.8655452130343804),
 Rating(user=1172, product=199279, rating=3.405959692941238),
 Rating(user=1172, product=253763, rating=2.7431275813945035)]

In [27]:
x = model.recommendProducts(1172, 10)
for i in x:
    print(isbn.filter(isbn['book_id'] = i.product).select(col('ISBN')))

+----------+
|      ISBN|
+----------+
|0000913154|
|0001010565|
|0001046438|
|0001046713|
|000104687X|
|0001046934|
|0001047213|
|0001047647|
|0001047663|
|0001047868|
+----------+
only showing top 10 rows



In [13]:
# save the model to disk
filename = r'C:\Users\Lenovo\Documents\Springboard\Capstone\Model\ALS\model.sav'
pickle.dump(model, open(filename, 'wb'))

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.