In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as f

In [2]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [3]:
!hdfs dfs -rm -r /ml-latest-small
!hdfs dfs -put ml-latest-small /

Deleted /ml-latest-small


## 1. Spark Application


Два Executor

In [6]:
conf = (
    SparkConf()
    .set("spark.executor.instances", "1")
    .set("spark.executor.cores", "10")
    .set("spark.executor.memory", "1g")
)
spark = (
    SparkSession
    .builder
    .master(master="local")
    .config(conf=conf)
    .appName("sibagatulin_spark")
    .getOrCreate()
)
spark

In [5]:
spark.stop()
del spark, conf

YARN application:

<img src="img/task_1 spark app in yarn.png" alt="image" width="700" height="auto">

Spark UI:

<img src="img/task_1 spark ui.png" alt="image" width="700" height="auto">

In [38]:
tags_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", LongType()),
])

tags_df = (
    spark
    .read
    .format("csv")
    .option("header", "True")
    .schema(tags_schema)
    .load("/ml-latest-small/tags.csv")
)
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [39]:
ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

ratings_df = (
    spark
    .read
    .format("csv")
    .option("header", "True")
    .schema(ratings_schema)
    .load("/ml-latest-small/ratings.csv")
)

ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [17]:
tags_df.select(f.count("userId")).show()

+-------------+
|count(userId)|
+-------------+
|         3683|
+-------------+



In [82]:
tags_df.count()

3683

In [40]:
ratings_df.select(f.count("userId")).show()

+-------------+
|count(userId)|
+-------------+
|       100836|
+-------------+



In [83]:
ratings_df.count()

100836

<img src="img/task_1 job_1.png" alt="image" width="300" height="auto"> <img src="img/task_1 job_2.png" alt="image" width="300" height="auto">

Выполонено 1 стейдж и 1 таска

## 2. Работа с данными

- Посчитать количество уникальных фильмов и уникальных юзеров в таблице “ratings”.

In [36]:
ratings_df.groupby("userId").count().show(5)

+------+-----+
|userId|count|
+------+-----+
|   148|   48|
|   463|   33|
|   471|   28|
|   496|   29|
|   243|   36|
+------+-----+
only showing top 5 rows



In [37]:
ratings_df.groupby("movieId").count().show(5)

+-------+-----+
|movieId|count|
+-------+-----+
|   1580|  165|
|   2366|   25|
|   3175|   75|
|   1088|   42|
|  32460|    4|
+-------+-----+
only showing top 5 rows



- Посчитать, сколько было поставлено оценок >= 4.0

In [45]:
ratings_df.filter(ratings_df.rating >= 4).select(f.count("rating")).show()

[Stage 54:>                                                         (0 + 1) / 1]

+-------------+
|count(rating)|
+-------------+
|        48580|
+-------------+



                                                                                

- Вывести топ100 фильмов с самым высоким рейтингом

In [49]:
(
    ratings_df
    .select("movieId", "rating")
    .orderBy("rating", ascending=False)
    .limit(100)
    .show(100)
)

+-------+------+
|movieId|rating|
+-------+------+
|   1573|   5.0|
|   3147|   5.0|
|   1732|   5.0|
|   1617|   5.0|
|   3053|   5.0|
|   1625|   5.0|
|     50|   5.0|
|   1804|   5.0|
|    151|   5.0|
|   1927|   5.0|
|    163|   5.0|
|   1954|   5.0|
|    231|   5.0|
|   2005|   5.0|
|    333|   5.0|
|   2018|   5.0|
|    457|   5.0|
|   2033|   5.0|
|    553|   5.0|
|   2048|   5.0|
|    608|   5.0|
|   2058|   5.0|
|    919|   5.0|
|   2078|   5.0|
|    940|   5.0|
|   2090|   5.0|
|   1023|   5.0|
|   2094|   5.0|
|   1025|   5.0|
|   2115|   5.0|
|   1031|   5.0|
|   2116|   5.0|
|   1049|   5.0|
|   2137|   5.0|
|   1080|   5.0|
|   2139|   5.0|
|   1092|   5.0|
|   2141|   5.0|
|   1136|   5.0|
|   2161|   5.0|
|   1197|   5.0|
|   2291|   5.0|
|   1206|   5.0|
|   2329|   5.0|
|   1213|   5.0|
|   2353|   5.0|
|   1222|   5.0|
|   2387|   5.0|
|   1226|   5.0|
|   2395|   5.0|
|   1256|   5.0|
|   2427|   5.0|
|   1275|   5.0|
|   2459|   5.0|
|   1282|   5.0|
|   2470|   5.

