# Install and Prepare Apache Spark

## Install Spark

You can install Apache Spark from this [link](https://spark.apache.org/downloads.html).
We will download the latest stable version v2.4.1 pre-built for Apache Hadoop 2.7 or later from
this [link](https://www.apache.org/dyn/closer.lua/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz).


```
tar -xzvf spark-2.4.1-bin-hadoop2.7.tgz
cd spark-2.4.1-bin-hadoop2.7

export SPARK_HOME='<SPARK_INSTALLATION_FOLDER>/spark-2.4.1-bin-hadoop2.7'
export PATH=$SPARK_HOME/bin:$PATH
```

You can now run interactive `spark-shell` or `pyspark` sessions.

You can add `SPARK_HOME` and `PATH` parmanently to your profile (`~/.bashrc`) on ubuntu.

## Start SparkUI

Spark History server provides a web user interface to inspect and analyse completed and running Spark Jobs.

Before running the Spark History server, we must ensure that Spark Jobs are configured to write their event logs.

```
cd $SPARK_HOME
cp conf/spark-defaults.conf.template conf/spark-defaults.conf
vi conf/spark-defaults.conf
```

In `spark-defaults.conf` configuration file, uncomment:

```
spark.eventLog.enabled           true

```

By default, Spark events are logged to `/tmp/spark-events`, you can override this by setting `spark.eventLog.dir`
to a local folder (fine for standalone deployment) or to an HDFS folder.

Now start the Spark History server with:

```
$SPARK_HOME/sbin/start-history-server.sh
```

Open your browser and goto `localhost:18080`.

## Install and prepare Jupyter

In this tutorial, we will use [Jupyter](https://jupyter.org/) notebook as our
interactive development environment.

You can follow the instructions in the [install](https://jupyter.org/install.html)
guide to install Jupyter on your machine.

In order to get `pyspark` to work in Jupyter notebook, we need to set some environment varibales:

```
export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
```

You can add these environment variables permanently to your profile.

Now running `pyspark` will start jupyter.

```
pyspark
```

You can check this [article](https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f) for more details.

# Working with Apache Spark

In this tutorial, we will use Apache Spark for `WebLog` analysis.
We will be using the `dataframe` API as it is the most common Structured API in
Spark. A dataframe represents a table of data with rows and columns. A dataframe
always has a `schema` defining the column data types and some additional `metadata`
like `nullable` indicating if the column accepts `nulls`.

## Load the web logs

The `Apache server log` is a text based format with custom structure (similar to
tabular format). It can't be directly loaded into Apache Spark.
We need to parse it line by line.

This can be done by reading a text file into an `RDD`, mapping every line into a
`pyspark.sql.Row` and transform the `RDD` into a Spark `dataframe`.

An Apache server log line can be parsed using:

In [2]:
import re # regular expression
from pyspark.sql import Row

# regular expression to parse an Apache Server log line
# It is constructed as follows
# IP_Address client_id user_id date_time method endpoint protocol response_code content_size
LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)'

# Returns a dataframe Row including named arguments. The fields in a Row are
# sorted by name. The fields of the returned row correspond to the Apache Server
# Log fields.
def log_parser(line):
  # Now match the given input line with the regular expression
  match = re.search(LOG_PATTERN, line)

  # If there is no match, then the input line is invalid: report an error
  if match is None:
    raise Error("Invalid input line: %s" % line)

  # return a pyspark.sql.Row
  return Row(
    ip_address    = match.group(1), # IP address of the client (IPv4 or IPv6)
    client_id     = match.group(2), # Clientid: mostly '-'. This info should never be used as it is unreliable.
    user_id       = match.group(3), # The userid as defined in HTTP authentication
                                    # If the endpoint is not password protected, it will be '-'
    date_time     = match.group(4), # The time the server finished processing the request
                                    # date_time format: day/month/year:hour:minute:second zone
    method        = match.group(5), # The HTTP method
    endpoint      = match.group(6), # The requested endpoint
    protocol      = match.group(7), # The protocol in use, usually HTTP/version
    response_code = int(match.group(8)), # one of HTTP's response codes (< 599).
    content_size  = int(match.group(9)) # content size as reported in the HTTP
  )


Now load the Weblog data. Don't forget to set `logfile` to the full path
to weblog data.

In [3]:
# path to weblog data
# logfile = 'REPLACE_WITH_PATH_TO_WEBLOG.LOG'
logfile = '/home/ubuntu/Desktop/dev/datafiles/weblog.log'

# reads the Weblog as a text file and returns it as RDD of lines
# applies the log_parser function to every line in the RDD
# Transforms the RDD into a Spark Dataframe
input_df = sc.textFile(logfile).map(log_parser).toDF() 


In the next section, we will apply multiple transformations to the weblog dataframe.
In such case, it is beneficial to `cache` the dataframe in order to accelerate
future access.

In [4]:
df = input_df.cache()

## Explore the Weblog data

Now that the log in cached in a dataframe, let's explore some of its content:


In [5]:
df.show() # prints up to 20 rows (default) of the dataframe

+---------+------------+--------------------+--------------------+--------------------+------+--------+-------------+-------+
|client_id|content_size|           date_time|            endpoint|          ip_address|method|protocol|response_code|user_id|
+---------+------------+--------------------+--------------------+--------------------+------+--------+-------------+-------+
|        -|           4|01/Mar/2018:23:07...|/v1/data/write/Ra...|  ::ffff:54.243.49.1|  POST|HTTP/1.1|          200|      -|
|        -|           4|01/Mar/2018:23:07...|/v1/data/write/Ra...|  ::ffff:54.243.49.1|  POST|HTTP/1.1|          200|      -|
|        -|           4|01/Mar/2018:23:07...|/v1/data/publish/...|::ffff:54.221.205.80|  POST|HTTP/1.1|          200|      -|
|        -|           4|01/Mar/2018:23:07...|/v1/data/write/Aa...| ::ffff:84.85.189.28|  POST|HTTP/1.1|          200|      -|
|        -|           4|01/Mar/2018:23:07...|/v1/data/publish/...|::ffff:54.221.205.80|  POST|HTTP/1.1|          200| 

Jupyter provides a better way to explore `pandas` dataframes as tables. 
> This will require converting the Spark Dataframe into Pandas Dataframe. This operation should only be executed on a subset of the dataframe.

This can be done as follows:

In [6]:
pdf = df.limit(1000).toPandas()

# pdf is a Pandas dataframe. All pandas API can be used on it.
display(pdf)

Unnamed: 0,client_id,content_size,date_time,endpoint,ip_address,method,protocol,response_code,user_id
0,-,4,01/Mar/2018:23:07:31 +0000,/v1/data/write/Ras_Beirut/statusRas_Beirut4,::ffff:54.243.49.1,POST,HTTP/1.1,200,-
1,-,4,01/Mar/2018:23:07:31 +0000,/v1/data/write/Ras_Beirut/statusRas_Beirut1,::ffff:54.243.49.1,POST,HTTP/1.1,200,-
2,-,4,01/Mar/2018:23:07:31 +0000,/v1/data/publish/ISS/position,::ffff:54.221.205.80,POST,HTTP/1.1,200,-
3,-,4,01/Mar/2018:23:07:31 +0000,/v1/data/write/Aanbouw/CVSturing,::ffff:84.85.189.28,POST,HTTP/1.1,200,-
4,-,4,01/Mar/2018:23:07:32 +0000,/v1/data/publish/ISS/position,::ffff:54.221.205.80,POST,HTTP/1.1,200,-
5,-,4,01/Mar/2018:23:07:33 +0000,/v1/data/write/Klima/Temperatur,::ffff:86.103.139.9,POST,HTTP/1.1,200,-
6,-,4,01/Mar/2018:23:07:33 +0000,/v1/data/write/Klima/Luftfeuchtigkeit,::ffff:86.103.139.9,POST,HTTP/1.1,200,-
7,-,4,01/Mar/2018:23:07:33 +0000,/v1/data/publish/ISS/position,::ffff:54.221.205.80,POST,HTTP/1.1,200,-
8,-,4,01/Mar/2018:23:07:34 +0000,/v1/data/write/bbt_raspi/cpu,::ffff:54.221.205.80,POST,HTTP/1.1,200,-
9,-,4,01/Mar/2018:23:07:34 +0000,/v1/data/write/bbt_raspi/memory,::ffff:54.221.205.80,POST,HTTP/1.1,200,-


The `schema` of the dataframe was defined by our custom parser. We can check it using:

In [7]:
df.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- content_size: long (nullable = true)
 |-- date_time: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- method: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- response_code: long (nullable = true)
 |-- user_id: string (nullable = true)



When exploring a data set, it is always interesting to get per column statistics
as `count`, `min`, `max`, `average` and `standard deviation` values. This can be done using:

In [8]:
# In plain Spark
df.describe().show()

# OR using toPandas conversion
display(df.describe().toPandas())

+-------+---------+------------------+--------------------+--------------------+--------------------+------+--------+------------------+-------+
|summary|client_id|      content_size|           date_time|            endpoint|          ip_address|method|protocol|     response_code|user_id|
+-------+---------+------------------+--------------------+--------------------+--------------------+------+--------+------------------+-------+
|  count|   336895|            336895|              336895|              336895|              336895|336895|  336895|            336895| 336895|
|   mean|     null| 38.14462072752638|                null|                null|                null|  null|    null|200.03355348105492|   null|
| stddev|     null|1670.5260288494235|                null|                null|                null|  null|    null| 2.602415337476263|   null|
|    min|        -|                 2|01/Mar/2018:23:07...|                   /|::ffff:101.141.22.75|   GET|HTTP/1.1|             

Unnamed: 0,summary,client_id,content_size,date_time,endpoint,ip_address,method,protocol,response_code,user_id
0,count,336895,336895.0,336895,336895,336895,336895,336895,336895.0,336895
1,mean,,38.14462072752638,,,,,,200.03355348105487,
2,stddev,,1670.5260288494237,,,,,,2.602415337476263,
3,min,-,2.0,01/Mar/2018:23:07:31 +0000,/,::ffff:101.141.22.75,GET,HTTP/1.1,200.0,-
4,max,-,685173.0,02/Mar/2018:21:34:45 +0000,/v1/public/data/read/jarooony/Azubibuero/tempe...,::ffff:96.231.156.111,POST,HTTP/1.1,404.0,-


## Cleaning up Weblog data

### Removing empty columns

Some of the values in Apache Log are optional, this is the case of `client_id`
and `user_id`. Check if these 2 columns are reported in the logs, and remove the
 corresponding columns if this is not the case.

> Hint you might want to check unique values of these columns.


In [9]:
# check distinct user and client ids
ids = df.select('user_id', 'client_id').distinct().collect()

print(ids)

[Row(user_id='-', client_id='-')]


As you can see, both columns have just one unique value: `-`. Let's now remove
the columns from the dataframe:


In [10]:
# Dropping user_id and client_id as they have no actual values, just the '-'.
ndf = df.drop('user_id', 'client_id')

ndf.show()

+------------+--------------------+--------------------+--------------------+------+--------+-------------+
|content_size|           date_time|            endpoint|          ip_address|method|protocol|response_code|
+------------+--------------------+--------------------+--------------------+------+--------+-------------+
|           4|01/Mar/2018:23:07...|/v1/data/write/Ra...|  ::ffff:54.243.49.1|  POST|HTTP/1.1|          200|
|           4|01/Mar/2018:23:07...|/v1/data/write/Ra...|  ::ffff:54.243.49.1|  POST|HTTP/1.1|          200|
|           4|01/Mar/2018:23:07...|/v1/data/publish/...|::ffff:54.221.205.80|  POST|HTTP/1.1|          200|
|           4|01/Mar/2018:23:07...|/v1/data/write/Aa...| ::ffff:84.85.189.28|  POST|HTTP/1.1|          200|
|           4|01/Mar/2018:23:07...|/v1/data/publish/...|::ffff:54.221.205.80|  POST|HTTP/1.1|          200|
|           4|01/Mar/2018:23:07...|/v1/data/write/Kl...| ::ffff:86.103.139.9|  POST|HTTP/1.1|          200|
|           4|01/Mar/2018:23

### Casting from string to time

As you could see in the datframe Schema, `date_time` column is in String format.
as `dd/MMM/yyyy:HH:mm:ss Z`. When manipulating dates or date time data, it is
always better to have it in the corresponding type.

Spark provides utility functions to work with dates and times. Let's create two columns:
* `ts`: with `date_time` column converted into a timestamp (`DateType`)
* `day`: with `date_time` column converted into `dd-MM-yyy` format.

We will use the following function (assuming Spark version >= v2.2):
* `to_timestamp`: casts a String formatted date time into a `DateType` object
  using an optionally provided format. This function is new in version 2.2.
* `date_format`: converts a `DateType` into a String in the specified format.

In [11]:
from pyspark.sql import functions as F

ndf = ndf.withColumn('ts', F.to_timestamp(ndf.date_time, 'dd/MMM/yyyy:HH:mm:ss Z'))
ndf = ndf.withColumn('day', F.date_format(ndf.ts, 'dd-MM-yyyy'))

ndf.show()

+------------+--------------------+--------------------+--------------------+------+--------+-------------+-------------------+----------+
|content_size|           date_time|            endpoint|          ip_address|method|protocol|response_code|                 ts|       day|
+------------+--------------------+--------------------+--------------------+------+--------+-------------+-------------------+----------+
|           4|01/Mar/2018:23:07...|/v1/data/write/Ra...|  ::ffff:54.243.49.1|  POST|HTTP/1.1|          200|2018-03-02 00:07:31|02-03-2018|
|           4|01/Mar/2018:23:07...|/v1/data/write/Ra...|  ::ffff:54.243.49.1|  POST|HTTP/1.1|          200|2018-03-02 00:07:31|02-03-2018|
|           4|01/Mar/2018:23:07...|/v1/data/publish/...|::ffff:54.221.205.80|  POST|HTTP/1.1|          200|2018-03-02 00:07:31|02-03-2018|
|           4|01/Mar/2018:23:07...|/v1/data/write/Aa...| ::ffff:84.85.189.28|  POST|HTTP/1.1|          200|2018-03-02 00:07:31|02-03-2018|
|           4|01/Mar/2018:2

## Analysing Weblog data

Let's now analyse the Weblog data to produce some reports. We will try to identify:
* Top IP addresses: IP addresses with the largest number of queries. Remember a
  Web query corresponds to a Weblog data entry. For every IP address, report the
  queries count and the content size.
* Top Endpoints: Endpoints with the largest number of requests. For every Endpoint,
  report the queries count and the total content size.
* Top Methods: number of queries and total content per Method.
* Requests count per day: Number of requests per day.
* Distribution of response codes.

### Top IP addresses

In order to get the top IP addresses with respect to queries count and content size,
we need to `groupBy` IP addresse then `aggregate` by row `count` and by `sum` of
content size. This can be done as follows:



In [12]:
from pyspark.sql import functions as F

top_ips = ndf.groupBy('ip_address').agg(F.count(ndf.method).alias('count'), F.sum(ndf.content_size).alias('size'))

top_ips.sort(F.desc('count')).limit(10).show()

+--------------------+-----+-------+
|          ip_address|count|   size|
+--------------------+-----+-------+
|::ffff:54.221.205.80|93234| 372936|
|::ffff:171.4.246.231|52924| 212611|
|::ffff:114.182.22...|42035|4161435|
|  ::ffff:54.243.49.1|29892| 119568|
|::ffff:86.103.139.38|26022| 104088|
|::ffff:190.192.57...|17026|  68104|
| ::ffff:84.85.189.28| 4608|  18432|
|::ffff:146.198.22...| 4444|  17776|
|   ::ffff:2.30.7.202| 4303|  17212|
| ::ffff:86.103.139.9| 3861|  15444|
+--------------------+-----+-------+



In this report we used the following:
* [pyspark.sql.DataFrame.groupBy](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy)
  to group the Weblog data using the `ip_address` column.
* [pyspark.sql.GroupedData.agg](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.GroupedData.agg)
  to aggregate grouped data according to the specified aggregation functions.
* [pyspark.sql.function.count](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.count)
  to return the number of items in a group.
* [pyspark.sql.function.sum](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.sum)
  to return the sum of all values, in our case the content size.
* [pyspark.sql.Column.alias](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.Column.alias)
  to rename a column.

### Top Endpoints

Similar to top IP addresses, we can get top endpoints as follows:


In [13]:
from pyspark.sql import functions as F

top_endpoints = ndf.groupBy('endpoint').agg(F.count(ndf.method).alias('count'))

top_endpoints.sort(F.desc('count')).limit(10).show()


+--------------------+-----+
|            endpoint|count|
+--------------------+-----+
|/v1/data/publish/...|79870|
|/v1/data/read/app...|42035|
|/v1/data/write/Kl...|13646|
|/v1/data/write/Kl...|13639|
|/v1/data/write/Ra...|13501|
|/v1/data/write/IS...| 7984|
|/v1/data/write/Ra...| 7077|
|/v1/data/write/Ra...| 7076|
|/v1/data/write/Ra...| 7071|
|/v1/data/write/Ra...| 4774|
+--------------------+-----+



### Top Methods

Top methods can be computed as follows:

In [14]:
from pyspark.sql import functions as F

top_methods = ndf.groupBy('method').agg(F.count(ndf.method).alias('count'), F.sum(ndf.content_size).alias('size'))

top_methods.sort(F.desc('count')).limit(10).show()


+------+------+-------+
|method| count|   size|
+------+------+-------+
|  POST|293601|6984063|
|   GET| 43289|5866513|
|  HEAD|     5|    156|
+------+------+-------+



### Queries per day

Number of queries per day can be calculated by grouping row count by `day` column:

In [15]:
from pyspark.sql import functions as F

daily_requests = ndf.groupBy('day').agg(F.count(ndf.method).alias('count'))

daily_requests.show()

+----------+------+
|       day| count|
+----------+------+
|02-03-2018|336895|
+----------+------+



### Response code distribution

When analyzing Web server logs, it is always important to analyze response codes.
The number or ratio of `4xx` and `5xx` response codes can be indicators of problems
affecting the services. We can calculate the distribution of response codes as
follows:

In [16]:
from pyspark.sql import functions as F

codes = ndf.groupBy('response_code').agg(F.count(ndf.method).alias('count'))

codes.show()

+-------------+------+
|response_code| count|
+-------------+------+
|          404|    26|
|          200|336839|
|          400|    30|
+-------------+------+



## Writing Data to File System

Spark defines an `interface`, `DataFrameWriter`, to write data to storage systems.
Storage systems can be HDFS, local file system, key-value stores, etc.

The `DataFrame.write()` method can be used to access the implemented storage systems.
Out of the box, Spark can write local file system and HDFS.

> Writing to Local File system: use this only on your single instance cluster or
for development. Writing to Local File system is a very bad idea on a Spark cluster.
You should rather use HDFS or alternative distributed and durable stores.


### Writing in CSV format

You can write to CSV format as follows:

In [28]:
ndf.write \
  .format('csv') \
  .save('PATH_TO_FOLDER/myweblog.csv')

This is equivalent to:

In [None]:
ndf.write \
  .csv('PATH_TO_FOLDER/myweblog.csv')

Try to run the write operation again! You should get an error `path already exists`.
By default, the behaviour of Spark Writer is to raise an error if it tries
writing to existing destination. You can override the default behavior by setting
the mode option. You can select from:
* `append`: appends content of the dataframe to write to existing data.
* `overwrite`: Replaces existing data by the content of the dataframe to write.
* `ignore`: Do nothing if the destination already exist.
* `error`: this is the default mode. It throws an error if destination already exists.


To override the destination with new data:

In [None]:
ndf.write \
  .mode("overwrite") \
  .csv('PATH_TO_FOLDER/myweblog.csv')


The override mode is idempotent, writing multiple times will lead to the same
data on the file system. This is not the case of `append` mode.
Change your code to write with append mode, run it multiple times and see how
data changes on disk.

> Selecting the right write mode is use case specific. You should take good care
when setting the write mode.

Open the csv file (or any of them) in a text editor and see how it was saved without
the column names. To include the header line containing the column names, you need
to set the `Header` option:


In [None]:
ndf.write \
  .mode("overwrite") \
  .csv('PATH_TO_FOLDER/myweblog.csv', header=True)

By default, writing to csv will not compress data. You can add compression by
setting the `compression` option:

In [None]:
ndf.write \
  .mode("overwrite") \
  .option("compression", "gzip") \
  .csv('PATH_TO_FOLDER/myweblog.csv', header=True)


### Writing in Parquet format

Writing to parquet (or any other format) is almost identical to writing to csv
format if we don't consider the csv specific serialization options. You can write
your dataframe in Parquet format as follows:


In [None]:
ndf.write \
  .format('parquet') \
  .save('PATH_TO_FOLDER/myweblog.parquet')

This is equivalent to:

In [None]:
ndf.write \
  .parquet('PATH_TO_FOLDER/myweblog.parquet')

Check your local file system where data was written and check the size of the
Parquet data. Compare it to CSV file size on disk to see how Parquet optimizes
data thanks to its columnar format and optimization techniques.

Now use `parquet-tools` to inspect the file and answer the following questions:
* What is the size of every column on disk?
* What type of statistics Parquet stores in columns metadata?
* Does Parquet stores the data schema? if yes what is the schema? Compare it to
  the schema of Spark dataframe.
* Play with the compression mode to test `none`, `snappy` and `gzip`. What can
  you say about the compression of Parquet data?


### Writing in ORC format

Writing to ORC format is similar to writing to Parquet format. The only thing
that changes if the format option:


In [None]:
ndf.write \
  .format('orc') \
  .save('PATH_TO_FOLDER/myweblog.orc')

This is equivalent to:

In [None]:
ndf.write \
  .orc('PATH_TO_FOLDER/myweblog.orc')

Now use `orc-tools` to inspect the file and answer the following questions:
* What is the size of every column on disk?
* What type of statistics ORC stores in columns metadata?
* Does ORC stores the data schema? if yes what is the schema? Compare it to
  the schema of Spark dataframe.
* Play with the compression mode to test `none`, `snappy` and `gzip`. What can
  you say about the compression of ORC data? How does it compare to Parquet?


### Writing in Avro format

Avro is a major binary row based format in the Hadoop ecosystem. However, Spark
does not provide a native Avro connector. Avro connector is available through
a third party package provided by Databricks. You need to explicitly import the
Avro connector.

Writing to Avro can be done as follows:


In [18]:
ndf.write \
  .format('avro') \
  .save('PATH_TO_FOLDER/myweblog.avro')


As you will notice, writing to avro will fail with the following message:

> AnalysisException: 'Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'

In order to overcome this error, you need to explicitly load the Spark Avro package as described in this [guide](https://spark.apache.org/docs/2.4.1/sql-data-sources-avro.html).

Stop your `pyspark` Jupyter Notebook integration and start it again with:

```
## For Scala v2.12
pyspark --packages org.apache.spark:spark-avro_2.12:2.4.1

## For Scala v2.11
pyspark --packages org.apache.spark:spark-avro_2.11:2.4.1
```

## Organizing Data on File System

Organizing stored data is a key point to consider when creating a data lake.
How data is organized on file system affects the performance of both the reads
and writes operations. It has also a direct impact on the volume of stored data.


### Partition by Columns

Partitioning data by columns allows to physically divide data on file system
according to the columns values. The advantage of partitioning by columns is a
much faster data access speeds. The drawback is the danger of having too much
files if the columns have a large number of unique values.

Therefore, partitioning by columns should be based on the use case requirements.

Let's see how partitioning by column works:

We will first try to partition our data by `method` column. The objective is to
have data files per `method`:


In [19]:
ndf.write \
  .partitionBy("method") \
  .mode("overwrite") \
  .parquet("PATH_TO_FOLDER/myweblog_partitioned.parquet")


Open a terminal and check the data structure in your destination output. You
should have something similar to:


In [23]:
%%bash
ls -all -R PATH_TO_FOLDER/myweblog_partitioned.parquet/

PATH_TO_FOLDER/myweblog_partitioned.parquet/:
total 24
drwxr-xr-x 5 ubuntu ubuntu 4096 avril 28 01:18 .
drwxr-xr-x 8 ubuntu ubuntu 4096 avril 28 01:17 ..
drwxr-xr-x 2 ubuntu ubuntu 4096 avril 28 01:18 method=GET
drwxr-xr-x 2 ubuntu ubuntu 4096 avril 28 01:17 method=HEAD
drwxr-xr-x 2 ubuntu ubuntu 4096 avril 28 01:18 method=POST
-rw-r--r-- 1 ubuntu ubuntu    0 avril 28 01:18 _SUCCESS
-rw-r--r-- 1 ubuntu ubuntu    8 avril 28 01:18 ._SUCCESS.crc

PATH_TO_FOLDER/myweblog_partitioned.parquet/method=GET:
total 356
drwxr-xr-x 2 ubuntu ubuntu   4096 avril 28 01:18 .
drwxr-xr-x 5 ubuntu ubuntu   4096 avril 28 01:18 ..
-rw-r--r-- 1 ubuntu ubuntu  46960 avril 28 01:17 part-00000-ba45b85e-71d9-45cc-8cd6-20f06a9d6ad0.c000.snappy.parquet
-rw-r--r-- 1 ubuntu ubuntu    376 avril 28 01:17 .part-00000-ba45b85e-71d9-45cc-8cd6-20f06a9d6ad0.c000.snappy.parquet.crc
-rw-r--r-- 1 ubuntu ubuntu 296923 avril 28 01:18 part-00001-ba45b85e-71d9-45cc-8cd6-20f06a9d6ad0.c000.snappy.parquet
-rw-r--r-- 1 ubuntu ubuntu 

See how the file system layout reflects perfectly the content of the `method`
column. Now, any query with a condition on the method column will only read the
corresponding partitions.

Try now to partition by `ip_address`, check the destination folder and answer to
the following questions:
* Why in our example partitioning by IP address is a bad idea?
* What columns in our data set you consider good choices to partition data on?

It is also possible to partition by multiple columns. How would you partition
data based on `method` and `response_code`?

In [None]:
## partition by method and response_code
## Your code goes here

Check the output destination on your file system.
* Does the column order in `partitionBy` has any impact?
* In the case of `method` and `response_code` columns, what would be the good order?
* Assume you have the following columns: `year`, `month`, `day`, how would you
  partition based on these columns? Why?

### Writing Sorted data

Saving sorted data to the file system can have double impact:
* reduce the required disk space when using columnar based formats
* improve the read performance of requests with conditions on the sorted columns


In [26]:
from pyspark.sql import functions as F

df \
  .sort(F.desc('date_time')) \
  .repartition(1) \
  .write \
  .mode("overwrite") \
  .parquet("PATH_TO_FOLDER/myweblog_sorted.parquet")


Compare now the size on disk of sorted and non sorted parquet data.
* What is the impact of sorting on `date_time` column on the file size?
  Why in your opinion?

Sorting based on *poorly selected* columns might have negative impact. Sort your
data now based on `endpoint` column then write it to parquet.
* What is the impact of sorting on `endpoint` column on the file size?
  Why in your opinion?

You can also sort data on multiple columns. Consider `endpoint`, `data_time` and
`ip_address` columns. You are asked to find the best sorting strategy that minimizes
disk space requirements.
* Does sort order have an impact?
* What is the sorting order that minimizes disk space? Why?

> Hint: you can use `parquet-tools` to debug the output files.
