In [1]:
# Set the PySpark environment variables
# Note! Change the SPARK_HOME value to your real pyspark location
import os
os.environ['/Users/97252/anaconda3/Lib/site-packages/pyspark'] = "your_pyspark_home"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [2]:
# Import PySpark
from pyspark.sql import SparkSession

In [3]:
# Import PySpark
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
    .appName("PySpark-jupyter-demo") \
    .getOrCreate()

In [4]:
df_lines = spark.read.format("com.databricks.spark.csv").options(header='true') \
 .load("simpsons_script_lines.csv")
df_lines.createOrReplaceTempView("lines_table")

In [5]:
## print lines file schema
df_lines.printSchema()


root
 |-- id: string (nullable = true)
 |-- episode_id: string (nullable = true)
 |-- number: string (nullable = true)
 |-- raw_text: string (nullable = true)
 |-- timestamp_in_ms: string (nullable = true)
 |-- speaking_line: string (nullable = true)
 |-- character_id: string (nullable = true)
 |-- location_id: string (nullable = true)
 |-- raw_character_text: string (nullable = true)
 |-- raw_location_text: string (nullable = true)
 |-- spoken_words: string (nullable = true)
 |-- normalized_text: string (nullable = true)
 |-- word_count: string (nullable = true)



In [6]:
## print all fields for first 5 rows
spark.sql("SELECT * from lines_table").show(5)

