# Part 1

In [1]:
import pyspark

In [2]:
from pyspark import SparkContext, SparkConf

In [3]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [5]:
conf = SparkConf().set("spark.executor.instances", "2").set("spark.executor.cores", "1").set("spark.executor.memory", "1g")

In [4]:
from pyspark.sql import SparkSession

In [9]:
spark = SparkSession.builder.master("yarn").appName("bondarenko_spark").getOrCreate()

23/12/10 19:10:14 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [10]:
spark

In [17]:
!hdfs dfs -put ../notebooks/ml-latest-small

In [20]:
%%time
ratings_df = spark.read.format("csv").option("header", "True").load("ml-latest-small/ratings.csv")

CPU times: user 1.25 ms, sys: 1.42 ms, total: 2.67 ms
Wall time: 205 ms


In [21]:
%%time
tags_df = spark.read.format("csv").option("header", "True").load("ml-latest-small/tags.csv")

CPU times: user 888 µs, sys: 1.01 ms, total: 1.9 ms
Wall time: 156 ms


In [22]:
ratings_df.count()

                                                                                

100836

In [27]:
tags_df.count()

3683

# Part 2

In [28]:
ratings_df.summary().show()

[Stage 18:>                                                         (0 + 1) / 1]

+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8|
|    min|                 1|               1|               0.5|          1000129365|
|    25%|             177.0|          1199.0|               3.0|       1.018535155E9|
|    50%|             325.0|          2991.0|               3.5|       1.186086516E9|
|    75%|             477.0|          8092.0|               4.0|       1.435993828E9|
|    max|                99|           99992|               5.0|           999873731|
+-------+------------------+----------------+---------

                                                                                

In [65]:
ratings_df = ratings_df.repartition(2)

In [78]:
# 1
print("count of unique users: ", ratings_df.dropDuplicates(["userId"]).count())

count of unique users:  610


In [79]:
# 1
print("count of unique movies: ", ratings_df.dropDuplicates(["movieId"]).count())

count of unique movies:  9724


In [36]:
import pyspark.sql.functions as f

In [76]:
# 2
print("count of '>=4.0' ratings: ", ratings_df.filter((f.col("rating") >= 4.0)).dropDuplicates(["movieId"]).count())

count of '>=4.0' ratings:  6298


In [77]:
# 3
print("top 100 movies: ")
ratings_df\
    .groupBy(f.col("movieId"))\
    .agg(f.avg(f.col("rating")).alias("avg_rating"))\
    .orderBy("avg_rating", ascending=False)\
    .show(100)

top 100 movies: 
+-------+----------+
|movieId|avg_rating|
+-------+----------+
|   3940|       5.0|
| 140627|       5.0|
|  82744|       5.0|
|    467|       5.0|
| 136341|       5.0|
| 149508|       5.0|
|  94810|       5.0|
|   6402|       5.0|
|   4180|       5.0|
| 170597|       5.0|
| 149350|       5.0|
| 136447|       5.0|
| 128087|       5.0|
| 151769|       5.0|
| 124404|       5.0|
|  73822|       5.0|
|  59814|       5.0|
|    626|       5.0|
|   2824|       5.0|
|  76091|       5.0|
|  31522|       5.0|
|  50999|       5.0|
| 142020|       5.0|
| 147326|       5.0|
| 109687|       5.0|
| 113829|       5.0|
|     53|       5.0|
| 112512|       5.0|
| 146662|       5.0|
|  90943|       5.0|
|  26928|       5.0|
| 134109|       5.0|
|  26169|       5.0|
|  95175|       5.0|
| 143511|       5.0|
|     99|       5.0|
|   3678|       5.0|
| 142444|       5.0|
| 136445|       5.0|
|   8580|       5.0|
|   3851|       5.0|
|   5723|       5.0|
| 172705|       5.0|
| 179133|       5

In [89]:
# 4
ratings_with_tags_df = ratings_df.alias("r")\
    .join(tags_df.alias("t"), on=["userId", "movieId"], how="inner")\
    .agg(f.avg(f.col("t.timestamp") - f.col("r.timestamp")).alias("avg_delta"))

ratings_with_tags_df.show()

+--------------------+
|           avg_delta|
+--------------------+
|2.6243727372266974E7|
+--------------------+



In [96]:
# 5
avg_rating = ratings_df\
    .groupBy(f.col("userId"))\
    .agg(f.avg(f.col("rating")).alias("avg_rating_user"))\
    .agg(f.avg(f.col("avg_rating_user")).alias("avg_rating"))

avg_rating.show()

+------------------+
|        avg_rating|
+------------------+
|3.6572223377474007|
+------------------+

