# WIP - Pipeline - Append snapshots with schema evolution

## Links
https://www.databricks.com/blog/2022/06/24/data-warehousing-modeling-techniques-and-their-implementation-on-the-databricks-lakehouse-platform.html

## Setup

In [2]:
%reload_ext autoreload
%autoreload 2

In [8]:
from pathlib import Path 
from utils.data_generation import FakerProfileDataSnapshot

from pyspark.sql import SparkSession
from  pyspark.sql.functions import input_file_name, split, regexp_extract

In [4]:
from delta import *

In [5]:
from pyspark.sql import SparkSession
from delta import *

builder = SparkSession.builder.appName("data-pipeline") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [21]:
# object for data generation
datagen = FakerProfileDataSnapshot(
    data_dir=str(Path('./data').resolve())
)
# clean landing storage
datagen.delete_batches_in_landing_dir()
print(datagen.bronze_dir)
datagen.delete_bronze_dir()

## Parameters

In [22]:
read_csv_options = {'header': 'true', 'inferSchema': 'true', 'delimiter': ',', 'quote': '"', 'multiLine': 'true'}

In [26]:
location_landing = datagen.landing_dir
location_bronze = datagen.bronze_dir

## Pipeline

### Land new data

In [37]:
path_csv = datagen.land_batch(1)
path_csv

'/home/jovyan/work/data/landing/snapshot_2022-08-19.csv'

### Ingest data from landing into bronze layer

In [27]:
# read all csv files of the landing folder into a spark dataframes
# supports different schemas by unionByName
# adds 

file_list = list(Path(location_landing).glob('*.csv'))
file_list

for i, file in enumerate(file_list):
    print(i, file)
    if i == 0:
        df = spark.read.format('csv').options(**read_csv_options).load(str(file)) \
            .withColumn("_delivery", regexp_extract(input_file_name(), '.*_(.*).csv$', 1))
    else:
        df = df.unionByName(
            spark.read.format('csv').options(**read_csv_options).load(str(file)) \
                .withColumn("_delivery", regexp_extract(input_file_name(), '.*_(.*).csv$', 1)),
            allowMissingColumns=True
        )
df.toPandas()

0 /home/jovyan/work/data/landing/snapshot_2022-08-19.csv


Unnamed: 0,id,job,company,name,sex,address,mail,birthdate,_delivery
0,0,Contractor,"Boone, Gallagher and Scott",Misty Phillips,F,"4978 Chapman Bypass\nSanchezfurt, TN 23177",ana51@yahoo.com,1996-12-15,2022-08-19
1,1,Rural practice surveyor,Osborne PLC,Alfred Hall,M,"11070 Wright Creek Apt. 541\nEast Jonathan, TN...",shawn57@yahoo.com,1922-02-28,2022-08-19
2,2,Aeronautical engineer,Hughes-Fisher,Teresa Manning,F,"397 Maria Ways\nSouth Jamesville, OK 88863",fvargas@gmail.com,1997-04-03,2022-08-19
3,3,Newspaper journalist,Gordon-Smith,Kyle Randall,M,"98150 Jones Way Apt. 251\nJonesside, OK 11215",michael49@hotmail.com,1922-03-06,2022-08-19


In [28]:
df.write.format('delta').mode('append').save(location_bronze)

In [33]:
spark.sql('DROP TABLE IF EXISTS bronze_data')
spark.sql(f'CREATE TABLE IF NOT EXISTS bronze_data USING DELTA LOCATION "{location_bronze}"')

DataFrame[]

In [35]:
spark.sql(f'DESCRIBE HISTORY bronze_data').toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,0,2022-08-15 14:16:21.628,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,,Serializable,True,"{'numOutputRows': '4', 'numOutputBytes': '3333...",,Apache-Spark/3.2.1 Delta-Lake/2.0.0


### Remove processed files from landing

In [38]:
datagen.delete_batches_in_landing_dir()

### Build silver layer

### Build gold layer