# **CUSTOMERS**

In [0]:
df_cust = spark.read.format("csv") \
    .option("inferSchema", True) \
    .option("header", True) \
    .load("/Volumes/my_first_catalog/bronze/bronze_volume2/customers")

display(df_cust)



In [0]:
from pyspark.sql.functions import current_timestamp

df_cust = df_cust.withColumn(
    "process_date",
    current_timestamp()
)

display(df_cust)


In [0]:
df_cust=df_cust.withColumn("domain",split(col("email"),"@")[1])
display(df_cust)

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("my_first_catalog.silver.customers_enr"):

    dlt_obj = DeltaTable.forName(spark, "my_first_catalog.silver.customers_enr")

    dlt_obj.alias("trg").merge(
        df_cust.alias("src"),
        "trg.customer_id == src.customer_id"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

else:

    df_cust.write.format("delta") \
        .mode("append") \
        .saveAsTable("my_first_catalog.silver.customers_enr")

In [0]:
%sql
select * from my_first_catalog.silver.customers_enr

# **PRODUCTS**

In [0]:
df_prod=spark.read.format("csv")\
  .option("inferSchema",True)\
  .option("header",True)\
  .load("/Volumes/my_first_catalog/bronze/bronze_volume2/products/")
display(df_prod)

In [0]:
df_prod = df_prod.withColumn(
    "process_date",
    current_timestamp()
)

display(df_prod)

In [0]:
display(df_prod.groupBy("category").agg(avg("price")))

Databricks visualization. Run in Databricks to view.

In [0]:
if spark.catalog.tableExists("my_first_catalog.silver.products_enr"):

    dlt_obj = DeltaTable.forName(spark, "my_first_catalog.silver.products_enr")

    dlt_obj.alias("trg").merge(
        df_prod.alias("src"),
        "trg.product_id == src.product_id"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

else:

    df_prod.write.format("delta") \
        .mode("append") \
        .saveAsTable("my_first_catalog.silver.products_enr")

In [0]:
%sql
select * from my_first_catalog.silver.products_enr

# **STORE**

In [0]:
df_store = spark.read.format("csv") \
    .option("inferSchema", True) \
    .option("header", True) \
    .load("/Volumes/my_first_catalog/bronze/bronze_volume2/stores")

display(df_store)


In [0]:
df_store=df_store.withColumn("store_name",regexp_replace(col("store_name"),"_",""))
display(df_store)

In [0]:
df_store=df_store.withColumn(
    "process_date",
    current_timestamp())
display(df_store)

In [0]:
if spark.catalog.tableExists("my_first_catalog.silver.stores_enr"):

    dlt_obj = DeltaTable.forName(spark, "my_first_catalog.silver.stores_enr")

    dlt_obj.alias("trg").merge(
        df_store.alias("src"),
        "trg.store_id == src.store_id"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

else:

    df_store.write.format("delta") \
        .mode("append") \
        .saveAsTable("my_first_catalog.silver.store_enr")

In [0]:
%sql
select * from my_first_catalog.silver.store_enr

# **SALES**

In [0]:
df_sales = spark.read.format("csv") \
    .option("inferSchema", True) \
    .option("header", True) \
    .load("/Volumes/my_first_catalog/bronze/bronze_volume2/sales")

display(df_sales)

In [0]:
df_sales=df_sales.withColumn("pricePersale",round(col("total_amount")/col("quantity"),2))
display(df_sales)

In [0]:
df_sales=df_sales.withColumn(
    "process_date",
    current_timestamp())
display(df_sales)

In [0]:
if spark.catalog.tableExists("my_first_catalog.silver.sales_enr"):

    dlt_obj = DeltaTable.forName(spark, "my_first_catalog.silver.sales_enr")

    dlt_obj.alias("trg").merge(
        df_sales.alias("src"),
        "trg.sales_id == src.sales_id"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

else:

    df_sales.write.format("delta") \
        .mode("append") \
        .saveAsTable("my_first_catalog.silver.sales_enr")

In [0]:
%sql
select * from my_first_catalog.silver.sales_enr