# Structuration of the log file

<ins>Aim</ins> - Transform the raw log file into a structured dataset table suited for analysis.

<ins>Input</ins> - A file containing Apache Web server logs.

<ins>Output</ins> - After execution of the notebook, the directory `access.csv.d/` will contain CSV files with the a structured version of the logs. The columns of the CSV table are the fields `remote_host`, `timestamp_utc`, `request_first_line`, `request_header_referer`, `request_header_user_agent`, `request_method`, `response_bytes_clf` and `status`.

<ins>Method</ins> - Parse the log file `access.log`, identify the fields (remote host, received time, ...) of each log entry, construct a dataframe containing the data and write it on disk.

<ins>Tools</ins> - SparkSQL from Spark 2.4.3, apache_log_parser 1.7.0

**Table of content**
* [Initialisation](#init)
* [Parse the log file](#parse)
* [Convert received time](#convert)
* [Write on disk](#write)

## Initialisation <a class="anchor" id="init"></a>
Import libraries and start a Spark session.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import to_timestamp
import apache_log_parser

In [2]:
spark = SparkSession.builder.config('spark.driver.memory','8G').getOrCreate()
spark

## Parse the log file <a class="anchor" id="parse"></a>
Construct a list where each element is a log entry.

In [3]:
%%time
log_file = './access.log'
with open(log_file) as OPEN_LOG_FILE:
    entries = OPEN_LOG_FILE.readlines()

CPU times: user 1.88 s, sys: 1.04 s, total: 2.92 s
Wall time: 2.96 s


Construct a function using the Apache log parser to convert a log entry to a dictionary.

In [4]:
log_regex = '%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-agent}i\"'
log_parser = apache_log_parser.make_parser(log_regex)

def line2dict(entry, fields):
    entryd = log_parser(entry)
    return {field:entryd[field]
            for field in fields
            if  field in list(entryd.keys())}

Construct a Spark dataframe whose columns are a subset of the log entry fields.

In [5]:
%%time

some_entry_fields = ['remote_host',
                     'request_first_line',
                     'request_header_referer',
                     'request_header_user_agent',
                     'request_method',
                     'response_bytes_clf',
                     'status']
selected_entry_fields = some_entry_fields + ['time_received_utc_isoformat']

entries_df = spark.createDataFrame(Row(**line2dict(entry, selected_entry_fields)) for entry in entries[1:])

CPU times: user 14min, sys: 11.8 s, total: 14min 12s
Wall time: 14min 22s


## Convert received time <a class="anchor" id="convert"></a>
Convert from UTC string timestamp to number of seconds from the epoch.<br>
Remove entries that do not correspond to the format.

In [6]:
time_format = "yyyy-MM-dd'T'HH:mm:ss'+00:00'"
entries_df = entries_df.withColumn('timestamp_utc',
                                   to_timestamp(entries_df['time_received_utc_isoformat'],
                                                format=time_format).cast('long'))
entries_df = entries_df.where(entries_df['timestamp_utc'].isNotNull())

## Write on disk <a class="anchor" id="write"></a>
Write dataframe to CSV files.

In [7]:
%%time
output_directory = './access.csv.d'
csv_entry_fields = some_entry_fields + ['timestamp_utc']
entries_df.select(csv_entry_fields).write.option('header','true').csv(output_directory, mode='overwrite')

CPU times: user 10.5 ms, sys: 5.46 ms, total: 16 ms
Wall time: 38 s


Close the Spark session.

In [8]:
spark.stop()

*(end of the Structuration notebook)*