In [1]:

import os
import json
import boto3
import sklearn
import socket
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


print('user:', os.environ['JUPYTERHUB_SERVICE_PREFIX'])

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return "{}proxy/{}/jobs/".format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

# small fix to enable UI views
SparkContext.uiWebUrl = property(uiWebUrl)

# spark configurtion in local regime 
conf = SparkConf().set('spark.master', 'local[*]').set('spark.driver.memory', '8g')

#some needed objects
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark

user: /user/st057275/


# Коллаборативный подход

In [2]:
df = spark.read.option("header",True) \
     .csv("rating.csv")
df.show(10,truncate=True)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
|     1|    112|   3.5|2004-09-10 03:09:00|
|     1|    151|     4|2004-09-10 03:08:54|
|     1|    223|     4|2005-04-02 23:46:13|
|     1|    253|     4|2005-04-02 23:35:40|
|     1|    260|     4|2005-04-02 23:33:46|
+------+-------+------+-------------------+
only showing top 10 rows



In [3]:
nd=df.select(df['userId'],df['movieId'],df['rating'])
nd.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
|     1|     47|   3.5|
|     1|     50|   3.5|
|     1|    112|   3.5|
|     1|    151|     4|
|     1|    223|     4|
|     1|    253|     4|
|     1|    260|     4|
|     1|    293|     4|
|     1|    296|     4|
|     1|    318|     4|
|     1|    337|   3.5|
|     1|    367|   3.5|
|     1|    541|     4|
|     1|    589|   3.5|
|     1|    593|   3.5|
|     1|    653|     3|
|     1|    919|   3.5|
+------+-------+------+
only showing top 20 rows



In [4]:
from pyspark.sql.types import IntegerType
ndd = nd.withColumn("movieId", nd["movieId"].cast(IntegerType()))

In [5]:
ndd2 = ndd.withColumn("userId", nd["userId"].cast(IntegerType()))
ndd3 = ndd2.withColumn("rating", nd["rating"].cast(IntegerType()))
ndd = ndd3      

In [6]:
ndd

DataFrame[userId: int, movieId: int, rating: int]

In [7]:
ndd.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|     3|
|     1|     29|     3|
|     1|     32|     3|
|     1|     47|     3|
|     1|     50|     3|
|     1|    112|     3|
|     1|    151|     4|
|     1|    223|     4|
|     1|    253|     4|
|     1|    260|     4|
|     1|    293|     4|
|     1|    296|     4|
|     1|    318|     4|
|     1|    337|     3|
|     1|    367|     3|
|     1|    541|     4|
|     1|    589|     3|
|     1|    593|     3|
|     1|    653|     3|
|     1|    919|     3|
+------+-------+------+
only showing top 20 rows



In [8]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [9]:
(training,test)=ndd.randomSplit([0.7, 0.3])

# Данные разбиваются в соотношении 70/30, что является одним из наиболее встречающихся вариантов разбивки датасетов. 

In [10]:
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="userId",itemCol="movieId",ratingCol="rating",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)

In [11]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

RMSE=0.860721942337323
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
| 22684|    148|     4|  2.739209|
| 88527|    148|     2| 2.2417731|
|108726|    148|     3|  2.414578|
| 20132|    148|     3| 2.5993564|
| 22884|    148|     3| 2.4478512|
| 81218|    148|     1| 2.2755466|
| 91782|    148|     3| 2.7478151|
| 13170|    148|     3| 1.1208135|
| 80168|    148|     4| 3.0527987|
|136453|    148|     2|  2.919178|
| 94994|    148|     4| 2.4690006|
| 90757|    148|     3| 3.0414684|
|125969|    148|     3|  2.988244|
| 60334|    148|     4|  2.894019|
| 21326|    148|     4| 2.8244214|
| 46380|    148|     4| 2.7570164|
| 75781|    148|     3|  2.842508|
|101628|    148|     1| 2.9236124|
| 78276|    148|     2| 2.9510891|
|108929|    148|     1| 2.7803798|
+------+-------+------+----------+
only showing top 20 rows



# В качестве метрики была использована среднеквадратическая ошибка, так как это одна из базовых метрик для ALS. RMSE = 0.86, что является неплохим результатом. 

In [12]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(als.regParam, [0.09, 0.1, 0.2])
             .addGrid(als.rank, [25, 30, 35])
             .build())

In [13]:
from time import *
start_time = time()


cv = CrossValidator(estimator=als,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)  


cvModel = cv.fit(training)

end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 2199.303 seconds


