In [None]:
from pyspark.sql import SparkSession
from IPython.core.display import display, HTML

# enlarge cell width
display(HTML("<style>.container { width:100% !important; }</style>"))

# create a spark session
spark = SparkSession.builder.config("spark.master", 'local').getOrCreate()

# Data Schema

Brazilian E-Commerce Public Dataset by Olist

https://www.kaggle.com/olistbr/brazilian-ecommerce

In [None]:
df = spark.read.option('header', 'true').csv('/shared-data/olist_order_items_dataset.csv')
df.show(5)

In [None]:
df.explain()

## Load data with a proper schema

`df.printSchema()` lists out the data type of each column. Directly loading our csv file into Spark creates a dataframe where all columns are set to String types. While, this is not ideal as we prefer to have numeric types for `price` and datetime types for `date`.

```
root
 |-- order_id: string (nullable = true)
 |-- order_item_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: string (nullable = true)
 |-- price: string (nullable = true)
 |-- freight_value: string (nullable = true)
```


In [None]:
df.printSchema()

### Let Spark to infer the schema

We can use the `inferSchema` option to ask Spark to guess the best suitable schema of the input data.

In [None]:
df_infer = spark.read.option('header', 'true').option('inferSchema', 'true').csv('/shared-data/olist_order_items_dataset.csv')
df_infer.printSchema()

### Use a user-specified schema

In [None]:
from pyspark.sql.types import StructType, StringType, TimestampType, DoubleType

my_schema = StructType() \
    .add("order_id",StringType(),True) \
    .add("order_item_id",StringType(),True) \
    .add("product_id",StringType(),True) \
    .add("seller_id",StringType(),True) \
    .add("shipping_limit_date",TimestampType(),True) \
    .add("price",DoubleType(),True) \
    .add("freight_value",DoubleType(),True)

df_user = spark.read \
    .option('header', 'true') \
    .schema(my_schema) \
    .csv('/shared-data/olist_order_items_dataset.csv')

