# Working with UDFs with Apache Spark (PySpark)

In this notebook we will extensively go through working with UDFs with Spark SQL as well as Spark DF API. We will learn how to write custom UDFs and register, register UDFs using lambda function without explicitly writing functions, how to write custom UDFs that take more than one parameter apart from the column names and also hack into PySpark source codes to write our own Custom UDFs.

`
@author: Anindya Saha  
@email: mail.anindya@gmail.com
`

## Table of contents

A brief synopsis of what each UDF use case is and what functionality does it touch on.

| Section                                                                             |        Demonstrates |
|:------------------------------------------------------------------------------------|:--------------------|
|[1. Understanding the Data Set](#1.-Understanding-the-Data-Set:)||
|[2. Creating the Spark Session](#2.-Creating-the-Spark-Session:)||
|[3. Load the Data From Files Into DataFrames](#3.-Load-the-Data-From-Files-Into-DataFrames:)||
|[3.1. Fix the Headers and Provide a Schema](#3.1.-Fix-the-Headers-and-Provide-a-Schema:)|Providing Custom Schema to Text File|
|[4.1. Extract Columns using In-built Regex Functions](#4.1.-Extract-Columns-using-In-built-Regex-Functions:)|Using REGEXP_EXTRACT from pyspark.sql.functions on DF API|
|[4.2 Extract Columns using Custom Regex Functions](#4.2-Extract-Columns-using-Custom-Regex-Functions:)||
|[4.2.1. Specific Extraction Functions](#4.2.1.-Specific-Extraction-Functions:)|Use Regex to Extract Specific Field|
|[4.2.2. Generic Extraction Functions](#4.2.2.-Generic-Extraction-Functions:)|Custom Regex functions taking Extra Parameter|
|[4.2.3. Register the function as an UDF to be used with DF API](#4.2.3.-Register-the-function-as-an-UDF-to-be-used-with-DF-API:)|Using UDF with DF API|
|[4.2.4. Curry up function that takes more than One Parameter](#4.2.4.-Curry-up-function-that-takes-more-than-One-Parameter:)|Workaround for DF API with Custom UDF that take extra Parameter beside Column name|
|[4.2.5. Register UDF on the Fly](#4.2.5.-Register-UDF-on-the-Fly:)|Create Anonymous UDFs for DF API|
|[4.2.6. Hack PySpark Codes to call Scala functions directly](#4.2.6.-Hack-PySpark-Codes-to-call-Scala-functions-directly:)|Drawing inspiration from PySpark source codes|
|[4.3. Putting together everything in one place](#4.3.-Putting-together-everything-in-one-place:)|Concluding the DF API use case|
|[5.1. Extract Columns using In-built Regex Functions](#5.1.-Extract-Columns-using-In-built-Regex-Functions:)|Using REGEXP_EXTRACT from pyspark.sql.functions on Spark SQL|
|[5.2. Extract Columns using Custom Regex Functions](#5.2.-Extract-Columns-using-Custom-Regex-Functions:)||
|[5.2.1. Register the function to the SQLContext as an udf to be used within SQL:](#5.2.1.-Register-the-function-to-the-SQLContext-as-an-udf-to-be-used-within-SQL:)|Using UDF with Spark SQL|
|[5.2.2. Function that takes more than One Parameter](#5.2.2.-Function-that-takes-more-than-One-Parameter:)|Spark SQL support Custom UDF that take extra Parameter beside Column name|
|[5.2.3. Register UDF on the Fly](#5.2.3.-Register-UDF-on-the-Fly:)|Create Anonymous UDFs for DF API|
|[5.3. Putting together everything in one place](#5.3.-Putting-together-everything-in-one-place:)|Concluding the Spark SQL use case|

In [1]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

In [2]:
pd.set_option('display.max_columns', 50)
pd.set_option('display.max_colwidth', 20)

In [3]:
# setting random seed for notebook reproducability
rnd_seed=42
np.random.seed=rnd_seed
np.random.set_state=rnd_seed

## 1. Understanding the Data Set:

**restore_log_contents** - a collection of events, one per line, where each event is an instance when a file was resotred from backup.

Following are two log lines from the file. Note all log lines are not of same length, some has more information than the others.

```
2018083015552003,AUDIT-LOG,Thu Aug 30 09:56:02 CEST 2018 RestoreTransfer[Aug 30 09:56:02]:616daf5c-ac23-11e8-946b-00a098de77e9 Operation-Uuid=23cfa2c4-ac2a-11e8-946b-00a098de77e9 Group=none Operation-Cookie=0 action=Start source=172.19.32.86:/share/afftest_testnfs destination=afftest:testnfs
```

```
2018083100500289,AUDIT-LOG,Thu Aug 30 23:43:24 PDT 2018 RestoreTransfer[Aug 30 23:43:12]:1e74b7a8-ace9-11e8-8278-00a098aa1cdd Operation-Uuid=1e74b79b-ace9-11e8-8278-00a098aa1cdd Group=none Operation-Cookie=0 action=End source=MAN-PRD-FS:LHR_PRD_FS_LHR_PRD_FS_candidateRM_mirror_vault destination=MAN-PRD-FS:Gerry_LHR_PRD_FS1_candidateRM status=Success bytes_transferred=268838556 network_compression_ratio=1.0:1 transfer_desc=Logical Transfer with Storage Efficiency - Optimized Directory Mode
```

We are interested in finding the following sections from the log file. We will use regex extensivly to capture these patterns.

![](assets/sections_of_interest.png)

Please check [regex101](https://regex101.com/) to try out your regex before coding them.

[Back to Top](#Table-of-contents)

## 2. Creating the Spark Session:

In [4]:
# The following must be set in your .bashrc file
#SPARK_HOME="/home/ubuntu/spark-2.4.0-bin-hadoop2.7"
#ANACONDA_HOME="/home/ubuntu/anaconda3/envs/pyspark"
#PYSPARK_PYTHON="$ANACONDA_HOME/bin/python"
#PYSPARK_DRIVER_PYTHON="$ANACONDA_HOME/bin/python"
#PYTHONPATH="$ANACONDA_HOME/bin/python"
#export PATH="$ANACONDA_HOME/bin:$SPARK_HOME/bin:$PATH"

In [5]:
os.environ['SPARK_HOME']

'D:\\Work\\spark-2.3.0-bin-hadoop2.7'

In [6]:
spark = (SparkSession
         .builder
         .master('local[*]')
         .appName('working-with-udfs')
         .getOrCreate())

In [7]:
spark

In [8]:
sc = spark.sparkContext
sc

In [9]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext

<pyspark.sql.context.SQLContext at 0x211138b8978>

## 3. Load the Data From Files Into DataFrames:

In [10]:
audit_log = spark.read.csv(path='data/restore_log_contents.csv', header=False, inferSchema=True).cache()

In [11]:
audit_log.show(5, truncate=False)

+----------------+---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0             |_c1      |_c2                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
+----------------+---------+------------------------------------

### 3.1. Fix the Headers and Provide a Schema:

The Log file clearly lacks Headers and we did not provide any schema. Let's fix them one by one.

In [12]:
# define the schema, corresponding to a line in the csv data file for log
log_schema = StructType([StructField('event_id', StringType(), nullable=False), 
                         StructField('log_type', StringType(), nullable=False),
                         StructField('log_text', StringType(), nullable=False)])

In [13]:
audit_log = spark.read.csv(path='data/restore_log_contents.csv', header=False, schema=log_schema).cache()

In [14]:
audit_log.show(5, truncate=False)

+----------------+---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|event_id        |log_type |log_text                                                                                                                                                                                                                                                                                                                                                                                                                                              |
+----------------+---------+------------------------------------

In [15]:
audit_log.createOrReplaceTempView('audit_log')

## 4. Using UDFs with DF API:

### 4.1. Extract Columns using In-built Regex Functions:

To start with we will use the `regexp_extract` function from the `pyspark.sql.functions` package.

**Note:** Notice the usage of `\\` in the regex expression. When you try at [regex101](https://regex101.com/) you do not need to give the two `\\`; one `\` is enough. `\\` is provided to escape one `\`.

In [16]:
audit_log_inbuilt_regex = (audit_log
            .withColumn('log_time', 
                F.regexp_extract('log_text', '^(\\w{3}\\s\\w{3}\\s\\d{2}\\s\\d{2}:\\d{2}:\\d{2}\\s\\w{3,4}\\s\\d{4})', idx=1))
            .withColumn('event_time', 
                F.regexp_extract('log_text', '\\[(\\w{3}\\s\\d{2}\\s\\d{2}:\\d{2}:\\d{2})]', idx=1))
            .withColumn('relationship_id', 
                F.regexp_extract('log_text', '\\]\\:(.*?)\\s+', idx=1))
            .withColumn('operation_uuid', 
                F.regexp_extract('log_text', 'Operation-Uuid\\=(.*?)\\s+', idx=1))
            .withColumn('action', 
                F.regexp_extract('log_text', 'action\\=(.*?)\\s+', idx=1))
            .withColumn('source', 
                F.regexp_extract('log_text', 'source\\=(.*?)\\s+', idx=1))
            .withColumn('destination', 
                F.regexp_extract('log_text', 'destination\\=(.*?)(?=\\s|$)', idx=1))
            .withColumn('status', 
                F.regexp_extract('log_text', 'status\\=(.*?)\\s+', idx=1))
            .withColumn('bytes_transferred', 
                F.regexp_extract('log_text', 'bytes_transferred\\=(.*?)\\s+', idx=1))
               )
audit_log_inbuilt_regex.cache()

DataFrame[event_id: string, log_type: string, log_text: string, log_time: string, event_time: string, relationship_id: string, operation_uuid: string, action: string, source: string, destination: string, status: string, bytes_transferred: string]

In [17]:
(audit_log_inbuilt_regex
 .select('event_id', 'log_time', 'event_time', 'relationship_id', 'operation_uuid',  
         'action', 'source', 'destination', 'status', 'bytes_transferred')
.limit(5).toPandas())

Unnamed: 0,event_id,log_time,event_time,relationship_id,operation_uuid,action,source,destination,status,bytes_transferred
0,2018083015552003,Thu Aug 30 09:56...,Aug 30 09:56:02,616daf5c-ac23-11...,23cfa2c4-ac2a-11...,Start,172.19.32.86:/sh...,afftest:testnfs,,
1,2018083015552003,Thu Aug 30 09:56...,Aug 30 09:56:02,616daf5c-ac23-11...,23cfa2c4-ac2a-11...,End,172.19.32.86:/sh...,afftest:testnfs,Success,0.0
2,2018083100220028,Thu Aug 30 13:44...,Aug 30 13:44:10,6f3486ed-ac95-11...,6f3486d7-ac95-11...,Start,tiblab-fs1:resto...,eaclab-fs1:tibla...,,
3,2018083100220028,Thu Aug 30 13:44...,Aug 30 13:44:10,6f3486ed-ac95-11...,6f3486d7-ac95-11...,End,tiblab-fs1:resto...,eaclab-fs1:tibla...,Success,513743160.0
4,2018083100500332,Thu Aug 30 23:40...,Aug 30 23:40:12,b2f4d3bc-ace8-11...,b2f4d3ae-ace8-11...,Start,MAN-PRD-FS:LHR_P...,MAN-PRD-FS1:Gerr...,,


### 4.2 Extract Columns using Custom Regex Functions:

We will now develop our own custom functions.

#### 4.2.1. Specific Extraction Functions:

Custom function to extract the `log_time` section in the log.

In [18]:
import re
def extract_log_time(log):
    pattern = re.compile('^(\\w{3}\\s\\w{3}\\s\\d{2}\\s\\d{2}:\\d{2}:\\d{2}\\s\\w{3,4}\\s\\d{4})')
    match = re.search(pattern, log)
    if match:
        return match.group(1)
    else:
        None

In [19]:
# Let's apply that to extract the log_time
extract_log_time("Thu Aug 30 09:56:02 CEST 2018 RestoreTransfer[Aug 30 09:56:02]:616daf5c-ac23-11e8-946b-00a098de77e9 Operation-Uuid=23cfa2c4-ac2a-11e8-946b-00a098de77e9 Group=none Operation-Cookie=0 action=Start source=172.19.32.86:/share/afftest_testnfs destination=afftest:testnfs")

'Thu Aug 30 09:56:02 CEST 2018'

Custom function to extract the `status` section in the log.

In [20]:
import re
def extract_status(log):
    pattern = re.compile('status\\=(.*?)\\s+')
    match = re.search(pattern, log)
    if match:
        return match.group(1)
    else:
        None

In [21]:
# Let's apply that to extract the status
extract_status("Thu Aug 30 09:56:02 CEST 2018 RestoreTransfer[Aug 30 09:56:02]:616daf5c-ac23-11e8-946b-00a098de77e9 Operation-Uuid=23cfa2c4-ac2a-11e8-946b-00a098de77e9 Group=none Operation-Cookie=0 action=Start source=172.19.32.86:/share/afftest_testnfs destination=afftest:testnfs")

**Oops!** What happened? Well the log line did not have the `status` section. Remember in the beginning I mentioned that not all sections are present in all log lines.

Let's try another log line.

In [22]:
# Let's apply that to extract the status of another log line
extract_status("Thu Aug 30 13:44:22 PDT 2018 RestoreTransfer[Aug 30 13:44:10]:6f3486ed-ac95-11e8-ac6e-00a0982d695e Operation-Uuid=6f3486d7-ac95-11e8-ac6e-00a0982d695e Group=none Operation-Cookie=0 action=End source=tiblab-fs1:restore1_vault destination=eaclab-fs1:tiblab_fs1_restore1_vault_restore status=Success bytes_transferred=513743160 network_compression_ratio=1.0:1 transfer_desc=Logical Transfer with Storage Efficiency - Optimized Directory Mode")

'Success'

Okay! This log line hasd the `status` section. It is important to note that not all sections will be present in all line, hence we added `if match` code block in our functions otherwise we would get exceptions.

#### 4.2.2. Generic Extraction Functions:

A generic function which takes the regular expression and the column name and applies the pattern on the column and extracts out the first match.

In [23]:
import re
def extract_pattern(regex, log):
    pattern = re.compile(regex)
    match = re.search(pattern, log)
    if match:
        return match.group(1)
    else:
        None

In [24]:
# Let's apply that to extract the operation_uuid
extract_pattern('Operation-Uuid\\=(.*?)\\s+', "Thu Aug 30 13:44:22 PDT 2018 RestoreTransfer[Aug 30 13:44:10]:6f3486ed-ac95-11e8-ac6e-00a0982d695e Operation-Uuid=6f3486d7-ac95-11e8-ac6e-00a0982d695e Group=none Operation-Cookie=0 action=End source=tiblab-fs1:restore1_vault destination=eaclab-fs1:tiblab_fs1_restore1_vault_restore status=Success bytes_transferred=513743160 network_compression_ratio=1.0:1 transfer_desc=Logical Transfer with Storage Efficiency - Optimized Directory Mode")

'6f3486d7-ac95-11e8-ac6e-00a0982d695e'

In [25]:
# Let's apply that to extract the status where the section is absent
extract_pattern('status\\=(.*?)\\s+', "Thu Aug 30 09:56:02 CEST 2018 RestoreTransfer[Aug 30 09:56:02]:616daf5c-ac23-11e8-946b-00a098de77e9 Operation-Uuid=23cfa2c4-ac2a-11e8-946b-00a098de77e9 Group=none Operation-Cookie=0 action=Start source=172.19.32.86:/share/afftest_testnfs destination=afftest:testnfs")

In [26]:
# Let's apply that to extract the status of another log line
extract_pattern('status\\=(.*?)\\s+', "Thu Aug 30 13:44:22 PDT 2018 RestoreTransfer[Aug 30 13:44:10]:6f3486ed-ac95-11e8-ac6e-00a0982d695e Operation-Uuid=6f3486d7-ac95-11e8-ac6e-00a0982d695e Group=none Operation-Cookie=0 action=End source=tiblab-fs1:restore1_vault destination=eaclab-fs1:tiblab_fs1_restore1_vault_restore status=Success bytes_transferred=513743160 network_compression_ratio=1.0:1 transfer_desc=Logical Transfer with Storage Efficiency - Optimized Directory Mode")

'Success'

#### 4.2.3. Register the function as an UDF to be used with DF API:

In [27]:
# Register extract_log_time as an UDF
udf_extract_log_time = udf(extract_log_time, StringType())

In [28]:
# Register extract_status as an UDF
udf_extract_status = udf(extract_status, StringType())

In [29]:
# Use the above UDFs to extract the fields
(audit_log
 .withColumn('log_time', udf_extract_log_time('log_text'))
 .withColumn('status', udf_extract_status('log_text'))
).show(5)

+----------------+---------+--------------------+--------------------+-------+
|        event_id| log_type|            log_text|            log_time| status|
+----------------+---------+--------------------+--------------------+-------+
|2018083015552003|AUDIT-LOG|Thu Aug 30 09:56:...|Thu Aug 30 09:56:...|   null|
|2018083015552003|AUDIT-LOG|Thu Aug 30 09:56:...|Thu Aug 30 09:56:...|Success|
|2018083100220028|AUDIT-LOG|Thu Aug 30 13:44:...|Thu Aug 30 13:44:...|   null|
|2018083100220028|AUDIT-LOG|Thu Aug 30 13:44:...|Thu Aug 30 13:44:...|Success|
|2018083100500332|AUDIT-LOG|Thu Aug 30 23:40:...|Thu Aug 30 23:40:...|   null|
+----------------+---------+--------------------+--------------------+-------+
only showing top 5 rows



#### 4.2.4. Curry up function that takes more than One Parameter:

**Unfortunately PySpark DataFrame udfs take only col names as parameters and nothing else.** So we cannot use `extract_pattern` as `udf(extract_pattern, StringType())` and pass the reqex pattern as well as the column name in the DF API calls. We need to curry the function, so that the only argument in the DataFrame call is the name of the column on which we want the function to act.

In [30]:
# Build a wrapper around extract_pattern to be used as an UDF in DF API
event_time_pattern = '\\[(\\w{3}\\s\\d{2}\\s\\d{2}:\\d{2}:\\d{2})]'
udf_extract_event_time = udf(lambda log_text : extract_pattern(event_time_pattern, log_text), StringType())

In [31]:
(audit_log
  .withColumn('event_time', udf_extract_event_time('log_text'))
 ).show(5)

+----------------+---------+--------------------+---------------+
|        event_id| log_type|            log_text|     event_time|
+----------------+---------+--------------------+---------------+
|2018083015552003|AUDIT-LOG|Thu Aug 30 09:56:...|Aug 30 09:56:02|
|2018083015552003|AUDIT-LOG|Thu Aug 30 09:56:...|Aug 30 09:56:02|
|2018083100220028|AUDIT-LOG|Thu Aug 30 13:44:...|Aug 30 13:44:10|
|2018083100220028|AUDIT-LOG|Thu Aug 30 13:44:...|Aug 30 13:44:10|
|2018083100500332|AUDIT-LOG|Thu Aug 30 23:40:...|Aug 30 23:40:12|
+----------------+---------+--------------------+---------------+
only showing top 5 rows



#### 4.2.5. Register UDF on the Fly:

We can also use our custom function as an UDF by creating an UDF on the fly. If we want to use the custom function over and over again with different parameters then creating new UDF functions for each different parameter might be cumbersome. In that case we can register our function as an UDF on the fly and pass different parameters as required.

In [32]:
(audit_log
  .withColumn('relationship_id', udf(lambda log : extract_pattern('\\]\\:(.*?)\\s+', log), StringType())('log_text'))
 ).show(5)

+----------------+---------+--------------------+--------------------+
|        event_id| log_type|            log_text|     relationship_id|
+----------------+---------+--------------------+--------------------+
|2018083015552003|AUDIT-LOG|Thu Aug 30 09:56:...|616daf5c-ac23-11e...|
|2018083015552003|AUDIT-LOG|Thu Aug 30 09:56:...|616daf5c-ac23-11e...|
|2018083100220028|AUDIT-LOG|Thu Aug 30 13:44:...|6f3486ed-ac95-11e...|
|2018083100220028|AUDIT-LOG|Thu Aug 30 13:44:...|6f3486ed-ac95-11e...|
|2018083100500332|AUDIT-LOG|Thu Aug 30 23:40:...|b2f4d3bc-ace8-11e...|
+----------------+---------+--------------------+--------------------+
only showing top 5 rows



#### 4.2.6. Hack PySpark Codes to call Scala functions directly:

If we are still unhappy with the way we have to curry up our Pyspark UDFs we can just look into the source code of PySpark and hack up our own methods.

Here I just hacked the PySpark's `regexp_extract` and created my own `custom_regexp_extract`.

In [33]:
from pyspark.sql.column import Column, _to_java_column, _to_seq

In [34]:
def custom_regexp_extract(str, pattern, idx):
    sc = SparkContext._active_spark_context
    jc = sc._jvm.functions.regexp_extract(_to_java_column(str), pattern, idx)
    return Column(jc)

In [35]:
(audit_log
  .withColumn('operation_uuid', custom_regexp_extract('log_text', 'Operation-Uuid\\=(.*?)\\s+', 1))
 ).show(5)

+----------------+---------+--------------------+--------------------+
|        event_id| log_type|            log_text|      operation_uuid|
+----------------+---------+--------------------+--------------------+
|2018083015552003|AUDIT-LOG|Thu Aug 30 09:56:...|23cfa2c4-ac2a-11e...|
|2018083015552003|AUDIT-LOG|Thu Aug 30 09:56:...|23cfa2c4-ac2a-11e...|
|2018083100220028|AUDIT-LOG|Thu Aug 30 13:44:...|6f3486d7-ac95-11e...|
|2018083100220028|AUDIT-LOG|Thu Aug 30 13:44:...|6f3486d7-ac95-11e...|
|2018083100500332|AUDIT-LOG|Thu Aug 30 23:40:...|b2f4d3ae-ace8-11e...|
+----------------+---------+--------------------+--------------------+
only showing top 5 rows



### 4.3. Putting together everything in one place:

Here we show all the 4 types of custom UDF that we have discussed so far along with inbuilt UDFs being implemented in DF API call  and how they work seamless with each other.

In [36]:
audit_log_custom_regex = (audit_log
            .withColumn('log_time', udf_extract_log_time('log_text'))
            .withColumn('event_time', udf_extract_event_time('log_text'))
            .withColumn('relationship_id', udf(lambda log : extract_pattern('\\]\\:(.*?)\\s+', log), StringType())('log_text'))
            .withColumn('operation_uuid', custom_regexp_extract('log_text', 'Operation-Uuid\\=(.*?)\\s+', 1))
            .withColumn('action', 
                F.regexp_extract('log_text', 'action\\=(.*?)\\s+', idx=1))
            .withColumn('source', custom_regexp_extract('log_text', 'source\\=(.*?)\\s+', 1))
            .withColumn('destination', udf(lambda log : extract_pattern('destination\\=(.*?)(?=\\s|$)', log), StringType())('log_text'))
            .withColumn('status', 
                F.regexp_extract('log_text', 'status\\=(.*?)\\s+', idx=1))
            .withColumn('bytes_transferred', 
                F.regexp_extract('log_text', 'bytes_transferred\\=(.*?)\\s+', idx=1))
               )

In [37]:
audit_log_inbuilt_regex.cache()

DataFrame[event_id: string, log_type: string, log_text: string, log_time: string, event_time: string, relationship_id: string, operation_uuid: string, action: string, source: string, destination: string, status: string, bytes_transferred: string]

In [38]:
audit_log_custom_regex.limit(5).toPandas()

Unnamed: 0,event_id,log_type,log_text,log_time,event_time,relationship_id,operation_uuid,action,source,destination,status,bytes_transferred
0,2018083015552003,AUDIT-LOG,Thu Aug 30 09:56...,Thu Aug 30 09:56...,Aug 30 09:56:02,616daf5c-ac23-11...,23cfa2c4-ac2a-11...,Start,172.19.32.86:/sh...,afftest:testnfs,,
1,2018083015552003,AUDIT-LOG,Thu Aug 30 09:56...,Thu Aug 30 09:56...,Aug 30 09:56:02,616daf5c-ac23-11...,23cfa2c4-ac2a-11...,End,172.19.32.86:/sh...,afftest:testnfs,Success,0.0
2,2018083100220028,AUDIT-LOG,Thu Aug 30 13:44...,Thu Aug 30 13:44...,Aug 30 13:44:10,6f3486ed-ac95-11...,6f3486d7-ac95-11...,Start,tiblab-fs1:resto...,eaclab-fs1:tibla...,,
3,2018083100220028,AUDIT-LOG,Thu Aug 30 13:44...,Thu Aug 30 13:44...,Aug 30 13:44:10,6f3486ed-ac95-11...,6f3486d7-ac95-11...,End,tiblab-fs1:resto...,eaclab-fs1:tibla...,Success,513743160.0
4,2018083100500332,AUDIT-LOG,Thu Aug 30 23:40...,Thu Aug 30 23:40...,Aug 30 23:40:12,b2f4d3bc-ace8-11...,b2f4d3ae-ace8-11...,Start,MAN-PRD-FS:LHR_P...,MAN-PRD-FS1:Gerr...,,


## 5. Using UDFs with PySpark SQL:

In [39]:
spark.sql("select * from audit_log").show(5)

+----------------+---------+--------------------+
|        event_id| log_type|            log_text|
+----------------+---------+--------------------+
|2018083015552003|AUDIT-LOG|Thu Aug 30 09:56:...|
|2018083015552003|AUDIT-LOG|Thu Aug 30 09:56:...|
|2018083100220028|AUDIT-LOG|Thu Aug 30 13:44:...|
|2018083100220028|AUDIT-LOG|Thu Aug 30 13:44:...|
|2018083100500332|AUDIT-LOG|Thu Aug 30 23:40:...|
+----------------+---------+--------------------+
only showing top 5 rows



### 5.1. Extract Columns using In-built Regex Functions:

We can use the same regex expressions and the inbuilt `regexp_extract` function to extract the fields from the `log_text` column. 

**Note:** However we now need to use three `\\\` in the SQL for regex expressions instead of two `\\` as in the DF API. When you try at [regex101](https://regex101.com/) you do not need to give the two `\\`; one `\` is enough; `\\` is provided to escape one `\`.

In [40]:
spark.sql(
"""
SELECT 
regexp_extract(log_text, '^(\\\w{3}\\\s\\\w{3}\\\s\\\d{2}\\\s\\\d{2}:\\\d{2}:\\\d{2}\\\s\\\w{3,4}\\\s\\\d{4})', 1) as log_time,
regexp_extract(log_text, '\\\[(\\\w{3}\\\s\\\d{2}\\\s\\\d{2}:\\\d{2}:\\\d{2})]', 1) as event_time,
regexp_extract(log_text, '\\\]\\\:(.*?)\\\s+', 1) as relationship_id,
regexp_extract(log_text, 'Operation-Uuid\\\=(.*?)\\\s+', 1) as operation_uuid,
regexp_extract(log_text, 'action\\\=(.*?)\\\s+', 1) as action,
regexp_extract(log_text, 'source\\\=(.*?)\\\s+', 1) as source,
regexp_extract(log_text, 'destination\\\=(.*?)(?=\\\s|$)', 1) as destination,
regexp_extract(log_text, 'status\\\=(.*?)\\\s+', 1) as status,
regexp_extract(log_text, 'bytes_transferred\\\=(.*?)\\\s+', 1) as bytes_transferred
FROM audit_log
""").limit(5).toPandas()

Unnamed: 0,log_time,event_time,relationship_id,operation_uuid,action,source,destination,status,bytes_transferred
0,Thu Aug 30 09:56...,Aug 30 09:56:02,616daf5c-ac23-11...,23cfa2c4-ac2a-11...,Start,172.19.32.86:/sh...,afftest:testnfs,,
1,Thu Aug 30 09:56...,Aug 30 09:56:02,616daf5c-ac23-11...,23cfa2c4-ac2a-11...,End,172.19.32.86:/sh...,afftest:testnfs,Success,0.0
2,Thu Aug 30 13:44...,Aug 30 13:44:10,6f3486ed-ac95-11...,6f3486d7-ac95-11...,Start,tiblab-fs1:resto...,eaclab-fs1:tibla...,,
3,Thu Aug 30 13:44...,Aug 30 13:44:10,6f3486ed-ac95-11...,6f3486d7-ac95-11...,End,tiblab-fs1:resto...,eaclab-fs1:tibla...,Success,513743160.0
4,Thu Aug 30 23:40...,Aug 30 23:40:12,b2f4d3bc-ace8-11...,b2f4d3ae-ace8-11...,Start,MAN-PRD-FS:LHR_P...,MAN-PRD-FS1:Gerr...,,


### 5.2. Extract Columns using Custom Regex Functions:

#### 5.2.1. Register the function to the SQLContext as an udf to be used within SQL:

In [41]:
# Register extract_log_time as an UDF with Spark SQLContext
spark.udf.register('udf_extract_log_time', extract_log_time, StringType())

<function __main__.extract_log_time>

In [42]:
# Register extract_status as an UDF with Spark SQLContext
spark.udf.register('udf_extract_status', extract_status, StringType())

<function __main__.extract_status>

In [43]:
# Use the above UDFs to extract the fields
spark.sql(
"""
SELECT 
udf_extract_log_time(log_text) as log_time,
udf_extract_status(log_text) as status
FROM audit_log
""").show(5, truncate=False)

+-----------------------------+-------+
|log_time                     |status |
+-----------------------------+-------+
|Thu Aug 30 09:56:02 CEST 2018|null   |
|Thu Aug 30 09:56:02 CEST 2018|Success|
|Thu Aug 30 13:44:10 PDT 2018 |null   |
|Thu Aug 30 13:44:22 PDT 2018 |Success|
|Thu Aug 30 23:40:12 PDT 2018 |null   |
+-----------------------------+-------+
only showing top 5 rows



#### 5.2.2. Function that takes more than One Parameter:

**Fortunately PySpark SQL UDFs can take additional parameters along with column name.** So we can use `extract_pattern` as `spark.udf.register('udf_extract_pattern', extract_pattern, StringType())` and pass the reqex pattern as well as the column name in the SQL calls. We do need to curry the function.

In [44]:
# Register the genericc extract_pattern function as an UDF with Spark SQLContext
spark.udf.register('udf_extract_pattern', extract_pattern, StringType())

<function __main__.extract_pattern>

In [45]:
spark.sql(
"""
SELECT 
udf_extract_pattern('\\\[(\\\w{3}\\\s\\\d{2}\\\s\\\d{2}:\\\d{2}:\\\d{2})]', log_text) as event_time
FROM audit_log
""").show(5, truncate=False)

+---------------+
|event_time     |
+---------------+
|Aug 30 09:56:02|
|Aug 30 09:56:02|
|Aug 30 13:44:10|
|Aug 30 13:44:10|
|Aug 30 23:40:12|
+---------------+
only showing top 5 rows



#### 5.2.3. Register UDF on the Fly:

We can also use our custom function as an UDF by creating an UDF on the fly. If we want to use the custom function over and over again with different parameters then creating new UDF functions for each different parameter might be cumbersome. In that case we can register our function as an UDF on the fly and pass different parameters as required.

In [46]:
spark.udf.register('udf_relationship_id', lambda log : extract_pattern('\\]\\:(.*?)\\s+', log), StringType())

<function __main__.<lambda>>

In [47]:
spark.sql(
"""
SELECT udf_relationship_id(log_text) as relationship_id
FROM audit_log
""").show(5, truncate=False)

+------------------------------------+
|relationship_id                     |
+------------------------------------+
|616daf5c-ac23-11e8-946b-00a098de77e9|
|616daf5c-ac23-11e8-946b-00a098de77e9|
|6f3486ed-ac95-11e8-ac6e-00a0982d695e|
|6f3486ed-ac95-11e8-ac6e-00a0982d695e|
|b2f4d3bc-ace8-11e8-8278-00a098aa1cdd|
+------------------------------------+
only showing top 5 rows



### 5.3. Putting together everything in one place:

Here we show all the 3 types of custom UDF that we have discussed so far along with inbuilt UDFs being implemented in one query and how they work seamless with each other.

In [48]:
spark.sql(
"""
SELECT 
udf_extract_log_time(log_text) as log_time,
udf_extract_pattern('\\\[(\\\w{3}\\\s\\\d{2}\\\s\\\d{2}:\\\d{2}:\\\d{2})]', log_text) as event_time,
udf_relationship_id(log_text) as relationship_id,
regexp_extract(log_text, 'Operation-Uuid\\\=(.*?)\\\s+', 1) as operation_uuid,
regexp_extract(log_text, 'action\\\=(.*?)\\\s+', 1) as action,
udf_extract_pattern('source\\\=(.*?)\\\s+', log_text) as source,
regexp_extract(log_text, 'destination\\\=(.*?)(?=\\\s|$)', 1) as destination,
udf_extract_status(log_text) as status,
regexp_extract(log_text, 'bytes_transferred\\\=(.*?)\\\s+', 1) as bytes_transferred
FROM audit_log
""").limit(5).toPandas()

Unnamed: 0,log_time,event_time,relationship_id,operation_uuid,action,source,destination,status,bytes_transferred
0,Thu Aug 30 09:56...,Aug 30 09:56:02,616daf5c-ac23-11...,23cfa2c4-ac2a-11...,Start,172.19.32.86:/sh...,afftest:testnfs,,
1,Thu Aug 30 09:56...,Aug 30 09:56:02,616daf5c-ac23-11...,23cfa2c4-ac2a-11...,End,172.19.32.86:/sh...,afftest:testnfs,Success,0.0
2,Thu Aug 30 13:44...,Aug 30 13:44:10,6f3486ed-ac95-11...,6f3486d7-ac95-11...,Start,tiblab-fs1:resto...,eaclab-fs1:tibla...,,
3,Thu Aug 30 13:44...,Aug 30 13:44:10,6f3486ed-ac95-11...,6f3486d7-ac95-11...,End,tiblab-fs1:resto...,eaclab-fs1:tibla...,Success,513743160.0
4,Thu Aug 30 23:40...,Aug 30 23:40:12,b2f4d3bc-ace8-11...,b2f4d3ae-ace8-11...,Start,MAN-PRD-FS:LHR_P...,MAN-PRD-FS1:Gerr...,,


In [49]:
spark.stop()