In [14]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='ALS_9de86f895e2f', name='blockSize', doc='block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.'): 4096,
 Param(parent='ALS_9de86f895e2f', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='ALS_9de86f895e2f', name='coldStartStrategy', doc="strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'."): 'drop',
 Param(parent='ALS_9de86f895e2f', name='itemCol', doc='column name for item ids. Ids must be within the integer value range.'): 'movieId',
 Param(parent='ALS_9de86f895e2f', name='userCol', doc='column name for user ids. Ids must be within the integer value range.'): 'userId'}

In [15]:

hold_out_preds = bestModel.transform(test)

In [16]:

print(evaluator.evaluate(hold_out_preds))
print(evaluator.getMetricName())
regr_model = evaluator.evaluate(hold_out_preds)

0.8609080014752772
rmse


# С помощью кросс-валидации можно попробовать найти лучшие параметры, при заданных параметрах RMSE однако не улучшилась. 

# Коллаборативный + контентный метод 

In [17]:
import pandas as pd
movie = pd.read_csv('movie.csv')
tag = pd.read_csv('tag.csv')
df = pd.read_csv('rating.csv')

In [18]:
df

Unnamed: 0,userId,movieId,rating,timestamp
0,1,2,3.5,2005-04-02 23:53:47
1,1,29,3.5,2005-04-02 23:31:16
2,1,32,3.5,2005-04-02 23:33:39
3,1,47,3.5,2005-04-02 23:32:07
4,1,50,3.5,2005-04-02 23:29:40
...,...,...,...,...
20000258,138493,68954,4.5,2009-11-13 15:42:00
20000259,138493,69526,4.5,2009-12-03 18:31:48
20000260,138493,69644,3.0,2009-12-07 18:10:57
20000261,138493,70286,5.0,2009-11-13 15:42:24


In [19]:
df = pd.read_csv('rating.csv')
item_feat = movie.merge(tag)

In [20]:
item_feat['year'] = (item_feat.title.str.extract('(\(\d\d\d\d\))', expand=False)
                              .str.extract('(\d\d\d\d)', expand=False))  


item_feat['title'] = (item_feat.title.str.replace('(\(\d\d\d\d\))', '')
                               .apply(lambda x: x.strip()))


item_feat['genres'] = item_feat.genres.str.split('|')
item_feat.head()

Unnamed: 0,movieId,title,genres,userId,tag,timestamp,year
0,1,Toy Story,"[Adventure, Animation, Children, Comedy, Fantasy]",1644,Watched,2014-12-04 23:44:40,1995
1,1,Toy Story,"[Adventure, Animation, Children, Comedy, Fantasy]",1741,computer animation,2007-07-08 13:59:15,1995
2,1,Toy Story,"[Adventure, Animation, Children, Comedy, Fantasy]",1741,Disney animated feature,2007-07-08 22:21:47,1995
3,1,Toy Story,"[Adventure, Animation, Children, Comedy, Fantasy]",1741,Pixar animation,2007-07-08 22:46:10,1995
4,1,Toy Story,"[Adventure, Animation, Children, Comedy, Fantasy]",1741,TÃ©a Leoni does not star in this movie,2009-06-15 19:19:33,1995


In [21]:
item_feat['genres'] = [','.join(map(str, l)) for l in item_feat['genres']]
item_feat

Unnamed: 0,movieId,title,genres,userId,tag,timestamp,year
0,1,Toy Story,"Adventure,Animation,Children,Comedy,Fantasy",1644,Watched,2014-12-04 23:44:40,1995
1,1,Toy Story,"Adventure,Animation,Children,Comedy,Fantasy",1741,computer animation,2007-07-08 13:59:15,1995
2,1,Toy Story,"Adventure,Animation,Children,Comedy,Fantasy",1741,Disney animated feature,2007-07-08 22:21:47,1995
3,1,Toy Story,"Adventure,Animation,Children,Comedy,Fantasy",1741,Pixar animation,2007-07-08 22:46:10,1995
4,1,Toy Story,"Adventure,Animation,Children,Comedy,Fantasy",1741,TÃ©a Leoni does not star in this movie,2009-06-15 19:19:33,1995
...,...,...,...,...,...,...,...
465559,131258,The Pirates,Adventure,28906,bandits,2015-03-30 19:57:01,2014
465560,131258,The Pirates,Adventure,28906,Korea,2015-03-30 19:58:32,2014
465561,131258,The Pirates,Adventure,28906,mutiny,2015-03-30 19:59:02,2014
465562,131258,The Pirates,Adventure,28906,pirates,2015-03-30 19:56:59,2014


In [22]:
tag

