# Pyspark job to process events files

## 1. Introduction  
This notebook shows how to process the events from files using Pyspark.

## 2. Connection with Spark cluster  
Create a spark session to connect to cluster. We need to pass master URL and give a name to the application(shown in cluster web UI).  
This object will be used to perform actions with the cluster.

In [1]:
from pyspark.sql import SparkSession

#Create Spark session
spark = SparkSession.\
        builder.\
        appName("file-processing").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/28 03:05:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 3. First things first: Data exploration
Let's take a first look on the raw data that we are going to process. As described in docs([link](https://github.com/ShimabukuroA/file-processing-pipeline/tree/master)), every event follows a well-defined schema of fields. We can notice that our uncompressed data are JSON files. Thus, it seems reasonable to perform the read with Pyspark JSON API.  
To work with JSON data, we can define a schema to read the events listing each field to parse. For this first exploration, we are going to define all fields as String to avoid convertion errors, althought we may notice impact on processing perfomance.

In [2]:
from pyspark.sql.types import StructType, StructField, StringType

# Define a schema
event_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("domain", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("data", StringType(), True)
])

# Read JSON files
df = spark.read.schema(event_schema).json("file:///opt/sandbox/data/input/*.json")

In [3]:
# Show 10 samples of the data
df.show(10)

                                                                                

