# Movie Recommendation with Spark MLlib

이 튜토리얼은 edx의 Anthony D. Joseph에 의해 Apache Spark에서 CS100.1x 빅 데이터 소개에서 제안 된 연습 중 하나에 대한 내 솔루션을 참고한 자료입니다 (Spark Summit 2014 참고). 구성은 아래와 같습니다.

- MovieLens Data: https://grouplens.org/datasets/movielens/
- Spark의 Alternating Least Saqures 구현을 사용한 Collaborative Filtering 기반의 추천 시스템
- 영화 및 평점 데이터를 가져 와서 Spark RDD로 파싱하는 부분
- 웹 어플리케이션에 추천 시스템 적용하는 부분

## Getting and processing data

In [6]:
import os
import zipfile
from urllib import request

In [8]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

In [9]:
datasets_path = os.path.join('..', 'datasets')
complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

In [10]:
complete_f = request.urlretrieve (complete_dataset_url, complete_dataset_path)
small_f = request.urlretrieve (small_dataset_url, small_dataset_path)

In [11]:
with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

## Loading and Parsing datasets

- ratings.csv: userId/movieId/rating/timestamp
- movies.csv: movieId/title/genres
- genres (Genre1/Genre2/Genre3)
- tags.csv: userId/movieId/tag/timestamp
- links.csv: movieId/imdbId,/tmdbId

In [3]:
from pyspark import SparkContext

In [4]:
sc = SparkContext()

In [14]:
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [16]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [17]:
small_ratings_data.take(3)

[('1', '31', '2.5'), ('1', '1029', '3.0'), ('1', '1061', '3.0')]

In [18]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

## Collaborative Filtering with Spark MLlib

협업 필터링에서 우리는 많은 사용자로부터 선호도 또는 취향 정보를 수집하여 (협업) 사용자의 관심에 대한 예측 (필터링)을 수행합니다. 기본 가정은 사용자 A가 문제에 대해 사용자 B와 동일한 의견을 가지고있는 경우 A가 임의로 선택한 사용자의 x에 대한 의견을 갖는 것보다 다른 문제 x에 대한 B의 의견을 가질 가능성이 높다는 것입니다.

Spark MLlib에서는 Alternating Least Squares를 사용하여 구현된 Collaborative Filtering이 있습니다.
- numBlocks: 계산을 병렬 처리하는 데 사용되는 블록 수입니다 (자동 구성하려면 -1로 설정)
- rank: 모델의 잠재 요인 수
- iterations: 실행할 반복 횟수
- lambda: ALS의 정규화 매개 변수를 지정
- implicitPrefs: 명시 적 피드백 ALS 변형을 사용할지 또는 암시 적 피드백 데이터에 적용 할지를 지정
- alpha: 선호 관찰에서의 기본 신뢰를 관리하는 ALS의 암시 적 피드백 변형에 적용 할 수 있는 매개 변수

## Selecting ALS parameters using the small dataset

In [25]:
from pyspark.mllib.recommendation import ALS
import math

In [29]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [30]:
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

In [35]:
min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank {} the RMSE is {}'.format(rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank {}'.format(best_rank))

For rank 4 the RMSE is 0.9405925542574993
For rank 8 the RMSE is 0.9451745059144596
For rank 12 the RMSE is 0.9435903947376889
The best model was trained with rank 4


In [36]:
predictions.take(3)

[((564, 1084), 3.9107053607351405),
 ((468, 1084), 3.2612206066842706),
 ((65, 1084), 3.6045833547509223)]

In [37]:
rates_and_preds.take(3)

[((378, 8376), (3.5, 2.8265380038046892)),
 ((239, 2805), (2.0, 3.913040044088734)),
 ((57, 1663), (4.0, 3.4910132330495314))]

In [38]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.9449699901041986
