# Settings

## Imports

In [111]:
import numpy as np
import pyspark

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.functions import col

## Disable safemode

In [3]:
!hdfs dfsadmin -safemode leave

2023-12-13 19:18:42 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Safe mode is OFF


# SparkSession (30 points)

In [4]:
conf = SparkConf().set("spark.executor.instances", "2").set("spark.executor.cores", "1").set("spark.executor.memory", "1g")

In [5]:
spark = SparkSession.builder.master("yarn").appName("Denisov_spark").config(conf=conf).getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/13 19:18:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/13 19:18:48 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [7]:
spark

# View datasets (10 points)

## HDFS work

In [8]:
!hdfs dfs -rm -r ml-latest-small

2023-12-13 19:24:42 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted ml-latest-small


In [9]:
!hdfs dfs -ls .

2023-12-13 19:24:55 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
drwxr-xr-x   - root supergroup          0 2023-12-13 19:18 .sparkStaging


In [10]:
!hdfs dfs -put ml-latest-small .

2023-12-13 19:25:04 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [11]:
!hdfs dfs -ls .

2023-12-13 19:25:10 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
drwxr-xr-x   - root supergroup          0 2023-12-13 19:18 .sparkStaging
drwxr-xr-x   - root supergroup          0 2023-12-13 19:25 ml-latest-small


In [12]:
!hdfs dfs -ls ml-latest-small

2023-12-13 19:26:55 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
-rw-r--r--   3 root supergroup       8342 2023-12-13 19:25 ml-latest-small/README.txt
-rw-r--r--   3 root supergroup     197979 2023-12-13 19:25 ml-latest-small/links.csv
-rw-r--r--   3 root supergroup     494431 2023-12-13 19:25 ml-latest-small/movies.csv
-rw-r--r--   3 root supergroup    2483723 2023-12-13 19:25 ml-latest-small/ratings.csv
-rw-r--r--   3 root supergroup     118660 2023-12-13 19:25 ml-latest-small/tags.csv


## PySpark work

In [14]:
ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

In [15]:
%%time
ratings_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(ratings_schema)\
    .load("ml-latest-small/ratings.csv")

CPU times: user 3.57 ms, sys: 1.46 ms, total: 5.02 ms
Wall time: 650 ms


In [16]:
ratings_df

DataFrame[userId: int, movieId: int, rating: double, timestamp: bigint]

In [17]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)



In [18]:
ratings_df.show(5)

                                                                                

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [20]:
ratings_df.summary().show()

23/12/13 19:28:05 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 1:>                                                          (0 + 1) / 1]

+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8|
|    min|                 1|               1|               0.5|           828124615|
|    25%|               177|            1199|               3.0|          1018535155|
|    50%|               325|            2991|               3.5|          1186086516|
|    75%|               477|            8092|               4.0|          1435993828|
|    max|               610|          193609|               5.0|          1537799250|
+-------+------------------+----------------+---------

                                                                                

In [21]:
ratings_df.count() # 2 stages, 2 tasks

                                                                                

100836

In [98]:
ratings_df.summary().show()

+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8|
|    min|                 1|               1|               0.5|           828124615|
|    25%|               177|            1199|               3.0|          1018535155|
|    50%|               325|            2991|               3.5|          1186086516|
|    75%|               477|            8092|               4.0|          1435993828|
|    max|               610|          193609|               5.0|          1537799250|
+-------+------------------+----------------+---------

In [22]:
%%time
tags_df = spark.read.format("csv").option("header", "True").load("ml-latest-small/tags.csv")

CPU times: user 10.4 ms, sys: 1.85 ms, total: 12.3 ms
Wall time: 320 ms


In [92]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [23]:
tags_df.count() # 2 stages, 2 tasks

3683

In [93]:
tags_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", LongType()),
])

In [94]:
%%time
tags_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(tags_schema)\
    .load("ml-latest-small/tags.csv")

CPU times: user 5.66 ms, sys: 1.43 ms, total: 7.09 ms
Wall time: 32.7 ms


In [99]:
tags_df.summary().show()

+-------+------------------+-----------------+-----------+--------------------+
|summary|            userId|          movieId|        tag|           timestamp|
+-------+------------------+-----------------+-----------+--------------------+
|  count|              3683|             3683|       3683|                3683|
|   mean| 431.1493347814282|27252.01357588922|       NULL| 1.320031966823785E9|
| stddev|158.47255348483532|43490.55880276775|       NULL|1.7210245043712625E8|
|    min|                 2|                1|"""artsy"""|          1137179352|
|    25%|               424|             1262|       NULL|          1137521204|
|    50%|               474|             4454|       NULL|          1269832564|
|    75%|               477|            39292|       NULL|          1498456766|
|    max|               610|           193565|    zombies|          1537098603|
+-------+------------------+-----------------+-----------+--------------------+



# Work with data (30 points)

## Count number of unique films and unique users in `ratings` (5 points)

In [24]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [65]:
ratings_df.groupby("movieId").count().count()

9724

In [66]:
ratings_df.groupby("userId").count().count()

610

## Count number of rating >= 4.0 (5 points)

In [67]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [71]:
ratings_df.filter(f.col("rating") >= 4.0).count()

48580

