In [0]:
from pyspark.sql.types import ( StructType,StructField,StringType,IntegerType,FloatType,TimestampType)

import pyspark.sql.functions as F 

In [0]:
catalog_name ='ecommerce'

brand_schema =StructType([StructField('brand_code',StringType(),False),
                          StructField('brand_name',StringType(),True),
                          StructField('category_code',StringType(),True)
                         ])

In [0]:
raw_data_path='/Volumes/ecommerce/source_data/raw/brands/*.csv'
df=spark.read.option('header','true').option('delimeter',',').schema(brand_schema).csv(raw_data_path)


#add metadata columns.

df= df.withColumn("_source_file",F.col("_metadata.file_path"))\
    .withColumn("ingested_at",F.current_timestamp())

In [0]:
display(df)

In [0]:
df.write.format("delta")\
    .mode('overwrite')\
        .option('mergeSchema','true')\
            .saveAsTable(f"{catalog_name}.bronze.brz_brands")

In [0]:
catalog_name ='ecommerce'

In [0]:
category_schema=StructType([StructField('category_code',StringType(),False),
                            StructField('category_name',StringType(),True)
                            ])

In [0]:
raw_data_Category='/Volumes/ecommerce/source_data/raw/category/category.csv'

df_category=spark.read.option('header','true').option('delimeter',',').schema(category_schema).csv(raw_data_Category)

df_category= df_category.withColumn("_source_file",F.col("_metadata.file_path"))\
    .withColumn("ingested_at",F.current_timestamp())



In [0]:
display(df_category)

In [0]:
df_category.write.format("delta")\
    .mode('overwrite')\
        .option('mergeSchema','true')\
            .saveAsTable(f"{catalog_name}.bronze.brz_category")

In [0]:
customer_schema= StructType([StructField('customer_id',StringType(),False),
                             StructField('phone',StringType(),True),
                             StructField('country_code',StringType(),True),
                             StructField('Country',StringType(),True),
                             StructField('state',StringType(),True)])

In [0]:
raw_data_customers='/Volumes/ecommerce/source_data/raw/customers/customers.csv'

df_customers=spark.read.option('header','true').option('delimeter',',').schema(customer_schema).csv(raw_data_customers)

df_customers=df_customers.withColumn("_source_file",F.col("_metadata.file_path")) \
   .withColumn("ingested_at",F.current_timestamp())

In [0]:
display(df_customers)

In [0]:
df_customers.write.format("delta")\
    .mode('overwrite')\
        .option('mergeSchmema','true')\
            .saveAsTable(f"{catalog_name}.bronze.brz_customers")

In [0]:
date_schema=StructType([StructField('date',StringType(),True),
                        StructField('year',IntegerType(),True),
                        StructField('day_name',StringType(),True),
                        StructField('quarter',IntegerType(),True),
                        StructField('week_of_year',IntegerType(),True)])



In [0]:
raw_data_date=f"/Volumes/ecommerce/source_data/raw/date/date.csv"

df_date=spark.read.option('header','true').option('delimeter',',').schema(date_schema).csv(raw_data_date)

df_date=df_date.withColumn("_source_file",F.col("_metadata.file_path"))\
    .withColumn("ingested_at",F.current_timestamp())

    



In [0]:
display(df_date)
df_date.write.format("delta")\
    .mode('overwrite')\
        .option('mergeSchema','true')\
            .saveAsTable(f"{catalog_name}.bronze.brz_date")

In [0]:
products_schema = StructType([
    StructField("product_id", StringType(), False),
    StructField("sku", StringType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand_code", StringType(), True),
    StructField("color", StringType(), True),
    StructField("size", StringType(), True),
    StructField("material", StringType(), True),
    StructField("weight_grams", StringType(), True),  
    StructField("length_cm", StringType(), True),     
    StructField("width_cm", FloatType(), True),
    StructField("height_cm", FloatType(), True),
    StructField("rating_count", IntegerType(), True),
    StructField("file_name", StringType(), False),
    StructField("ingest_timestamp", TimestampType(), False)
])


raw_data_product = "/Volumes/ecommerce/source_data/raw/products/*.csv"

df_product = spark.read.option("header", "true").option("delimiter", ",").schema(products_schema).csv(raw_data_product) \
    .withColumn("file_name", F.col("_metadata.file_path")) \
    .withColumn("ingest_timestamp", F.current_timestamp())


df_product.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_products")    

In [0]:
display(df_product)