# Spark 

### Spark Motivation and Concepts

Spark offers you:
- <b>Lazy Computations</b>
    - Optimize the job before executing
- <b>In-memory data caching</b>
    - Scan HDD only once, then scan your RAM
- <b>Efficient pipelining</b>
    - Avoids the data hitting the HDD by all means

Two main abstractions of Spark:
- <b>RDD (Resilient Distributed Dataset)</b>:
    - Collection of data items split into partitions and stored in memory on worker nodes of the cluster
    - Is an interface for data transformation
    - Refers to the data stored either in persisted store (HDFS, Cassandra, HBase, etc.) or in cache (memory, memory+disks, disk only, etc.) or in another RDD
    - Partitions are recomputed on failure or cache eviction
    - Metadata stored for interface:
        - *Partitions* - set of data splits associated with this RDD
        - *Dependencies* - list of parent RDDs involved in computation 
        - *Compute* - function to compute partition of the RDD given the
parent partitions from the Dependencies
        - *Prefered locations* - where is the best place to put
computations on this partition (data locality)
        - *Partitioner* - how the data is split into partitions
    - Two classes of interfaces:
        - Transformations
        - Actions
- <b>DAG (Direct Acyclic Graph)</b>: sequence of computations performed on data
    - Node: RDD partition
    - Edge: transformation on top of data
    - Acyclic: graph cannot return to the older partition
    - Direct: transformation is an action that transitions
data partition state (from A to B)

In [1]:
from typing import Tuple
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
    collect_set,
    col,
    avg,
    count,
    split,
    explode,
    udf,
    min,
    first,
)
from pyspark.sql.types import IntegerType, ArrayType, StringType
from conf import catalog
import os
import time

In [5]:
spark = (
    SparkSession.builder.appName("Next Watch EDA")
    .master("local[3]")
    .config("spark.executor.memory", "3g")
    .getOrCreate()
)

23/05/20 10:21:24 WARN Utils: Your hostname, bruno resolves to a loopback address: 127.0.1.1; using 172.20.10.2 instead (on interface wlp0s20f3)
23/05/20 10:21:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/20 10:21:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
acc = spark.sparkContext.accumulator(1)
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
rdd.foreach(lambda x: acc.add(x))
print(acc.value)

16


[Stage 0:>                                                          (0 + 3) / 3]                                                                                

### 1. Load datasets

In [10]:
movies_path = catalog.get_external_dataset_path(catalog.Sources.MOVIELENS, catalog.Datasets.MOVIES)
ratings_path = catalog.get_external_dataset_path(catalog.Sources.MOVIELENS, catalog.Datasets.RATINGS)
links_path = catalog.get_external_dataset_path(catalog.Sources.MOVIELENS, catalog.Datasets.LINKS)
tags_path = catalog.get_external_dataset_path(catalog.Sources.MOVIELENS, catalog.Datasets.TAGS)

In [11]:
movies = spark.read.load(
    str(movies_path), format="csv", header=True, inferSchema=True
)
ratings = spark.read.load(
    str(ratings_path), format="csv", header=True, inferSchema=True
)
links = spark.read.load(
    str(links_path), format="csv", header=True, inferSchema=True
)
tags = spark.read.load(
    str(tags_path), format="csv", header=True, inferSchema=True
)

                                                                                

In [10]:
ratings.createOrReplaceTempView("ratings")
movies.createOrReplaceTempView("movies")

In [11]:
movies.show(1)

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
+-------+----------------+--------------------+
only showing top 1 row



In [12]:
ratings.show(1)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
+------+-------+------+----------+
only showing top 1 row



In [13]:
links.show(1)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
+-------+------+------+
only showing top 1 row



In [14]:
tags.show(1)

+------+-------+----+----------+
|userId|movieId| tag| timestamp|
+------+-------+----+----------+
|    14|    110|epic|1443148538|
+------+-------+----+----------+
only showing top 1 row



### 2. EDA: Exploratory Data Analysis

#### 2.1 Aggregate and list ratings

In [15]:
ratings.groupBy("movieId").agg(collect_set("rating")).show(5, False)

                                                                                

+-------+--------------------------------------------------+
|movieId|collect_set(rating)                               |
+-------+--------------------------------------------------+
|1      |[2.0, 3.5, 5.0, 1.0, 4.5, 2.5, 4.0, 0.5, 3.0, 1.5]|
|3      |[2.0, 3.5, 5.0, 1.0, 4.5, 2.5, 4.0, 0.5, 3.0, 1.5]|
|5      |[2.0, 3.5, 5.0, 1.0, 4.5, 2.5, 4.0, 0.5, 3.0, 1.5]|
|6      |[2.0, 3.5, 5.0, 1.0, 4.5, 2.5, 4.0, 0.5, 3.0, 1.5]|
|12     |[2.0, 3.5, 5.0, 1.0, 4.5, 2.5, 4.0, 0.5, 3.0, 1.5]|
+-------+--------------------------------------------------+
only showing top 5 rows



