# Week 6 - SMM695

Matteo Devigili

June, 24th 2020

[_PySpark_](https://spark.apache.org/docs/latest/api/python/index.html#): during this lecture, we will approach Spark through Python

<img src="images/_1.png" width="20%">

**Agenda**:
1. Introduction to Spark
1. Installing PySpark
1. PySpark Basics
1. PySpark and Pandas
1. PySpark and SQL
1. Load data from your DBMS

# Introduction to Spark

**Big Data Challenge**:

* Cost of storing data has dropped
* The need for parallel computation has increased

![IBM Blue Gene\L](https://www.ibm.com/ibm/history/ibm100/images/icp/U225116Q82800V30/us__en_us__ibm100__blue_gene__man_next_blue_gene__620x350.jpg)
**Note**: [IBM Blue Gen\L](https://www.ibm.com/ibm/history/ibm100/us/en/icons/bluegene/)

**What is [Apache Spark](https://spark.apache.org)**?

> "Apache Spark is a unified computing engine and a set of libraries for parallel data processing on computer clusters"

[Chambers and Zaharia 2018](#references)

**Programming Languages Supported**:
<img src="images/_0.png" width="50%">

**Spark's philosophy**:

* *Unified*: Spark offers a large variety of data analytics tools
* *Computing Engine*: Spark focuses on computing, not on storage
* *Libraries*: Spark has different libraries to perform several tasks

**Apache Spark Libraries**:

* *Spark SQL*
* *Spark Streaming*
* *Spark MLlib*
* *Spark GraphX*

[Third-party projects](https://spark.apache.org/third-party-projects.html)

**Spark Application**:

| Component ||Role |
|----|----|---|
| *Spark Driver*| | Execute user-defined tasks |
| *Cluster Manager* | | Manage workers nodes|
| *Executors* | | Execute tasks |

<img src="images/_5.png" width=80%>

**From Python to Spark code and back**:

![The relationship between the SparkSession and Spark’s Language API
](https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/assets/spdg_0202.png)

Source: _Bill Chambers, Matei Zaharia 2018_ (p. 23)

# Installing PySpark

There are several ways to set-up PySpark on your local machine. Here, two methods are discussed:
* Pure-python users: 
```python
pip install pyspark
```
* Conda users:
```python
conda install pyspark
```
Further info at [Spark Download page](https://spark.apache.org/downloads.html).

## Requirements

Pay attention to the following:

>Spark runs on Java 8

Check java version running on your machine. Type the following on your terminal:
```python
java -version
```

If you are running a different Java version, install java 8! Check out [Spark Downloading info](https://spark.apache.org/docs/latest/#downloading).

# PySpark - Basics

## Libraries

In [1]:
#to create a spark session object
from pyspark.sql import SparkSession

# functions
import pyspark.sql.functions as F

# data types
from pyspark.sql.types import *

# import datetime 
from datetime import date as dt

* More info on **Functions** at these [linkOne](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) & [linkTwo](https://spark.apache.org/docs/2.3.0/api/sql/index.html#year)
* More info on **Data Types** at this [link](https://spark.apache.org/docs/latest/sql-reference.html#tab_python_0)

## Opening a Session

The **SparkSession** is a driver process that enables:

* to control our Spark Application
* to execute user-defined manipulations

Check this [link](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession) for further reference.

In [2]:
# to open a Session
spark = SparkSession.builder.appName('last_dance').getOrCreate()

**Spark UI**

<img src="images/_6.png" width=60%>

The spark UI is useful to monitor your application. You have the following tabs:

* *Jobs*: info concerning Spark jobs
* *Stages*: info on individual stages and their tasks
* *Storage*: info on data that is currently in our spark application
* *Environment*: info on configurations and current settings of our application
* *Executors*: info on the executors that run our application
* *SQL*: refers to both SQL and DataFrames

In [3]:
spark

## Create Dataframe

In order to create a dataframe from scratch, we need to:
1. Create a schema, passing:
  * Column names
  * Data types
1. Pass values as an array of tuples

In [4]:
# Here, I define a schema
# .add(field, data_type=None, nullable=True, metadata=None)

schema = StructType().add("id", "integer", True).add("first_name", "string", True).add(
    "last_name", "string", True).add("dob", "date", True)

'''
schema = StructType().add("id", IntegerType(), True).add("first_name", StringType(), True).add(
    "last_name", StringType(), True).add("dob", DateType(), True)
'''

# Then, I can pass some values
df = spark.createDataFrame([(1, 'Michael', "Jordan", dt(1963, 2, 17)),
                            (2, 'Scottie', "Pippen", dt(1965, 9, 25)),
                            (3, 'Dennis', "Rodman", dt(1961, 5, 16))],
                           schema=schema)

# Let's explore Schema structure
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: date (nullable = true)



In [5]:
# We can also leverage on functions to create a new column
df=df.withColumn('age', F.year(F.current_date()) - F.year(df.dob))

df.show()

+---+----------+---------+----------+---+
| id|first_name|last_name|       dob|age|
+---+----------+---------+----------+---+
|  1|   Michael|   Jordan|1963-02-17| 57|
|  2|   Scottie|   Pippen|1965-09-25| 55|
|  3|    Dennis|   Rodman|1961-05-16| 59|
+---+----------+---------+----------+---+



**Transformations**

* Immutability: once created, data structures can not be changed
* Lazy evaluation: computational instructions will be executed at the very last

**Actions**

* view data
* collect data
* write to output data sources

# PySpark and Pandas

## Load a csv

Loading a csv file from you computer, you need to type:
* Pands:
  * db = pd.read_csv('path/to/movies.csv')
* Pyspark:
  * df = spark.read.csv('path/to/movies.csv', header=True, inferSchema=True)

Here, we will import a csv directly from GitHub. Data are provided by [FiveThirtyEight](https://github.com/fivethirtyeight)
[<img src="images/_2.png" width="50%">](https://fivethirtyeight.com/features/the-dollar-and-cents-case-against-hollywoods-exclusion-of-women/)

In [6]:
# import pandas
import pandas as pd

# import SparkFiles
from pyspark import SparkFiles

# target dataset
url = 'https://raw.githubusercontent.com/fivethirtyeight/data/master/bechdel/movies.csv'

In [7]:
# loading data with pandas
db = pd.read_csv(url)

# loading data with pyspark
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get('movies.csv'), header=True, inferSchema=True)

## Inspecting dataframes

In [8]:
# pandas info
db.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1794 entries, 0 to 1793
Data columns (total 15 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   year            1794 non-null   int64  
 1   imdb            1794 non-null   object 
 2   title           1794 non-null   object 
 3   test            1794 non-null   object 
 4   clean_test      1794 non-null   object 
 5   binary          1794 non-null   object 
 6   budget          1794 non-null   int64  
 7   domgross        1777 non-null   float64
 8   intgross        1783 non-null   float64
 9   code            1794 non-null   object 
 10  budget_2013$    1794 non-null   int64  
 11  domgross_2013$  1776 non-null   float64
 12  intgross_2013$  1783 non-null   float64
 13  period code     1615 non-null   float64
 14  decade code     1615 non-null   float64
dtypes: float64(6), int64(3), object(6)
memory usage: 210.4+ KB


In [9]:
# pyspark schema
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- imdb: string (nullable = true)
 |-- title: string (nullable = true)
 |-- test: string (nullable = true)
 |-- clean_test: string (nullable = true)
 |-- binary: string (nullable = true)
 |-- budget: integer (nullable = true)
 |-- domgross: string (nullable = true)
 |-- intgross: string (nullable = true)
 |-- code: string (nullable = true)
 |-- budget_2013$: integer (nullable = true)
 |-- domgross_2013$: string (nullable = true)
 |-- intgross_2013$: string (nullable = true)
 |-- period code: integer (nullable = true)
 |-- decade code: integer (nullable = true)



In [10]:
# pandas fetch 5
db.head(5)

Unnamed: 0,year,imdb,title,test,clean_test,binary,budget,domgross,intgross,code,budget_2013$,domgross_2013$,intgross_2013$,period code,decade code
0,2013,tt1711425,21 &amp; Over,notalk,notalk,FAIL,13000000,25682380.0,42195766.0,2013FAIL,13000000,25682380.0,42195766.0,1.0,1.0
1,2012,tt1343727,Dredd 3D,ok-disagree,ok,PASS,45000000,13414714.0,40868994.0,2012PASS,45658735,13611086.0,41467257.0,1.0,1.0
2,2013,tt2024544,12 Years a Slave,notalk-disagree,notalk,FAIL,20000000,53107035.0,158607035.0,2013FAIL,20000000,53107035.0,158607035.0,1.0,1.0
3,2013,tt1272878,2 Guns,notalk,notalk,FAIL,61000000,75612460.0,132493015.0,2013FAIL,61000000,75612460.0,132493015.0,1.0,1.0
4,2013,tt0453562,42,men,men,FAIL,40000000,95020213.0,95020213.0,2013FAIL,40000000,95020213.0,95020213.0,1.0,1.0


In [11]:
# pyspark fetch 5
df.show(5)

df.take(5)

+----+---------+----------------+---------------+----------+------+--------+--------+---------+--------+------------+--------------+--------------+-----------+-----------+
|year|     imdb|           title|           test|clean_test|binary|  budget|domgross| intgross|    code|budget_2013$|domgross_2013$|intgross_2013$|period code|decade code|
+----+---------+----------------+---------------+----------+------+--------+--------+---------+--------+------------+--------------+--------------+-----------+-----------+
|2013|tt1711425|   21 &amp; Over|         notalk|    notalk|  FAIL|13000000|25682380| 42195766|2013FAIL|    13000000|      25682380|      42195766|          1|          1|
|2012|tt1343727|        Dredd 3D|    ok-disagree|        ok|  PASS|45000000|13414714| 40868994|2012PASS|    45658735|      13611086|      41467257|          1|          1|
|2013|tt2024544|12 Years a Slave|notalk-disagree|    notalk|  FAIL|20000000|53107035|158607035|2013FAIL|    20000000|      53107035|     158

[Row(year=2013, imdb='tt1711425', title='21 &amp; Over', test='notalk', clean_test='notalk', binary='FAIL', budget=13000000, domgross='25682380', intgross='42195766', code='2013FAIL', budget_2013$=13000000, domgross_2013$='25682380', intgross_2013$='42195766', period code=1, decade code=1),
 Row(year=2012, imdb='tt1343727', title='Dredd 3D', test='ok-disagree', clean_test='ok', binary='PASS', budget=45000000, domgross='13414714', intgross='40868994', code='2012PASS', budget_2013$=45658735, domgross_2013$='13611086', intgross_2013$='41467257', period code=1, decade code=1),
 Row(year=2013, imdb='tt2024544', title='12 Years a Slave', test='notalk-disagree', clean_test='notalk', binary='FAIL', budget=20000000, domgross='53107035', intgross='158607035', code='2013FAIL', budget_2013$=20000000, domgross_2013$='53107035', intgross_2013$='158607035', period code=1, decade code=1),
 Row(year=2013, imdb='tt1272878', title='2 Guns', test='notalk', clean_test='notalk', binary='FAIL', budget=610000

In [12]:
# pandas filtering:
db[db.year == 1970]

Unnamed: 0,year,imdb,title,test,clean_test,binary,budget,domgross,intgross,code,budget_2013$,domgross_2013$,intgross_2013$,period code,decade code
1793,1970,tt0065466,Beyond the Valley of the Dolls,ok,ok,PASS,1000000,9000000.0,9000000.0,1970PASS,5997631,53978683.0,53978683.0,,


In [13]:
# pyspark filtering:
df[df.year == 1970].show()

+----+---------+--------------------+----+----------+------+-------+--------+--------+--------+------------+--------------+--------------+-----------+-----------+
|year|     imdb|               title|test|clean_test|binary| budget|domgross|intgross|    code|budget_2013$|domgross_2013$|intgross_2013$|period code|decade code|
+----+---------+--------------------+----+----------+------+-------+--------+--------+--------+------------+--------------+--------------+-----------+-----------+
|1970|tt0065466|Beyond the Valley...|  ok|        ok|  PASS|1000000| 9000000| 9000000|1970PASS|     5997631|      53978683|      53978683|       null|       null|
+----+---------+--------------------+----+----------+------+-------+--------+--------+--------+------------+--------------+--------------+-----------+-----------+



In [14]:
# get columns and data types
print("""
Pandas db.columns:
===================
{}

PySpark df.columns:
===================
{}

Pandas db.dtype:
===================
{}

PySpark df.dtypes:
===================
{}

""".format(db.columns, df.columns, db.dtypes, df.dtypes), flush = True)


Pandas db.columns:
Index(['year', 'imdb', 'title', 'test', 'clean_test', 'binary', 'budget',
       'domgross', 'intgross', 'code', 'budget_2013$', 'domgross_2013$',
       'intgross_2013$', 'period code', 'decade code'],
      dtype='object')

PySpark df.columns:
['year', 'imdb', 'title', 'test', 'clean_test', 'binary', 'budget', 'domgross', 'intgross', 'code', 'budget_2013$', 'domgross_2013$', 'intgross_2013$', 'period code', 'decade code']

Pandas db.dtype:
year                int64
imdb               object
title              object
test               object
clean_test         object
binary             object
budget              int64
domgross          float64
intgross          float64
code               object
budget_2013$        int64
domgross_2013$    float64
intgross_2013$    float64
period code       float64
decade code       float64
dtype: object

PySpark df.dtypes:
[('year', 'int'), ('imdb', 'string'), ('title', 'string'), ('test', 'string'), ('clean_test', 'string'), ('bin

## Columns

In [15]:
# pandas add a column
db['newcol'] = db.domgross/db.intgross

# pyspark add a column
df=df.withColumn('newcol', df.domgross/df.intgross)

In [16]:
# pandas rename columns
db.rename(columns={'newcol': 'dgs/igs'}, inplace=True)

# pyspark rename columns
df=df.withColumnRenamed('newcol', 'dgs/igs')

## Drop

In [17]:
# pandas drop `code' column
db.drop('code', axis=1, inplace=True)

# pyspark drop `code' column
df=df.drop('code')

In [18]:
# pandas dropna()
db.dropna(subset=['domgross'], inplace=True)

# pyspark dropna()
df=df.dropna(subset='domgross')

## Stats

In [19]:
# pandas describe
db.describe()

Unnamed: 0,year,budget,domgross,intgross,budget_2013$,domgross_2013$,intgross_2013$,period code,decade code,dgs/igs
count,1777.0,1777.0,1777.0,1777.0,1777.0,1776.0,1777.0,1601.0,1601.0,1777.0
mean,2002.541925,45159120.0,69132050.0,150850200.0,55869370.0,95174780.0,198457500.0,2.425984,1.939413,0.591354
std,8.956449,48286770.0,80367310.0,210537100.0,55010930.0,125965300.0,283784600.0,1.197257,0.691521,0.260125
min,1970.0,7000.0,0.0,828.0,8632.0,899.0,899.0,1.0,1.0,0.0
25%,1998.0,12000000.0,16311570.0,26341900.0,16234220.0,20546590.0,33703830.0,1.0,1.0,0.400251
50%,2005.0,30000000.0,42194060.0,76954310.0,37157440.0,55993640.0,96846560.0,2.0,2.0,0.540907
75%,2009.0,60000000.0,93354920.0,190400000.0,78942730.0,121678400.0,241919400.0,3.0,2.0,0.789224
max,2013.0,425000000.0,760507600.0,2783919000.0,461435900.0,1771683000.0,3171931000.0,5.0,3.0,1.0


In [20]:
# pyspark describe
df.describe(['year', 'budget']).show()

+-------+-----------------+--------------------+
|summary|             year|              budget|
+-------+-----------------+--------------------+
|  count|             1794|                1794|
|   mean|2002.552396878484|4.4826462614269786E7|
| stddev|8.979730993075055| 4.818602611895356E7|
|    min|             1970|                7000|
|    max|             2013|           425000000|
+-------+-----------------+--------------------+



# Pyspark and SQL

In [8]:
# pyspark rename 'budget_2013$'
df=df.withColumnRenamed('budget_2013$', 'budget_2013')

In [9]:
# Create a temporary table 
df.createOrReplaceTempView('bechdel')

# Run a simple SQL command
sql = spark.sql("""SELECT imdb, year, title, budget FROM bechdel LIMIT(5)""")
sql.show()

+---------+----+----------------+--------+
|     imdb|year|           title|  budget|
+---------+----+----------------+--------+
|tt1711425|2013|   21 &amp; Over|13000000|
|tt1343727|2012|        Dredd 3D|45000000|
|tt2024544|2013|12 Years a Slave|20000000|
|tt1272878|2013|          2 Guns|61000000|
|tt0453562|2013|              42|40000000|
+---------+----+----------------+--------+



In [10]:
# AVG budget differences
sql_avg = spark.sql(
    """
    SELECT 
    binary, 
    COUNT(*) AS count, 
    format_number(AVG(budget),2) AS avg_budget, 
    format_number((SELECT AVG(budget) FROM bechdel),2) AS avg_budget_samp,
    format_number(AVG(budget_2013),2) AS avg_budget2013,
    format_number((SELECT AVG(budget_2013) FROM bechdel),2) AS avg_budget2013_samp
    FROM bechdel GROUP BY binary
    """
)

sql_avg.show()

+------+-----+-------------+---------------+--------------+-------------------+
|binary|count|   avg_budget|avg_budget_samp|avg_budget2013|avg_budget2013_samp|
+------+-----+-------------+---------------+--------------+-------------------+
|  FAIL|  991|50,415,289.27|  44,826,462.61| 62,911,555.33|      55,464,608.45|
|  PASS|  803|37,929,168.45|  44,826,462.61| 46,274,167.16|      55,464,608.45|
+------+-----+-------------+---------------+--------------+-------------------+



# Load data from DBMS

To run the following you need to restart the notebook.

In [1]:
# to create a spark session object
from pyspark.sql import SparkSession

## PostgreSQL

To interact with postgre you need to:
    
* Download the *postgresql-42.2.14.jar file* [here](https://jdbc.postgresql.org/download.html)
* Include the path to the downloaded jar file into SparkSession()

In [2]:
# Open a session running data from PostgreSQL
spark_postgre = SparkSession \
    .builder \
    .appName("last_dance_postgre") \
    .config("spark.jars", "/Users/matteo/py3_venvs/smm695/share/py4j/postgresql-42.2.14.jar") \
    .getOrCreate()

In [3]:
spark_postgre

In [4]:
# Read data from PostgreSQL running at localhost
df = spark_postgre.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5433/pagila") \
    .option("dbtable", "film") \
    .option("user", "postgres") \
    .option("password", "smm695") \
    .option("driver", "org.postgresql.Driver") \
    .load()

df.printSchema()

root
 |-- film_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- language_id: short (nullable = true)
 |-- original_language_id: short (nullable = true)
 |-- rental_duration: short (nullable = true)
 |-- rental_rate: decimal(4,2) (nullable = true)
 |-- length: short (nullable = true)
 |-- replacement_cost: decimal(5,2) (nullable = true)
 |-- rating: string (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- special_features: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fulltext: string (nullable = true)



In [5]:
# get some stats
df.describe(['release_year', 'rental_rate', 'rental_duration']).show()

+-------+------------+-----------------+------------------+
|summary|release_year|      rental_rate|   rental_duration|
+-------+------------+-----------------+------------------+
|  count|        1000|             1000|              1000|
|   mean|      2006.0|         2.980000|             4.985|
| stddev|         0.0|1.646393212635005|1.4116542663725307|
|    min|        2006|             0.99|                 3|
|    max|        2006|             4.99|                 7|
+-------+------------+-----------------+------------------+



In [6]:
# Create a temporary table 
df.createOrReplaceTempView('film')

# Run a simple SQL command
sql = spark_postgre.sql("""SELECT title, release_year, length, rating FROM film LIMIT(1)""")
sql.show()

+----------------+------------+------+------+
|           title|release_year|length|rating|
+----------------+------------+------+------+
|ACADEMY DINOSAUR|        2006|    86|    PG|
+----------------+------------+------+------+



## MongoDB

For further reference check the [Python Guide provided by Mongo](https://docs.mongodb.com/spark-connector/current/python-api/) or the [website for the mongo-spark connector](https://spark-packages.org/package/mongodb/mongo-spark).

In [2]:
# add path to Mongo
spark_mongo = SparkSession \
    .builder \
    .appName("last_dance_mongo") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/amazon.music") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/amazon.music") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.4.1') \
    .getOrCreate()

In [3]:
spark_mongo

In [4]:
# load data from MongoDB
df = spark_mongo.read.format("mongo").load()

df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- overall: integer (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: integer (nullable = true)



In [5]:
# get some stats
df.describe(['overall', 'unixReviewTime']).show()

+-------+------------------+--------------------+
|summary|           overall|      unixReviewTime|
+-------+------------------+--------------------+
|  count|           1097592|             1097592|
|   mean| 4.294394456227815|1.1741746688824263E9|
| stddev|1.0737318641546316| 1.363032651590121E8|
|    min|                 1|           879292800|
|    max|                 5|          1406073600|
+-------+------------------+--------------------+



In [6]:
# Create a temporary table 
df.createOrReplaceTempView('music')

# Run a simple SQL command
sql = spark_mongo.sql("""SELECT asin, date, helpful, overall, unixReviewTime FROM music LIMIT(1)""")
sql.show()

+----------+-------------------+--------+-------+--------------+
|      asin|               date| helpful|overall|unixReviewTime|
+----------+-------------------+--------+-------+--------------+
|0307141985|2005-10-06 02:00:00|[14, 15]|      5|    1128556800|
+----------+-------------------+--------+-------+--------------+



# References

* Bill Chambers, Matei Zaharia 2018,["Spark: The Definitive Guide"](https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/) <img src="images/_3.png" width="20%">
* Pramod Singh 2019, ["Learn PySpark: Build Python-based Machine Learning and Deep Learning Models
"](https://www.ibs.it/learn-pyspark-build-python-based-libro-inglese-pramod-singh/e/9781484249604) <img src="images/_4.png" width="18%">