# Getting Started with Spark

### Analyzing IMDB Data

Public data available at https://datasets.imdbws.com.

First, we setup the SparkSession.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.appName("IMDBData").getOrCreate()

# set log level to ERROR so we don't have many outputs
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/04 17:56:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Then, we define the schema for the tables. This improves Spark's performance A LOT!

In [2]:
schema_names = "nconst string, primaryName string, birthYear int, deathYear int, primaryProfession string, knownForTitles string"
schema_basics = """
tconst string, titleType string, primaryTitle string, originalTitle string, isAdult int, startYear int, endYear int,
runtimeMinutes double, genres string
"""
schema_crew = "tconst string, directors string, writers string"
schema_principals = "tconst string, ordering int, nconst string, category string, job string, characters string"
schema_ratings = "tconst string, averageRating double, numVotes int"

Reading the tables.

In [3]:
names = (
    spark
    .read
    .schema(schema_names)
    .options(header=True, delimiter="\t")
    .csv('data/imdb/names.tsv.gz')
)

basics = (
    spark
    .read
    .schema(schema_basics)
    .options(header=True, delimiter="\t")
    .csv('data/imdb/basics.tsv.gz')
)

crew = (
    spark
    .read
    .schema(schema_crew)
    .options(header=True, delimiter="\t")
    .csv('data/imdb/crew.tsv.gz')
)

principals = (
    spark
    .read
    .schema(schema_principals)
    .options(header=True, delimiter="\t")
    .csv('data/imdb/principals.tsv.gz')
)

ratings = (
    spark
    .read
    .schema(schema_ratings)
    .options(header=True, delimiter="\t")
    .csv('data/imdb/ratings.tsv.gz')
)

Now, we check if the schemas were correctly handled.

In [4]:
print("NAMES Schema")
names.printSchema()
print("BASICS Schema")
basics.printSchema()
print("CREW Schema")
crew.printSchema()
print("PRINCIPALS Schema")
principals.printSchema()
print("RATINGS Schema")
ratings.printSchema()

NAMES Schema
root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: integer (nullable = true)
 |-- deathYear: integer (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)

BASICS Schema
root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: integer (nullable = true)
 |-- startYear: integer (nullable = true)
 |-- endYear: integer (nullable = true)
 |-- runtimeMinutes: double (nullable = true)
 |-- genres: string (nullable = true)

CREW Schema
root
 |-- tconst: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- writers: string (nullable = true)

PRINCIPALS Schema
root
 |-- tconst: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: s

We take a look at the `names` table and filter for people with the name **Keanu** who are **actors**.

In [5]:
names.filter("primaryName LIKE 'Keanu%' AND primaryProfession LIKE '%actor%'").show()

[Stage 0:>                                                          (0 + 1) / 1]

+----------+---------------+---------+---------+--------------------+--------------------+
|    nconst|    primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+----------+---------------+---------+---------+--------------------+--------------------+
| nm0000206|   Keanu Reeves|     1964|     null|actor,producer,so...|tt0102685,tt02342...|
|nm10158822|    Keanu Anoai|     null|     null|               actor|           tt8174116|
|nm10263947|  Keanu Yamaoka|     null|     null|               actor|           tt9287504|
|nm10456419|    Keanu Parks|     null|     null|               actor|           tt3208026|
|nm10515123|     Keanu Blye|     null|     null|producer,actor,wr...|tt23133930,tt2240...|
|nm10670161|    Keanu Kulin|     null|     null|               actor|           tt9483838|
|nm10806151|   Keanu Peyran|     null|     null|               actor|tt14250484,tt1600...|
|nm10823300|          Keanu|     null|     null|               actor|                  \N|

                                                                                

We can see the column `knowForTitles` is an array. We should work on that if we want to join this table with the titles' `basics` table.

In [6]:
names = names.select(
    'nconst', 'primaryName', 'birthYear', 'deathYear', 
    f.explode(f.split('knownForTitles', ',')).alias('knownForTitles')
)

We need to do exactly the same with the `directors` columns of the `crew` table.

In [7]:
crew.filter("directors LIKE '%,%'").show()

