In [2]:
import random

from datetime import date, timedelta

from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DateType, StringType, StructType, StructField

In [3]:
app_name = "SeQura Data Engineer"
master = "local[8]"

spark = SparkSession.builder \
    .appName(app_name) \
    .master(master) \
    .getOrCreate()

print(spark.version)

3.2.1


## Load Mock Data into the Landing Zone

In [4]:
record_date = date(2022, 1, 1)
today = date.today()

data = []
while record_date < (date.today() - timedelta(days=1)):

    for letter in 'abcdefghijklmnopqrstuvwxyx':
        record_id = f"{letter*6}".capitalize()
        data.append(
            {"Id": record_id, 
             "record_date": record_date, 
             "dim1": random.choice(range(10)), 
             "dim2": random.choice(range(10)), 
             "dim3": random.choice(range(10))})

    record_date = record_date + timedelta(days=1)
    
schema = StructType([StructField('Id', StringType(), nullable=False),
                     StructField('record_date', DateType(), nullable=False),
                     StructField('dim1', IntegerType(), nullable=False),
                     StructField('dim2', IntegerType(), nullable=False),
                     StructField('dim3', IntegerType(), nullable=False)])

df = spark.createDataFrame(data, schema=schema)
df.show()

+------+-----------+----+----+----+
|    Id|record_date|dim1|dim2|dim3|
+------+-----------+----+----+----+
|Aaaaaa| 2022-01-01|   4|   1|   2|
|Bbbbbb| 2022-01-01|   7|   1|   8|
|Cccccc| 2022-01-01|   7|   6|   9|
|Dddddd| 2022-01-01|   6|   3|   3|
|Eeeeee| 2022-01-01|   0|   1|   4|
|Ffffff| 2022-01-01|   9|   9|   0|
|Gggggg| 2022-01-01|   1|   1|   5|
|Hhhhhh| 2022-01-01|   7|   1|   0|
|Iiiiii| 2022-01-01|   8|   7|   1|
|Jjjjjj| 2022-01-01|   8|   5|   7|
|Kkkkkk| 2022-01-01|   1|   7|   7|
|Llllll| 2022-01-01|   1|   9|   8|
|Mmmmmm| 2022-01-01|   7|   4|   5|
|Nnnnnn| 2022-01-01|   2|   7|   2|
|Oooooo| 2022-01-01|   9|   9|   9|
|Pppppp| 2022-01-01|   9|   7|   5|
|Qqqqqq| 2022-01-01|   6|   9|   8|
|Rrrrrr| 2022-01-01|   5|   6|   9|
|Ssssss| 2022-01-01|   6|   3|   0|
|Tttttt| 2022-01-01|   3|   3|   8|
+------+-----------+----+----+----+
only showing top 20 rows



In [5]:
filename = f"data/landing/credit-status-snapshots.parquet"

df = df.repartition("record_date")
df.write.mode("overwrite").parquet(filename)
df.write.partitionBy("record_date").mode("overwrite").parquet(filename)

## Extract data from the Landing Zone

In [6]:
filename = f"data/landing/credit-status-snapshots.parquet"

agg_df = spark.read.option("basePath", f"{filename}/").parquet(f"{filename}/record_date=*")
# agg_df.show()

today = date.today().strftime("%Y%m%d")
table_name = f"LandingAggregateTable{today}"
agg_df.createOrReplaceTempView(table_name)

spark.sql(f"select distinct(record_date), count(1) from {table_name} group by 1 order by 1 asc").show()

+-----------+--------+
|record_date|count(1)|
+-----------+--------+
| 2022-01-01|      26|
| 2022-01-02|      26|
| 2022-01-03|      26|
| 2022-01-04|      26|
| 2022-01-05|      26|
| 2022-01-06|      26|
| 2022-01-07|      26|
| 2022-01-08|      26|
| 2022-01-09|      26|
| 2022-01-10|      26|
| 2022-01-11|      26|
| 2022-01-12|      26|
| 2022-01-13|      26|
| 2022-01-14|      26|
| 2022-01-15|      26|
| 2022-01-16|      26|
| 2022-01-17|      26|
| 2022-01-18|      26|
| 2022-01-19|      26|
| 2022-01-20|      26|
+-----------+--------+
only showing top 20 rows



## Transform

In [7]:
days = []

today = date.today().strftime("%Y%m%d")
table_name = f"LandingAggregateTable{today}"

day = spark.sql(f"select min(record_date) from {table_name}").collect()[0][0]
day_before = None
while day <= date.today():
    if day == day.replace(day=1):
        if day_before:
            days.append(day_before)
    
    day_before = day
    day += timedelta(days=1)

day = spark.sql(f"select max(record_date) from {table_name}").collect()[0][0]

if day not in days:
    days.append(day)

In [8]:
new_df = agg_df.where(agg_df.record_date.isin(days))
# new_df.show()

today = date.today().strftime("%Y%m%d")
table_name = f"ProcessedAggregateTable{today}"
new_df.createOrReplaceTempView(table_name)

spark.sql(f"select distinct(record_date), count(1) from {table_name} group by 1 order by 1 asc").show()

+-----------+--------+
|record_date|count(1)|
+-----------+--------+
| 2022-01-31|      26|
| 2022-02-28|      26|
| 2022-03-25|      26|
+-----------+--------+



## Load data into the Processed Zone

In [9]:
filename = f"data/processed/credit-status-snapshots-{today}.parquet"

new_df.write.partitionBy("record_date").mode("overwrite").parquet(filename)