# Python Data Science

> Introduction to PySpark

Kuo, Yao-Jen from [DATAINPOINT](https://www.datainpoint.com/)

## TL; DR

> In this lecture, you'll learn how to use Spark from Python via PySpark.

## About Spark

## What is Spark?

> Spark(Apache Spark) is a platform for **cluster computing**, which is written in Scala programming language. Spark spreads data and computations over clusters with multiple nodes (think of each node as a separate computer). 

Source: <http://spark.apache.org/>

## Why Spark?

Splitting up data makes it easier to work with very large datasets because each node only works with a small amount of data. As each node works on its own subset of the total data. It is a fact that parallel computation can make certain types of programming tasks much faster.

## How does Spark work?

The cluster will be hosted on a remote machine. There will be one computer, called the **master** that manages splitting up the data and the computations. The master is connected to the rest of the computers in the cluster, which are called **worker**. The master sends the workers data and calculations to run, and they send their results back to the master.

## Different language to interact with Spark

- Scala.
- Java.
- Python.
- R.

## What is PySpark and why PySpark?

> PySpark is the Python API to support Spark, which makes Python users easier to utilize Spark without learning another programming language: Scala.

## The first step in using Spark in Python

Connecting to a cluster using `SparkContext` class.

In [1]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext()

In [2]:
# Verify SparkContext
print(type(sc))

# Print Spark version
print(sc.version)

<class 'pyspark.context.SparkContext'>
3.0.0


## Using Spark DataFrames on top of RDD

- Spark's core data structure is the Resilient Distributed Dataset (RDD).
- We'll be using the Spark DataFrame built on top of RDDs.

## The first step in using Spark DataFrames

- Create a `SparkSession`.
- Think of the `SparkContext` as a connection to the cluster and the `SparkSession` as an interface.

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
print(type(spark))

<class 'pyspark.sql.session.SparkSession'>


## Using `catalog.listTables` method of `SparkSession` to list all the data inside the cluster

In [4]:
# Print the tables in the catalog
tables_in_spark = spark.catalog.listTables()
print(len(tables_in_spark)) # An empty list indicates that there is no table inside the cluster
print(tables_in_spark)

0
[]


## Data manipulation with PySpark

## The four basic functions of persistent storage

- Create(C)
- Read(R)
- Update(U)
- Delete(D)

## We can create a Spark DataFrame via

- a list of tuples.
- a tabular external file.
- a `pandas` DataFrame.

## Create a Spark DataFrame via a list of tuples

In [5]:
columns = ["title", "release_year", "imdb_rating"]
rows = [
    ("The Shawshank Redemption", 1994, 9.2),
    ("The Dark Knight", 2008, 9.0),
    ("Schindler's List", 1993, 8.9),
    ("Forrest Gump", 1994, 8.8)
]
movies_from_list = spark.createDataFrame(rows).toDF(*columns)
movies_from_list.show()

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|The Shawshank Red...|        1994|        9.2|
|     The Dark Knight|        2008|        9.0|
|    Schindler's List|        1993|        8.9|
|        Forrest Gump|        1994|        8.8|
+--------------------+------------+-----------+



## Why is there still no table?

In [6]:
print(type(movies_from_list))
tables_in_spark = spark.catalog.listTables()
print(len(tables_in_spark))

<class 'pyspark.sql.dataframe.DataFrame'>
0


## We need to REGISTER the table with the cluster

In [7]:
movies_from_list.createOrReplaceTempView('movies_from_list')
tables_in_spark = spark.catalog.listTables()
print(len(tables_in_spark))
print(tables_in_spark)

1
[Table(name='movies_from_list', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


## Create a Spark DataFrame via a tabular external file

In [8]:
movies_from_csv = spark.read.csv('imdb_top_rated.csv', header=True, inferSchema=True)
movies_from_csv.show(5)

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|The Shawshank Red...|        1994|        9.2|
|       The Godfather|        1972|        9.1|
|The Godfather: Pa...|        1974|        9.0|
|     The Dark Knight|        2008|        9.0|
|        12 Angry Men|        1957|        8.9|
+--------------------+------------+-----------+
only showing top 5 rows



In [9]:
movies_from_csv.createOrReplaceTempView('movies_from_csv')
tables_in_spark = spark.catalog.listTables()
print(len(tables_in_spark))
print(tables_in_spark)

2
[Table(name='movies_from_csv', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='movies_from_list', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


## Create a Spark DataFrame via a `pandas` DataFrame

In [10]:
import pandas as pd

list_of_dicts = [
    {"title": "The Shawshank Redemption", "release_year": 1994, "imdb_rating": 9.2},
    {"title": "The Dark Knight", "release_year": 2008, "imdb_rating": 9.0},
    {"title": "Schindler's List", "release_year": 1993, "imdb_rating": 8.9},
    {"title": "Forrest Gump", "release_year": 1994, "imdb_rating": 8.8}
]
pandas_df = pd.DataFrame(list_of_dicts)
print(type(pandas_df))

<class 'pandas.core.frame.DataFrame'>


In [11]:
movies_from_pddf = spark.createDataFrame(pandas_df)
movies_from_pddf.show()

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|The Shawshank Red...|        1994|        9.2|
|     The Dark Knight|        2008|        9.0|
|    Schindler's List|        1993|        8.9|
|        Forrest Gump|        1994|        8.8|
+--------------------+------------+-----------+



## We can update a Spark DataFrame via

- inserting new records.
- modifying existed records.

In [12]:
movies_from_pddf.show()

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|The Shawshank Red...|        1994|        9.2|
|     The Dark Knight|        2008|        9.0|
|    Schindler's List|        1993|        8.9|
|        Forrest Gump|        1994|        8.8|
+--------------------+------------+-----------+



## We can update a Spark DataFrame via

- inserting new records.
- modifying existed records.

## Inserting new records with `union`

In [13]:
columns = ["title", "release_year", "imdb_rating"]
rows = [
    ("Inception", 2010, 8.7)
]
inception = spark.createDataFrame(rows).toDF(*columns)
movies_from_pddf = movies_from_pddf.union(inception)
movies_from_pddf.show()

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|The Shawshank Red...|        1994|        9.2|
|     The Dark Knight|        2008|        9.0|
|    Schindler's List|        1993|        8.9|
|        Forrest Gump|        1994|        8.8|
|           Inception|        2010|        8.7|
+--------------------+------------+-----------+



## Modifying existed records with `sql.functions`

In [14]:
from pyspark.sql import functions as funcs

movies_from_pddf = movies_from_pddf.withColumn("title", funcs.when(funcs.col("title")=='The Shawshank Redemption', '刺激1995').otherwise(funcs.col("title")))
movies_from_pddf.show()

+----------------+------------+-----------+
|           title|release_year|imdb_rating|
+----------------+------------+-----------+
|        刺激1995|        1994|        9.2|
| The Dark Knight|        2008|        9.0|
|Schindler's List|        1993|        8.9|
|    Forrest Gump|        1994|        8.8|
|       Inception|        2010|        8.7|
+----------------+------------+-----------+



## We can operate deletion on a Spark DataFrame via

- removing existed records.
- removing existed columns.
- dropping an entire Spark DataFrame from a cluster.

## Removing existed records with `where`

In [15]:
condition = """
title != '刺激1995'
"""
movies_from_pddf = movies_from_pddf.where(condition)
movies_from_pddf.show()

+----------------+------------+-----------+
|           title|release_year|imdb_rating|
+----------------+------------+-----------+
| The Dark Knight|        2008|        9.0|
|Schindler's List|        1993|        8.9|
|    Forrest Gump|        1994|        8.8|
|       Inception|        2010|        8.7|
+----------------+------------+-----------+



## Removing existed columns with `drop`

In [16]:
columns_to_drop = ['release_year', 'imdb_rating']
movies_from_pddf = movies_from_pddf.drop(*columns_to_drop)
movies_from_pddf.show()

+----------------+
|           title|
+----------------+
| The Dark Knight|
|Schindler's List|
|    Forrest Gump|
|       Inception|
+----------------+



## Dropping an entire Spark DataFrame

In [17]:
spark.catalog.dropTempView("movies_from_pddf")
tables_in_spark = spark.catalog.listTables()
print(len(tables_in_spark))
print(tables_in_spark)

2
[Table(name='movies_from_csv', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='movies_from_list', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


## Basic attributes or methods of a Spark `DataFrame`

## Check data types of each column via `dtypes`

In [18]:
movies_from_csv.dtypes

[('title', 'string'), ('release_year', 'int'), ('imdb_rating', 'double')]

## Find the shape `(m, n)` via

- `len(df.dtypes)` for number of columns.
- `count()` for number of rows.

In [19]:
m = movies_from_csv.count()
n = len(movies_from_csv.dtypes)
print("Shape: ({}, {})".format(m, n))

Shape: (250, 3)


## Find column names of a Spark DataFrame via `schema.names`

In [20]:
print(movies_from_csv.schema.names)

['title', 'release_year', 'imdb_rating']


## Show the first `n` observations via

- `show(n)`
- `head(n)`

In [21]:
movies_from_csv.show(5)

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|The Shawshank Red...|        1994|        9.2|
|       The Godfather|        1972|        9.1|
|The Godfather: Pa...|        1974|        9.0|
|     The Dark Knight|        2008|        9.0|
|        12 Angry Men|        1957|        8.9|
+--------------------+------------+-----------+
only showing top 5 rows



In [22]:
movies_from_csv.head(5)

[Row(title='The Shawshank Redemption', release_year=1994, imdb_rating=9.2),
 Row(title='The Godfather', release_year=1972, imdb_rating=9.1),
 Row(title='The Godfather: Part II', release_year=1974, imdb_rating=9.0),
 Row(title='The Dark Knight', release_year=2008, imdb_rating=9.0),
 Row(title='12 Angry Men', release_year=1957, imdb_rating=8.9)]

## Show the last `n` observations via `tail(n)`

In [23]:
movies_from_csv.tail(5)

[Row(title='The Battle of Algiers', release_year=1966, imdb_rating=8.0),
 Row(title='The Terminator', release_year=1984, imdb_rating=8.0),
 Row(title='Aladdin', release_year=1992, imdb_rating=8.0),
 Row(title='A Silent Voice: The Movie', release_year=2016, imdb_rating=8.0),
 Row(title='Tangerines', release_year=2013, imdb_rating=8.0)]

## Show the summary via `summary().show()`

In [24]:
movies_from_csv.summary().show()

+-------+------------+------------------+-------------------+
|summary|       title|      release_year|        imdb_rating|
+-------+------------+------------------+-------------------+
|  count|         250|               250|                250|
|   mean|      1917.0|          1986.968|  8.256399999999973|
| stddev|         NaN|25.019489912906025|0.23002322215576926|
|    min|12 Angry Men|              1921|                8.0|
|    25%|      1917.0|              1968|                8.1|
|    50%|      1917.0|              1994|                8.2|
|    75%|      1917.0|              2007|                8.4|
|    max|  Your Name.|              2020|                9.2|
+-------+------------+------------------+-------------------+



## Select a column from a Spark DataFrame via `select`

In [25]:
movies_from_csv.select("title").show(5)

+--------------------+
|               title|
+--------------------+
|The Shawshank Red...|
|       The Godfather|
|The Godfather: Pa...|
|     The Dark Knight|
|        12 Angry Men|
+--------------------+
only showing top 5 rows



## Select multiple columns from a Spark DataFrame via `select`

In [26]:
movies_from_csv.select("title", "imdb_rating").show(5)

+--------------------+-----------+
|               title|imdb_rating|
+--------------------+-----------+
|The Shawshank Red...|        9.2|
|       The Godfather|        9.1|
|The Godfather: Pa...|        9.0|
|     The Dark Knight|        9.0|
|        12 Angry Men|        8.9|
+--------------------+-----------+
only showing top 5 rows



## Select multiple columns from a Spark DataFrame with the same syntax as in pandas

In [27]:
movies_from_csv[["title", "imdb_rating"]].show(5)

+--------------------+-----------+
|               title|imdb_rating|
+--------------------+-----------+
|The Shawshank Red...|        9.2|
|       The Godfather|        9.1|
|The Godfather: Pa...|        9.0|
|     The Dark Knight|        9.0|
|        12 Angry Men|        8.9|
+--------------------+-----------+
only showing top 5 rows



## Filter row(s) from a Spark DataFrame via `filter`

In [28]:
movies_from_csv.filter(movies_from_csv.imdb_rating >= 9).show()

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|The Shawshank Red...|        1994|        9.2|
|       The Godfather|        1972|        9.1|
|The Godfather: Pa...|        1974|        9.0|
|     The Dark Knight|        2008|        9.0|
+--------------------+------------+-----------+



## Filter row(s) from a Spark DataFrame via `filter` with more than one condition

In [29]:
condition_1 = movies_from_csv.imdb_rating >= 9
condition_2 = movies_from_csv.release_year >= 2000
movies_from_csv.filter(condition_1 & condition_2).show()

+---------------+------------+-----------+
|          title|release_year|imdb_rating|
+---------------+------------+-----------+
|The Dark Knight|        2008|        9.0|
+---------------+------------+-----------+



## Filter row(s) from a Spark DataFrame with the same syntax as in pandas

In [30]:
condition_1 = movies_from_csv.imdb_rating >= 9
condition_2 = movies_from_csv.release_year >= 2000
movies_from_csv[condition_1 & condition_2].show()

+---------------+------------+-----------+
|          title|release_year|imdb_rating|
+---------------+------------+-----------+
|The Dark Knight|        2008|        9.0|
+---------------+------------+-----------+



## Sort a Spark DataFrame via `sort`

In [31]:
movies_from_csv.sort("release_year").show(5)

+-------------+------------+-----------+
|        title|release_year|imdb_rating|
+-------------+------------+-----------+
|      The Kid|        1921|        8.2|
| Sherlock Jr.|        1924|        8.1|
|The Gold Rush|        1925|        8.1|
|  The General|        1926|        8.1|
|   Metropolis|        1927|        8.2|
+-------------+------------+-----------+
only showing top 5 rows



## Sort a Spark DataFrame via `sort` with descending order

In [32]:
movies_from_csv.sort("release_year", ascending=False).show(5)

+-----------------+------------+-----------+
|            title|release_year|imdb_rating|
+-----------------+------------+-----------+
|         Hamilton|        2020|        8.5|
|            Joker|        2019|        8.4|
|             1917|        2019|        8.3|
|         Parasite|        2019|        8.5|
|Avengers: Endgame|        2019|        8.3|
+-----------------+------------+-----------+
only showing top 5 rows



## Create a new column with a Spark DataFrame via `withColumn`

In [33]:
movies_from_csv.withColumn("release_year_roc", movies_from_csv.release_year - 1911).show(5)

+--------------------+------------+-----------+----------------+
|               title|release_year|imdb_rating|release_year_roc|
+--------------------+------------+-----------+----------------+
|The Shawshank Red...|        1994|        9.2|              83|
|       The Godfather|        1972|        9.1|              61|
|The Godfather: Pa...|        1974|        9.0|              63|
|     The Dark Knight|        2008|        9.0|              97|
|        12 Angry Men|        1957|        8.9|              46|
+--------------------+------------+-----------+----------------+
only showing top 5 rows



## Summarizing Spark DataFrame via aggregate methods

In [34]:
movies_from_csv.groupBy().min('release_year').show()

+-----------------+
|min(release_year)|
+-----------------+
|             1921|
+-----------------+



In [35]:
movies_from_csv.groupBy().avg('imdb_rating').show()

+-----------------+
| avg(imdb_rating)|
+-----------------+
|8.256399999999973|
+-----------------+



## Summarizing and grouping Spark DataFrame with aggregate methods

In [36]:
movies_from_csv.groupBy('release_year').max('imdb_rating').sort('release_year', ascending=False).show(5)

+------------+----------------+
|release_year|max(imdb_rating)|
+------------+----------------+
|        2020|             8.5|
|        2019|             8.5|
|        2018|             8.4|
|        2017|             8.3|
|        2016|             8.3|
+------------+----------------+
only showing top 5 rows



## Concatenating Spark DataFrame vertically via `union`

In [37]:
columns = ["title", "release_year", "imdb_rating"]
rows = [
    ("The Shawshank Redemption", 1994, 9.2),
    ("The Dark Knight", 2008, 9.0)
]
first_two_movies = spark.createDataFrame(rows).toDF(*columns)
first_two_movies.show()

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|The Shawshank Red...|        1994|        9.2|
|     The Dark Knight|        2008|        9.0|
+--------------------+------------+-----------+



In [38]:
columns = ["title", "release_year", "imdb_rating"]
rows = [
    ("Schindler's List", 1993, 8.9),
    ("Forrest Gump", 1994, 8.8)
]
last_two_movies = spark.createDataFrame(rows).toDF(*columns)
last_two_movies.show()

+----------------+------------+-----------+
|           title|release_year|imdb_rating|
+----------------+------------+-----------+
|Schindler's List|        1993|        8.9|
|    Forrest Gump|        1994|        8.8|
+----------------+------------+-----------+



In [39]:
movies_union = first_two_movies.union(last_two_movies)
movies_union.show()

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|The Shawshank Red...|        1994|        9.2|
|     The Dark Knight|        2008|        9.0|
|    Schindler's List|        1993|        8.9|
|        Forrest Gump|        1994|        8.8|
+--------------------+------------+-----------+



## Concatenating Spark DataFrame horizontally via `join`

In [40]:
columns = ["title", "release_year"]
rows = [
    ("The Avengers", 2012),
    ("Avengers: Age of Ultron", 2015),
    ("Avengers: Infinity War", 2018)
]
first_three_avenger_movies = spark.createDataFrame(rows).toDF(*columns)
first_three_avenger_movies.show()

+--------------------+------------+
|               title|release_year|
+--------------------+------------+
|        The Avengers|        2012|
|Avengers: Age of ...|        2015|
|Avengers: Infinit...|        2018|
+--------------------+------------+



In [41]:
columns = ["title", "imdb_rating"]
rows = [
    ("Avengers: Age of Ultron", 7.3),
    ("Avengers: Infinity War", 8.4),
    ("Avengers: Endgame", 8.4)
]
last_three_avenger_movies = spark.createDataFrame(rows).toDF(*columns)
last_three_avenger_movies.show()

+--------------------+-----------+
|               title|imdb_rating|
+--------------------+-----------+
|Avengers: Age of ...|        7.3|
|Avengers: Infinit...|        8.4|
|   Avengers: Endgame|        8.4|
+--------------------+-----------+



## Join logic is set as `inner join` by default

In [42]:
first_three_avenger_movies.join(last_three_avenger_movies, on='title').show()

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|Avengers: Infinit...|        2018|        8.4|
|Avengers: Age of ...|        2015|        7.3|
+--------------------+------------+-----------+



## Performing different kinds of join via `how`

In [43]:
# left join
first_three_avenger_movies.join(last_three_avenger_movies, on='title', how='left').show()

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|        The Avengers|        2012|       null|
|Avengers: Infinit...|        2018|        8.4|
|Avengers: Age of ...|        2015|        7.3|
+--------------------+------------+-----------+



In [44]:
# right join
first_three_avenger_movies.join(last_three_avenger_movies, on='title', how='right').show()

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|Avengers: Infinit...|        2018|        8.4|
|Avengers: Age of ...|        2015|        7.3|
|   Avengers: Endgame|        null|        8.4|
+--------------------+------------+-----------+



In [45]:
# full join
first_three_avenger_movies.join(last_three_avenger_movies, on='title', how='full').show()

+--------------------+------------+-----------+
|               title|release_year|imdb_rating|
+--------------------+------------+-----------+
|        The Avengers|        2012|       null|
|Avengers: Infinit...|        2018|        8.4|
|Avengers: Age of ...|        2015|        7.3|
|   Avengers: Endgame|        null|        8.4|
+--------------------+------------+-----------+



## Feeling confused by using different syntax with PySpark?

![](https://media.giphy.com/media/lkdH8FmImcGoylv3t3/giphy.gif)

Source: <https://giphy.com>

## Actually, we can leverage SQL or pandas to interact with Spark DataFrames

- Sending SQL queries to Spark cluster.
- Pandafying a Spark DataFrame.

## Sending SQL queries via `sql`

In [46]:
print(spark.catalog.listTables())

[Table(name='movies_from_csv', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='movies_from_list', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [47]:
query = """
SELECT release_year,
       title,
       MAX(imdb_rating) AS max_imdb_rating
  FROM movies_from_csv
 GROUP BY release_year, title
 ORDER BY max_imdb_rating DESC
 LIMIT 10;
"""
spark.sql(query).show()

+------------+--------------------+---------------+
|release_year|               title|max_imdb_rating|
+------------+--------------------+---------------+
|        1994|The Shawshank Red...|            9.2|
|        1972|       The Godfather|            9.1|
|        2008|     The Dark Knight|            9.0|
|        1974|The Godfather: Pa...|            9.0|
|        2003|The Lord of the R...|            8.9|
|        1957|        12 Angry Men|            8.9|
|        1993|    Schindler's List|            8.9|
|        1966|The Good, the Bad...|            8.8|
|        1994|        Forrest Gump|            8.8|
|        1999|          Fight Club|            8.8|
+------------+--------------------+---------------+



## Pandafying a Spark DataFrame

In [48]:
print(type(movies_from_csv))
print(type(movies_from_csv.toPandas()))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


In [49]:
movies_pddf = movies_from_csv.toPandas()
movies_pddf.groupby(['release_year', 'title'])['imdb_rating'].max().sort_values(ascending=False)[:10]

release_year  title                                            
1994          The Shawshank Redemption                             9.2
1972          The Godfather                                        9.1
1974          The Godfather: Part II                               9.0
2008          The Dark Knight                                      9.0
1993          Schindler's List                                     8.9
1957          12 Angry Men                                         8.9
2003          The Lord of the Rings: The Return of the King        8.9
1994          Pulp Fiction                                         8.8
2001          The Lord of the Rings: The Fellowship of the Ring    8.8
1999          Fight Club                                           8.8
Name: imdb_rating, dtype: float64