+---------+--------------------+-------------------+
|   tconst|           directors|            writers|
+---------+--------------------+-------------------+
|tt0000007| nm0005690,nm0374658|                 \N|
|tt0000012| nm0525908,nm0525910|                 \N|
|tt0000017| nm1587194,nm0804434|                 \N|
|tt0000030| nm0010291,nm0666972|                 \N|
|tt0000089| nm0525908,nm0698645|                 \N|
|tt0000093| nm0525908,nm0525910|                 \N|
|tt0000247|nm2156608,nm00056...|nm0000636,nm0002504|
|tt0000287| nm0085865,nm0807236|                 \N|
|tt0000335| nm0095714,nm0675140|                 \N|
|tt0000380| nm0634629,nm0954087|          nm0674518|
|tt0000387| nm0617588,nm0881616|                 \N|
|tt0000399| nm2092030,nm0692105|                 \N|
|tt0000420| nm0832948,nm0378408|nm0140902,nm0378408|
|tt0000436| nm0095816,nm0666972|                 \N|
|tt0000447| nm2092030,nm0692105|          nm0692105|
|tt0000498| nm0280432,nm0378408|          nm15

In [8]:
crew = crew.select(
    'tconst', f.explode(f.split('directors', ',')).alias('directors'), 'writers'
)

Now, let's try to visualize all the movies with Keanu Reeves.

In [9]:
only_keanu = names.filter("primaryName = 'Keanu Reeves'")
only_keanu.show()

[Stage 2:>                                                          (0 + 1) / 1]

+---------+------------+---------+---------+--------------+
|   nconst| primaryName|birthYear|deathYear|knownForTitles|
+---------+------------+---------+---------+--------------+
|nm0000206|Keanu Reeves|     1964|     null|     tt0102685|
|nm0000206|Keanu Reeves|     1964|     null|     tt0234215|
|nm0000206|Keanu Reeves|     1964|     null|     tt0133093|
|nm0000206|Keanu Reeves|     1964|     null|     tt0111257|
+---------+------------+---------+---------+--------------+



                                                                                

In [10]:
keanus_movies = (
    basics.select('tconst', 'primaryTitle', 'startYear')
    .join(
        only_keanu.select('primaryName', 'knownForTitles'), 
        basics.tconst == names.knownForTitles, how='inner'
    )
)

In [11]:
keanus_movies.explain('formatted')

== Physical Plan ==
AdaptiveSparkPlan (11)
+- SortMergeJoin Inner (10)
   :- Sort (4)
   :  +- Exchange (3)
   :     +- Filter (2)
   :        +- Scan csv  (1)
   +- Sort (9)
      +- Exchange (8)
         +- Generate (7)
            +- Filter (6)
               +- Scan csv  (5)