+--------------------+-------------------+-----------+-------------+--------------------+
|            event_id|          timestamp|     domain|   event_type|                data|
+--------------------+-------------------+-----------+-------------+--------------------+
|d2b9bccc-c344-425...|2021-01-19T14:11:46|    account|status-change|{"id":169213,"old...|
|dcf60394-3234-408...|2021-01-04T12:28:11|transaction|     creation|{"id":561507,"typ...|
|18f6958a-52e9-488...|2021-01-29T14:09:25|transaction|     creation|{"id":735142,"typ...|
|de369510-8874-4f7...|2021-02-19T04:23:58|transaction|     creation|{"id":756239,"typ...|
|8bcb0d71-a0af-42e...|2021-01-24T17:40:16|transaction|     creation|{"id":492073,"typ...|
|0a384c35-7690-4ff...|2021-02-03T10:26:05|    account|     creation|{"id":73838,"pers...|
|f10a1651-1341-423...|2021-02-24T00:59:40|transaction|     creation|{"id":519650,"typ...|
|a4c95c51-65d7-43d...|2021-02-14T20:19:07|transaction|     creation|{"id":956317,"typ...|
|7b2b3e34-

In [4]:
# Count number of rows(events)
n_rows = df.count()
print(f"Number of rows: {n_rows}")



Number of rows: 14


                                                                                

### So we only have 14 rows of events to process???  
This does not look correct, let's take a look on the files.

In [5]:
# List files inside directory
! ls -lhart /opt/sandbox/data/input/

total 44M
-rw-rw-r-- 1 1000 1000 2.2M Oct 27 11:52 events-1f7df57a-e1c9-4b76-b795-962de9cc292e.json
-rw-rw-r-- 1 1000 1000 2.0M Oct 27 11:52 events-2924398a-d266-4f0e-b9f9-d7d35979ec7f.json
-rw-rw-r-- 1 1000 1000 3.1M Oct 27 11:52 events-5a1d977f-d295-4b8b-aa8f-3a0dbb661dfb.json
-rw-rw-r-- 1 1000 1000 2.5M Oct 27 11:52 events-6dea23dd-f46e-4f35-b948-4abe9cd6d1d7.json
-rw-rw-r-- 1 1000 1000 2.0M Oct 27 11:52 events-6feecebf-d7a1-4cf1-a153-ffb120d88e8a.json
-rw-rw-r-- 1 1000 1000 2.8M Oct 27 11:52 events-845d5e63-78ff-471e-b045-5f1faeb8ed79.json
-rw-rw-r-- 1 1000 1000 3.0M Oct 27 11:52 events-88d5d460-d3ab-4058-9abd-fd07e033a337.json
-rw-rw-r-- 1 1000 1000 3.0M Oct 27 11:52 events-8b908f99-0c4b-40f7-ba0c-70928dd95aeb.json
-rw-rw-r-- 1 1000 1000 3.0M Oct 27 11:52 events-a17ec8ab-8207-47e4-b632-77067f2622ae.json
-rw-rw-r-- 1 1000 1000 2.2M Oct 27 11:52 events-b9cd4135-4cba-44c6-8e00-3a46e97fc915.json
-rw-rw-r-- 1 1000 1000 3.0M Oct 27 11:52 events-c3c321e9-d229-4434-b8d2-a561698b184f.json


In [6]:
# Count JSON files
! ls -lhart /opt/sandbox/data/input/*.json | wc -l

14


We have 14 JSON files with, at least, 2MB of size each.  
Let's take look inside one file to analyze.

In [None]:
# Get first line from file
! head -1 /opt/sandbox/data/input/events-1f7df57a-e1c9-4b76-b795-962de9cc292e.json

# We are not going to show here because the text inside the file is big.

In [8]:
# Count lines in files
! wc -l /opt/sandbox/data/input/*.json

       0 /opt/sandbox/data/input/events-1f7df57a-e1c9-4b76-b795-962de9cc292e.json
       0 /opt/sandbox/data/input/events-2924398a-d266-4f0e-b9f9-d7d35979ec7f.json
       0 /opt/sandbox/data/input/events-5a1d977f-d295-4b8b-aa8f-3a0dbb661dfb.json
       0 /opt/sandbox/data/input/events-6dea23dd-f46e-4f35-b948-4abe9cd6d1d7.json
       0 /opt/sandbox/data/input/events-6feecebf-d7a1-4cf1-a153-ffb120d88e8a.json
       0 /opt/sandbox/data/input/events-845d5e63-78ff-471e-b045-5f1faeb8ed79.json
       0 /opt/sandbox/data/input/events-88d5d460-d3ab-4058-9abd-fd07e033a337.json
       0 /opt/sandbox/data/input/events-8b908f99-0c4b-40f7-ba0c-70928dd95aeb.json
       0 /opt/sandbox/data/input/events-a17ec8ab-8207-47e4-b632-77067f2622ae.json
       0 /opt/sandbox/data/input/events-b9cd4135-4cba-44c6-8e00-3a46e97fc915.json
       0 /opt/sandbox/data/input/events-c3c321e9-d229-4434-b8d2-a561698b184f.json
       0 /opt/sandbox/data/input/events-d9107ca9-619e-4bf2-b281-4b4a34f7fb44.json
       0 /opt/sa

As we can see above, the counts show zero lines for all files. Thus, we can assume that all files has 1 line of data and no NEW_LINE at the end.  
This indicates a malformed JSON file and, therefore, somes fixes are needed before reading with Spark.

## 4. Preprocessing files
Since we know that the files have errors, we will need to perform a preprocessing to correct them.  
The proposed solution for this scenario is the jq command([reference](https://stedolan.github.io/jq/)) for parsing correctly and save in a staging folder.

In [9]:
# Install jq command line
! apt-get install -y jq

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
jq is already the newest version (1.6-2.1).
0 upgraded, 0 newly installed, 0 to remove and 12 not upgraded.


In [10]:
# Create folder to stage files
! mkdir -p /opt/sandbox/data/staging;

In [11]:
# Parse json files and save in the staging folder
! for json_file in /opt/sandbox/data/input/*.json; do \
name=${json_file##*/}; \
jq -c . $json_file > /opt/sandbox/data/staging/staging_$name; \
echo "Parsed file $json_file successfully"; \
done

Parsed file /opt/sandbox/data/input/events-1f7df57a-e1c9-4b76-b795-962de9cc292e.json successfully
Parsed file /opt/sandbox/data/input/events-2924398a-d266-4f0e-b9f9-d7d35979ec7f.json successfully
Parsed file /opt/sandbox/data/input/events-5a1d977f-d295-4b8b-aa8f-3a0dbb661dfb.json successfully
Parsed file /opt/sandbox/data/input/events-6dea23dd-f46e-4f35-b948-4abe9cd6d1d7.json successfully
Parsed file /opt/sandbox/data/input/events-6feecebf-d7a1-4cf1-a153-ffb120d88e8a.json successfully
Parsed file /opt/sandbox/data/input/events-845d5e63-78ff-471e-b045-5f1faeb8ed79.json successfully
Parsed file /opt/sandbox/data/input/events-88d5d460-d3ab-4058-9abd-fd07e033a337.json successfully
Parsed file /opt/sandbox/data/input/events-8b908f99-0c4b-40f7-ba0c-70928dd95aeb.json successfully
Parsed file /opt/sandbox/data/input/events-a17ec8ab-8207-47e4-b632-77067f2622ae.json successfully
Parsed file /opt/sandbox/data/input/events-b9cd4135-4cba-44c6-8e00-3a46e97fc915.json successfully
Parsed file /opt/san

In [12]:
# Validate number of rows
! wc -l /opt/sandbox/data/staging/*.json

    6225 /opt/sandbox/data/staging/staging_events-1f7df57a-e1c9-4b76-b795-962de9cc292e.json
    5447 /opt/sandbox/data/staging/staging_events-2924398a-d266-4f0e-b9f9-d7d35979ec7f.json
    8750 /opt/sandbox/data/staging/staging_events-5a1d977f-d295-4b8b-aa8f-3a0dbb661dfb.json
    6732 /opt/sandbox/data/staging/staging_events-6dea23dd-f46e-4f35-b948-4abe9cd6d1d7.json
    5639 /opt/sandbox/data/staging/staging_events-6feecebf-d7a1-4cf1-a153-ffb120d88e8a.json
    7623 /opt/sandbox/data/staging/staging_events-845d5e63-78ff-471e-b045-5f1faeb8ed79.json
    8461 /opt/sandbox/data/staging/staging_events-88d5d460-d3ab-4058-9abd-fd07e033a337.json
    8372 /opt/sandbox/data/staging/staging_events-8b908f99-0c4b-40f7-ba0c-70928dd95aeb.json
    8417 /opt/sandbox/data/staging/staging_events-a17ec8ab-8207-47e4-b632-77067f2622ae.json
    5919 /opt/sandbox/data/staging/staging_events-b9cd4135-4cba-44c6-8e00-3a46e97fc915.json
    8427 /opt/sandbox/data/staging/staging_events-c3c321e9-d229-4434-b8d2-a56169

Now the number of rows is 101821 seems correct.

## 5. Process data
We are going to read the events data from staging folder and save in a Dataframe. Rows that are invalid with schema are dis

In [13]:
from pyspark.sql.types import TimestampType

# This time we are going to define a more specific schema
schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("domain", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("data", StringType(), True)
])

# Read JSON files
## This option mode=PERMISSIVE allows to detect rows with invalid schema and write into a column _corrupt_record
raw_data_df = spark.read.schema(schema)\
                   .option("mode", "PERMISSIVE")\
                   .option("columnNameOfCorruptRecord", "_corrupt_record")\
                   .json("file:///opt/sandbox/data/staging/*.json")

In [14]:
# Show 10 samples of the data
raw_data_df.show(10)

+--------------------+-------------------+-----------+-------------+--------------------+
|            event_id|          timestamp|     domain|   event_type|                data|
+--------------------+-------------------+-----------+-------------+--------------------+
|d2b9bccc-c344-425...|2021-01-19 14:11:46|    account|status-change|{"id":169213,"old...|
|d2b9bccc-c344-425...|2021-01-19 14:11:55|    account|status-change|{"id":169213,"old...|
|c882e155-52a1-4aa...|2021-01-19 14:14:40|    account|     creation|{"id":444852,"per...|
|36fa4df5-9804-46d...|2021-01-19 14:14:51|transaction|     creation|{"id":527401,"typ...|
|36fa4df5-9804-46d...|2021-01-19 14:14:52|transaction|     creation|{"id":527401,"typ...|
|143fc0a0-8d08-4cc...|2021-01-19 14:15:28|    account|status-change|{"id":533320,"old...|
|143fc0a0-8d08-4cc...|2021-01-19 14:15:37|    account|status-change|{"id":533320,"old...|
|c9886f51-d1d4-4b9...|2021-01-19 14:16:36|    account|status-change|{"id":253571,"old...|
|0c303b06-

No column named _corrupt_record in dataframe, so we don't have any row with invalid schema.

In [15]:
# Count number of rows(events)
n_rows = raw_data_df.count()
print(f"Number of rows: {n_rows}")

[Stage 7:====>                                                    (1 + 13) / 14]

Number of rows: 101821


                                                                                

In [16]:
# Validate types of events
from pyspark.sql.functions import col, concat, lit

event_types = raw_data_df.withColumn("type_of_event", concat(col("domain"), lit("_"), col("event_type")))\
                         .select(col("type_of_event"))\
                         .distinct()\
                         .orderBy(col("type_of_event").asc())
event_types.show(10,False)



+---------------------+
|type_of_event        |
+---------------------+
|account_creation     |
|account_status-change|
|transaction_creation |
+---------------------+



                                                                                

As described in docs, some events may be duplicated so we are going to deduplicate those. The event with the latest timestamp will be considered to be write in the output.

In [17]:
# Deduplicate events based on fields event_id and timestamp.
# Let's use windowing and row_number to perform this.
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

windowed_df = raw_data_df.withColumn("row_number", row_number().over(Window.partitionBy(col("event_id")).orderBy(col("timestamp").desc())))

windowed_df.show(10)



+--------------------+-------------------+-----------+-------------+--------------------+----------+
|            event_id|          timestamp|     domain|   event_type|                data|row_number|
+--------------------+-------------------+-----------+-------------+--------------------+----------+
|00157e0f-231f-42a...|2021-01-06 09:50:33|transaction|     creation|{"id":610767,"typ...|         1|
|0015cc39-a7fb-476...|2021-01-21 05:54:15|transaction|     creation|{"id":120125,"typ...|         1|
|0015cc39-a7fb-476...|2021-01-21 05:54:11|transaction|     creation|{"id":120125,"typ...|         2|
|001ff7e3-652f-42e...|2021-01-02 23:42:15|transaction|     creation|{"id":609165,"typ...|         1|
|0046bb0e-bcaf-475...|2021-02-16 12:28:25|transaction|     creation|{"id":137506,"typ...|         1|
|0049f2dd-89db-4bf...|2021-01-29 21:20:36|    account|status-change|{"id":769266,"old...|         1|
|0058a933-e064-469...|2021-01-09 04:58:36|transaction|     creation|{"id":322886,"typ...|  

                                                                                

In [18]:
windowed_df.where(col("event_id") == "0015cc39-a7fb-4761-806c-b676dfc8c385").show(10)

+--------------------+-------------------+-----------+----------+--------------------+----------+
|            event_id|          timestamp|     domain|event_type|                data|row_number|
+--------------------+-------------------+-----------+----------+--------------------+----------+
|0015cc39-a7fb-476...|2021-01-21 05:54:15|transaction|  creation|{"id":120125,"typ...|         1|
|0015cc39-a7fb-476...|2021-01-21 05:54:11|transaction|  creation|{"id":120125,"typ...|         2|
+--------------------+-------------------+-----------+----------+--------------------+----------+



We can see in the dataframe above that for a specific event_id duplicated the row_number indicates the order based o the timestamp field. Whe the row_number value is 1, it indicates that this row is the latest for the given event_id.  
So to deduplicate the events we need to filter rows where row_number value is 1.

In [19]:
# Perform deduplication
# Filter column row_number=1 and drop column row_number
dedup_df = windowed_df.where(col("row_number") == 1).drop(col("row_number"))
dedup_df.show(10)



+--------------------+-------------------+-----------+-------------+--------------------+
|            event_id|          timestamp|     domain|   event_type|                data|
+--------------------+-------------------+-----------+-------------+--------------------+
|00157e0f-231f-42a...|2021-01-06 09:50:33|transaction|     creation|{"id":610767,"typ...|
|0015cc39-a7fb-476...|2021-01-21 05:54:15|transaction|     creation|{"id":120125,"typ...|
|001ff7e3-652f-42e...|2021-01-02 23:42:15|transaction|     creation|{"id":609165,"typ...|
|0046bb0e-bcaf-475...|2021-02-16 12:28:25|transaction|     creation|{"id":137506,"typ...|
|0049f2dd-89db-4bf...|2021-01-29 21:20:36|    account|status-change|{"id":769266,"old...|
|0058a933-e064-469...|2021-01-09 04:58:36|transaction|     creation|{"id":322886,"typ...|
|005bd626-026d-4ff...|2021-01-06 11:27:49|transaction|     creation|{"id":532579,"typ...|
|00605d7c-3195-4b4...|2021-01-21 03:13:06|transaction|     creation|{"id":995574,"typ...|
|007d2ff6-

                                                                                

In [20]:
#Count unique events
dedup_df.count()

                                                                                

81609

## Writing results
The resut will be write in Parquet files partitioned by event type and timestamp(year, month, day).

In [21]:
# Write results.
# To write with partition year/month/day we need to create this fields in dataframe.
from pyspark.sql.functions import date_format

final_df = dedup_df.withColumn("date", date_format(col("timestamp"), "yyyy-MM-dd"))

In [22]:
# Write to output folder
final_df.write\
        .partitionBy("domain", "event_type", "date")\
        .parquet("/opt/sandbox/data/output/")

                                                                                

In [24]:
! apt-get install -y tree && tree /opt/sandbox/data/output/

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
tree is already the newest version (1.8.0-1+b1).
0 upgraded, 0 newly installed, 0 to remove and 12 not upgraded.
[01;34m/opt/sandbox/data/output/[00m
├── _SUCCESS
├── [01;34mdomain=account[00m
│   ├── [01;34mevent_type=creation[00m
│   │   ├── [01;34mdate=2021-01-01[00m
│   │   │   ├── part-00000-831ca2ff-2bb8-43d9-8ecb-7bf8a268a0ad.c000.snappy.parquet
│   │   │   ├── part-00001-831ca2ff-2bb8-43d9-8ecb-7bf8a268a0ad.c000.snappy.parquet
│   │   │   ├── part-00002-831ca2ff-2bb8-43d9-8ecb-7bf8a268a0ad.c000.snappy.parquet
│   │   │   ├── part-00003-831ca2ff-2bb8-43d9-8ecb-7bf8a268a0ad.c000.snappy.parquet
│   │   │   ├── part-00004-831ca2ff-2bb8-43d9-8ecb-7bf8a268a0ad.c000.snappy.parquet
│   │   │   ├── part-00005-831ca2ff-2bb8-43d9-8ecb-7bf8a268a0ad.c000.snappy.parquet
│   │   │   ├── part-00006-831ca2ff-2bb8-43d9-8ecb-7bf8a268a0ad.c000.snappy.parquet
│   │   │   ├── part-00007-831ca2ff-

As we previously validated, there is 3 types of event and in fact the output is partitioned by 3 combinations of domain+event_type folder path.