## Part 2

### Запуск спарк сессии

In [1]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
conf = SparkConf().set("spark.executor.instances", "2").set("spark.executor.cores", "1").set("spark.executor.memory", "1g")

spark = SparkSession.builder.config(conf=conf).master(master="yarn").appName("Porsev_spark").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/10 19:31:08 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


### Работа с данными

In [4]:
!hdfs dfs -put ml-latest-small .

put: `ml-latest-small/README.txt': File exists
put: `ml-latest-small/ratings.csv': File exists
put: `ml-latest-small/links.csv': File exists
put: `ml-latest-small/tags.csv': File exists
put: `ml-latest-small/movies.csv': File exists


In [5]:
from pyspark.sql.types import *

In [13]:
r_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

ratings = spark.read\
               .format("csv")\
               .option("header", "True")\
               .schema(r_schema)\
                .load("ml-latest-small/ratings.csv")

t_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", LongType()),
])

tags = spark.read\
            .format("csv")\
            .option("header", "True")\
            .schema(t_schema)\
            .load("ml-latest-small/tags.csv")

print('Number of lines (rating):', ratings.count())

print('Number of lines (tags): ',tags.count())

Number of lines (rating): 100836
Number of lines (tags):  3683


Загрузить оставшиеся датасеты

In [14]:
l_schema = StructType(fields=[
    StructField("movieId", IntegerType()),
    StructField("imdbId", IntegerType()),
    StructField("tmdbId", IntegerType()),
])

links = spark.read\
               .format("csv")\
               .option("header", "True")\
               .schema(l_schema)\
                .load("ml-latest-small/links.csv")

m_schema = StructType(fields=[
    StructField("movieId", IntegerType()),
    StructField("title", StringType()),
    StructField("genres", StringType()),
])

movies = spark.read\
               .format("csv")\
               .option("header", "True")\
               .schema(m_schema)\
                .load("ml-latest-small/movies.csv")

### Задачи

Посчитать количество уникальных фильмов и уникальных юзеров в таблице “ratings”. (5 баллов)

In [22]:
unique_movies_count = ratings.select("movieId").distinct().count()
print(f"Uniques movies: {unique_movies_count}")

unique_users_count = ratings.select("userId").distinct().count()
print(f"Uniques users: {unique_users_count}")

Uniques movies: 9724
Uniques users: 610


Посчитать, сколько было поставлено оценок >= 4.0. (5 баллов)

In [25]:
above4_ratings_count = ratings.filter(ratings["rating"] >= 4.0).count()
print(f"Ratings >= 4.0: {above4_ratings_count}")

Ratings >= 4.0: 48580


In [29]:
tags.take(5)

[Row(userId=2, movieId=60756, tag='funny', timestamp=1445714994),
 Row(userId=2, movieId=60756, tag='Highly quotable', timestamp=1445714996),
 Row(userId=2, movieId=60756, tag='will ferrell', timestamp=1445714992),
 Row(userId=2, movieId=89774, tag='Boxing story', timestamp=1445715207),
 Row(userId=2, movieId=89774, tag='MMA', timestamp=1445715200)]

Вывести топ100 фильмов с самым высоким рейтингом. (6 баллов)

In [27]:
from pyspark.sql.functions import avg

# Соединим два датафрейма
joined = ratings.join(movies, on="movieId")

# Вычислим средний рейтинг для каждого фильма
avg_ratings = joined.groupBy("movieId", "title").agg(avg("rating").alias("avg_rating"))

# Получим 100 лучших фильмов
top_100_movies = avg_ratings.orderBy("avg_rating", ascending=False).limit(100)
top_100_movies.show(100, truncate=False)

+-------+--------------------------------------------------------------------+----------+
|movieId|title                                                               |avg_rating|
+-------+--------------------------------------------------------------------+----------+
|150554 |The Love Bug (1997)                                                 |5.0       |
|134796 |Bitter Lake (2015)                                                  |5.0       |
|4813   |When Worlds Collide (1951)                                          |5.0       |
|67618  |Strictly Sexual (2008)                                              |5.0       |
|2972   |Red Sorghum (Hong gao liang) (1987)                                 |5.0       |
|78836  |Enter the Void (2009)                                               |5.0       |
|26350  |Passenger, The (Professione: reporter) (1975)                       |5.0       |
|104780 |Mystery of the Third Planet, The (Tayna tretey planety) (1981)      |5.0       |
|136556 |K

Посчитать разницу во времени в секундах между временем тегирования пользователя данного фильма и временем, когда пользователь поставил оценку фильму. В качестве ответа выведете среднюю дельту по времени. (7 баллов)

In [41]:
from pyspark.sql.functions import col

# Соединим два датафрейма
joined = ratings.alias("ratings").join(tags.alias("tags"), on=["userId", "movieId"])

# # Посчитаем разницу во времени
time_diff = joined.withColumn("time_diff", col("ratings.timestamp") - col("tags.timestamp"))

# Средняя дельта времени
avg_delta = abs(time_diff.selectExpr("avg(time_diff) as avg_delta").collect()[0]["avg_delta"])
print("Average delta over time (s):", avg_delta)

Average delta over time (s): 26243727.372266974


Посчитать среднюю оценку от каждого пользователя, в качестве ответа выведете среднее от всех усредненных оценок всех пользователей. (7 баллов)

In [44]:
# Средний рейтинг пользователей
avg_rating = ratings.groupBy("userId").agg(avg("rating").alias("avg_rating"))

# Средний рейтинг всех средних по пользователям
overall_avg_rating = avg_rating.select(avg("avg_rating")).collect()[0][0]
print("Average ratings of all users:", overall_avg_rating)

Average ratings of all users: 3.6572223377474016
