In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://mirror.its.dal.ca/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xvf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark
#===============
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
!update-alternatives --config java
!java -version

In [0]:
import findspark
findspark.init()

In [0]:
#===============

In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()

In [0]:
import urllib, zipfile
!mkdir datasets

datasets_path = os.path.join('.', 'datasets')
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')

complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)
with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

In [0]:
#=========================

In [9]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 27753444 recommendations in the complete dataset


In [13]:
# Load the complete movie titles file
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))

print("There are %s movies in the complete dataset" % (complete_movies_titles.count()))

There are 58098 movies in the complete dataset


In [0]:
from pyspark.mllib.recommendation import ALS
import math
complete_model = ALS.train(complete_ratings_data, rank=20, seed=0, iterations=10, lambda_=0.1) #lambda = 正規化參數

In [0]:
#===========訓練完成==========

In [0]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [0]:
import numpy as np
from numpy import linalg as LA
def cosineSimilarity(vec1, vec2):
  return vec1.dot(vec2) / (LA.norm(vec1) * LA.norm(vec2))

In [0]:
itemId = 25800

# calculating the similarity coefficients of all the movies against the chosen movie ID
complete_itemFactor = np.asarray(complete_model.productFeatures().lookup(itemId))[0]
complete_sims = complete_model.productFeatures().map(lambda products:(products[0],cosineSimilarity(np.asarray(products[1]), complete_itemFactor)))
#                                        .join(complete_movies_titles).join(movie_rating_counts_RDD)\
#                                        .join(movie_ID_with_avg_ratings_RDD)
#complete_sims = complete_sims.map(lambda r: (r[1][0][0][1], r[1][0][0][0], r[0], r[1][0][1], r[1][1][1]))
complete_sortedSims = complete_sims.takeOrdered(10, key=lambda x: -x[1])

In [18]:
print(complete_sortedSims)

[(25800, 0.9999999999999999), (91393, 0.9847022404735565), (49663, 0.9793896787553978), (120266, 0.9786059788039633), (135228, 0.9785959186879162), (101245, 0.9776765074703059), (93441, 0.9772415245472528), (109598, 0.9771253884509674), (115531, 0.977088073595721), (124813, 0.9766609054879636)]


In [19]:
complete_sims.count()

53889

In [0]:
#電影名稱、相似度、電影ID、電影rating數量、電影平均分數

In [20]:
# The productFeatures matrix will be used to create an item-item collaborative filtering recommendation model
complete_model.userFeatures().count()
complete_model.productFeatures().count()
complete_model.productFeatures().lookup(1084)[0]

array('d', [0.10791780054569244, -0.4079083204269409, -0.5271019339561462, 0.2901724576950073, 0.09610442072153091, -0.356342613697052, -0.39169052243232727, 0.21541601419448853, 0.1799866408109665, -0.8911468386650085, -0.3575962483882904, -0.11121373623609543, -0.31809961795806885, -0.5217965841293335, -0.467618465423584, 0.5852058529853821, 0.65548175573349, 0.07391628623008728, 0.48091229796409607, 0.17626503109931946])

In [0]:
#================SAVE===================

In [0]:
# Just Save Matrix
movie_feature_matrix = complete_model.productFeatures().collect()
import pickle
with open("movie_feature_matrix.txt", "wb") as fp:   #Pickling
    pickle.dump(movie_feature_matrix, fp)

In [0]:
#================LOAD===================

In [0]:
# Just Load Matrix
with open("movie_feature_matrix.txt", "rb") as fp:   # Unpickling
    movie_feature_matrix = pickle.load(fp)

In [0]:
#================SAVE===================

In [0]:
# Save
model_path = os.path.join('./save_model')
complete_model.save(sc, model_path)

In [0]:
#================LOAD===================

In [0]:
from google.colab import drive
drive.mount('/content/drive')
!cp -r "/content/drive/My Drive/save_model/" "./save_model/"

In [0]:
# Load
from pyspark.mllib.recommendation import MatrixFactorizationModel
model_path = os.path.join('./save_model')
complete_model = MatrixFactorizationModel.load(sc, model_path)

In [0]:
import numpy as np
from numpy import linalg as LA
def cosineSimilarity(vec1, vec2):
  return vec1.dot(vec2) / (LA.norm(vec1) * LA.norm(vec2))

In [0]:
itemId = 25800
# calculating the similarity coefficients of all the movies against the chosen movie ID
complete_itemFactor = np.asarray(complete_model.productFeatures().lookup(itemId))[0]
complete_sims = complete_model.productFeatures().map(lambda products:(products[0],cosineSimilarity(np.asarray(products[1]), complete_itemFactor)))
#                                        .join(complete_movies_titles).join(movie_rating_counts_RDD)\
#                                        .join(movie_ID_with_avg_ratings_RDD)
#complete_sims = complete_sims.map(lambda r: (r[1][0][0][1], r[1][0][0][0], r[0], r[1][0][1], r[1][1][1]))
complete_sortedSims = complete_sims.takeOrdered(20, key=lambda x: -x[1])

In [0]:
complete_sortedSims