<a href="https://colab.research.google.com/github/eder1985/pismo_recruiting_technical_case/blob/main/work/notbooks/Colab_Pismo_Recruiting_Technical_Case.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1><center>Pismo Recruiting Technical Case</center></h1>

---



## Objective
The objective of this notebook is to:
><li>Give a proper understanding about the different PySpark functions available. </li>
><li>A short introduction to Google Colab, as that is the platform on which this notebook is written on. </li>

Once you complete this notebook, you should be able to write pyspark programs in an efficent way. The ideal way to use this is by going through the examples given and then trying them on Colab. At the end there are a few hands on questions which you can use to evaluate yourself.

## 1. Pre-requisites

### Installing Spark

Install Dependencies:


1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

Set Environment Variables:

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [20]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

### Creating data folders

In [5]:
!mkdir -p work/data/raw/events/
!mkdir -p work/data/processed/events/
!mkdir -p work/data/trusted/events/

## 2. Generate Fake Data

### Installing libs

In [6]:
!pip install -q faker

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.7 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/1.7 MB[0m [31m2.7 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.5/1.7 MB[0m [31m7.3 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━[0m [32m1.1/1.7 MB[0m [31m10.5 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.7/1.7 MB[0m [31m13.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25h

### Imports

In [7]:
from faker import Faker
from faker.providers import BaseProvider
from datetime import datetime
from json import dumps
import pandas as pd
import random
import collections
import glob
import os

### Generating fake `event_id`: random UUIDs

In [8]:
fake = Faker()
Faker.seed(random.randrange(0, 99999999999999999999, 1))
fake_event_id = fake.uuid4()
print(fake_event_id)

c8d133ae-2da6-4400-b9d2-bf6d2ed4e6d6


### Generating fake `timestamp`: random timestamps with values until 3 years ago

In [9]:
fake_timestamp = datetime.strftime(fake.date_time_between(start_date='-3y', end_date='now'),"%Y-%m-%dT%H:%M:%S")
print(fake_timestamp)

2022-04-17T19:26:20


### Generating fake `status`: random values based on list

In [10]:
class StatusTypeProvider(BaseProvider):
    def status_type(self):
        list_status_types = ['ACTIVE','INACTIVE','SUSPENDED','BLOCKED', 'DELETED']
        return random.choice(list_status_types)

fake.add_provider(StatusTypeProvider)

fake_status_type = fake.status_type()
print(fake_status_type)

SUSPENDED


### Generating custom fake `uuid`: random values based on list



In [11]:
class CustomUUIDProvider(BaseProvider):
    def custom_uuid(self):
        list_uuids = [
            '1a1a1a1a-1a1a-1a1a-1a1a-1a1a1a1a1a1a',
            '2b2b2b2b-2b2b-2b2b-2b2b-2b2b2b2b2b2b'
            ]
        return random.choice(list_uuids)

### Generating custom fake `event_type`: random values based on list


In [12]:
class EventTypeProvider(BaseProvider):
    def event_type(self):
        list_event_types = ['account-status-change','transaction-new-value']
        return random.choice(list_event_types)

fake.add_provider(EventTypeProvider)

fake_event_type = fake.event_type()
print(fake_event_type)

account-status-change


### Generating custom fake `data`: values based on dict

In [13]:
class CustomDataProvider(BaseProvider):
      def custom_data(self):
        dict_data = {
            "account-status-change": collections.OrderedDict([
                ('id', fake.random_number(digits=6)),
                ('old_status', fake.status_type()),
                ('new_status', fake.status_type()),
                ('reason', fake.sentence(nb_words=5))
            ]),
            "transaction-new-value": collections.OrderedDict([
                ('id', fake.random_number(digits=6)),
                ('account_orig_id', fake.random_number(digits=6)),
                ('account_dest_id', fake.random_number(digits=6)),
                ('amount', fake.pyfloat(positive=True)),
                ('currency', fake.currency_code())
            ])
        }
        return dict_data

fake.add_provider(CustomDataProvider)

fake_custom_data = fake.custom_data().get(fake.event_type())
print(fake_custom_data)

OrderedDict([('id', 267296), ('account_orig_id', 684407), ('account_dest_id', 533302), ('amount', 4022317013.11924), ('currency', 'AWG')])


### Defining `write_fake_data` and `read_fake_data` functions

In [14]:
def write_fake_data(fake, length, destination_path, unique_uuid = True):

    database = []
    current_time = datetime.now().strftime("%Y%m%d%H%M%S")
    filename = 'fake_events_'+current_time

    for x in range(length):
        uuid = fake.uuid4() if unique_uuid else fake.custom_uuid()
        event_type = fake.event_type()
        project_domain_name = event_type.split('-')[0]

        database.append(collections.OrderedDict([
            ('event_id', uuid),
            ('timestamp', datetime.strftime(fake.date_time_between(start_date='-3y', end_date='now'),"%Y-%m-%dT%H:%M:%S")),
            ('domain', project_domain_name),
            ('event_type', event_type),
            ('data', fake.custom_data().get(event_type))
        ]))

    with open('%s%s.json' % (destination_path, filename), 'w') as output:
        output.write(dumps(database, indent=4, sort_keys=False, default=str))

    print("Done.")

def read_fake_data(json_filepath):
    json_files = [os.path.normpath(i) for i in glob.glob(json_filepath)]
    df = pd.concat([pd.read_json(f) for f in json_files])
    return df

### Writing and reading fake data

In [15]:
def run(length, unique_uuid = True):
    fake = Faker()
    Faker.seed(random.randrange(0, 99999999999999999999, 1))
    fake.add_provider(StatusTypeProvider)
    fake.add_provider(CustomUUIDProvider)
    fake.add_provider(EventTypeProvider)
    fake.add_provider(CustomDataProvider)

    destination_path = 'work/data/raw/events/'
    write_fake_data(fake, length, destination_path,unique_uuid)

    json_filepath = destination_path+'*.json'
    fake_data = read_fake_data(json_filepath)
    print(fake_data)

In [16]:
run(1000)

Done.
                                 event_id           timestamp       domain  \
0    440a6b7f-2186-47f3-bff2-5fc9a6eb74f9 2020-12-10 01:40:07      account   
1    ee6fd071-20c4-4b6a-b0dc-47492d45f383 2022-04-14 01:26:11  transaction   
2    151f094b-0c9a-4a09-b041-eafd14b068bd 2021-12-02 09:50:49      account   
3    c29baf36-6206-49b8-8a99-db71a613b3bf 2022-11-29 08:13:38      account   
4    d53f93a3-b2a9-498c-ae0a-28f3f7fc7f96 2021-01-25 01:21:50      account   
..                                    ...                 ...          ...   
995  092ab738-f08a-497b-b37c-6ccc1ef9702a 2021-12-21 08:55:55  transaction   
996  4c08818f-99f0-48d0-897c-5711e95220c5 2022-06-23 00:50:28  transaction   
997  5eceb16f-ff5a-460a-8f88-69413d649d2c 2022-11-08 04:10:47  transaction   
998  29ff1726-e9a7-4bad-89d1-a24c3bc56578 2023-02-11 07:43:55      account   
999  8d950701-c44c-4155-8205-a2eb211665cc 2022-10-08 06:30:46  transaction   

                event_type                               

In [17]:
run(10,unique_uuid = False)

Done.
                                 event_id           timestamp       domain  \
0    2b2b2b2b-2b2b-2b2b-2b2b-2b2b2b2b2b2b 2021-02-18 08:09:19  transaction   
1    1a1a1a1a-1a1a-1a1a-1a1a-1a1a1a1a1a1a 2022-02-24 22:24:29  transaction   
2    2b2b2b2b-2b2b-2b2b-2b2b-2b2b2b2b2b2b 2022-06-09 02:16:59  transaction   
3    1a1a1a1a-1a1a-1a1a-1a1a-1a1a1a1a1a1a 2021-09-06 22:24:19      account   
4    1a1a1a1a-1a1a-1a1a-1a1a-1a1a1a1a1a1a 2020-11-08 18:40:37  transaction   
..                                    ...                 ...          ...   
995  092ab738-f08a-497b-b37c-6ccc1ef9702a 2021-12-21 08:55:55  transaction   
996  4c08818f-99f0-48d0-897c-5711e95220c5 2022-06-23 00:50:28  transaction   
997  5eceb16f-ff5a-460a-8f88-69413d649d2c 2022-11-08 04:10:47  transaction   
998  29ff1726-e9a7-4bad-89d1-a24c3bc56578 2023-02-11 07:43:55      account   
999  8d950701-c44c-4155-8205-a2eb211665cc 2022-10-08 06:30:46  transaction   

                event_type                               

## 3. Exploring the Raw Dataframe

### Loading the Dataframe:

- List raw json files
- Define raw_events_schema with only StringType for not lost data
- Load raw_events dataframe

In [18]:
!ls work/data/raw/events/ -la

total 392
drwxr-xr-x 2 root root   4096 Jul 26 13:28 .
drwxr-xr-x 3 root root   4096 Jul 26 13:26 ..
-rw-r--r-- 1 root root 387403 Jul 26 13:28 fake_events_20230726132807.json
-rw-r--r-- 1 root root   3877 Jul 26 13:28 fake_events_20230726132813.json


In [45]:

from pyspark.sql.types import StructType,StructField, StringType

raw_events_schema = StructType([
      StructField("data",StringType(),True),
      StructField("domain",StringType(),True),
      StructField("event_id",StringType(),True),
      StructField("event_type",StringType(),True),
      StructField("timestamp",StringType(),True)
  ])

In [24]:
raw_events = spark.read.option("multiline","true").schema(raw_events_schema).json('work/data/raw/events/')
raw_events.show(5, truncate = False)

+-----------------------------------------------------------------------------------------------------------+-----------+------------------------------------+---------------------+-------------------+
|data                                                                                                       |domain     |event_id                            |event_type           |timestamp          |
+-----------------------------------------------------------------------------------------------------------+-----------+------------------------------------+---------------------+-------------------+
|{"id":454455,"old_status":"INACTIVE","new_status":"DELETED","reason":"To product heavy probably suddenly."}|account    |440a6b7f-2186-47f3-bff2-5fc9a6eb74f9|account-status-change|2020-12-10T01:40:07|
|{"id":26561,"account_orig_id":660456,"account_dest_id":532831,"amount":2.787653028486E11,"currency":"XAF"} |transaction|ee6fd071-20c4-4b6a-b0dc-47492d45f383|transaction-new-value|2022-04-14T01:26

In [25]:
raw_events.count()

1010

### Dataframe Raw Schema

In [26]:
raw_events.printSchema()

root
 |-- data: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



## 4. Applying transformations

### Columns transformations

In [27]:
partial_events = raw_events\
  .withColumn("timestamp",to_timestamp("timestamp"))\
  .withColumn("day",to_date("timestamp"))

In [28]:
partial_events.printSchema()

root
 |-- data: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- day: date (nullable = true)



### Drop duplicated events

In [29]:
from pyspark.sql.functions import countDistinct

# Count
partial_events.select(countDistinct("event_id", "event_type").alias("distinct_events")).show()


+---------------+
|distinct_events|
+---------------+
|           1003|
+---------------+



In [30]:
grouped_events = partial_events \
  .groupBy( \
      col("event_id"), \
      col("event_type")) \
  .agg( \
      max(col("timestamp")))

grouped_events.show(truncate = False)

+------------------------------------+---------------------+-------------------+
|event_id                            |event_type           |max(timestamp)     |
+------------------------------------+---------------------+-------------------+
|ca46ac03-5e85-4f68-989d-7ab875365e68|transaction-new-value|2021-08-16 15:11:46|
|54602ccc-22c5-4964-aa15-8c54afcd37a5|account-status-change|2020-08-13 22:31:54|
|41e2c325-ed9a-4324-8d84-91c13fe7397e|account-status-change|2023-06-02 04:37:16|
|3d6a0f50-fcf9-4f36-87f5-eed07e88b74f|account-status-change|2021-01-15 16:37:53|
|524fc350-a248-4106-9b57-a8481d826322|transaction-new-value|2023-05-05 04:15:53|
|a3795b5c-a0b3-4e49-863b-338b884fdd8a|transaction-new-value|2022-06-08 15:42:20|
|713b7478-b734-4496-afe9-a5d5475b384a|transaction-new-value|2020-09-28 19:09:40|
|b84c8fed-4c19-473d-96ec-3c3ac7dbce96|transaction-new-value|2022-06-18 07:16:17|
|27d35379-c896-446b-96f4-d7eccad2d55c|account-status-change|2020-10-29 04:28:38|
|19c556db-ffc6-4828-8406-168

In [31]:
final_events = grouped_events \
    .join(partial_events, ["event_id","event_type"]) \
    .dropDuplicates(["event_id","event_type"]) \
    .drop("timestamp") \
    .withColumnRenamed("max(timestamp)", "timestamp")
final_events.show()

+--------------------+--------------------+-------------------+--------------------+-----------+----------+
|            event_id|          event_type|          timestamp|                data|     domain|       day|
+--------------------+--------------------+-------------------+--------------------+-----------+----------+
|3d6a0f50-fcf9-4f3...|account-status-ch...|2021-01-15 16:37:53|{"id":448893,"old...|    account|2021-01-15|
|41e2c325-ed9a-432...|account-status-ch...|2023-06-02 04:37:16|{"id":123927,"old...|    account|2023-06-02|
|54602ccc-22c5-496...|account-status-ch...|2020-08-13 22:31:54|{"id":760608,"old...|    account|2020-08-13|
|ca46ac03-5e85-4f6...|transaction-new-v...|2021-08-16 15:11:46|{"id":690905,"acc...|transaction|2021-08-16|
|27d35379-c896-446...|account-status-ch...|2020-10-29 04:28:38|{"id":248336,"old...|    account|2020-10-29|
|524fc350-a248-410...|transaction-new-v...|2023-05-05 04:15:53|{"id":889154,"acc...|transaction|2023-05-05|
|713b7478-b734-449...|transa

In [32]:
final_events.count()

1003

## 5. Flatten `data` column by event-type

> Each event-type has its own schema

In [41]:
final_events.groupBy('event_type').agg(count(col('event_type'))).show(truncate=False)

+---------------------+-----------------+
|event_type           |count(event_type)|
+---------------------+-----------------+
|account-status-change|512              |
|transaction-new-value|491              |
+---------------------+-----------------+



In [67]:
def distinct_values(df, col):
    list_distinct_values = df.select(col).distinct().toPandas()[col].to_list()
    return list_distinct_values

def flatten_df(df):
    json_schema = spark.read.json(df.rdd.map(lambda row: row.data)).schema
    df2 = df.withColumn("data", from_json("data", json_schema))
    col1 = df2.columns
    col1.remove("data")
    col2 = df2.select("data.*").columns
    append_str ="data."
    col3 = [append_str + val for val in col2]
    col_list = col1 + col3
    df3 = df2.select(*col_list).drop("data")
    return df3

def write_parquet_data(df):
    df.printSchema()
    df.write \
    .partitionBy("event_type", "day") \
    .mode("append") \
    .parquet("work/data/trusted/events/")

## 6. Write transformed data in parquet format





In [68]:
for value in distinct_values(final_events, 'event_type'):
    filtered_df = final_events.filter(col('event_type') == value)
    flattened_df = flatten_df(filtered_df)
    write_parquet_data(flattened_df)

root
 |-- event_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- domain: string (nullable = true)
 |-- day: date (nullable = true)
 |-- id: long (nullable = true)
 |-- new_status: string (nullable = true)
 |-- old_status: string (nullable = true)
 |-- reason: string (nullable = true)

root
 |-- event_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- domain: string (nullable = true)
 |-- day: date (nullable = true)
 |-- account_dest_id: long (nullable = true)
 |-- account_orig_id: long (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- id: long (nullable = true)



## 6. Read transformed data in parquet format

In [73]:
!ls -la work/data/trusted/events/

total 52
drwxr-xr-x   4 root root  4096 Jul 26 16:10  .
drwxr-xr-x   3 root root  4096 Jul 26 13:26  ..
drwxr-xr-x 418 root root 20480 Jul 26 16:10 'event_type=account-status-change'
drwxr-xr-x 386 root root 20480 Jul 26 16:10 'event_type=transaction-new-value'
-rw-r--r--   1 root root     0 Jul 26 16:10  _SUCCESS
-rw-r--r--   1 root root     8 Jul 26 16:10  ._SUCCESS.crc


In [74]:
trusted_transaction_events = spark.read.parquet('work/data/trusted/events/event_type=transaction-new-value')
trusted_transaction_events.show()

+--------------------+-------------------+-----------+---------------+---------------+-------------------+--------+------+----------+
|            event_id|          timestamp|     domain|account_dest_id|account_orig_id|             amount|currency|    id|       day|
+--------------------+-------------------+-----------+---------------+---------------+-------------------+--------+------+----------+
|5044852d-fb35-44a...|2021-11-21 12:08:40|transaction|         214886|         976948|    7107716.6935541|     CRC|183871|2021-11-21|
|8cfc8c24-d227-410...|2020-09-29 21:59:11|transaction|         932979|         805258|   41.6929606665716|     NGN|924010|2020-09-29|
|4c85e7e2-836d-421...|2022-07-16 08:25:00|transaction|         543600|         988545| 2.0699236805332E12|     CUP|694442|2022-07-16|
|dfc54258-c843-4f7...|2021-10-22 19:37:21|transaction|         896601|         910735|    579.93692554022|     ZMW|804403|2021-10-22|
|b94149a0-f294-4a0...|2021-04-15 16:31:44|transaction|        

In [75]:
trusted_transaction_events.count()

491

In [76]:
trusted_account_events = spark.read.parquet('work/data/trusted/events/event_type=account-status-change')
trusted_account_events.show()

+--------------------+-------------------+-------+------+----------+----------+--------------------+----------+
|            event_id|          timestamp| domain|    id|new_status|old_status|              reason|       day|
+--------------------+-------------------+-------+------+----------+----------+--------------------+----------+
|df0c68a8-a6ec-47b...|2021-11-09 04:14:35|account| 44324|  INACTIVE|   BLOCKED|Choose service co...|2021-11-09|
|832c7add-2881-405...|2020-10-20 01:42:12|account|174884|  INACTIVE| SUSPENDED|Environment disco...|2020-10-20|
|cf8f6836-25fa-4d1...|2021-06-22 21:39:07|account|575438| SUSPENDED|  INACTIVE|Result administra...|2021-06-22|
|27839a5b-7f57-492...|2021-07-27 13:52:51|account|395226| SUSPENDED| SUSPENDED|International cas...|2021-07-27|
|a02f2099-1dde-4cd...|2022-10-05 14:53:49|account|986889|  INACTIVE|    ACTIVE|Structure hospita...|2022-10-05|
|89f61c64-3508-422...|2022-06-12 15:36:01|account|282756| SUSPENDED| SUSPENDED|Concern whom comp...|2022

In [77]:
trusted_account_events.count()

512

## 7. Move raw data to processed data folder

In [32]:
!mv work/data/raw/events/* work/data/processed/events/
!ls work/data/processed/events/ -la

total 384
drwxr-xr-x 2 root root   4096 Jul 26 07:36 .
drwxr-xr-x 3 root root   4096 Jul 26 07:29 ..
-rw-r--r-- 1 root root 380467 Jul 26 07:33 fake_events_20230726073333.json
-rw-r--r-- 1 root root   3809 Jul 26 07:33 fake_events_20230726073340.json
