# Lap5. Machine learning với Spark

In [1]:
# Khai báo thư viện
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import lit
from pyspark.ml.recommendation import ALS

In [4]:
# Load movieID và movieName từ file u.item thành một Dict
def loadMovieNames():
  movieNames = {}
  with open("ml-100k/u.item", encoding="latin1") as f:
    for line in f:
      fields = line.split('|')
      movieNames[int(fields[0])] = fields[1]
  return movieNames

movieNames = loadMovieNames()
# print(movieNames)
# Print 5 element of dictionary 
for idx, k in enumerate(movieNames):
  if idx == 5: break
  print((k, movieNames[k]))

(1, 'Toy Story (1995)')
(2, 'GoldenEye (1995)')
(3, 'Four Rooms (1995)')
(4, 'Get Shorty (1995)')
(5, 'Copycat (1995)')


In [5]:
# Sử dụng SparkSession

## Create a SparkSession
spark = SparkSession.builder.appName("MovieRecs").getOrCreate()

## This line is necessary on HDP 2.6.5:
spark.conf.set("spark.sql.crossJoin.enabled", "true")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/29 01:46:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


23/11/29 01:46:59 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
# Khởi tạo hàm ParseInput từ file u.data 
# và convert dữ liệu thành các rows
# và convert thành một RDD

## Get the raw data
lines = spark.read.text("ml-100k/u.data").rdd
# lines = spark.sparkContext.textFile("ml-100k/u.data") # cách của lap trước

## The parseInput
def parseInput(line):
    # fields = line.split() # thiếu .value thì lines = spark.read.text("ml-100k/u.data").rdd bị lỗi vì phải đọc value của Row
    fields = line.value.split()
    return Row(userID = int(fields[0]), movieID = int(fields[1]), rating = int(fields[2]))

## Convert it to a RDD of Row objects 
## with (userID, movieID, rating)
ratingsRDD = lines.map(parseInput)

In [7]:
type(lines)
# lines.foreach(lambda x: print(x))

pyspark.rdd.RDD

In [8]:
type(ratingsRDD)

pyspark.rdd.PipelinedRDD

In [9]:
ratingsRDD.take(5)

                                                                                

[Row(userID=196, movieID=242, rating=3),
 Row(userID=186, movieID=302, rating=3),
 Row(userID=22, movieID=377, rating=1),
 Row(userID=244, movieID=51, rating=2),
 Row(userID=166, movieID=346, rating=1)]

In [10]:
# Chuyển đổi RDD thành dataframe và cache nó
ratings = spark.createDataFrame(ratingsRDD).cache()

                                                                                

In [11]:
# Sử dụng ALS trong Spark để train model
als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol='rating')
model = als.fit(ratings)

23/11/29 01:51:06 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/29 01:51:06 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/11/29 01:51:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [12]:
# Hiển thị các phim mà userID = 1 đã xem
print("\nRatings for user ID 1:")
userRatings = ratings.filter("userID = 1")
index = 0
for rating in userRatings.collect():
    if index == 5: break
    print(movieNames[rating['movieID']], rating['rating'])
    index += 1


Ratings for user ID 1:
Three Colors: White (1994) 4
Grand Day Out, A (1992) 3
Desperado (1995) 4
Glengarry Glen Ross (1992) 4
Angels and Insects (1995) 4


In [13]:
# In tất cả các phim được rate trên 100 lần
ratingCounts = ratings.groupBy("movieID").count().filter("count > 100")
print(type(ratingCounts))
ratingCounts.take(5)

<class 'pyspark.sql.dataframe.DataFrame'>


                                                                                

[Row(movieID=474, count=194),
 Row(movieID=29, count=114),
 Row(movieID=65, count=115),
 Row(movieID=191, count=276),
 Row(movieID=418, count=129)]

In [14]:
# Tạo ra 1 dataframe có tên là popularMovies từ ratingCounts theo cột movieID.
# Khởi tạo thêm 1 cột có tên là userID có giá trị = 1
popularMovies = ratingCounts.select("movieID").withColumn('userID', lit(1))
print(type(popularMovies))
popularMovies.take(5)

<class 'pyspark.sql.dataframe.DataFrame'>


[Row(movieID=474, userID=1),
 Row(movieID=29, userID=1),
 Row(movieID=65, userID=1),
 Row(movieID=191, userID=1),
 Row(movieID=418, userID=1)]

In [16]:
# Biến đổi popularMovies theo model đã train 
recommendations = model.transform(popularMovies)

print(type(recommendations))

recommendations.take(5)

<class 'pyspark.sql.dataframe.DataFrame'>


                                                                                

[Row(movieID=474, userID=1, prediction=4.790917873382568),
 Row(movieID=29, userID=1, prediction=1.9533743858337402),
 Row(movieID=65, userID=1, prediction=3.744243621826172),
 Row(movieID=191, userID=1, prediction=4.196722507476807),
 Row(movieID=418, userID=1, prediction=3.472533702850342)]

In [17]:
# Sắp xếp data giảm dần trong recommendations theo cột 'prediction'
# để đề cử phim từ trên xuống
from pyspark.sql.functions import desc
# recommendations.sort('prediction', ascending = False)
recommendations = recommendations.orderBy(desc("prediction"))

In [18]:
recommendations.take(5)

                                                                                

[Row(movieID=169, userID=1, prediction=5.191006183624268),
 Row(movieID=48, userID=1, prediction=5.144643306732178),
 Row(movieID=408, userID=1, prediction=5.0885396003723145),
 Row(movieID=242, userID=1, prediction=5.073580741882324),
 Row(movieID=137, userID=1, prediction=5.020456314086914)]