- Посчитать разницу во времени в секундах между временем тегирования пользователя данного фильма и временем, когда пользователь поставил оценку фильму. В качестве ответа выведете среднюю дельту по времени.

In [55]:
ratings_with_tags_df = ratings_df.alias("r").join(tags_df.alias("t"), on=["userId", "movieId"], how="inner")
ratings_with_tags_df.show(5)

+------+-------+------+----------+---------------+----------+
|userId|movieId|rating| timestamp|            tag| timestamp|
+------+-------+------+----------+---------------+----------+
|     2|  60756|   5.0|1445714980|   will ferrell|1445714992|
|     2|  60756|   5.0|1445714980|Highly quotable|1445714996|
|     2|  60756|   5.0|1445714980|          funny|1445714994|
|     2|  89774|   5.0|1445715189|      Tom Hardy|1445715205|
|     2|  89774|   5.0|1445715189|            MMA|1445715200|
+------+-------+------+----------+---------------+----------+
only showing top 5 rows



In [62]:
(
    ratings_with_tags_df
    .select("userId", (f.col("t.timestamp") - f.col("r.timestamp")).alias("delta"))
    .groupby("userId")
    .avg("delta")
    .show(5)
)

+------+-------------------+
|userId|         avg(delta)|
+------+-------------------+
|   513|               75.0|
|   193|-107.41176470588235|
|   300|              326.0|
|    76|              -10.0|
|   606| 2441534.8333333335|
+------+-------------------+
only showing top 5 rows



- Посчитать среднюю оценку от каждого пользователя, в качестве ответа выведете среднее от всех усредненных оценок всех пользователей.

In [73]:
(
    ratings_df
    .groupby("userId")
    .avg("rating")
    .select(f.avg("avg(rating)"))
    .show()
)

+------------------+
|  avg(avg(rating))|
+------------------+
|3.6572223377474016|
+------------------+



## 3. UDF

In [87]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDRegressor
from sklearn.model_selection import train_test_split

In [107]:
data_df = ratings_with_tags_df.select("tag", "rating")
data_df.show(3)

+---------------+------+
|            tag|rating|
+---------------+------+
|   will ferrell|   5.0|
|Highly quotable|   5.0|
|          funny|   5.0|
+---------------+------+
only showing top 3 rows



In [108]:
df = data_df.toPandas()
df.head(3)

Unnamed: 0,tag,rating
0,will ferrell,5.0
1,Highly quotable,5.0
2,funny,5.0


In [109]:
vectorizer = TfidfVectorizer()
vectors = vectorizer.fit_transform(df["tag"])
vectors.shape

(3476, 1708)

In [110]:
model = SGDRegressor()
model.fit(vectors, df["rating"])

In [132]:
@f.udf(FloatType())
def predict_rating(tag):
    x = vectorizer.transform([tag])
    return model.predict(x).item()

In [154]:
data_df = data_df.withColumn("predicted_rating", predict_rating(f.col("tag")))
data_df.show(50)

[Stage 205:>                                                        (0 + 1) / 1]

+--------------------+------+----------------+
|                 tag|rating|predicted_rating|
+--------------------+------+----------------+
|        will ferrell|   5.0|        4.026239|
|     Highly quotable|   5.0|       3.9530454|
|               funny|   5.0|        4.370658|
|           Tom Hardy|   5.0|       3.8381026|
|                 MMA|   5.0|       3.4290197|
|        Boxing story|   5.0|        3.925313|
|     Martin Scorsese|   5.0|       3.8527648|
|   Leonardo DiCaprio|   5.0|        4.237917|
|               drugs|   5.0|        4.290987|
|        way too long|   1.0|       3.4317873|
|               mafia|   4.0|       4.1640844|
|            gangster|   4.0|       3.6265683|
|           Al Pacino|   4.0|          3.8612|
|               Mafia|   5.0|       4.1640844|
|           Al Pacino|   5.0|          3.8612|
|          true story|   4.5|       3.8489664|
|           holocaust|   4.5|       3.9758172|
|        twist ending|   4.5|        4.837204|
|        twis

                                                                                

In [155]:
(
    data_df
    .select((f.pow(f.col("rating") - f.col("predicted_rating"), 2).alias("diff_pow")))
    .select(f.sqrt(f.sum("diff_pow") / data_df.count()).alias("RMSE"))
    .show()
)

[Stage 211:>                                                        (0 + 1) / 1]

+------------------+
|              RMSE|
+------------------+
|0.8734757249097925|
+------------------+



                                                                                