(1) Scan csv 
Output [3]: [tconst#12, primaryTitle#14, startYear#17]
Batched: false
Location: InMemoryFileIndex [file:/Users/neylsoncrepalde/Bigdata-on-Kubernetes/Chapter 5/data/imdb/basics.tsv.gz]
PushedFilters: [IsNotNull(tconst)]
ReadSchema: struct<tconst:string,primaryTitle:string,startYear:int>

(2) Filter
Input [3]: [tconst#12, primaryTitle#14, startYear#17]
Condition : isnotnull(tconst#12)

(3) Exchange
Input [3]: [tconst#12, primaryTitle#14, startYear#17]
Arguments: hashpartitioning(tconst#12, 200), ENSURE_REQUIREMENTS, [plan_id=84]

(4) Sort
Input [3]: [tconst#12, primaryTitle#14, startYear#17]
Arguments: [tconst#12 ASC NULLS FIRST], false, 0

(5) Scan csv 
Output [2]: [primaryName#1, knownForTitles#

See that we made a Shuffle Merge Join. Now, let's try to minimize the processing time reducing the first table to only Keanu Reeves and trying a Broadcast Join.

In [12]:
keanus_movies2 = (
    basics.select(
        'tconst', 'primaryTitle', 'startYear'
    ).join(
        f.broadcast(only_keanu.select('primaryName', 'knownForTitles')), 
        basics.tconst == names.knownForTitles, how='inner'
    )
)

In [13]:
keanus_movies2.explain('formatted')

== Physical Plan ==
AdaptiveSparkPlan (8)
+- BroadcastHashJoin Inner BuildRight (7)
   :- Filter (2)
   :  +- Scan csv  (1)
   +- BroadcastExchange (6)
      +- Generate (5)
         +- Filter (4)
            +- Scan csv  (3)


(1) Scan csv 
Output [3]: [tconst#12, primaryTitle#14, startYear#17]
Batched: false
Location: InMemoryFileIndex [file:/Users/neylsoncrepalde/Bigdata-on-Kubernetes/Chapter 5/data/imdb/basics.tsv.gz]
PushedFilters: [IsNotNull(tconst)]
ReadSchema: struct<tconst:string,primaryTitle:string,startYear:int>

(2) Filter
Input [3]: [tconst#12, primaryTitle#14, startYear#17]
Condition : isnotnull(tconst#12)

(3) Scan csv 
Output [2]: [primaryName#1, knownForTitles#5]
Batched: false
Location: InMemoryFileIndex [file:/Users/neylsoncrepalde/Bigdata-on-Kubernetes/Chapter 5/data/imdb/names.tsv.gz]
PushedFilters: [IsNotNull(primaryName), EqualTo(primaryName,Keanu Reeves)]
ReadSchema: struct<primaryName:string,knownForTitles:string>

(4) Filter
Input [2]: [primaryName#1, knownFor

In [14]:
keanus_movies3 = (
    basics.select(
        'tconst', 'primaryTitle', 'startYear'
    ).join(
        only_keanu.select('primaryName', 'knownForTitles').hint("shuffle_hash"),
        basics.tconst == names.knownForTitles, how='inner'
    )
)

In [15]:
keanus_movies3.explain("formatted")

== Physical Plan ==
AdaptiveSparkPlan (9)
+- ShuffledHashJoin Inner BuildRight (8)
   :- Exchange (3)
   :  +- Filter (2)
   :     +- Scan csv  (1)
   +- Exchange (7)
      +- Generate (6)
         +- Filter (5)
            +- Scan csv  (4)


(1) Scan csv 
Output [3]: [tconst#12, primaryTitle#14, startYear#17]
Batched: false
Location: InMemoryFileIndex [file:/Users/neylsoncrepalde/Bigdata-on-Kubernetes/Chapter 5/data/imdb/basics.tsv.gz]
PushedFilters: [IsNotNull(tconst)]
ReadSchema: struct<tconst:string,primaryTitle:string,startYear:int>

(2) Filter
Input [3]: [tconst#12, primaryTitle#14, startYear#17]
Condition : isnotnull(tconst#12)

(3) Exchange
Input [3]: [tconst#12, primaryTitle#14, startYear#17]
Arguments: hashpartitioning(tconst#12, 200), ENSURE_REQUIREMENTS, [plan_id=144]

(4) Scan csv 
Output [2]: [primaryName#1, knownForTitles#5]
Batched: false
Location: InMemoryFileIndex [file:/Users/neylsoncrepalde/Bigdata-on-Kubernetes/Chapter 5/data/imdb/names.tsv.gz]
PushedFilters: [IsNo

In [16]:
keanus_movies.show()

                                                                                

+---------+-------------------+---------+------------+--------------+
|   tconst|       primaryTitle|startYear| primaryName|knownForTitles|
+---------+-------------------+---------+------------+--------------+
|tt0111257|              Speed|     1994|Keanu Reeves|     tt0111257|
|tt0234215|The Matrix Reloaded|     2003|Keanu Reeves|     tt0234215|
|tt0102685|        Point Break|     1991|Keanu Reeves|     tt0102685|
|tt0133093|         The Matrix|     1999|Keanu Reeves|     tt0133093|
+---------+-------------------+---------+------------+--------------+



In [17]:
keanus_movies2.show()

[Stage 15:>                                                         (0 + 1) / 1]

+---------+-------------------+---------+------------+--------------+
|   tconst|       primaryTitle|startYear| primaryName|knownForTitles|
+---------+-------------------+---------+------------+--------------+
|tt0102685|        Point Break|     1991|Keanu Reeves|     tt0102685|
|tt0111257|              Speed|     1994|Keanu Reeves|     tt0111257|
|tt0133093|         The Matrix|     1999|Keanu Reeves|     tt0133093|
|tt0234215|The Matrix Reloaded|     2003|Keanu Reeves|     tt0234215|
+---------+-------------------+---------+------------+--------------+



                                                                                

In [18]:
keanus_movies3.show()

                                                                                

+---------+-------------------+---------+------------+--------------+
|   tconst|       primaryTitle|startYear| primaryName|knownForTitles|
+---------+-------------------+---------+------------+--------------+
|tt0111257|              Speed|     1994|Keanu Reeves|     tt0111257|
|tt0234215|The Matrix Reloaded|     2003|Keanu Reeves|     tt0234215|
|tt0102685|        Point Break|     1991|Keanu Reeves|     tt0102685|
|tt0133093|         The Matrix|     1999|Keanu Reeves|     tt0133093|
+---------+-------------------+---------+------------+--------------+



In [19]:
basics.createOrReplaceTempView('basics')
names.createOrReplaceTempView('names')

keanus_movies4 = spark.sql("""
    SELECT
        b.primaryTitle,
        b.startYear,
        n.primaryName
    FROM basics b
    INNER JOIN names n
        ON b.tconst = n.knownForTitles
    WHERE n.primaryName = 'Keanu Reeves'
""")

In [20]:
keanus_movies4.show()

                                                                                

+-------------------+---------+------------+
|       primaryTitle|startYear| primaryName|
+-------------------+---------+------------+
|              Speed|     1994|Keanu Reeves|
|The Matrix Reloaded|     2003|Keanu Reeves|
|        Point Break|     1991|Keanu Reeves|
|         The Matrix|     1999|Keanu Reeves|
+-------------------+---------+------------+



## Tom Hanks and Meg Ryan's movies

One last question to answer:

Who were the directors, producers and writers of the movies Tom Hanks and Meg Ryan acted together and which of them has the highest rating?

In [21]:
(
    names
    .filter("primaryName in ('Tom Hanks', 'Meg Ryan')")
    .select('nconst', 'primaryName', 'knownForTitles')
    .show()
)

[Stage 38:>                                                         (0 + 1) / 1]

+----------+-----------+--------------+
|    nconst|primaryName|knownForTitles|
+----------+-----------+--------------+
| nm0000158|  Tom Hanks|     tt0094737|
| nm0000158|  Tom Hanks|     tt1535109|
| nm0000158|  Tom Hanks|     tt0162222|
| nm0000158|  Tom Hanks|     tt0109830|
| nm0000212|   Meg Ryan|     tt0120632|
| nm0000212|   Meg Ryan|     tt0128853|
| nm0000212|   Meg Ryan|     tt0098635|
| nm0000212|   Meg Ryan|     tt0108160|
|nm12744293|   Meg Ryan|    tt10918860|
|nm14023001|   Meg Ryan|            \N|
| nm7438089|   Meg Ryan|     tt4837202|
| nm9013931|   Meg Ryan|     tt6917076|
| nm9253135|   Meg Ryan|     tt7309462|
| nm9621674|   Meg Ryan|     tt7993310|
+----------+-----------+--------------+



                                                                                

The "Tom Hanks" and "Meg Ryan" we want are the `nm0000158` and `nm0000212` persons. Let's find the movies they did together.

In [22]:
# Get the movies they worked together
movies_together = (
    principals
    .filter("nconst in ('nm0000158', 'nm0000212')")
    .groupBy('tconst')
    .agg(f.count('nconst').alias('nactors'))
    .filter('nactors > 1')
)

In [23]:
movies_together.show()

[Stage 39:>                                                         (0 + 1) / 1]

+---------+-------+
|   tconst|nactors|
+---------+-------+
|tt2831414|      2|
|tt0128853|      2|
|tt0099892|      2|
|tt1185238|      2|
|tt0108160|      2|
|tt7875572|      2|
|tt0689545|      2|
+---------+-------+



                                                                                

Now, we find the information about those movies.

In [24]:
subjoin = (
    principals
    .join(movies_together.select('tconst'), on='tconst', how='inner')
    .join(names.select('nconst', 'primaryName'), 
          on='nconst', how='inner')
    .join(basics.select('tconst', 'primaryTitle', 'startYear'),
         on='tconst', how='inner')
    .dropDuplicates()
)

In [27]:
subjoin.show()

                                                                                

+---------+---------+--------+---------------+-----------------+--------------------+--------------------+------------------+---------+
|   tconst|   nconst|ordering|       category|              job|          characters|         primaryName|      primaryTitle|startYear|
+---------+---------+--------+---------------+-----------------+--------------------+--------------------+------------------+---------+
|tt0128853|nm0001427|       3|          actor|               \N|   ["Frank Navasky"]|        Greg Kinnear|   You've Got Mail|     1998|
|tt0128853|nm0511979|      10|cinematographer|               \N|                  \N|        John Lindley|   You've Got Mail|     1998|
|tt0128853|nm0000205|       4|        actress|               \N|   ["Patricia Eden"]|        Parker Posey|   You've Got Mail|     1998|
|tt0128853|nm0000212|       2|        actress|               \N|  ["Kathleen Kelly"]|            Meg Ryan|   You've Got Mail|     1998|
|tt0128853|nm0258286|       7|         writer|  

In [38]:
subjoin.cache()

DataFrame[tconst: string, nconst: string, ordering: int, category: string, job: string, characters: string, primaryName: string, primaryTitle: string, startYear: int]

In [37]:
(
    subjoin
    .select('primaryTitle', 'startYear')
    .dropDuplicates()
    .orderBy(f.col('startYear').desc())
    .show(truncate=False)
)

+-----------------------------------------+---------+
|primaryTitle                             |startYear|
+-----------------------------------------+---------+
|Everything Is Copy                       |2015     |
|Delivering 'You've Got Mail'             |2008     |
|You've Got Mail                          |1998     |
|Episode dated 10 December 1998           |1998     |
|Sleepless in Seattle                     |1993     |
|Joe Versus the Volcano                   |1990     |
|Joe Versus the Volcano: Behind the Scenes|1990     |
+-----------------------------------------+---------+



Let's check the directors, producers and writers of those movies:

In [39]:
(
    subjoin
    .filter("category in ('director', 'producer', 'writer')")
    .select('primaryTitle', 'startYear', 'primaryName', 'category')
    .show()
)

+--------------------+---------+--------------------+--------+
|        primaryTitle|startYear|         primaryName|category|
+--------------------+---------+--------------------+--------+
|  Everything Is Copy|     2015|     Jacob Bernstein|director|
|  Everything Is Copy|     2015|          Carly Hugo|producer|
|  Everything Is Copy|     2015|      Matthew Parker|producer|
|  Everything Is Copy|     2015|         Nick Hooker|director|
|     You've Got Mail|     1998|        Delia Ephron|  writer|
|     You've Got Mail|     1998|       Miklós László|  writer|
|     You've Got Mail|     1998|Lauren Shuler Donner|producer|
|     You've Got Mail|     1998|         Nora Ephron|director|
|Joe Versus the Vo...|     1990|       Teri Schwartz|producer|
|Joe Versus the Vo...|     1990|John Patrick Shanley|director|
|Delivering 'You'v...|     2008|  Christina Hacopian|producer|
|Sleepless in Seattle|     1993|           Jeff Arch|  writer|
|Sleepless in Seattle|     1993|         Gary Foster|pr

Now, let's check which of them has the highest ratings.

In [43]:
(
    subjoin.select('tconst', 'primaryTitle')
    .dropDuplicates()
    .join(ratings, on='tconst', how='inner')
    .orderBy(f.col('averageRating').desc())
    .show()
)

[Stage 377:>                                                        (0 + 1) / 1]

+---------+--------------------+-------------+--------+
|   tconst|        primaryTitle|averageRating|numVotes|
+---------+--------------------+-------------+--------+
|tt7875572|Joe Versus the Vo...|          7.8|      12|
|tt2831414|  Everything Is Copy|          7.4|    1123|
|tt1185238|Delivering 'You'v...|          7.0|      17|
|tt0108160|Sleepless in Seattle|          6.8|  188925|
|tt0128853|     You've Got Mail|          6.7|  227513|
|tt0099892|Joe Versus the Vo...|          5.9|   39532|
|tt0689545|Episode dated 10 ...|          3.8|      11|
+---------+--------------------+-------------+--------+



                                                                                

### Now, with Spark SQL

This code is not in the book. You should try doing it by yourself first. If you have doubts, check the code below.