In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F 
from pyspark.sql.types import *
from delta import *

In [20]:
builder = SparkSession.builder.appName("olist_demo") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.repl.eagerEval.enabled", "true") \
    .config("spark.executor.memory", "2g")\
    .config("spark.driver.memory", "4g")\
    .config("spark.driver.memory", "4g")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

## Config

In [21]:
environment = '' # dev;uat;prd

In [22]:
base_path = '../../../data'  # This will be overwritten by Papermill

In [23]:
path = {
    'slv': {
        'orders': f'{base_path}/silver/evt/evt_orders',
        'order_items': f'{base_path}/silver/evt/evt_order_items',
        'customer': f'{base_path}/silver/mtd/mtd_customer',
    }, 
    'snk': f'{base_path}/gold/fct/fct_avg_sales'
}

## Load

In [24]:
df = {
    'orders': spark.read.format('delta').load(path['slv']['orders']),
    'order_items': spark.read.format('delta').load(path['slv']['order_items']),
    'customer': spark.read.format('delta').load(path['slv']['customer']),
}

In [25]:
df_snk = df['orders'].alias('ord').join(
    df['order_items'].alias('ori'),
    on=F.col('ord.id_order')==F.col('ori.id_order'),
    how='left'
).join(
    df['customer'].alias('cst'),
    on=F.col('ord.id_customer')==F.col('cst.id_customer'),
    how='left'
).select(
    F.col('cst.sg_customer_state'),
    F.col("ori.vl_item"),
    F.col("ori.vl_freight")
).groupBy('sg_customer_state').agg(
    F.sum('vl_item').alias('vl_total_item'),
    F.sum('vl_freight').alias('vl_total_freight'),
    F.avg('vl_item').alias('vl_avg_item'),
    F.avg('vl_freight').alias('vl_avg_freight')    
)

In [26]:
df_snk

                                                                                

sg_customer_state,vl_total_item,vl_total_freight,vl_avg_item,vl_avg_freight
SC,520553.339999996,89660.25999999979,124.65357758620594,21.47036877394631
RO,46140.63999999997,11417.38,165.973525179856,41.06971223021583
PI,86914.08000000013,21218.2,160.35808118081206,39.14797047970479
AM,22356.83999999999,5478.89,135.49599999999992,33.20539393939394
RR,7829.429999999999,2235.19,150.56596153846152,42.98442307692308
GO,294591.9499999997,53114.98000000004,126.2717316759536,22.766815259322776
TO,49621.73999999998,11732.680000000002,157.52933333333328,37.24660317460318
MT,156453.5300000002,29715.43,148.2971848341234,28.16628436018957
SP,5202955.049999561,718723.0700000112,109.65362915972014,15.147275390419424
ES,275037.3099999999,49764.60000000005,121.9137012411347,22.058776595744703


In [27]:
df_snk = df_snk.withColumn("dh_insert_gld", F.current_timestamp())

## Sink

In [28]:
df_snk.write.format('delta')\
.mode(saveMode='overwrite')\
.save(path['snk'])

                                                                                