Unnamed: 0,userId,movieId,tag,timestamp
0,18,4141,Mark Waters,2009-04-24 18:19:40
1,65,208,dark hero,2013-05-10 01:41:18
2,65,353,dark hero,2013-05-10 01:41:19
3,65,521,noir thriller,2013-05-10 01:39:43
4,65,592,dark hero,2013-05-10 01:41:18
...,...,...,...,...
465559,138446,55999,dragged,2013-01-23 23:29:32
465560,138446,55999,Jason Bateman,2013-01-23 23:29:38
465561,138446,55999,quirky,2013-01-23 23:29:38
465562,138446,55999,sad,2013-01-23 23:29:32


In [23]:
from lightfm.data import Dataset
dataset1 = Dataset()

In [24]:
item_feat['ratings'] = df['rating']

In [25]:
item_feat

Unnamed: 0,movieId,title,genres,userId,tag,timestamp,year,ratings
0,1,Toy Story,"Adventure,Animation,Children,Comedy,Fantasy",1644,Watched,2014-12-04 23:44:40,1995,3.5
1,1,Toy Story,"Adventure,Animation,Children,Comedy,Fantasy",1741,computer animation,2007-07-08 13:59:15,1995,3.5
2,1,Toy Story,"Adventure,Animation,Children,Comedy,Fantasy",1741,Disney animated feature,2007-07-08 22:21:47,1995,3.5
3,1,Toy Story,"Adventure,Animation,Children,Comedy,Fantasy",1741,Pixar animation,2007-07-08 22:46:10,1995,3.5
4,1,Toy Story,"Adventure,Animation,Children,Comedy,Fantasy",1741,TÃ©a Leoni does not star in this movie,2009-06-15 19:19:33,1995,3.5
...,...,...,...,...,...,...,...,...
465559,131258,The Pirates,Adventure,28906,bandits,2015-03-30 19:57:01,2014,3.5
465560,131258,The Pirates,Adventure,28906,Korea,2015-03-30 19:58:32,2014,3.0
465561,131258,The Pirates,Adventure,28906,mutiny,2015-03-30 19:59:02,2014,4.0
465562,131258,The Pirates,Adventure,28906,pirates,2015-03-30 19:56:59,2014,4.5


In [26]:
item_f = item_feat.sample()

In [27]:
item_fe = []
col = ['genres']*len(item_f.genres.unique())+['ratings']*len(item_f.ratings.unique())+['tag']*len(item_f.tag.unique())
unique_f1 =  list(item_f.genres.unique())+list(item_f.ratings.unique())+list(item_f.tag.unique())

for x,y in zip(col, unique_f1):
    res = str(x)+ ":" +str(y)
    item_fe.append(res)
    print(res)

genres:Comedy,Drama,Romance
ratings:4.0
tag:Teen movie


In [28]:
dataset1.fit(
        df['userId'].unique(), 
        df['movieId'].unique(), 
        item_features = item_fe
        
)

In [29]:

(interactions, weights) = dataset1.build_interactions([(x[0], x[1], x[2]) for x in df.values ])

In [30]:
from lightfm.evaluation import precision_at_k
from lightfm.evaluation import auc_score
from lightfm.cross_validation import random_train_test_split


In [31]:
def feature_colon_value(my_list):

    result = []
    ll = [ 'genres:',  'ratings:', 'tag:']
    aa = my_list
    for x,y in zip(ll,aa):
        res = str(x) +""+ str(y)
        result.append(res)
    return result


In [32]:
ad_subset = item_f[['genres',  'ratings', 'tag']] 
ad_list = [list(x) for x in ad_subset.values]
feature_list = []
for item in ad_list:
    feature_list.append(feature_colon_value(item))

In [33]:
item_tuple = list(zip(df.movieId, feature_list))

In [34]:
item_features = dataset1.build_item_features(item_tuple, normalize= False)

In [35]:
(train, test) = random_train_test_split(interactions=interactions, test_percentage=0.2)


# Разбиение данных производится за счет встроенной функции random_train_test_split, процент тестовой выборки = 20%, что необходимо для обучения модели должным образом.

In [36]:
from lightfm import LightFM
model2 = LightFM(loss='warp', no_components = 30)
model2.fit(train, epochs=1)

<lightfm.lightfm.LightFM at 0x7fb41ce856d0>

# Увеличение параметра epochs оказывает существенное влияние на результаты как тренировочного, так и тестового сета. 

In [37]:
train_auc = auc_score(model2, train).mean()

In [38]:
train_auc

0.9815505

In [39]:
test_auc = auc_score(model2, test).mean()

In [40]:
test_auc

0.97867644

# ROC AUC -  одна из базовых метрик для гибридного подхода. Модель показала  хорошие результаты

# Во многих случаях гибридный подход будет показывать более точные результаты, так как он комбинирует наилучшие характеристики и разрешает проблемы отдельных вариантов. 