In [18]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col

In [19]:
# 创建Spark会话
spark = SparkSession.builder.appName("CollaborativeFilteringExample").getOrCreate()

In [20]:
# 读取数据
books = spark.read.csv('data/BX-Books.csv', sep=";", header=True, inferSchema=True)
users = spark.read.csv('data/BX-Users.csv', sep=";", header=True, inferSchema=True)
ratings = spark.read.csv('data/BX-Book-Ratings.csv', sep=";", header=True, inferSchema=True)

In [4]:
books

DataFrame[ISBN: string, Book-Title: string, Book-Author: string, Year-Of-Publication: int, Publisher: string, Image-URL-S: string, Image-URL-M: string, Image-URL-L: string]

In [21]:
books = books[['ISBN', 'Book-Title', 'Book-Author', 'Year-Of-Publication', 'Publisher', 'Image-URL-L']]

In [6]:
books

DataFrame[ISBN: string, Book-Title: string, Book-Author: string, Year-Of-Publication: int, Publisher: string, Image-URL-L: string]

In [7]:
books.show(3)

+----------+--------------------+--------------------+-------------------+--------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|         Image-URL-L|
+----------+--------------------+--------------------+-------------------+--------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|http://images.ama...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|http://images.ama...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|http://images.ama...|
+----------+--------------------+--------------------+-------------------+--------------------+--------------------+
only showing top 3 rows



In [22]:
# Lets remane some wierd columns name
books = books.select('ISBN', 'Book-Title', 'Book-Author', 'Year-Of-Publication', 'Publisher', 'Image-URL-L') \
             .withColumnRenamed('Book-Title', 'title') \
             .withColumnRenamed('Book-Author', 'author') \
             .withColumnRenamed('Year-Of-Publication', 'year') \
             .withColumnRenamed('Publisher', 'publisher') \
             .withColumnRenamed('Image-URL-L', 'image_url')

In [23]:
books

DataFrame[ISBN: string, title: string, author: string, year: int, publisher: string, image_url: string]

In [12]:
ratings

DataFrame[User-ID: int, ISBN: string, Book-Rating: int]

In [24]:
ratings = ratings.select('User-ID', 'ISBN', 'Book-Rating') \
                 .withColumnRenamed('User-ID', 'user_id') \
                 .withColumnRenamed('Book-Rating', 'rating')

In [25]:
ratings.show(5)

+-------+----------+------+
|user_id|      ISBN|rating|
+-------+----------+------+
| 276725|034545104X|     0|
| 276726|0155061224|     5|
| 276727|0446520802|     0|
| 276729|052165615X|     3|
| 276729|0521795028|     6|
+-------+----------+------+
only showing top 5 rows



In [28]:
# 过滤用户数大于200的数据
user_counts = ratings.groupBy("user_id").count()
popular_users = user_counts.filter(col("count") > 200).select("user_id")
ratings = ratings.join(popular_users, "user_id", "inner")
ratings.show(5)

+-------+----------+------+
|user_id|      ISBN|rating|
+-------+----------+------+
| 277427|002542730X|    10|
| 277427|0026217457|     0|
| 277427|003008685X|     8|
| 277427|0030615321|     0|
| 277427|0060002050|     0|
+-------+----------+------+
only showing top 5 rows



In [31]:
# 合并数据
ratings_with_books = ratings.join(books, "ISBN", "inner")

# 计算评分数量
num_rating = ratings_with_books.groupBy('title').agg({"rating": "count"}).withColumnRenamed("count(rating)", "num_of_rating")

# 合并评分数量信息
final_rating = ratings_with_books.join(num_rating, "title", "inner")

# 过滤评分数量大于等于50的数据
final_rating = final_rating.filter(col("num_of_rating") >= 50)

# 去重
final_rating = final_rating.dropDuplicates(['user_id', 'title'])

In [32]:
final_rating

DataFrame[title: string, ISBN: string, user_id: int, rating: int, author: string, year: int, publisher: string, image_url: string, num_of_rating: bigint]

In [37]:
final_rating.count()

59850

In [39]:
final_rating = final_rating.toPandas()

In [40]:
import pandas as pd
final_rating.head()

