# Intorduction to PySPark on Jupyter Notebook

Use <kbd>CTRL</kbd>+<kbd>⏎</kbd> to run cells:

In [None]:
#the spark object is pre initialized on PySpark Jupyter Notebooks
spark

In [None]:
from datetime import datetime, timedelta
import time
ts = datetime.now()
time = int(time.time())

#used for time deltas
minus = 2

today = (datetime.today() - timedelta(days=minus)).strftime("%Y%m%d")
yesterday = (datetime.today() - timedelta(days=minus+1)).strftime("%Y%m%d")

### Set your bucket name

In [None]:
bucket_name = "INSERT_YOUR_BUCKET_NAME_HERE"

## Data loading: load data in the distributed file system
In order to work properly, **Spark** needs to work wih **data** that can be **reached from each node of the cluster**, but this is not the only requirement, we must access to the file in a **file system like manner** in order to partition work, so we have to use a **Distributed Files System**.
There are various alternatives to choose, the first one is the native for the **Hadoop Ecosystem HDFS (Hadoop Distributed File System)**, but now cloud vendors exose their own richer solution like **S3 for AWS** or, in this case, **Cloud Storage on GCP**

In [None]:
import os
#download data locally
os.popen('wget -O data.csv https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-province/dpc-covid19-ita-province-%s.csv' % today).read()
#load it into Google STorage DFS
os.popen('gsutil mv data.csv gs://%s/dpc-covid19-ita-province-%s.csv' % (bucket_name,today)).read()

## ETL example
The first example consist in create an ETL process using Spark

### Extraction phase
The first phase is the **extraction** so we have to load data from OLTP sources like databases or files into a **Dataframe**.
The **Dataframe** is a named column organized **Dataset**.

*A **Dataset** is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.
A **DataFrame** is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.*

The documentation about how to Load/Store file in Spark is here: [https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html)

In [None]:
source_df = spark.read \
                .option("header",True)\
                .csv("gs://%s/dpc-covid19-ita-province-%s.csv" % (bucket_name, today))

#### Visulalize the dataframe
In Jupyter notebooks to analize the content of a dataframe we have to convert a DF to a Pandas Dataframe

In [None]:
source_df.toPandas()

