In [66]:
!pip install pyspark



In [68]:
import pyspark

In [69]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [70]:
spark  = (SparkSession.builder
                      .appName('Dubovik_Spark')
                      .enableHiveSupport()
                      .config("spark.executor.instances", "2")
                      .config("spark.executor.cores","1")
                      .config("spark.executor.memory", "4G")
                      .config("spark.default.parallelism","2")
                      .getOrCreate())

In [71]:
spark

In [72]:
ratings_txt = spark.read.text("/kaggle/input/movielens/ratings.csv")

In [73]:
ratings_txt.take(5)

[Row(value='userId,movieId,rating,timestamp'),
 Row(value='1,31,2.5,1260759144'),
 Row(value='1,1029,3.0,1260759179'),
 Row(value='1,1061,3.0,1260759182'),
 Row(value='1,1129,2.0,1260759185')]

## Блок 1

### Количество строчек в датасете movielens/ratings.csv

In [74]:
ratings_csv = spark.read.option("header", True).csv("/kaggle/input/movielens/ratings.csv")

In [75]:
ratings_csv.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
|     1|   1263|   2.0|1260759151|
|     1|   1287|   2.0|1260759187|
|     1|   1293|   2.0|1260759148|
|     1|   1339|   3.5|1260759125|
|     1|   1343|   2.0|1260759131|
|     1|   1371|   2.5|1260759135|
|     1|   1405|   1.0|1260759203|
|     1|   1953|   4.0|1260759191|
|     1|   2105|   4.0|1260759139|
|     1|   2150|   3.0|1260759194|
|     1|   2193|   2.0|1260759198|
|     1|   2294|   2.0|1260759108|
|     1|   2455|   2.5|1260759113|
|     1|   2968|   1.0|1260759200|
|     1|   3671|   3.0|1260759117|
+------+-------+------+----------+
only showing top 20 rows



In [76]:
ratings_csv.count()

100004

### Количество строчек в датасете movielens/tags.csv

In [77]:
tags_csv = spark.read.option("header", True).csv("/kaggle/input/movielens/tags.csv")

In [78]:
tags_csv.show()

+------+-------+--------------------+----------+
|userId|movieId|                 tag| timestamp|
+------+-------+--------------------+----------+
|    15|    339|sandra 'boring' b...|1138537770|
|    15|   1955|             dentist|1193435061|
|    15|   7478|            Cambodia|1170560997|
|    15|  32892|             Russian|1170626366|
|    15|  34162|         forgettable|1141391765|
|    15|  35957|               short|1141391873|
|    15|  37729|          dull story|1141391806|
|    15|  45950|          powerpoint|1169616291|
|    15| 100365|            activist|1425876220|
|    15| 100365|         documentary|1425876220|
|    15| 100365|              uganda|1425876220|
|    23|    150|          Ron Howard|1148672905|
|    68|   2174|               music|1249808064|
|    68|   2174|               weird|1249808102|
|    68|   8623|        Steve Martin|1249808497|
|    73| 107999|              action|1430799184|
|    73| 107999|               anime|1430799184|
|    73| 107999|    

In [79]:
tags_csv.count()

1296

## Блок 2

In [80]:
ratings_csv.createOrReplaceTempView("ratings_csv")
tags_csv.createOrReplaceTempView("tags_csv")

### Количество уникальных фильмов и юзеров в таблице “ratings”

In [81]:
ratings_csv.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [82]:
query = ''' SELECT 
                COUNT(DISTINCT userId) AS Number_of_unique_users,
                COUNT(DISTINCT movieId) AS Number_of_unique_movies
            FROM ratings_csv
        '''

In [83]:
unique_elems = spark.sql(query)

In [84]:
unique_elems.show()

+----------------------+-----------------------+
|Number_of_unique_users|Number_of_unique_movies|
+----------------------+-----------------------+
|                   671|                   9066|
+----------------------+-----------------------+



### Количество оценок >= 4.0

