In [1]:
import pyspark

In [2]:
from pyspark import SparkContext, SparkConf

In [2]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [11]:
!hdfs dfs -put ../ml-latest-small /

In [3]:
!hdfs dfs -ls /ml-latest-small

Found 5 items
-rw-r--r--   3 root supergroup       8342 2023-12-10 22:02 /ml-latest-small/README.txt
-rw-r--r--   3 root supergroup     197979 2023-12-10 22:02 /ml-latest-small/links.csv
-rw-r--r--   3 root supergroup     494431 2023-12-10 22:02 /ml-latest-small/movies.csv
-rw-r--r--   3 root supergroup    2483723 2023-12-10 22:02 /ml-latest-small/ratings.csv
-rw-r--r--   3 root supergroup     118660 2023-12-10 22:02 /ml-latest-small/tags.csv


In [4]:
conf = SparkConf().set("spark.executor.instances", "2").set("spark.executor.cores", "1").set("spark.executor.memory", "1g")

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.master("yarn").appName("danyaev_spark").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/11 01:58:41 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [7]:
spark

In [9]:
from pyspark.sql.types import *

In [12]:
ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

In [13]:
ratings_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(ratings_schema)\
    .load("/ml-latest-small/ratings.csv")

In [15]:
ratings_df

DataFrame[userId: int, movieId: int, rating: double, timestamp: bigint]

In [20]:
ratings_df.count()

100836

1 stage, 1 task

In [16]:
tags_schema = StructType([
    StructField("userId", IntegerType(), True), 
    StructField("movieId", IntegerType(), True), 
    StructField("tag", StringType(), True), 
    StructField("timestamp", LongType(), True)])

In [17]:
tags_df = spark.read.csv("/ml-latest-small/tags.csv", header=True, schema=tags_schema)

In [18]:
tags_df

DataFrame[userId: int, movieId: int, tag: string, timestamp: bigint]

In [19]:
tags_df.count()

3683

1 stage, 1 task

Посчитать количество уникальных фильмов и уникальных юзеров в таблице “ratings”

In [22]:
ratings_df.select("movieId").distinct().count()

9724

In [23]:
ratings_df.select("userId").distinct().count()

610

Посчитать, сколько было поставлено оценок >= 4.0

In [24]:
import pyspark.sql.functions as f

In [25]:
ratings_df.select("rating").filter(f.col("rating") >= 4.0).count()

48580

Вывести топ100 фильмов с самым высоким рейтингом. (6 баллов)

In [32]:
ratings_df.groupBy(f.col("movieId"))\
    .agg(f.avg(f.col("rating")).alias("avg_rating"))\
    .orderBy("avg_rating", ascending=False)\
    .show(100)
    

+-------+----------+
|movieId|avg_rating|
+-------+----------+
|  80124|       5.0|
|  74226|       5.0|
| 175387|       5.0|
|    148|       5.0|
| 158027|       5.0|
|  67618|       5.0|
|  42556|       5.0|
| 142444|       5.0|
|  44943|       5.0|
|  27523|       5.0|
| 113829|       5.0|
|   3073|       5.0|
| 152711|       5.0|
| 132333|       5.0|
|     53|       5.0|
|  27704|       5.0|
| 147300|       5.0|
| 187717|       5.0|
|    633|       5.0|
|   8238|       5.0|
|    876|       5.0|
| 120138|       5.0|
|  53578|       5.0|
| 163072|       5.0|
| 160644|       5.0|
| 172909|       5.0|
|   5490|       5.0|
| 172583|       5.0|
|   2972|       5.0|
|   2196|       5.0|
|  25887|       5.0|
| 118834|       5.0|
|   3795|       5.0|
| 156025|       5.0|
|   3941|       5.0|
| 120130|       5.0|
|    626|       5.0|
|   1349|       5.0|
|  26928|       5.0|
| 122092|       5.0|
|  40491|       5.0|
|  71268|       5.0|
| 167064|       5.0|
|  99636|       5.0|
|  25906|    

Посчитать разницу во времени в секундах между временем тегирования пользователя данного фильма и временем, когда пользователь поставил оценку фильму. В качестве ответа выведете среднюю дельту по времени

In [33]:
ratings_with_tags_df = ratings_df.alias("r").join(tags_df.alias("t"), on=["userId", "movieId"], how="inner")

In [34]:
ratings_with_tags_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: long (nullable = true)



In [42]:
ratings_with_tags_with_delta = ratings_with_tags_df\
    .withColumn("delta", f.abs(f.col("r.timestamp")-f.col("t.timestamp")))

In [43]:
ratings_with_tags_with_delta.show(10)

+------+-------+------+----------+-----------------+----------+-----+
|userId|movieId|rating| timestamp|              tag| timestamp|delta|
+------+-------+------+----------+-----------------+----------+-----+
|     2|  60756|   5.0|1445714980|     will ferrell|1445714992|   12|
|     2|  60756|   5.0|1445714980|  Highly quotable|1445714996|   16|
|     2|  60756|   5.0|1445714980|            funny|1445714994|   14|
|     2|  89774|   5.0|1445715189|        Tom Hardy|1445715205|   16|
|     2|  89774|   5.0|1445715189|              MMA|1445715200|   11|
|     2|  89774|   5.0|1445715189|     Boxing story|1445715207|   18|
|     2| 106782|   5.0|1445714966|  Martin Scorsese|1445715056|   90|
|     2| 106782|   5.0|1445714966|Leonardo DiCaprio|1445715051|   85|
|     2| 106782|   5.0|1445714966|            drugs|1445715054|   88|
|     7|  48516|   1.0|1169687318|     way too long|1169687325|    7|
+------+-------+------+----------+-----------------+----------+-----+
only showing top 10 

In [45]:
ratings_with_tags_with_delta.agg(f.avg(f.col("delta"))).show()

+--------------------+
|          avg(delta)|
+--------------------+
|2.9203715568469506E7|
+--------------------+



Посчитать среднюю оценку от каждого пользователя, в качестве ответа выведете среднее от всех усредненных оценок всех пользователей

In [40]:
ratings_df.groupBy(f.col("userId"))\
    .agg(f.avg(f.col("rating")).alias("avg_rating"))\
    .agg(f.avg(f.col("avg_rating")))\
    .show()

+------------------+
|   avg(avg_rating)|
+------------------+
|3.6572223377474016|
+------------------+

