In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType, FloatType
import pyspark.sql.functions as F

In [0]:
catalog_name = 'ecommerce'

### Brands

In [0]:
brand_schema = StructType([
    StructField('brand_code', StringType(), False),
    StructField('brand_name', StringType(), True),
    StructField('category_code', StringType(), True)
])

In [0]:
raw_data_path = "/Volumes/ecommerce/source_data/raw/brands/*.csv"

df = spark.read.option('header', "true").option("delimiter", ",").schema(brand_schema).csv(raw_data_path)

df = df.withColumn("_source_file", F.col("_metadata.file_path")) \
       .withColumn("ingested_at", F.current_timestamp())

display(df.limit(5))

In [0]:
df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_brands")
    


### Category

In [0]:
category_schema = StructType([
    StructField('category_code', StringType(), False),
    StructField('category_name', StringType(), True),
])

raw_data_path = "/Volumes/ecommerce/source_data/raw/category/*.csv"

df_raw = spark.read.option('header', "true").option("delimiter", ",").schema(category_schema).csv(raw_data_path)

df_raw = df_raw.withColumn("_source_file", F.col("_metadata.file_path")) \
       .withColumn("_ingested_at", F.current_timestamp())

display(df_raw.limit(5))

df_raw.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_category")
    

### Products

In [0]:
products_schema = StructType([
    StructField("product_id", StringType(), False),
    StructField("sku", StringType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand_code", StringType(), True),
    StructField("color", StringType(), True),
    StructField("size", StringType(), True),
    StructField("material", StringType(), True),
    StructField("weight_grams", StringType(), True),  
    StructField("length_cm", StringType(), True),     
    StructField("width_cm", FloatType(), True),
    StructField("height_cm", FloatType(), True),
    StructField("rating_count", IntegerType(), True),
    StructField("file_name", StringType(), False),
    StructField("ingest_timestamp", TimestampType(), False)
])


raw_data_path = "/Volumes/ecommerce/source_data/raw/products/*.csv"

df_prod = spark.read.option("header", "true").option("delimiter", ",").schema(products_schema).csv(raw_data_path) \
    .withColumn("file_name", F.col("_metadata.file_path")) \
    .withColumn("ingest_timestamp", F.current_timestamp())

display(df_prod.limit(5))

df_prod.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_products")    

### Customers

In [0]:
customers_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("phone", StringType(), True),
    StructField("country_code", StringType(), True),
    StructField("country", StringType(), True),
    StructField("state", StringType(), True)
])


raw_data_path ="/Volumes/ecommerce/source_data/raw/customers/*.csv"

df_cust = spark.read.option("header", "true").option("delimiter", ",").schema(customers_schema).csv(raw_data_path) \
    .withColumn("file_name", F.col("_metadata.file_path")) \
    .withColumn("ingest_timestamp", F.current_timestamp())

display(df_cust.limit(5))

df_cust.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_customers")  

### Date

In [0]:

date_schema = StructType([
    StructField("date", StringType(), True),           
    StructField("year", IntegerType(), True),          
    StructField("day_name", StringType(), True),       
    StructField("quarter", IntegerType(), True),       
    StructField("week_of_year", IntegerType(), True),  
])


raw_data_path = f"/Volumes/ecommerce/source_data/raw/date/*.csv" 

df_date = spark.read.option("header", "true").option("delimiter", ",").schema(date_schema).csv(raw_data_path) \
    .withColumn("_ingested_at", F.current_timestamp()) \
    .withColumn("_source_file", F.col("_metadata.file_path"))


display(df_date.limit(5))
df_date.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_calendar")     