In [0]:
import uuid
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

# Define column lists for different tables
prdt_lst = ['product_id', 'product_line', 'unit_price']
cust_lst = ['customer_id', 'customer_type', 'gender', 'rating']
sales_fact_lst = [
    'invoice_id', 'customer_id', 'product_id', 'branch', 'city', 'quantity',
    'date', 'time', 'payment', 'cogs', 'tax', 'total', 'gross_income', 'gross_margin_percentage'
]

# Read CSV file into a Spark DataFrame
# Using inferSchema=True allows Spark to automatically determine column data types
df = (spark.read.format("csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("/FileStore/input/supermarket_sales.csv"))

# Define UDFs to generate UUIDs for customer_id and product_id
uuidUdf_cust = udf(lambda: int(abs(hash(str(uuid.uuid4())))%10**10),LongType())
uuidUdf_prdt = udf(lambda: int(abs(hash(str(uuid.uuid4())))%10**10),LongType())

# Add customer_id and product_id columns with unique UUIDs
df_ids = df.withColumn("customer_id", uuidUdf_cust()).withColumn("product_id", uuidUdf_prdt())

# Select specific columns to create different DataFrames for separate tables
df_product = df_ids.select(*prdt_lst)
df_customer = df_ids.select(*cust_lst)
df_fact = df_ids.select(*sales_fact_lst)

# Write DataFrames to Delta tables in append mode
df_customer.write.format("delta").mode("append").saveAsTable("supermarketsales_db.customer")
df_product.write.format("delta").mode("append").saveAsTable("supermarketsales_db.product")
df_fact.write.format("delta").mode("append").saveAsTable("supermarketsales_db.sales_fact")