In [97]:
query = ''' SELECT COUNT(userId) AS Score_greater_than_4
            FROM ratings_csv
            WHERE rating >= 4
        '''

In [98]:
score_greater_than_4 = spark.sql(query)

In [99]:
score_greater_than_4.show()

+--------------------+
|Score_greater_than_4|
+--------------------+
|               51568|
+--------------------+



### Топ 100 фильмов с самым высоким рейтингом 

In [123]:
query = ''' SELECT 
                movieId,
                AVG(rating) AS Average_rating,
                COUNT(userId) AS Number_of_assessments
            FROM ratings_csv
            GROUP BY movieId
            ORDER BY 
                Average_rating DESC,
                Number_of_assessments DESC
        '''

In [124]:
top_movies = spark.sql(query)

In [128]:
top_movies.show(100)

+-------+--------------+---------------------+
|movieId|Average_rating|Number_of_assessments|
+-------+--------------+---------------------+
|   3038|           5.0|                    4|
|    309|           5.0|                    3|
|   3112|           5.0|                    3|
|  32525|           5.0|                    2|
|   7087|           5.0|                    2|
|  74727|           5.0|                    2|
|  99764|           5.0|                    2|
|   9010|           5.0|                    2|
|    759|           5.0|                    2|
|   6598|           5.0|                    2|
|   1859|           5.0|                    2|
|   6918|           5.0|                    2|
|   4466|           5.0|                    1|
|  26843|           5.0|                    1|
|  77291|           5.0|                    1|
|   7208|           5.0|                    1|
|  26094|           5.0|                    1|
|   5960|           5.0|                    1|
| 112577|    

### Средняя разница между timestamp'ами проставления тегов и рейтинга

In [212]:
query = ''' SELECT 
                t.*,
                ratings_csv.timestamp AS timestamp_rate,
                timestamp_tag-timestamp_rate AS timedelta_between_tag_and_rate
            FROM 
                (SELECT 
                    MAX(tags_csv.movieId) AS movieId,
                    MAX(tags_csv.userId) AS userId,
                    AVG(tags_csv.timestamp) AS timestamp_tag
                FROM tags_csv
                GROUP BY
                    tags_csv.movieId,
                    tags_csv.userId) AS t
            LEFT JOIN ratings_csv
            WHERE 
                t.movieId = ratings_csv.movieId
                AND t.userId = ratings_csv.userId
        '''

In [213]:
timedelta_between_tag_and_rate = spark.sql(query)

In [214]:
timedelta_between_tag_and_rate.show()

+-------+------+--------------+--------------+------------------------------+
|movieId|userId| timestamp_tag|timestamp_rate|timedelta_between_tag_and_rate|
+-------+------+--------------+--------------+------------------------------+
|    339|    15|  1.13853777E9|    1122576622|                   1.5961148E7|
|   1955|    15| 1.193435061E9|    1338698424|                 -1.45263363E8|
|  34162|    15| 1.141391765E9|    1141391741|                          24.0|
|  35957|    15| 1.141391873E9|    1141391844|                          29.0|
|  37729|    15| 1.141391806E9|    1141391812|                          -6.0|
|  45950|    15| 1.169616291E9|    1169616283|                           8.0|
| 100365|    15|  1.42587622E9|    1361831156|                   6.4045064E7|
|    150|    23| 1.148672905E9|    1148672933|                         -28.0|
|   2174|    68| 1.249808083E9|    1249808034|                          49.0|
|   8623|    68| 1.249808497E9|    1249808534|                  

In [221]:
timedelta_between_tag_and_rate.agg({'timedelta_between_tag_and_rate': 'avg'}).show()

+-----------------------------------+
|avg(timedelta_between_tag_and_rate)|
+-----------------------------------+
|                  415369.8411245346|
+-----------------------------------+



### Средняя оценка по пользователям

In [225]:
query = ''' SELECT 
                AVG(userId) AS userId,
                AVG(rating) AS average_rating
            FROM 
                ratings_csv
            GROUP BY userId
        '''