Unnamed: 0,title,ISBN,user_id,rating,author,year,publisher,image_url,num_of_rating
0,Chocolat,014100018X,254,0,Joanne Harris,2000,Penguin Books,http://images.amazon.com/images/P/014100018X.0...,103
1,The Fellowship of the Ring (The Lord of the Ri...,0618002227,254,8,J. R. R. Tolkien,1999,Houghton Mifflin Company,http://images.amazon.com/images/P/0618002227.0...,107
2,The Brethren,0440236673,2276,10,John Grisham,2000,Island,http://images.amazon.com/images/P/0440236673.0...,169
3,The King of Torts,0440241537,2276,0,JOHN GRISHAM,2003,Dell,http://images.amazon.com/images/P/0440241537.0...,117
4,Chocolat,014100018X,2766,0,Joanne Harris,2000,Penguin Books,http://images.amazon.com/images/P/014100018X.0...,103


In [41]:
book_pivot = final_rating.pivot_table(columns='user_id', index='title', values= 'rating')
book_pivot

user_id,254,2276,2766,2977,3363,3757,4017,4385,6242,6251,...,274004,274061,274301,274308,274808,275970,277427,277478,277639,278418
title,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
"""O"" Is for Outlaw",,,,,,,,,,,...,,,8.0,,,,,,,
1984,9.0,,,,,,,,,,...,,,,,,0.0,,,,
1st to Die: A Novel,,,,,,,,,,,...,,,,,,,,,,
2nd Chance,,10.0,,,,,,,,,...,,,,0.0,,,,,0.0,
4 Blondes,,,,,,,,,,0.0,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
Wuthering Heights,,,,,,,,,,,...,,,,0.0,,,,,,
Year of Wonders,,,,7.0,,,,,7.0,,...,,,,,,0.0,,,,
You Belong To Me,,,,,,,,,,,...,,,,,,,,,,
Zen and the Art of Motorcycle Maintenance: An Inquiry into Values,,,,,0.0,,,,,0.0,...,,,,,,0.0,,,,


In [42]:
book_pivot.fillna(0, inplace=True)
book_pivot

user_id,254,2276,2766,2977,3363,3757,4017,4385,6242,6251,...,274004,274061,274301,274308,274808,275970,277427,277478,277639,278418
title,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
"""O"" Is for Outlaw",0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,8.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1984,9.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1st to Die: A Novel,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2nd Chance,0.0,10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4 Blondes,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
Wuthering Heights,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
Year of Wonders,0.0,0.0,0.0,7.0,0.0,0.0,0.0,0.0,7.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
You Belong To Me,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
Zen and the Art of Motorcycle Maintenance: An Inquiry into Values,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [43]:
from scipy.sparse import csr_matrix

book_sparse = csr_matrix(book_pivot)
book_sparse

<742x888 sparse matrix of type '<class 'numpy.float64'>'
	with 14966 stored elements in Compressed Sparse Row format>

In [45]:
# Now import our clustering algoritm which is Nearest Neighbors this is an unsupervised ml algo
from sklearn.neighbors import NearestNeighbors
model = NearestNeighbors(algorithm= 'brute')

In [46]:
model.fit(book_sparse)

In [47]:
book_names = book_pivot.index

In [48]:
import pickle
pickle.dump(model,open('artifacts/model.pkl','wb'))
pickle.dump(book_names,open('artifacts/book_names.pkl','wb'))
pickle.dump(final_rating,open('artifacts/final_rating.pkl','wb'))
pickle.dump(book_pivot,open('artifacts/book_pivot.pkl','wb'))

In [49]:
def recommend_book(book_name):
    book_id = np.where(book_pivot.index == book_name)[0][0]
    distance, suggestion = model.kneighbors(book_pivot.iloc[book_id,:].values.reshape(1,-1), n_neighbors=6 )
    
    for i in range(len(suggestion)):
            books = book_pivot.index[suggestion[i]]
            for j in books:
                if j == book_name:
                    print(f"You searched '{book_name}'\n")
                    print("The suggestion books are: \n")
                else:
                    print(j)

In [51]:
import numpy as np
recommend_book("4 Blondes")

You searched '4 Blondes'

The suggestion books are: 

No Safe Place
Pleading Guilty
Exclusive
Lake Wobegon days
Long After Midnight