+----+----------+------+--------------------+---------------+-------------+------------+-----------+--------------------+--------------------+--------------------+--------------------+----------+
|  id|episode_id|number|            raw_text|timestamp_in_ms|speaking_line|character_id|location_id|  raw_character_text|   raw_location_text|        spoken_words|     normalized_text|word_count|
+----+----------+------+--------------------+---------------+-------------+------------+-----------+--------------------+--------------------+--------------------+--------------------+----------+
|9549|        32|   209|Miss Hoover: No, ...|         848000|         true|         464|          3|         Miss Hoover|Springfield Eleme...|No, actually, it ...|no actually it wa...|        31|
|9550|        32|   210|Lisa Simpson: (NE...|         856000|         true|           9|          3|        Lisa Simpson|Springfield Eleme...|Where's Mr. Bergs...| wheres mr bergstrom|         3|
|9551|        32|   

In [7]:
## print the shortest 10 script lines:
spark.sql("SELECT int(word_count) AS wc, raw_text FROM lines_table WHERE int(word_count) > 0 ORDER BY wc").show(10, False)


+---+----------------------------+
|wc |raw_text                    |
+---+----------------------------+
|1  |Maude Flanders: Ohhh.       |
|1  |Eerie Voice: (SCARY) Oooooo!|
|1  |Lisa Simpson: Ooooh.        |
|1  |Eerie Voice: (SCARY) Oooooo!|
|1  |Kearney Zzyzwicz: Loser!    |
|1  |Immigration Official: Name! |
|1  |Moe Szyslak: Hello.         |
|1  |Bart Simpson: Ow.           |
|1  |Homer Simpson: Go! (LAUGHS) |
|1  |Carl Carlson: Yeah.         |
+---+----------------------------+
only showing top 10 rows



In [8]:
## count the scenes that took more than 10 minutes
spark.sql("SELECT count(*) FROM lines_table WHERE int(timestamp_in_ms) > (10 * 60 * 1000)").show()

+--------+
|count(1)|
+--------+
|   85671|
+--------+



In [9]:
df_lines = spark.read.format("com.databricks.spark.csv").options(header='true') \
 .load("simpsons_locations.csv")
df_lines.createOrReplaceTempView("locations_table")

In [16]:
## print the first 50 locations that has the word "Springfield" in them , ignoring letters case.
spark.sql(" SELECT id, name AS location FROM locations_table WHERE normalized_name LIKE '%springfield%' ").show(50, False)

+---+-----------------------------------------------+
|id |location                                       |
+---+-----------------------------------------------+
|3  |Springfield Elementary School                  |
|8  |Springfield Mall                               |
|10 |Springfield Nuclear Power Plant                |
|20 |Springfield Downs Dog Track                    |
|21 |SPRINGFIELD DOWNS                              |
|23 |SPRINGFIELD DOWN                               |
|24 |SPRINGFIELD DOWNS PARKING LOT                  |
|26 |Springfield Elementary School Playground       |
|48 |Springfield Town Hall                          |
|76 |Springfield Elementary School Hallway          |
|77 |Springfield Elementary School Cafeteria        |
|79 |Springfield Retirement Castle                  |
|80 |Grampa's Room at Springfield Retirement Castle |
|90 |Springfield Library                            |
|103|Springfield Elementary School Yard             |
|131|First Church of Springf

In [18]:
## print 20 quotes that are located in any place that has Jerusalem in its name.
## Note that Jerusalem may appear in any case and in any part of the location name.
## Use JOIN for this query on another table
spark.sql(""" SELECT l.raw_text AS quotes
                FROM lines_table l JOIN locations_table loc ON l.location_id = loc.id 
                WHERE loc.normalized_name LIKE '%jerusalem%'
                """).show(20)

+--------------------+
|              quotes|
+--------------------+
|(JERUSALEM ROYAL ...|
|Bart Simpson: I'm...|
|Krusty the Clown:...|
|Bart Simpson: Wha...|
|Krusty the Clown:...|
|Methuselah Grampa...|
|Bart Simpson: Met...|
|Methuselah Grampa...|
|Bart Simpson: (PU...|
|Methuselah Grampa...|
|Bart Simpson: (ME...|
|(Jerusalem: EXT. ...|
|Bart Simpson: (AN...|
|Santa's Little He...|
|Goliath/nelson: (...|
|Santa's Little He...|
|Bart Simpson: You...|
|"Crowd: (EXCITED ...|
|Goliath/nelson: Y...|
|"Crowd: (BIGGER, ...|
+--------------------+
only showing top 20 rows



In [21]:
## print first 20 most used locations with the count of lines spoken in them.
## use GROUP BY for that
spark.sql("""SELECT location_id, raw_location_text AS location, COUNT(*) AS lines 
             FROM lines_table 
             GROUP BY location_id, raw_location_text 
             ORDER BY lines DESC""").show(20,False) 

+-----------+--------------------+-----+
|location_id|            location|lines|
+-----------+--------------------+-----+
|          5|        Simpson Home|34568|
|          3|Springfield Eleme...| 6996|
|         15|        Moe's Tavern| 4567|
|         10|Springfield Nucle...| 3552|
|        136|         Kwik-E-Mart| 1452|
|        131|First Church of S...| 1397|
|         25| Simpson Living Room| 1354|
|        270|  Springfield Street| 1326|
|        216|         Springfield| 1299|
|        151|         Simpson Car| 1223|
|        188|       Flanders Home| 1151|
|          1|              Street| 1119|
|         48|Springfield Town ...| 1094|
|         79|Springfield Retir...| 1043|
|         53|         Burns Manor|  982|
|          8|    Springfield Mall|  827|
|        140|     Simpson Kitchen|  806|
|        194|           Courtroom|  800|
|         43|      Bart's Bedroom|  757|
|         84|    Bart's Treehouse|  755|
+-----------+--------------------+-----+
only showing top

In [13]:
df_lines = spark.read.format("com.databricks.spark.csv").options(header='true') \
 .load("simpsons_episodes.csv")
df_lines.createOrReplaceTempView("episodes_table")

In [14]:
## find the seasons in which the average imdb rating was the highest.
## Print the seasons number, the number of episodes in each one and the average rating
## in a descending order from highest average rating to lowest.
spark.sql(""" SELECT season, count(*) AS number_of_episodes, avg(imdb_rating) AS average_rating
                FROM episodes_table
                GROUP BY season
                ORDER BY average_rating DESC
                """).show(20, False)

+------+------------------+------------------+
|season|number_of_episodes|average_rating    |
+------+------------------+------------------+
|5     |22                |8.336363636363636 |
|7     |25                |8.324             |
|6     |25                |8.312             |
|4     |22                |8.268181818181818 |
|8     |25                |8.219999999999999 |
|3     |24                |8.154166666666665 |
|2     |22                |8.04090909090909  |
|9     |25                |7.8439999999999985|
|1     |13                |7.807692307692307 |
|10    |23                |7.569565217391306 |
|12    |21                |7.361904761904761 |
|11    |22                |7.290909090909093 |
|13    |22                |7.140909090909091 |
|14    |22                |7.077272727272727 |
|15    |22                |7.045454545454546 |
|16    |21                |7.042857142857143 |
|18    |22                |7.0               |
|19    |20                |6.935             |
|20    |21   

In [15]:
## which episodes have the title containing the word "Christmas"?

spark.sql("""
    SELECT title, season, number_in_season
    FROM episodes_table
    WHERE title LIKE '%Christmas%'
    ORDER BY season, number_in_season
""").show(truncate=False)


+-----------------------------+------+----------------+
|title                        |season|number_in_season|
+-----------------------------+------+----------------+
|Simpsons Christmas Stories   |17    |9               |
|The Fight Before Christmas   |22    |8               |
|White Christmas Blues        |25    |8               |
|I Won't Be Home for Christmas|26    |9               |
+-----------------------------+------+----------------+