In [226]:
average_rating = spark.sql(query)

In [227]:
average_rating.show()

+------+------------------+
|userId|    average_rating|
+------+------------------+
| 296.0|             3.975|
| 467.0|             3.875|
| 125.0| 4.023809523809524|
| 451.0|3.6538461538461537|
| 666.0|              2.95|
|   7.0| 3.465909090909091|
|  51.0| 3.967741935483871|
| 124.0|3.7588235294117647|
| 447.0|3.0229885057471266|
| 591.0|3.2333333333333334|
| 307.0| 4.263888888888889|
| 475.0| 2.761832061068702|
| 574.0|3.5116959064327484|
| 613.0| 3.792452830188679|
| 169.0| 4.110619469026549|
| 205.0|3.4101941747572817|
| 334.0| 3.985294117647059|
| 544.0| 4.472014925373134|
| 577.0| 4.345878136200717|
| 581.0|1.4591836734693877|
+------+------------------+
only showing top 20 rows



In [228]:
average_rating.agg({'average_rating': 'avg'}).show()

+-------------------+
|avg(average_rating)|
+-------------------+
| 3.6575868932068687|
+-------------------+



## Блок 3

In [539]:
from pyspark.sql import functions as f
from pyspark.sql.types import *

In [540]:
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDRegressor

In [541]:
df_tags = tags_csv.toPandas()
df_ratings = ratings_csv.toPandas()

In [542]:
df_tags = df_tags.groupby(['movieId'])['tag'].apply(lambda x: ' '.join(x)).reset_index()
df_ratings[['rating']] = df_ratings[['rating']].astype(float)
df_ratings = df_ratings.groupby(['movieId'])['rating'].mean().reset_index()

df = df_tags.merge(df_ratings, on='movieId')
df

Unnamed: 0,movieId,tag,rating
0,1,Pixar,3.872470
1,100365,activist documentary uganda,2.500000
2,101285,getdvd,3.166667
3,101525,toplist12,3.900000
4,101895,toplist13,3.166667
...,...,...,...
625,97938,Ang Lee India ocean visually appealing toplist12,3.772727
626,98154,toplist12,3.428571
627,98961,toplist12,3.590909
628,99114,toplist12,3.977273


In [543]:
X_train, X_test, y_train, y_test = train_test_split(df['tag'], 
                                                    df['rating'], 
                                                    train_size=0.8, 
                                                    random_state=42)

In [544]:
vect_word = TfidfVectorizer(
    max_features=1000,
    lowercase=True,
    analyzer="word",
    ngram_range=(1, 6),
    dtype=np.float32
)

In [545]:
# Train tf-idf
X_train_tfidf = vect_word.fit_transform(X_train)

In [546]:
# Init SGDRegressor
sgd_reg = SGDRegressor(random_state=42)

In [547]:
sgd_reg.fit(X_train_tfidf, y_train)

In [548]:
df_test = pd.concat([X_test, y_test], axis=1)

In [549]:
df_test

Unnamed: 0,tag,rating
497,martial arts,3.863636
244,forgettable,3.547619
552,toplist10,3.925000
213,controversial,3.322785
549,toplist10,3.630435
...,...,...
388,toplist07,4.222222
322,jesus,4.750000
218,stupid,2.026316
462,kung fu martial arts well done,3.958333


In [550]:
spark_df_test = spark.createDataFrame(df_test)

In [551]:
spark_df_test.show()

+--------------------+------------------+
|                 tag|            rating|
+--------------------+------------------+
|        martial arts|3.8636363636363638|
|         forgettable|3.5476190476190474|
|           toplist10|             3.925|
|       controversial|3.3227848101265822|
|           toplist10| 3.630434782608696|
|           toplist09| 3.480769230769231|
|aliens franchise ...|3.1923076923076925|
|          space star|3.5172413793103448|
|            holes70s|3.3333333333333335|
|demons horror pos...|         3.7265625|
|          dull story| 3.738095238095238|
|action anime kung fu|               3.5|
|           toplist14|2.7777777777777777|
|              getdvd|               2.0|
|           toplist08|3.9473684210526314|
|    getdvd toplist15|3.7903225806451615|
|              getdvd|3.1666666666666665|
|sci fi spock Star...|3.8333333333333335|
|cool and great mu...|               4.2|
|                 dvd|               4.0|
+--------------------+------------