In [16]:
ratings.select("rating").distinct().rdd.sortBy(lambda r: r[0]).collect()

                                                                                

[Row(rating=0.5),
 Row(rating=1.0),
 Row(rating=1.5),
 Row(rating=2.0),
 Row(rating=2.5),
 Row(rating=3.0),
 Row(rating=3.5),
 Row(rating=4.0),
 Row(rating=4.5),
 Row(rating=5.0)]

#### 2.2 Find the most popular movies

In [17]:
ratings.groupBy("movieId").count().join(
    movies,
    ratings.movieId == movies.movieId,
    "left",  # LEFT JOIN because I want get even the movies without titles (if exist any)
).drop(movies.movieId).select(
    "movieId", "title", col("count").alias("n_of_ratings")
).orderBy(
    col("n_of_ratings").desc()
).show(
    truncate=False
)



+-------+------------------------------------------------------------------------------+------------+
|movieId|title                                                                         |n_of_ratings|
+-------+------------------------------------------------------------------------------+------------+
|318    |Shawshank Redemption, The (1994)                                              |97999       |
|356    |Forrest Gump (1994)                                                           |97040       |
|296    |Pulp Fiction (1994)                                                           |92406       |
|593    |Silence of the Lambs, The (1991)                                              |87899       |
|2571   |Matrix, The (1999)                                                            |84545       |
|260    |Star Wars: Episode IV - A New Hope (1977)                                     |81815       |
|480    |Jurassic Park (1993)                                                     

                                                                                

In [18]:
spark.sql(
    """
    SELECT 
        r.movieId,m.title,COUNT(r.movieId) AS n_of_ratings
    FROM
        ratings AS r
    LEFT JOIN 
        movies AS m
    WHERE
        r.movieId == m.movieId
    GROUP BY
        r.movieId,m.title
    ORDER BY
        n_of_ratings DESC
    """
).show(truncate=False)



+-------+------------------------------------------------------------------------------+------------+
|movieId|title                                                                         |n_of_ratings|
+-------+------------------------------------------------------------------------------+------------+
|318    |Shawshank Redemption, The (1994)                                              |97999       |
|356    |Forrest Gump (1994)                                                           |97040       |
|296    |Pulp Fiction (1994)                                                           |92406       |
|593    |Silence of the Lambs, The (1991)                                              |87899       |
|2571   |Matrix, The (1999)                                                            |84545       |
|260    |Star Wars: Episode IV - A New Hope (1977)                                     |81815       |
|480    |Jurassic Park (1993)                                                     

                                                                                

#### 2.3 Find the highest rating movies

In [19]:
ratings.groupBy("movieId").agg(
    avg("rating").alias("avg_rating"), count("rating").alias("n_of_ratings")
).join(movies, ratings.movieId == movies.movieId, "left").drop(movies.movieId).select(
    "movieId",
    "title",
    "avg_rating",
    "n_of_ratings",
).orderBy(
    col("avg_rating").desc(), col("n_of_ratings").desc()
).show(
    truncate=False
)



+-------+-------------------------------------------------------+----------+------------+
|movieId|title                                                  |avg_rating|n_of_ratings|
+-------+-------------------------------------------------------+----------+------------+
|152711 |Who Killed Chea Vichea? (2010)                         |5.0       |2           |
|164771 |Love Finds You in Valentine (2016)                     |5.0       |2           |
|171821 |3-D Sex and Zen: Extreme Ecstasy (2011)                |5.0       |2           |
|140369 |War Arrow (1954)                                       |5.0       |2           |
|164787 |You Cast A Spell On Me (2015)                          |5.0       |2           |
|193527 |The Enclosed Valley (1995)                             |5.0       |2           |
|139547 |Placebo: Soulmates Never Die: Live in Paris 2003 (2004)|5.0       |2           |
|137593 |Hooligan (1998)                                        |5.0       |2           |
|143422 |2



#### 2.4 Find the most active users

In [20]:
ratings.groupBy("userId").count().select(
    "userId", col("count").alias("n_of_ratings")
).orderBy(col("n_of_ratings").desc()).show(5)



+------+------------+
|userId|n_of_ratings|
+------+------------+
|123100|       23715|
|117490|        9279|
|134596|        8381|
|212343|        7884|
|242683|        7515|
+------+------------+
only showing top 5 rows



                                                                                

#### 2.5 Top rated movies by genre

In [21]:
@udf(returnType=ArrayType(StringType()))
def split_pipe(x):
    return x.split("|")

In [22]:
# Display genres
df = (
    movies.select(
        explode(split_pipe("genres")).alias("genres"),
    )
    .distinct()
    .show()
)

+------------------+
|            genres|
+------------------+
|             Crime|
|           Romance|
|          Thriller|
|         Adventure|
|             Drama|
|               War|
|       Documentary|
|           Fantasy|
|           Mystery|
|           Musical|
|         Animation|
|         Film-Noir|
|(no genres listed)|
|              IMAX|
|            Horror|
|           Western|
|            Comedy|
|          Children|
|            Action|
|            Sci-Fi|
+------------------+



