In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .config('spark.driver.memory', '15g') \
    .appName('cafe-build-recommender') \
    .getOrCreate()

sc = spark.sparkContext

In [2]:
import os
datasets_path = os.path.join('..', 'data')

In [3]:
rating_file = os.path.join(datasets_path, 'rating.csv')
ratings_raw_data = sc.textFile(rating_file)
ratings_raw_data_header = ratings_raw_data.take(1)[0]

In [4]:
ratings_data = ratings_raw_data.filter(lambda line: line != ratings_raw_data_header)\
    .map(lambda line: line.split(","))\
    .map(lambda tokens: (tokens[1], tokens[0].replace("-", ""), tokens[2])).cache()

In [5]:
ratings_data.take(3)

[('1', '5610000104201900272', '10'),
 ('1', '5610000104201900212', '10'),
 ('1', '3770000104201900160', '10')]

데이터순서: (유저ID, 카페ID, LIKE)
(3760000-104-2019-00185) 이런식의 카페 ID 에서 -를 삭제해서 사용한다.
(7)-(3)-(4)-(5) 형태의 총 19 자리의 관리번호.

In [6]:
cafe_file = os.path.join(datasets_path, 'data.csv')

cafe_raw_data = sc.textFile(cafe_file)
cafe_raw_data_header = cafe_raw_data.take(1)[0]

headerList = str(cafe_raw_data_header).split(",") # 관리번호 = PK
need_column = ['관리번호', '영업상태구분코드', '영업상태명', '소재지전화',\
        '소재지면적','소재지전체주소', '도로명전체주소', '사업장명', '최종수정시점',\
        '업태구분명', '좌표정보(X)', '좌표정보(Y)', '시설총규모', '홈페이지']
need_column_index = []
column_list = []
for i, value in enumerate(headerList):
    if str(value) in need_column:
        need_column_index.append(i)
        column_list.append(str(value))

cafe_data = cafe_raw_data.filter(lambda line: line!=cafe_raw_data_header) \
    .map(lambda line: line.split(",")) \
    .filter(lambda tokens: tokens[24] == '커피숍' and tokens[7] =='영업/정상')\
    .map(lambda tokens: [tokens[i] for i in need_column_index]) \
    .filter(lambda tokens: tokens[5].split()[0] == '경기도' or tokens[5].split()[0] == '충청북도')\
    .map(lambda tokens: (tokens[0].replace("-", ""), tokens[7]))

cafe_data.take(3)

[('5610000104201800133', '커피에반하다영통구청2호점'),
 ('3770000104201900191', '시에커피(sie coffee)'),
 ('3760000104201900185', '날쌘카페(탑동점)')]

여기까지 데이터 세팅 완료

In [7]:
###### cafe 데이터 integer mapping #######

users = ratings_data.map(lambda r: r[0]).distinct()
items = cafe_data.map(lambda r: r[0]).distinct()

print("users :", users.take(3))
print("items :", items.take(3))

# Zips the RDDs and creates 'mirrored' RDDs to facilitate reverse mapping 
user_int = users.zipWithIndex()
int_user = user_int.map(lambda u: (u[1], u[0]))
item_int = items.zipWithIndex()
int_item = item_int.map(lambda i: (i[1], i[0]))

print(ratings_data.take(3))

ratings = ratings_data.map(lambda r: (r[1], (r[0], r[2])))
print(ratings.take(3))
ratings = ratings.join(item_int).map(lambda r: (r[1][0][0], r[1][1], r[1][0][1]))
print(ratings.take(3))

users : ['1', '4', '8']
items : ['3760000104201900173', '3760000104201900158', '3750000104201900109']
[('1', '5610000104201900272', '10'), ('1', '5610000104201900212', '10'), ('1', '3770000104201900160', '10')]
[('5610000104201900272', ('1', '10')), ('5610000104201900212', ('1', '10')), ('3770000104201900160', ('1', '10'))]
[('1', 10661, '10'), ('2', 10661, '10'), ('3', 10661, '10')]


integer_map_list << ALS 에서 training_RDD 의 값이 integer 만 사용가능해서 mapping 한 데이터를 사용해서 ratings 의 값들을 integer 값으로 변경.

In [8]:
training_RDD, validation_RDD, test_RDD = ratings.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x:(x[0], x[1]))

In [9]:
training_RDD.take(2)

[('1', 10661, '10'), ('2', 10661, '10')]

In [11]:
from pyspark.mllib.recommendation import ALS
import math

