# Implementing a Data Pipeline with Apache Spark - Demo 

In this demo we will see together how to leverage [Apache Spark](https://spark.apache.org/) via Python ([PySpark](http://spark.apache.org/docs/latest/api/python/)) to process data from different sources representing a Data Lake. 
For the demo purpose we will use data stored in:

* __MySQL__ - RDBMS database
* __MongoDB__ - NoSQL database
* __Parquet__ Files - [Apache Parquet](https://parquet.apache.org/) is a columnar data format often used in [Apache Hadoop](https://hadoop.apache.org/) environments, particularly suitable for analytics


The basic idea is to design and implement a Big Data Pipeline consisting of several **Jobs** and **Tasks**. 

* Job: a complete data tranformation activity, from reading data from a source to saving them somewhere
* Task: a single step of a job
___

To start the demo, run:

`docker-compose -f ./docker-compose-full.yml up -d`
 
To stop the demo, run:

`docker-compose -f ./docker-compose-full.yml down`

Docker will build up an ecosystem with:

* **MongoDB** with a populated database
* **MySQL** with a populated database
* **Apache Spark** deployed in Standalone Mode
* **Jupyter** enabled to work with Spark


### SparkSession
In the first cell we have to instantiate the __SparkSession__ object. Via the SparkSession we can read, manipulate, and store data from different data sources using both RDD and DataFrame API. 

> Only one SparkSession object can be contained in a Spark-powered program. The SparkSession creates and handles the DAG and interacts with the exectutors to execute it. 

In [1]:
from pyspark.sql import SparkSession
ss = SparkSession.builder \
.config("spark.mongodb.input.uri", "mongodb://root:example@mongo/test.coll?authSource=admin") \
.config("spark.mongodb.output.uri", "mongodb://root:example@mongo/test.coll?authSource=admin") \
.config('spark.jars.packages', 'mysql:mysql-connector-java:8.0.17,org.mongodb.spark:mongo-spark-connector_2.11:2.4.1') \
.getOrCreate()
ss.version
# Spark version 2.4.4 uses Scala 2.11

'2.4.4'

The [builder pattern](https://en.wikipedia.org/wiki/Builder_pattern) is used to create and initialize the SparkSession. Notice that we used the *config* method to store metadata as the MondoDB input and output uris and a list of java packages (fully specified in [Apache Maven](https://maven.apache.org/) format). 

> Note that Spark checks if the listed jar packages are available at executor level, otherwise it downloads them.  

Other metadata (hostname, username, password, etc.) are to be expressed as python variable. Next cell reports MySQL connection parameters. In the following the **jdbcUrl** is used for DataFrame creation from MySQL.

In [2]:
jdbcHostname = "mysql"
jdbcDatabase = "esame"
username = "root"
password = "example"
jdbcPort = 3306
jdbcUrl = "jdbc:mysql://{0}:{1}/{2}?user={3}&password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, username, password)

### 1. Total sales per film category

List the **total sales per film category** considering only the sales referred to rented movies.
Measure the query execution time (you might want to use the python 'time' library)

You may need to use the following tables: 

1. category
2. film_category 
3. inventory 
3. payment 
4. rental

![EER Diagram](figures/mysql_table1.png)


The result must have __two columns__:

1. the film_category
2. the total_sales

and has to be **sorted in descending order** with respect to the **total_sales**.

> For performance reasons it is recommended to write a SQL query rather than import the data in different spark dataframes and use spark to join them. It is better because Spark is able to communicate with MySQL and push down filter and join operations. 



The following code snippet presents how to create a dataframe from MySQL. 

Note that it is necessary to provide:

1. The protocol to use (jdbc, driver)
2. The connection string (url)
3. The query MySQL has to execute to export data to Spark

Moreover, note that the dataframe API is lazy evaluated, it needs an **action**. In the code snippet below:

* load() is not an action, the dataframe is therefore only defined but not created
* show() is an action, the dataframe is created here and the results are returned to the main program (driver)


In [4]:
# 1
import time

query1 =  '''
    SELECT 
        c.name AS category, 
        SUM(p.amount) AS total_sales
    FROM
        payment p
        JOIN rental r ON p.rental_id = r.rental_id
        JOIN inventory i ON r.inventory_id = i.inventory_id
        JOIN film f ON i.film_id = f.film_id
        JOIN film_category fc ON f.film_id = fc.film_id
        JOIN category c ON fc.category_id = c.category_id
    GROUP BY c.name
    ORDER BY total_sales DESC
    '''

salesCat = ss.read \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("query", query1)\
    .option("driver", "com.mysql.jdbc.Driver") \
    .load()


start = time.time()
salesCat.show() # this is an action, the dataframe is created only at this point.
end = time.time()
time_taken = end - start
print('Time: ',time_taken)

+-----------+-----------+
|   category|total_sales|
+-----------+-----------+
|     Sports|    5314.21|
|     Sci-Fi|    4756.98|
|  Animation|    4656.30|
|      Drama|    4587.39|
|     Comedy|    4383.58|
|     Action|    4375.85|
|        New|    4351.62|
|      Games|    4281.33|
|    Foreign|    4270.67|
|     Family|    4226.07|
|Documentary|    4217.52|
|     Horror|    3722.54|
|   Children|    3655.55|
|   Classics|    3639.59|
|     Travel|    3549.64|
|      Music|    3417.72|
+-----------+-----------+

Time:  135.85194158554077


### 2. Optimizing data loading with indexes

Indexes are extremely important for speeding up analytical processes, particularly when JOIN operations must be performed. It is easy to implement indexes in RDBMS and NoSQL systems, a little less easy when dealing with large files in HDFS. 

**Optimize** the query created in the previous cell by entering the appropriate indexes in the mysql database. 

**Report** as comments the sql statements used to create the indexes

**Re-execute** the query (reating the dataframe df2) and measure the time.


In [5]:
# 2

# ALTER TABLE `esame`.`inventory` CHANGE COLUMN `inventory_id` `inventory_id` MEDIUMINT(8) UNSIGNED NOT NULL AUTO_INCREMENT , ADD PRIMARY KEY (`inventory_id`);
# ALTER TABLE `esame`.`rental` CHANGE COLUMN `rental_id` `rental_id` INT(11) NOT NULL AUTO_INCREMENT ,ADD PRIMARY KEY (`rental_id`);
# ALTER TABLE `esame`.`inventory` ADD INDEX `store_id_idx` (`store_id` ASC) VISIBLE;
# ALTER TABLE `esame`.`payment` ADD INDEX `rental_id_idx` (`rental_id` ASC) VISIBLE;
# ALTER TABLE `esame`.`rental` ADD INDEX `inventory_id_idx` (`inventory_id` ASC) VISIBLE;
# ALTER TABLE `esame`.`inventory` ADD INDEX `index2` (`film_id` ASC) VISIBLE;
# ALTER TABLE `esame`.`film_category` ADD INDEX `index1` (`film_id` ASC) VISIBLE;
# ALTER TABLE `esame`.`category` CHANGE COLUMN `category_id` `category_id` TINYINT(3) UNSIGNED NOT NULL AUTO_INCREMENT ,ADD PRIMARY KEY (`category_id`);
# ALTER TABLE `esame`.`film_category` ADD PRIMARY KEY (`category_id`, `film_id`);

# To drop the indexes
# ALTER TABLE `esame`.`inventory` MODIFY inventory_id INT NOT NULL;
# ALTER TABLE `esame`.`inventory` DROP PRIMARY KEY;
# ALTER TABLE `esame`.`inventory` DROP INDEX `store_id_idx`;
# ALTER TABLE `esame`.`rental` MODIFY rental_id INT NOT NULL;
# ALTER TABLE `esame`.`rental` DROP PRIMARY KEY;
# ALTER TABLE `esame`.`payment` DROP INDEX `rental_id_idx`;
# ALTER TABLE `esame`.`rental` DROP INDEX `inventory_id_idx`;
# ALTER TABLE `esame`.`inventory` DROP INDEX `index2`;
# ALTER TABLE `esame`.`film_category` DROP INDEX `index1` ;
# ALTER TABLE `esame`.`category` MODIFY category_id INT NOT NULL;
# ALTER TABLE `esame`.`category` DROP PRIMARY KEY;
# ALTER TABLE `esame`.`film_category` DROP PRIMARY KEY ;

start = time.time()
salesCat.show()
end = time.time()
time_taken = end - start
print('Time: ',time_taken)

+-----------+-----------+
|   category|total_sales|
+-----------+-----------+
|     Sports|    5314.21|
|     Sci-Fi|    4756.98|
|  Animation|    4656.30|
|      Drama|    4587.39|
|     Comedy|    4383.58|
|     Action|    4375.85|
|        New|    4351.62|
|      Games|    4281.33|
|    Foreign|    4270.67|
|     Family|    4226.07|
|Documentary|    4217.52|
|     Horror|    3722.54|
|   Children|    3655.55|
|   Classics|    3639.59|
|     Travel|    3549.64|
|      Music|    3417.72|
+-----------+-----------+

Time:  0.3575015068054199


### 3. Optimizing data loading with views

Views are a great way to simplify the definition of a data pipeline because it defines tasks to be executed at the database level, in some cases it also allows to improve the overall query performance. 

In the big data world, a view can be implemented as a **batch process**, for example implemented through [Apache HIVE](https://hive.apache.org/), which makes partially pre-processed data available. 
This is very useful when you want to crate a data pipeline for production, a bit less so when you want to implement exploratory actions.

1. **Create** a view called "total_sales" from the query implemented in the previous cells.
2. **Report** here the sql statement used to create the view
3. **Load** in a spark dataframe all the rows of the view and show them
4. **Measure** the data loading time


In [6]:
# 3

# put here the sql statement for the view creation

#  CREATE VIEW total_sales AS
#     SELECT 
#         c.name AS category, SUM(p.amount) AS total_sales
#     FROM
#         payment p
#             JOIN
#         rental r ON p.rental_id = r.rental_id
#             JOIN
#         inventory i ON r.inventory_id = i.inventory_id
#             JOIN
#         film f ON i.film_id = f.film_id
#             JOIN
#         film_category fc ON f.film_id = fc.film_id
#             JOIN
#         category c ON fc.category_id = c.category_id
#     GROUP BY c.name
#     ORDER BY total_sales DESC

import time

query2 = '''select * from total_sales'''

dfview = ss.read \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("query", query2)\
    .option("driver", "com.mysql.jdbc.Driver") \
    .load()

start = time.time()
dfview.show()
end = time.time()
time_taken = end - start
print('Time: ',time_taken)



+-----------+-----------+
|   category|total_sales|
+-----------+-----------+
|     Sports|    5314.21|
|     Sci-Fi|    4756.98|
|  Animation|    4656.30|
|      Drama|    4587.39|
|     Comedy|    4383.58|
|     Action|    4375.85|
|        New|    4351.62|
|      Games|    4281.33|
|    Foreign|    4270.67|
|     Family|    4226.07|
|Documentary|    4217.52|
|     Horror|    3722.54|
|   Children|    3655.55|
|   Classics|    3639.59|
|     Travel|    3549.64|
|      Music|    3417.72|
+-----------+-----------+

Time:  0.3496370315551758


### 4. Films per category

List films per category (create the dataframe called "film_category"), the table must have the following structure:

1. film_id
2. film_title
3. film_description
3. film_category
4. film_rental_rate
5. film_length
6. film_rating

and measure the query execution time.

In [7]:
# 4

query3 ='''
        SELECT 
        film.film_id AS FID,
        film.title AS title,
        film.description AS description,
        category.name AS category,
        film.rental_rate AS price,
        film.length AS length,
        film.rating AS rating
    FROM
        category
        LEFT JOIN film_category ON category.category_id = film_category.category_id
        LEFT JOIN film ON film_category.film_id = film.film_id
'''

film_category=ss.read \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("query", query3)\
    .option("driver", "com.mysql.jdbc.Driver") \
    .load()

start = time.time()
film_category.show()
end = time.time()
time_taken = end - start
print('Time: ',time_taken)

+---+-------------------+--------------------+--------+-----+------+------+
|FID|              title|         description|category|price|length|rating|
+---+-------------------+--------------------+--------+-----+------+------+
| 19|       AMADEUS HOLY|A Emotional Displ...|  Action| 0.99|   113|    PG|
| 21|    AMERICAN CIRCUS|A Insightful Dram...|  Action| 4.99|   129|     R|
| 29| ANTITRUST TOMATOES|A Fateful Yarn of...|  Action| 2.99|   168| NC-17|
| 38|      ARK RIDGEMONT|A Beautiful Yarn ...|  Action| 0.99|    68| NC-17|
| 56|BAREFOOT MANCHURIAN|A Intrepid Story ...|  Action| 2.99|   129|     G|
| 67|       BERETS AGENT|A Taut Saga of a ...|  Action| 2.99|    77| PG-13|
| 97|     BRIDE INTRIGUE|A Epic Tale of a ...|  Action| 0.99|    56|     G|
|105|     BULL SHAWSHANK|A Fanciful Drama ...|  Action| 0.99|   125| NC-17|
|111|    CADDYSHACK JEDI|A Awe-Inspiring E...|  Action| 0.99|    52| NC-17|
|115|    CAMPUS REMEMBER|A Astounding Dram...|  Action| 2.99|   167|     R|
|126|  CASUA

### 5. Actor - Film dataframe 

The database in mysql is incomplete. In fact, there is a lack of information on addresses, cities and countries. 
This information is present in the "esame" database and in the collections: "denormalizedAddress", "actor" and "film_actor" in MongoDB. 

___

The following pictures depict the structure of a typical document withing the "actor" and "film_actor" collections, respectively.

![Actor collection](figures/actor.png)

___

![Film_Actor collection](figures/film_actor.png)


___

**Join** (using a pipeline of `lookup`, `unwind`, `project`, `concat` and `sort`) the "actor" and "film_actor" collections and extract a Spark dataFrame with the following columns:

1. actor_id
2. film_id
3. name i.e. "first_name last_name"

sorted by film_id

> as for MySQL extracted dataframe for performance reasons is recommended to execute in-database operations (filters, joins, concat..) within the database itself


Notes:

* [`$lookup`](https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/). It is equivalent to a Join in **MongoDB Query Language (MQL)**
* [`$unwind`](https://docs.mongodb.com/manual/reference/operator/aggregation/unwind/). Deconstructs an array field from the input documents to output a document for each element.
* [`$project`](https://docs.mongodb.com/manual/reference/operator/aggregation/project/). Passes along the documents with the requested fields to the next stage in the pipeline. The specified fields can be existing fields from the input documents or newly computed fields.


In [8]:
# 5

queryFA = [
    {
        '$lookup': {
            'from': 'film_actor', 
            'localField': 'actor_id', 
            'foreignField': 'actor_id', 
            'as': 'film'
        }
    }, {
        '$unwind': {
            'path': '$film'
        }
    }, {
        '$project': {
            '_id': 0, 
            'actor_id': 1, 
            'film_id': '$film.film_id', 
            'name': {
                '$concat': [
                    '$first_name', ' ', '$last_name'
                ]
            }
        }
    }, {
        '$sort': {
            'film_id': 1
        }
    }
]

FA = ss.read.format("mongo")\
.option("pipeline", queryFA)\
.option("uri","mongodb://root:example@mongo/esame.actor?authSource=admin&readPreference=primaryPreferred")\
.load()

FA.show()


+--------+-------+----------------+
|actor_id|film_id|            name|
+--------+-------+----------------+
|       1|      1|PENELOPE GUINESS|
|      10|      1| CHRISTIAN GABLE|
|      20|      1|   LUCILLE TRACY|
|      30|      1|     SANDRA PECK|
|      40|      1|     JOHNNY CAGE|
|      53|      1|     MENA TEMPLE|
|     108|      1|    WARREN NOLTE|
|     162|      1|    OPRAH KILMER|
|     188|      1|    ROCK DUKAKIS|
|     198|      1|     MARY KEITEL|
|      19|      2|     BOB FAWCETT|
|      85|      2|MINNIE ZELLWEGER|
|      90|      2|    SEAN GUINESS|
|     160|      2|      CHRIS DEPP|
|       2|      3|   NICK WAHLBERG|
|      19|      3|     BOB FAWCETT|
|      24|      3|  CAMERON STREEP|
|      64|      3|   RAY JOHANSSON|
|     123|      3|  JULIANNE DENCH|
|      41|      4| JODIE DEGENERES|
+--------+-------+----------------+
only showing top 20 rows



### 6. Actor - Film - Category dataframe

Using the dataframe created in the previous cell ("FA") add to the "film_category" dataframe a column with the names (separated by commas) of the actors starring in each film.

> hint: you might want to use **concat_ws** and **collect_list** function to aggregate the actors' nane

Note that in the code snippet below film_category and FA are registered as temporary relational tables, this means that we can usa Spark SQL API to manipulate them.

In [9]:
# 6

film_category.registerTempTable('film_category')
FA.registerTempTable('FA')

full_film_category = ss.sql('''
select * from film_category
join 
(Select
film_id,
concat_ws(', ', collect_list(name)) as actor
from FA
group by film_id
)as inSel 
on inSel.film_id =film_category.FID
''')

full_film_category.show()

+---+-------------------+--------------------+-----------+-----+------+------+-------+--------------------+
|FID|              title|         description|   category|price|length|rating|film_id|               actor|
+---+-------------------+--------------------+-----------+-----+------+------+-------+--------------------+
|148|     CHOCOLATE DUCK|A Unbelieveable S...|    Foreign| 2.99|   132|     R|    148|JOE SWANK, CAMERO...|
|463|   INSTINCT AIRPORT|A Touching Docume...|     Sports| 2.99|   116|    PG|    463|JENNIFER DAVIS, G...|
|471|    ISLAND EXORCIST|A Fanciful Panora...|   Classics| 2.99|    84| NC-17|    471|RAY JOHANSSON, AN...|
|496|      KICK SAVANNAH|A Emotional Drama...|     Travel| 0.99|   179| PG-13|    496|ANGELA HUDSON, JE...|
|833|    SPLENDOR PATTON|A Taut Story of a...|   Children| 0.99|   134|     R|    833|UMA WOOD, SANDRA ...|
|243|    DOORS PRESIDENT|A Awe-Inspiring D...|  Animation| 4.99|    49| NC-17|    243|KARL BERRY, LUCIL...|
|392|       HALL CASSIDY|A B

### 7. Conclude the pipeline saving results as a parquet file

Save the dataframe created in the previous cell as parquet file named "film_category".


In [10]:
# 7
# put here the code
full_film_category.write.mode('overwrite').parquet('film_category.parquet')

Hurray! We implemented in Apache Spark our first Big Data Pipeline. Follows a grafical representation of the pipeline at issue.

![Pipeline 1](figures/pipeline1.png)

### 8. More on indexing - Geospatial indexes in MongoDB

**Create** a geospatial index (type 2dsphere) in mongodb on the "location.coordinates" field of the "denormalizedAddress" collection.

Follows a picture with the typical structure of an address in "denormalizedAddress" collection:

![denormalizedAddress collection](figures/address.png)

In [11]:
# 8
# put here the code
!pip install pymongo
import pymongo
import urllib.parse
username = urllib.parse.quote_plus('root')
password = urllib.parse.quote_plus('example')
client = pymongo.MongoClient("mongodb://%s:%s@mongo:27017" % (username, password))
dblist = client.list_database_names()
db =client.esame;
db.denormalizedAddress.create_index([('location.coordinates', pymongo.GEOSPHERE)],name='geo_index')

Collecting pymongo
[?25l  Downloading https://files.pythonhosted.org/packages/ed/a3/eb6f1fe5299ba556be2c7d3d79bd6a387c6a8adf7246967eabf7eada9287/pymongo-3.12.1-cp37-cp37m-manylinux2014_x86_64.whl (527kB)
[K     |████████████████████████████████| 532kB 16.1MB/s eta 0:00:01
[?25hInstalling collected packages: pymongo
Successfully installed pymongo-3.12.1


'geo_index'

### 9. Geospatial querying data from MongoDB to Spark

**Retrive** all documents that contains coordinates within 200km from the point [ 8.659, 45.955 ]

> you might want to use the **geoNear** aggregation

**Select** only the following field: address_id, address, city_name, district, country_name

Note that in the snipped below the selection part is performed using the Spark dataframe API, it may seems a waste of time but Spark uses the mongodb connector to push down the projection operator. In this way Spark does not load all data and then selects the columns; in fact, the querying and selection happens in an optimized way within MongoDB.


In [12]:
# 9

distPip =[
            {
        '$geoNear': {
            'near': {
                'type': 'Point', 
                'coordinates': [
                    8.659, 45.955
                ]
            }, 
            'distanceField': 'dist.calculated', 
            'maxDistance': 200000
        }
    }
]

disDF = ss.read.format("mongo")\
.option("pipeline", distPip)\
.option("uri","mongodb://root:example@mongo/esame.denormalizedAddress?authSource=admin&readPreference=primaryPreferred")\
.load().select('address_id', 'address', 'city_name', 'district', 'country_name')

disDF.show()

+----------+--------------------+-----------+-----------+-------------+
|address_id|             address|  city_name|   district| country_name|
+----------+--------------------+-----------+-----------+-------------+
|       444|231 Kaliningrad P...|    Bergamo|  Lombardia|        Italy|
|        37|127 Purnea (Purni...|Alessandria|   Piemonte|        Italy|
|       314|1224 Huejutla de ...|    Brescia|  Lombardia|        Italy|
|       159|  185 Novi Sad Place|       Bern|       Bern|  Switzerland|
|        61|    943 Tokat Street|      Vaduz|      Vaduz|Liechtenstein|
|       604| 1331 Usak Boulevard|   Lausanne|       Vaud|  Switzerland|
|        65|     915 Ponce Place|      Basel|Basel-Stadt|  Switzerland|
+----------+--------------------+-----------+-----------+-------------+



### 10. Customers within a certain area

Generate and a table with information about the customers whose address has been identified in the previous cell

The table must have he following structure :

1. Customer name ("first_name last_name")
2. Customer address ("address, city_name")
3. district, 
4. country_name
5. active

The information about the customers can be found in the "customer.parquet" file

In [14]:
# 10. 

customers = ss.read.parquet("customer.parquet")
customers.registerTempTable('customer')
disDF.registerTempTable('address')
customers_full = ss.sql('''
select 
concat(first_name,' ',last_name) AS name,
concat(address,',',city_name) as address,
district,
country_name,
active
from customer natural join address
''')
customers_full.show()

+-----------------+--------------------+-----------+-------------+------+
|             name|             address|   district| country_name|active|
+-----------------+--------------------+-----------+-------------+------+
|ALEXANDER FENNELL|231 Kaliningrad P...|  Lombardia|        Italy|  true|
|        ANNA HILL|127 Purnea (Purni...|   Piemonte|        Italy|  true|
|CHRISTOPHER GRECO|1224 Huejutla de ...|  Lombardia|        Italy|  true|
|      GAIL KNIGHT|185 Novi Sad Plac...|       Bern|  Switzerland|  true|
|    EVELYN MORGAN|943 Tokat Street,...|      Vaduz|Liechtenstein|  true|
|    WADE DELVALLE|1331 Usak Bouleva...|       Vaud|  Switzerland|  true|
| KATHERINE RIVERA|915 Ponce Place,B...|Basel-Stadt|  Switzerland|  true|
+-----------------+--------------------+-----------+-------------+------+



### 11. Conclude the pipeline saving results as a parquet file

Save the dataframe created in the previous cell as parquet file named "custormer_full".

In [15]:
customers_full.write.mode('overwrite').parquet('custormer_full.parquet')

The previous code concludes the second Spark-powered pipeline, representable as:
![Pipeline 2](figures/pipeline2.png)