## Print top-100 films with the highest rating (6 points)

In [72]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [81]:
ratings_df.summary().show()

+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8|
|    min|                 1|               1|               0.5|           828124615|
|    25%|               177|            1199|               3.0|          1018535155|
|    50%|               325|            2991|               3.5|          1186086516|
|    75%|               477|            8092|               4.0|          1435993828|
|    max|               610|          193609|               5.0|          1537799250|
+-------+------------------+----------------+---------

In [89]:
ratings_df.orderBy("rating", ascending=False).show(100)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|   1617|   5.0|964982951|
|     1|   1625|   5.0|964983504|
|     1|     47|   5.0|964983815|
|     1|   1732|   5.0|964981125|
|     1|    101|   5.0|964980868|
|     1|   1804|   5.0|964983034|
|     1|    157|   5.0|964984100|
|     1|   1927|   5.0|964981497|
|     1|    216|   5.0|964981208|
|     1|   1954|   5.0|964982176|
|     1|    260|   5.0|964981680|
|     1|   2005|   5.0|964981710|
|     1|    362|   5.0|964982588|
|     1|   2018|   5.0|964980523|
|     1|    527|   5.0|964984002|
|     1|   2033|   5.0|964982903|
|     1|    596|   5.0|964982838|
|     1|   2048|   5.0|964982791|
|     1|    661|   5.0|964982838|
|     1|   2058|   5.0|964982400|
|     1|    923|   5.0|964981529|
|     1|   2078|   5.0|964982838|
|     1|    954|   5.0|964983219|
|     1|   2090|   5.0|964982838|
|     1|   1024|   5.0|964982876|
|     1|   2094|   5.0|964982653|
|     1|   102

## Calculate diff in seconds between user tagging and user feedback. Print mean diff (7 points)

In [101]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [103]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [106]:
ratings_with_tags_df = ratings_df.alias("r").join(tags_df.alias("t"), on=["userId", "movieId"], how="inner")

In [107]:
ratings_with_tags_df.show(5)

+------+-------+------+----------+---------------+----------+
|userId|movieId|rating| timestamp|            tag| timestamp|
+------+-------+------+----------+---------------+----------+
|     2|  60756|   5.0|1445714980|   will ferrell|1445714992|
|     2|  60756|   5.0|1445714980|Highly quotable|1445714996|
|     2|  60756|   5.0|1445714980|          funny|1445714994|
|     2|  89774|   5.0|1445715189|      Tom Hardy|1445715205|
|     2|  89774|   5.0|1445715189|            MMA|1445715200|
+------+-------+------+----------+---------------+----------+
only showing top 5 rows



In [127]:
diffs = ratings_with_tags_df.select((col("t.timestamp") - col("r.timestamp")).alias("diff"), col("t.timestamp").alias("t"), col("r.timestamp").alias("r"))

In [128]:
diffs.show(5)

+----+----------+----------+
|diff|         t|         r|
+----+----------+----------+
|  12|1445714992|1445714980|
|  16|1445714996|1445714980|
|  14|1445714994|1445714980|
|  16|1445715205|1445715189|
|  11|1445715200|1445715189|
+----+----------+----------+
only showing top 5 rows



In [129]:
diffs.orderBy(col("diff"), ascending=False).show(5)

+---------+----------+---------+
|     diff|         t|        r|
+---------+----------+---------+
|163364688|1138032197|974667509|
|163364688|1138032197|974667509|
|163364502|1138032178|974667676|
|163364494|1138031795|974667301|
|163364330|1138031745|974667415|
+---------+----------+---------+
only showing top 5 rows



In [139]:
diffs.orderBy(col("diff"), ascending=True).show(5)

+----------+----------+----------+
|      diff|         t|         r|
+----------+----------+----------+
|-201719480|1242160404|1443879884|
|-201719457|1242160427|1443879884|
|-158138193|1266408634|1424546827|
|-158138189|1266408638|1424546827|
|-158138186|1266408641|1424546827|
+----------+----------+----------+
only showing top 5 rows



In [138]:
diffs.select(f.avg(f.col("diff"))).show()

+--------------------+
|           avg(diff)|
+--------------------+
|2.6243727372266974E7|
+--------------------+



In [142]:
diffs.select(f.avg(f.col("diff"))).collect()[0][0] / 1000 # milliseconds to seconds. Approximately 18 hours...

26243.727372266974

## Calculate mean rating by every user. Print mean of that mean ratings (7 points)

In [144]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [153]:
avg_ratings_by_users = ratings_df.groupby("userId").avg("rating")

In [155]:
avg_ratings_by_users.show(5)

+------+------------------+
|userId|       avg(rating)|
+------+------------------+
|   148|3.7395833333333335|
|   463| 3.787878787878788|
|   471|             3.875|
|   496| 3.413793103448276|
|   243| 4.138888888888889|
+------+------------------+
only showing top 5 rows



In [161]:
avg_ratings_by_users.agg(f.avg("avg(rating)")).collect()[0][0] # answer

3.6572223377474016

In [163]:
ratings_df.agg(f.avg("rating")).show() # just want to compare

+-----------------+
|      avg(rating)|
+-----------------+
|3.501556983616962|
+-----------------+



# Stop spark session

In [12]:
spark.stop()