[Stage 34:>                                                         (0 + 1) / 1]                                                                                

In [23]:
genre_to_compare = "Film-Noir"


def genre_top_rated_movies(udf_flag: bool = False) -> Tuple[DataFrame, float]:
    s = time.perf_counter()
    df = (
        movies.select(
            movies.colRegex("`^(?!genres).*$`"),  # do not select genres column
            explode(
                split(col("genres"), "\\|") if not udf_flag else split_pipe("genres")
            ).alias("genre"),
        )
        .filter(col("genre") == genre_to_compare)
        .join(ratings, movies.movieId == ratings.movieId, "inner")
        .drop(ratings.movieId)
        .groupBy("movieId", "title", "genre")
        .agg(avg("rating").alias("avg_rating"), count("rating").alias("n_of_ratings"))
        .coalesce(3)  # maybe we do not need many partitions...
        .orderBy(col("avg_rating").desc(), col("n_of_ratings").desc())
        .take(10)
    )
    t = time.perf_counter() - s
    return df, t


spark_take, spark_time = genre_top_rated_movies(udf_flag=False)
udf_take, udf_time = genre_top_rated_movies(udf_flag=True)

print(f"Time taken with an User Defined Function: {udf_time:.4f}s")
print(f"Time taken with native Spark functions: {spark_time:.4f}s")

udf_take



Time taken with an User Defined Function: 8.6007s
Time taken with native Spark functions: 9.0201s




[Row(movieId=1212, title='Third Man, The (1949)', genre='Film-Noir', avg_rating=4.20375939849624, n_of_ratings=7980),
 Row(movieId=3435, title='Double Indemnity (1944)', genre='Film-Noir', avg_rating=4.199261675824176, n_of_ratings=5824),
 Row(movieId=922, title='Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)', genre='Film-Noir', avg_rating=4.195425943852856, n_of_ratings=8264),
 Row(movieId=930, title='Notorious (1946)', genre='Film-Noir', avg_rating=4.163581345674618, n_of_ratings=5618),
 Row(movieId=1260, title='M (1931)', genre='Film-Noir', avg_rating=4.161491297468355, n_of_ratings=5056),
 Row(movieId=1284, title='Big Sleep, The (1946)', genre='Film-Noir', avg_rating=4.159891808346213, n_of_ratings=6470),
 Row(movieId=1252, title='Chinatown (1974)', genre='Film-Noir', avg_rating=4.1524313561098305, n_of_ratings=19084),
 Row(movieId=1248, title='Touch of Evil (1958)', genre='Film-Noir', avg_rating=4.15122820176261, n_of_ratings=5333),
 Row(movieId=913, title='Maltese Falcon, The (19

#### 2.6 Min ratings per user and min ratings per movie

In [24]:
ratings.groupBy("userId").count().agg(min("count").alias("min")).show()

ratings.groupBy("movieId").count().agg(min("count").alias("min")).show()

                                                                                

+---+
|min|
+---+
|  1|
+---+





+---+
|min|
+---+
|  1|
+---+



                                                                                

#### 2.7 Movies that were rated only by one user

In [25]:
n_1_rating_movies = ratings.groupBy("movieId").count().filter(col("count") == 1).count()
n_all_movies = ratings.select("movieId").distinct().count()

perc_1_rating = (n_1_rating_movies / n_all_movies) * 100

print(f"{perc_1_rating:.2f}% of the movies have only 1 rating")



18.84% of the movies have only 1 rating


                                                                                

#### 2.8 Total Number of users in the datasets

In [26]:
ratings.select("userID").distinct().count()

                                                                                

283228

#### 2.9 Total number of movies in the datasets

In [27]:
movies.select("movieID").distinct().count()

58098

#### 2.10 Movies that are not yet rated

In [28]:
movies.join(ratings, movies.movieId == ratings.movieId, "left_anti").select(
    "movieId"
).distinct().count()

# OR

movies.select("movieId").subtract(
    ratings.select("movieId")
).count()  # subtract is the same as EXCEPT DISTINCT in SQL...

                                                                                

4209

#### 2.11 Number of Movies for each category

In [29]:
movies.select("movieID", explode(split_pipe("genres")).alias("genres")).groupby(
    "genres"
).count().sort(col("count").desc()).show()

+------------------+-----+
|            genres|count|
+------------------+-----+
|             Drama|24144|
|            Comedy|15956|
|          Thriller| 8216|
|           Romance| 7412|
|            Action| 7130|
|            Horror| 5555|
|       Documentary| 5118|
|             Crime| 5105|
|(no genres listed)| 4266|
|         Adventure| 4067|
|            Sci-Fi| 3444|
|           Mystery| 2773|
|          Children| 2749|
|         Animation| 2663|
|           Fantasy| 2637|
|               War| 1820|
|           Western| 1378|
|           Musical| 1113|
|         Film-Noir|  364|
|              IMAX|  197|
+------------------+-----+