In [552]:
@f.pandas_udf(DoubleType())
def pred_rate(tags):
    # Map tf-idf on test
    X_test_tfidf = vect_word.transform(tags).toarray()
    output = tags.apply(lambda x: sgd_reg.predict(X_test_tfidf))
    return pd.Series(output[0])

In [553]:
%%time
spark_df_test\
    .withColumn("pred", pred_rate(f.col("tag")))\
    .collect()

[Stage 290:>                                                        (0 + 2) / 2]

CPU times: user 18.1 ms, sys: 3.17 ms, total: 21.3 ms
Wall time: 1.46 s


                                                                                

[Row(tag='martial arts', rating=3.8636363636363638, pred=3.0397504225961516),
 Row(tag='forgettable', rating=3.5476190476190474, pred=1.5856514730325881),
 Row(tag='toplist10', rating=3.925, pred=3.8464636328436774),
 Row(tag='controversial', rating=3.3227848101265822, pred=1.5856514730325881),
 Row(tag='toplist10', rating=3.630434782608696, pred=3.8464636328436774),
 Row(tag='toplist09', rating=3.480769230769231, pred=3.827122694061394),
 Row(tag='aliens franchise funny time travel', rating=3.1923076923076925, pred=2.7968947307457435),
 Row(tag='space star', rating=3.5172413793103448, pred=1.7875792021300634),
 Row(tag='holes70s', rating=3.3333333333333335, pred=3.782419642486927),
 Row(tag='demons horror possession scary', rating=3.7265625, pred=1.9222805863598385),
 Row(tag='dull story', rating=3.738095238095238, pred=1.9766660389255515),
 Row(tag='action anime kung fu', rating=3.5, pred=3.0926351029386536),
 Row(tag='toplist14', rating=2.7777777777777777, pred=3.810116870515477),
 

In [554]:
%%time
spark_df_test = spark_df_test.withColumn("pred", pred_rate(f.col("tag")))

CPU times: user 2.14 ms, sys: 125 µs, total: 2.27 ms
Wall time: 6.98 ms


In [555]:
spark_df_test.show()

+--------------------+------------------+------------------+
|                 tag|            rating|              pred|
+--------------------+------------------+------------------+
|        martial arts|3.8636363636363638|3.0397504225961516|
|         forgettable|3.5476190476190474|1.5856514730325881|
|           toplist10|             3.925|3.8464636328436774|
|       controversial|3.3227848101265822|1.5856514730325881|
|           toplist10| 3.630434782608696|3.8464636328436774|
|           toplist09| 3.480769230769231| 3.827122694061394|
|aliens franchise ...|3.1923076923076925|2.7968947307457435|
|          space star|3.5172413793103448|1.7875792021300634|
|            holes70s|3.3333333333333335| 3.782419642486927|
|demons horror pos...|         3.7265625|1.9222805863598385|
|          dull story| 3.738095238095238|1.9766660389255515|
|action anime kung fu|               3.5|3.0926351029386536|
|           toplist14|2.7777777777777777| 3.810116870515477|
|              getdvd|  

In [556]:
spark_df_test.createOrReplaceTempView("spark_df_test")

In [568]:
query = ''' SELECT SQRT(AVG(squared_error)) AS RMSE
            FROM
                (SELECT 
                    *,
                    POW (rating - pred, 2) AS squared_error
                FROM 
                    spark_df_test)
        '''

In [569]:
rmse = spark.sql(query)

In [570]:
rmse.show()

+------------------+
|              RMSE|
+------------------+
|1.1030680377369404|
+------------------+