seed = 10
iterations = 10
regularization_parameter = 0.1
ranks = [1, 4, 8, 10, 12, 16]
errors = [0, 0, 0, 0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank %s' % best_rank)

For rank 1 the RMSE is 0.05144809643752524
For rank 4 the RMSE is 0.03618933289488169
For rank 8 the RMSE is 0.03666640977318992
For rank 10 the RMSE is 0.036880709892221475
For rank 12 the RMSE is 0.035303254245323455
For rank 16 the RMSE is 0.03756299087562722
The best model was trained with rank 12


In [12]:
print(predictions.take(3))
print(rates_and_preds.take(3))

[((42, 3558), 9.962437001192278), ((126, 3558), 9.962437001192278), ((115, 3558), 9.962437001192278)]
[((145, 10687), (10.0, 9.962437001192278)), ((161, 10695), (10.0, 9.962437015639482)), ((1, 5295), (10.0, 9.962437015639482))]


In [13]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.03530325422218386


In [14]:
# Parse
complete_ratings_data = ratings.map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache() 
print(complete_ratings_data.take(2)) # userId, cafe_id (mapping), like.
print("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

[(1, 10661, 10.0), (2, 10661, 10.0)]
There are 578360 recommendations in the complete dataset


In [15]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

# cafe_id, like > group By key (key = 카페아이디)
cafe_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey()) 
cafe_ID_with_avg_ratings_RDD = cafe_ID_with_ratings_RDD.map(get_counts_and_averages) # 결과 = (카페ID, (평가받은 수, 평가 평균))
cafe_rating_counts_RDD = cafe_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

cafe_rating_counts_RDD.take(3)

[(5296, 37), (5424, 41), (10840, 39)]

이제 유저의 레이팅을 추가해보자.

In [21]:
import random
new_user_ID = 0
new_user_ratings = []
ratings = ['3760000-104-2014-00043', '3760000-104-2012-00063', '3770000-104-2019-00087', '3760000-104-2019-00062', '3760000-104-2014-00177', '3760000-104-2014-00019', '3760000-104-2018-00014', '3760000-104-2020-00022', '3760000-104-2016-00083', '3760000-104-2014-00021', '3760000-104-2017-00150', '3760000-104-2011-00046', '3760000-104-2019-00050']
for i in range(len(ratings)):
    # index = random.randrange(0, 16000)
    new_user_ratings.append((new_user_ID, str(ratings[i]).replace("-", ""), 10))

new_user_ratings_RDD = sc.parallelize(new_user_ratings)
new_user_ratings_RDD = new_user_ratings_RDD.map(lambda r: (r[1], (r[0], r[2]))).join(item_int).map(lambda r: (r[1][0][0], r[1][1], r[1][0][1]))

print('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 157, 10), (0, 5347, 10), (0, 5357, 10), (0, 8106, 10), (0, 10828, 10), (0, 165, 10), (0, 33, 10), (0, 5407, 10), (0, 5366, 10), (0, 10690, 10)]


In [22]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [23]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print("New model trained in %s seconds" % round(tt,3))

New model trained in 23.941 seconds


In [24]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_cafe_RDD = (cafe_ID_with_ratings_RDD.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_cafe_RDD)

In [25]:
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_ratings_data).join(cafe_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(130, ((9.945325189734302, 10679), 36)),
 (130, ((9.945325189734302, 10702), 36)),
 (130, ((9.945325189734302, 5295), 36))]

In [26]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [27]:
top_cafe = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print('TOP recommended cafe (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_cafe)))

TOP recommended cafe (with more than 25 reviews):
(10679, 9.945325189734302, 36)
(10702, 9.945325189734302, 36)
(5295, 9.945325189734302, 36)
(43, 9.945325189734302, 36)
(5305, 9.945325189734302, 36)
(10730, 9.945325189734302, 36)
(10764, 9.945325189734302, 36)
(116, 9.945325189734302, 36)
(5399, 9.945325189734302, 36)
(10794, 9.945325189734302, 36)
(5424, 9.945325189734302, 36)
(214, 9.945325189734302, 36)
(5504, 9.945325189734302, 36)
(5524, 9.945325189734302, 36)
(10924, 9.945325189734302, 36)
(11024, 9.945325189734302, 36)
(11144, 9.945325189734302, 36)
(5775, 9.945325189734302, 36)
(5852, 9.945325189734302, 36)
(650, 9.945325189734302, 36)
(660, 9.945325189734302, 36)
(671, 9.945325189734302, 36)
(5911, 9.945325189734302, 36)
(11333, 9.945325189734302, 36)
(5955, 9.945325189734302, 36)


In [28]:
topcafe_with_managenum = sc.parallelize(top_cafe).join(int_item)
topcafe_managenum = topcafe_with_managenum.map(lambda tokens: (tokens[1][1], tokens[1][0]))
print(topcafe_managenum.take(3))
topcafe_managenum.join(cafe_data).take(25)

[('5610000104201100078', 9.945325189734302), ('3760000104201100041', 9.945325189734302), ('3860000104201233744', 9.945325189734302)]


[('3860000104201900035', (9.945325189734302, '카페베이직(CafeBasic)')),
 ('3770000104202000044', (9.945325189734302, '오늘도빙수')),
 ('3760000104201100041', (9.945325189734302, '보그너커피')),
 ('3770000104200600061', (9.945325189734302, '할리스동수원점')),
 ('3770000104201900040', (9.945325189734302, '하루(HARU)')),
 ('3840000104201200010', (9.945325189734302, '에스케이카페(기념관점)')),
 ('3860000104201800159', (9.945325189734302, '라잌커피(coffee)')),
 ('3760000104201900008', (9.945325189734302, '워나로스터즈')),
 ('5610000104201500215', (9.945325189734302, '앨리스커피')),
 ('3770000104201800060', (9.945325189734302, '카페델로')),
 ('3810000104201500011', (9.945325189734302, '아이키친')),
 ('3790000104201800065', (9.945325189734302, '위클리커피')),
 ('3760000104201800069', (9.945325189734302, '세븐일레븐')),
 ('3810000104201600187', (9.945325189734302, '쥬씨 분당상록점')),
 ('3860000104201700203', (9.945325189734302, '카페늘봄')),
 ('3760000104201400142', (9.945325189734302, '엔젤리너스')),
 ('3800000104201500065', (9.945325189734302, '커피샵(#)')),
 ('3820000104201

In [None]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('..', 'models', 'cafe_lens_als')

model.save(sc, model_path)
same_model = MatrixFactorizationModel.load(sc, model_path)