### Transformation
In this phase we want to generate the **output dataframe** by selecting a subest of the orginal data and by adding new columns.
In order to do this we have to import [*pyspark.sql functions*](https://spark.apache.org/docs/2.3.0/api/sql/index.html) a collection on functions used to manipulate columns of a dataframe.

In [None]:
from pyspark.sql import functions as F
small_source_df = source_df.select("stato","codice_regione","denominazione_regione","codice_provincia","denominazione_provincia","totale_casi",F.lit(today).alias("data"))

In [None]:
small_source_df.toPandas()

### Loading
We have loaded our file in spark via Dataset interface, we have manipulated it with sql.functions now we want to store our new View on the Data Wherehouse. In this case, our DW is made of **parquet files**, an **Apache Common File Format** that is used in a big data context. This format is **self-describing** and optimized to be partitioned, so multiple workers can consume the same file and work on it.

Before storing our Dataframe let's look inside of it: a data frame is an **abstraction for a dataset** and **it doesn't contain data**! Da data frame **describe the spark procedure** to generate a Row List. Spark use the **parallel skeleton** approach to let the developer write the data frame that is converted into a **Task Graph (DAG)**; when we invoke DataframeFunction's like collect(), first() or toPandas() we are submitting the **DAG** to the cluster's master node that distributes tasks across workers than we wait until the master will gather all data in a single Row List.

In [None]:
small_source_df.explain(True)

Before loading I'll want to bring the focus on the partition concept: as we will see on the bucket we can perform optimizations based on partitions, if we know that our data have a regionality behaviour we can split our data frame into smaller files that can be processed in parallel when we want to perform aggregate computation on partition fields.

In [None]:
small_source_df.write.partitionBy("data","stato","codice_regione","codice_provincia")\
    .mode("append")\
    .option("mergeSchema", "true")\
    .parquet("gs://%s/dpc-covid19-ita-province" % bucket_name)

Explore your bucket to see how this dataframe is stored and what is a parquet file.

## Analytics and OLAP

### Load data from DW
Load todays data from the parquet directory that is partitionated by *data* into a Dataframe

In [None]:
small_source_df = spark.read.parquet("gs://%s/dpc-covid19-ita-province" % bucket_name).where(F.col("data") == today)

### Execute Query
In this cells we are computing some aggregate metrics, we can do it i two different ways:
1. Using Spark SQL functions manipulating functionally the dataframe
2. Using SQL by mapping the dataframe into a table

In [None]:
#1
df_by_region = small_source_df.groupBy("codice_regione").agg(F.first(F.col("denominazione_regione")).alias("denominazione_regione"),
                                                             F.first(F.col("data")).alias("date")
                                                             ,F.sum(F.col("totale_casi")).alias("totale_casi"))\
.select("denominazione_regione","totale_casi","date")
df_by_region.toPandas()

In [None]:
#2
small_source_df.createOrReplaceTempView("sds")
spark.sql("select first(denominazione_regione) as denominazione_regione, first(data) as date, sum(totale_casi) as totale_casi from sds group by codice_regione").toPandas()

In [None]:
df_time = df_by_region

In the cell below there is an example of complex computation and dataframe composition.
*As you can see there are some Java code parts, that's because PySpark is only a wrapper for the java Spark/Hadoop below system *

In [None]:
# Obtain a reference to the Hadoop File System
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
    sc._jvm.java.net.URI.create("gs://%s" % bucket_name),
    sc._jsc.hadoopConfiguration(),
)
# check if old data is present (at the first run may not exists)
if fs.exists(sc._jvm.org.apache.hadoop.fs.Path("dpc-covid19-ita-region")):
    # extract old data from parquet
    yesterday_data = spark.read.parquet("gs://%s/dpc-covid19-ita-region" % bucket_name).where(F.col("date") == yesterday)
    # print data
    yesterday_data.show()
    # join new data and new data
    joined = df_time.join(yesterday_data, yesterday_data.denominazione_regione == df_time.denominazione_regione,how='left')
    # print joined data
    joined.show()
    # compute differences into 'delta' column
    region_recap_df = joined\
        .withColumn("delta",df_time.totale_casi - F.when(yesterday_data.totale_casi.isNull(),0).otherwise(yesterday_data.totale_casi))\
        .select(df_time.denominazione_regione, df_time.totale_casi, "delta", df_time.date)
    pass
else:
    region_recap_df = df_time.withColumn("delta",F.lit(0))
region_recap_df = region_recap_df.orderBy(F.desc("delta"),F.desc("totale_casi"))
# explore Dataframe
region_recap_df.toPandas()

### Save result for future usage and analytics
Now we want to create a new table in order to keep tracking of increments. Like the example above we will store the data into a Parquet table.

In [None]:
# materialize this view for future analitycs
region_recap_df.write.partitionBy("date").mode("append").option("mergeSchema", "true").parquet("gs://%s/dpc-covid19-ita-region" % bucket_name)

Now we want also to serve our data like an API by creating a json representing today's increments. You may have read in above documentation links that we can use the json write primitive of spark. I suggest you to try to do it in order to see what kind of data is produced.
You will otice that this write do not produce a single json array file containing our data, but a particolar file format called [JSONLines](https://jsonlines.org/) used to parallel process a list of json object, this same format is used to store object in a document database like [MongoDB](https://www.mongodb.com/)

In [None]:
region_recap_df.write.json("gs://%s/dpc-covid19-ita-region-%s.json" % (bucket_name,today))

Since the aggregate data are few we can generate the json directly in python and save it on the storage.
This step is aimed at highlighting the most common use of spark as "accelerated computing" that is: I write a calculation procedure (the Dataframe) I send it to the server to compute in parallel on the large amount of data and obtain the recap/aggregate values (the collect()).
Having obtained the results in the form of a list of objects, I rework them in the main language and so on.

In [None]:
recap = region_recap_df.collect()
json_recap = []
for row in recap:
    json_recap.append(row.asDict())
import json
with open('data.json', 'w') as outfile:
    print(json.dumps(json_recap))
    json.dump(json_recap, outfile)
    outfile.close()
    os.popen('gsutil mv data.json gs://%s/dpc-covid19-ita-region-recap.json' % bucket_name).read()

Now you can write your own procedure to explore generated data, have fun!

In [None]:
generated_table = spark.read.parquet("gs://%s/dpc-covid19-ita-region" % bucket_name)
generated_table.toPandas()