# **PYSPARK**

In [0]:
%sql
use catalog dba;
create schema if not exists silver;
create schema if not exists gold;

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

### **CUSTOMERS**

In [0]:
df = spark.read.format("csv")\
          .option("inferSchema",True)\
          .option("header",True)\
          .load("/Volumes/dba/bronze/bronze_volume/customers/")

In [0]:
display(df)

In [0]:
df.printSchema()

In [0]:
df = df.withColumn("name",upper(col("name")))
display(df)

In [0]:
df = df.withColumn("domain",split(col("email"),"@")[1])
display(df)

In [0]:
display(
    df.groupBy("domain")
    .agg(count(col("customer_id")).alias("total_customers"))
    .sort(col("total_customers").desc())
)

In [0]:
df = df.withColumn("processDate",current_timestamp())
display(df)

#### UPSERT 

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("dba.silver.customers_enr"):

    dlt_obj = DeltaTable.forName(spark, "dba.silver.customers_enr")

    dlt_obj.alias("trg").merge(df.alias("src"), "trg.customer_id == src.customer_id")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()

else:

    df.write.format("delta")\
            .mode("append")\
            .saveAsTable("dba.silver.customers_enr")

### **PRODUCTS**

In [0]:
df_prod = spark.read.format("csv")\
            .option("inferSchema",True)\
            .option("header",True)\
            .load("/Volumes/dba/bronze/bronze_volume/products/")
display(df_prod)

In [0]:
df_prod = df_prod.withColumn("processDate",current_timestamp())
display(df_prod)

In [0]:
display(df_prod.groupBy("category").agg(avg("price").alias("avg_price")))


In [0]:
if spark.catalog.tableExists("dba.silver.products_enr"):

    dlt_obj = DeltaTable.forName(spark, "dba.silver.products_enr")

    dlt_obj.alias("trg").merge(df_prod.alias("src"), "trg.product_id == src.product_id")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()

else:

    df_prod.write.format("delta")\
            .mode("append")\
            .saveAsTable("dba.silver.products_enr")

In [0]:
%sql
select * from dba.silver.products_enr

### **STORES**

In [0]:
df_str = spark.read.format("csv")\
              .option("inferSchema",True)\
              .option("header",True)\
              .load("/Volumes/dba/bronze/bronze_volume/stores/")
display(df_str)

In [0]:
df_str = df_str.withColumn("store_name",regexp_replace(col("store_name"),"_",""))
display(df_str)

In [0]:
df_str = df_str.withColumn("processDate",current_timestamp())
display(df_str)


In [0]:
if spark.catalog.tableExists("dba.silver.stores_enr"):

    dlt_obj = DeltaTable.forName(spark, "dba.silver.stores_enr")

    dlt_obj.alias("trg").merge(df_str.alias("src"), "trg.store_id == src.store_id")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()

else:

    df_str.write.format("delta")\
            .mode("append")\
            .saveAsTable("dba.silver.stores_enr")

### **SALES**

In [0]:
df_sales = spark.read.format("csv")\
              .option("inferSchema",True)\
              .option("header",True)\
              .load("/Volumes/dba/bronze/bronze_volume/sales/")
display(df_sales)

In [0]:
df_sales = df_sales.withColumn("pricePerSale",round(col("total_amount")/col("quantity"),2))
df_sales = df_sales.withColumn("processDate",current_timestamp())

display(df_sales)


In [0]:
if spark.catalog.tableExists("dba.silver.sales_enr"):

    dlt_obj = DeltaTable.forName(spark, "dba.silver.sales_enr")

    dlt_obj.alias("trg").merge(df_sales.alias("src"), "trg.sales_id == src.sales_id")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()

else:

    df_sales.write.format("delta")\
            .mode("append")\
            .saveAsTable("dba.silver.sales_enr")

### **SPARK SQL**

In [0]:
df = spark.sql("SELECT * FROM dba.silver.products_enr")

In [0]:
display(df)

In [0]:
df.createOrReplaceTempView("temp_products")

In [0]:
df = spark.sql("""
            SELECT *,
                    CASE 
                    WHEN category = 'Toys' THEN 'YES' ELSE 'NO' END AS flag
                    FROM temp_products
            """)

In [0]:
display(df)

### **PySpark UDF**

In [0]:
def greet(p_input):
  return "Hello"+str(p_input)

In [0]:
udf_greet = udf(greet)

In [0]:
df = df.withColumn("greet",udf_greet(col("flag")))
display(df)