df_user.printSchema()

 ## Data transform based on columns' schema
 
 A right schema makes it much easier to process our data as we can utilize native functions Spark provides to supported data types. Check [Spark documentation](https://spark.apache.org/docs/2.3.0/api/sql/index.html)  to learn more about available functions.
 
 For example, we can extract `year` and `month` information from the `shipping_limit_date` column and add them as expanded columns in our dataframe.
 
 In Spark, adding a new column is done by calling the `withColumn` interface of a dataframe:
 
 ```
df_user1 = df_user.withColumn('Y', year('shipping_limit_date'))
df_user1.show(5)
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|   Y|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+----+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|2017|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|2017|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|        17.87|2018|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|2018|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51| 199.9|        18.14|2017|

 ```
 
 If we need to add multiple columns, we can pipe a seriese of `withColumn` calls together:
 ```
 df_user1 = df_user \
    .withColumn('Y', year('shipping_limit_date')) \
    .withColumn('M', month('shipping_limit_date'))
 ```
 
 __Note__: Spark does not allow you to modify an existing dataframe in-place. Every transform creates a new dataframe, and this is why we always assign a new dataframe to the transformed result `df_user1 = df_user.withColumn(...)`. But don't worry such a behavior might waste your ram -- Spark internally maintains linkage among transformed dataframes and minize memory usage automatically.

In [None]:
from pyspark.sql.functions import year, month

df_user1 = df_user \
    .withColumn('Y', year('shipping_limit_date')) \
    .withColumn('M', month('shipping_limit_date'))
df_user1.show(5)

# Data ETL

ETL refers to 'Extract, Transform, and Load', which is almost the first data requirement in any project. Spark dataframe make ETL easy as many typical operations are already abstracted and embedded into Spark dataframe interfaces.

## Filtering

`Filtering` is the most common `Extract` operation, where we want to focus only on data that meets certain criterias. 
For example, if we want to look at orders in year 2017, we can use the following filter:

```
df_user1.filter(df_user1.Y == 2017)
```

Similarly, you can also apply conditions with `greater`, `less`, or `not equal` to filter dataframes.


__Note__: `Y` column is of `integer` type. But if you use `df_user1.filter(df_user1.Y == "2017")`, you might be suprised that Spark still returns you the right result. This is becuase Spark trys to cast the column into a matching type to ensure the `comparison` is executable.

In [None]:
print("*** Compare with an integer")
df_user1.filter(df_user1.Y == 2017).show(2)

print("*** Compare with a string")
df_user1.filter(df_user1.Y == "2017").show(2)

For columns with `string` type, sometime we might need to filter them by prefixes, postfixes or particular patterns. Spark provides a few string functions to make it simple for us to express these logics.

For example, if we want to work with 'order_ids' that starts with `"2"`, we can fitler the dataframe using the `substring` function:

```
df_user.filter(substring(df_user.order_id, 0, 1) == '2').show()
```

`substring` helps us extract a segment of the string based on the start and end location we provides. 


__<span style="color:red">Quiz:</span>__: How to filter `order_id` start with `2002`?

In [None]:
from pyspark.sql.functions import substring

df_user.filter(substring(<FIXME>) == '2002').show(5)

So far we are talking about filtering rows. How should we filter the dataframe by columns, e.g. choose a subset of columns while keep every row?

Column filtering can be done by the `select` interface. If we only want to keep `order_id`, `price`, and `Y`, we can `select` in the following way:

```
df_user1.select('order_id', 'price', 'Y')
```

In [None]:
df_user1.select('order_id', 'price', 'Y').show(3)

__<span style="color:red">Quiz:</span>__: Try to filler the dataframe whose `order_id` starts with `2002`, and only keep the `order_id` and `price` columns.

In [None]:
df_user.filter(<FIXME>).select(<FIXME>).show()

## Transform

We have shown how to add columns using the `withColumn` interface previousely. Now let's have a look other transforms.

The simplest transform is mathematic operations, e.g. addition, subtraction, multiplication, division, etc. Check [Spark documentation](https://spark.apache.org/docs/2.3.0/api/sql/index.html)  to learn more about available math functions.
For example, if we want to convert the `price` to use `thousand` dollar as its unit, we can

```
df_user.withColumn('price_in_k', df_user.price/1000).show()
```

We could also combine two columns information together. For example, we can get the `overall_price` by adding `price` and `freight_value` together.

```
df_user.withColumn('overall_price', df_user.price + df_user.freight_value).show()
```

In [None]:
df_user.withColumn('price_in_k', df_user.price/1000).show(5)

In [None]:
df_user.withColumn('overall_price', df_user.price + df_user.freight_value).show(5)

__<span style="color:red">Quiz:</span>__: Find the square root of each oder's price

In [None]:
from pyspark.sql.functions import <FIXME>

df_user.withColumn('sqrt_price', <FIXME>).show(5)

Remember Spark does not allow in-place transform, so each of our transform needs to be expressed by adding a new column into the dataframe. The good thing is that we can always use the `coloumn` filtering to keep only the columns we want.

## Aggregation

Aggregation is a special `transform`: Instead of altering cells on a per-row basis, aggregation works with a group of rows and generate new values from each group. Therefore, aggregation is always used together with the `groupby` operation.



In [None]:
from pyspark.sql.functions import sum, count

df_user1.groupby('Y', 'M').agg(sum('price')).show(5)

Our new column's name is "sum(price)", which is not convenient to use later. We could use `alias` to give the column a better name.

In [None]:
df_user1.groupby('Y', 'M').agg(
    sum('price').alias('monthly_total')
).show(5)

It is also possible to have multiple aggregation with each group. For example, we can get monthly price and monthly count using the following call

```
df_user1.groupby('Y', 'M').agg(
    sum('price').alias('monthly_total'),
    count('price').alias('montyly_count'),
)
```

In [None]:
df_user1.groupby('Y', 'M').agg(
    sum('price').alias('monthly_total'),
    count('price').alias('montyly_count'),
).show(5)

You may notice we use the `show` interface to have a quick look at the dataframe content. However,`show` does not do full operations on the dataframe. If we want to apply the defined operations fully to the dataframe, we need to use `triggers`. The most frequently used trigger is `count`, which tell us how many records we have in a dataframe.

In [None]:
df_agg = df_user1.groupby('Y', 'M').agg(
    sum('price').alias('monthly_total'),
    count('price').alias('monthly_count'),
)

print(df_agg.count())

Another useful trigger is `collect`, which pulls data out of executors and save to the driver. For examples, we may want to have the full content of `df_agg` to see the monthly trend.

In [None]:
agg_result = df_agg.collect()
print(len(agg_result))

The data we collect back are wrapped in the `Row` class. The `Row` class makes it easier for us to address individual fields in each record.

In [None]:
for i in agg_result:
    print("Type of the object: {}".format(type(i)))
    print(i)
    print(i.Y, i.M, i.monthly_total, i.monthly_count)
    break

There is also an convinient function to convert a Spark dataframe into a Pandas dataframe, `to_pandas`.

In [None]:
pandas_df = df_agg.orderBy('Y', 'M').toPandas()
pandas_df['ym'] = pandas_df['Y'].apply(lambda x: '{:04d}'.format(x)) + '-' + pandas_df['M'].apply(lambda x: '{:02d}'.format(x))
pandas_df.plot(x='ym', y='monthly_total')

## Load

The `Load` operation refers to save the processed data in some persistent storage engines, e.g. relational databases, key-value stores, object stores, and file systems.

Spark supports write data to most of these popular engines:
* MySQL
* Postgresql
* Elasticserch
* Cassandra
* Kafka
* AWS S3
* Azure Blob
* HDFS
* Alluxio

Let's take writing to csv files as an example. 
```
df_agg.repartition(1).write.mode('overwrite').option('header', 'true').csv('/home/jovyan/output')
```
where
* `repartition(1)` is used to reduce number of output files to 1
* `mode('overwrite')` tells Spark to delete any exisiting data in the output directory 
* `option('header', 'true')` controls if a header line should be added to the csv files

You can try to modify the arguments to observe Spark's writting behavior.

In [None]:
df_agg.repartition(1).write.mode('overwrite').option('header', 'true').csv('/root/output')

In [None]:
!cat output/*.csv

# Spark SQL

Spark is very powerful and user friendly. However, as you might already noticed, Spark dataframe API still needs us to write `code`, which can be a blocker for people who are not familiar with programming. 

To make Spark more accessible, Spark provides a SQL interface where most common data processing can be expressed by SQL statements. So if you have worked with any relational database before, there is almost nothing stops you to use Spark to process your data.

In order to use SparkSQL, we need to register our dataframe as `TempView` first.

Let's register our `df_user` dataframe as a TempView named `order_items`.

In [None]:
_ = df_user.createOrReplaceTempView('order_items')

Now we can use standard SQL to execute ETL jobs. 

For example, extracting `year`, `month`, `day`:

In [None]:
statement= '''
select
    shipping_limit_date
    ,year(shipping_limit_date) as Y
    ,month(shipping_limit_date) as M
    ,day(shipping_limit_date) as D 
from order_items 
limit 5
'''

spark.sql(statement).show()

Get top 5 sellers who has the largest  number of orders:

In [None]:
statement= '''
select
    seller_id
    ,count(*) as total_order 
from order_items
group by seller_id
order by total_order desc
'''

spark.sql(statement).show(5)

Get average price per order for each seller. __Note__: We use nested SQL to get the result. Spark also support CTE (Common Table Expression) which makes the statement easier to reader.

In [None]:
statement= '''
select
    seller_id
    ,total_price/total_order as avg_price_per_order
from (
    select
        seller_id
        ,sum(price) as total_price
        ,count(*) as total_order 
    from order_items
    group by seller_id
)
'''

spark.sql(statement).show(5)

Moreover, Spark SQL also make it possible to run complicated logics that are difficult to programm using dataframe interfaces.

For example, we can use windows query to find top sellers in each month

In [None]:
statement= '''
select
    seller_id
    ,Y
    ,M
    ,total_order
    ,rank() over (
        partition by seller_id order by total_order desc
    ) as rank_total_order
from (
    select
        seller_id
        ,year(shipping_limit_date) as Y
        ,month(shipping_limit_date) as M
        ,count(*) as total_order
    from order_items
    group by Y, M, seller_id
)
'''

spark.sql(statement).show()

__<span style="color:red">Quiz:</span>__: Replace `rank` with `dense_rank` to see the difference.

In [None]:
statement= '''
select
    seller_id
    ,Y
    ,M
    ,total_order
    ,<FIXME> over (
        partition by seller_id order by total_order desc
    ) as rank_total_order
from (
    select
        seller_id
        ,year(shipping_limit_date) as Y
        ,month(shipping_limit_date) as M
        ,count(*) as total_order
    from order_items
    group by Y, M, seller_id
)
'''

spark.sql(statement).show()

__<span style="color:red">Quiz:</span>__: Find the months with 2nd highest number of orders for each seller, whose total number of orders in that month is greater than 10

In [None]:
statement= '''
select *
from (
    select
        seller_id
        ,Y
        ,M
        ,total_order
        ,rank() over (
            partition by seller_id order by total_order desc
        ) as rank_total_order
    from (
        select
            seller_id
            ,year(shipping_limit_date) as Y
            ,month(shipping_limit_date) as M
            ,count(*) as total_order
        from order_items
        group by Y, M, seller_id
    )
)
where 
    <FIXME>
'''
spark.sql(statement).show(truncate=False)

__<span style="color:red">Quiz:</span>__: Find seller who had more than 100 orders in two consecutive months

In [None]:
statement= '''
with t1 as (
    select
        seller_id
        ,year(shipping_limit_date) as Y
        ,month(shipping_limit_date) as M
        ,count(*) as total_order
    from order_items
    group by Y, M, seller_id
)
select
    a.seller_id
    ,a.Y
    ,a.M
    ,a.total_order
    ,b.M as prev_M
    ,b.total_order as prev_total_order
from 
    t1 as a
join
    t1 as b
on
    a.seller_id = b.seller_id

where
   <FIXME>
    
'''
spark.sql(statement).show(truncate=False)