## Задание 1

In [1]:
import pyspark
from pyspark import SparkContext, SparkConf

In [2]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [3]:
conf = (
    SparkConf()
    .set("spark.executor.instances", "2")
    .set("spark.executor.cores", "2")
    .set("spark.executor.memory", "512m")
)

sc = SparkContext(appName="lopatin_spark_2", master="yarn")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/30 00:28:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/12/30 00:28:39 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [None]:
!hdfs dfs -put ml-latest-small .

In [5]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("yarn")
    .appName("lopatin_spark_sql")
    .getOrCreate()
)
spark

In [6]:
ratings_df = (
    spark
    .read
    .format("csv")
    .option("header", "True")
    .load("ml-latest-small/ratings.csv")
)
ratings_df.count()

                                                                                

100836

In [7]:
tags_df = (
    spark
    .read
    .format("csv")
    .option("header", "True")
    .load("ml-latest-small/tags.csv")
)
tags_df.count()

                                                                                

3683

для каждого подсчета было выполнено 3 стейджа, в каждом по одной таски

## Задание 2

In [10]:
import pyspark.sql.functions as f

In [11]:
# количество уникальных фильмов
ratings_df.groupby("movieId").count().count()

                                                                                

9724

In [12]:
# количество уникальных юзеров
ratings_df.groupby("userId").count().count()

                                                                                

610

In [13]:
# количество оценок >= 4.0
ratings_df.filter(f.col("rating") >= 4.0).count()

                                                                                

48580

In [14]:
# топ-100 фильмов с самым высоким рейтингом
(
    ratings_df
    .groupBy('movieId')
    .agg(f.avg(f.col("rating")).alias("avg_rating"))
    .orderBy("avg_rating", ascending=False)
    .show(100)
)

                                                                                

+-------+----------+
|movieId|avg_rating|
+-------+----------+
| 102084|       5.0|
| 126921|       5.0|
|   5723|       5.0|
|    467|       5.0|
|  95149|       5.0|
| 140627|       5.0|
|    633|       5.0|
| 147330|       5.0|
| 103602|       5.0|
|  96608|       5.0|
| 118270|       5.0|
|   1349|       5.0|
| 147410|       5.0|
|    496|       5.0|
| 136355|       5.0|
| 136341|       5.0|
| 173351|       5.0|
|    495|       5.0|
| 131724|       5.0|
|  47736|       5.0|
|   6835|       5.0|
|  69469|       5.0|
| 131098|       5.0|
| 151769|       5.0|
| 172875|       5.0|
|   5745|       5.0|
|  26147|       5.0|
|  73822|       5.0|
|   4495|       5.0|
|    626|       5.0|
|   3939|       5.0|
|   3496|       5.0|
|  44851|       5.0|
| 140265|       5.0|
|  53280|       5.0|
| 107951|       5.0|
| 141718|       5.0|
|  72692|       5.0|
|  27751|       5.0|
| 175397|       5.0|
|    876|       5.0|
|  26073|       5.0|
|  85295|       5.0|
| 113829|       5.0|
| 136834|    

In [17]:
# временная разница между тегированием и оценкой
(
    ratings_df
    .alias("r")
    .join(tags_df.alias("t"), on=["userId", "movieId"], how="inner")
    .select((f.col("t.timestamp") - f.col("r.timestamp")).alias("delta"))
    .select(f.avg(f.col("delta")))
    .show()
)

[Stage 39:>                                                         (0 + 1) / 1]

+--------------------+
|          avg(delta)|
+--------------------+
|2.6243727372266974E7|
+--------------------+



                                                                                

In [16]:
# усредненная по пользователям средняя оценка фильма
(
    ratings_df
    .groupBy("userId")
    .agg(f.avg(f.col("rating")).alias("avg_rating"))
    .select(f.avg(f.col("avg_rating")))
    .show()
)

[Stage 32:>                                                         (0 + 1) / 1]

+------------------+
|   avg(avg_rating)|
+------------------+
|3.6572223377474007|
+------------------+



                                                                                

## Задание 3

In [8]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDRegressor

import pandas as pd
from pyspark.sql.types import FloatType

<jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
<jemalloc>: (This is the expected behaviour if you are running under QEMU)


In [9]:
joined_df = ratings_df.join(tags_df, on=["userId", "movieId"], how="inner").toPandas()

                                                                                

In [10]:
vectorizer = TfidfVectorizer()
vectorizer.fit(joined_df["tag"])

In [11]:
model = SGDRegressor()
X = vectorizer.transform(joined_df["tag"])
y = joined_df["rating"]
model.fit(X, y)

In [32]:
@f.pandas_udf(FloatType())
def predict_rating(tag):
    return pd.Series(model.predict(vectorizer.transform(tag)))

In [33]:
(
    tags_df
    .withColumn("rating_predicted", predict_rating(f.col("tag")))
    .show(50)
)

                                                                                

+------+-------+--------------------+----------+----------------+
|userId|movieId|                 tag| timestamp|rating_predicted|
+------+-------+--------------------+----------+----------------+
|     2|  60756|               funny|1445714994|        4.364758|
|     2|  60756|     Highly quotable|1445714996|       3.9516141|
|     2|  60756|        will ferrell|1445714992|       4.0286427|
|     2|  89774|        Boxing story|1445715207|       3.9261923|
|     2|  89774|                 MMA|1445715200|        3.419473|
|     2|  89774|           Tom Hardy|1445715205|       3.8319461|
|     2| 106782|               drugs|1445715054|       4.2879524|
|     2| 106782|   Leonardo DiCaprio|1445715051|       4.2352877|
|     2| 106782|     Martin Scorsese|1445715056|       3.8444886|
|     7|  48516|        way too long|1169687325|       3.4328892|
|    18|    431|           Al Pacino|1462138765|       3.8567297|
|    18|    431|            gangster|1462138749|       3.6196296|
|    18|  

In [37]:
(
    ratings_df
    .join(tags_df, on=["userId", "movieId"], how="inner")
    .withColumn("rating_predicted", predict_rating(f.col("tag")))
    .withColumn("squared_error", f.pow(f.col("rating_predicted") - f.col("rating"), f.lit(2)))
    .select(f.avg("squared_error").alias("mse"))
    .show()
)

[Stage 22:>                                                         (0 + 1) / 1]

+------------------+
|               mse|
+------------------+
|0.7645033112216735|
+------------------+



                                                                                

было выполнено 3 стейджа с одной таской каждая