##How to import Data and Creating brz_bronze

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType, FloatType
import pyspark.sql.functions as F

In [0]:
catalog_name = 'ecommerce'

# Define schema for the data file

brand_schema = StructType([
    StructField('brand_code', StringType(), False),
    StructField('brand_name', StringType(), False),
    StructField('category_code', StringType(), True),
])

In [0]:
raw_data_path = "/Volumes/ecommerce/source_data/raw/brands/*.csv"

df = spark.read.option('header', "true").option("delimeter", ",").schema(brand_schema).csv(raw_data_path)

#add metadata columns
df = df.withColumn("_source_file", F.col("_metadata.file_path")) \
    .withColumn("ingested_at", F.current_timestamp())
    
display(df.limit(5))

In [0]:
df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_brands")

##Creating brz_category

In [0]:
category_schema = StructType([
    StructField('category_code', StringType(), False),
    StructField('category_name', StringType(), True),
])

raw_data_path = "/Volumes/ecommerce/source_data/raw/category/*.csv"

df = spark.read.option('header', "true").option("delimeter", ",").schema(category_schema).csv(raw_data_path)

#add metadata columns
df = df.withColumn("_source_file", F.col("_metadata.file_path")) \
    .withColumn("ingested_at", F.current_timestamp())

df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_category")

## Creating brz_products

In [0]:
products_schema = StructType([
    StructField('product_id', StringType(), False),
    StructField('sku', StringType(), True),
    StructField('category_code', StringType(), True),
    StructField('brand_code', StringType(), True),
    StructField('color', StringType(), True),
    StructField('size', StringType(), True),
    StructField('material', StringType(), True),
    StructField('weight_grams', StringType(), True),
    StructField('length_cm', StringType(), True),
    StructField('width_cm', FloatType(), True),
    StructField('height_cm', FloatType(), True),
    StructField('rating_count', IntegerType(), True),
    StructField('file_name', StringType(), False),
    StructField('ingest_timestamp', TimestampType(), False),
])

raw_data_path = "/Volumes/ecommerce/source_data/raw/products/*.csv"

df = spark.read.option('header', "true").option("delimeter", ",").schema(products_schema).csv(raw_data_path)

#add metadata columns
df = df.withColumn("file_name", F.col("_metadata.file_path")) \
    .withColumn("ingest_timestamp", F.current_timestamp())

df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_products")


## Creating brz_customers

In [0]:
customers_schema = StructType([
    StructField('customer_id', StringType(), False),
    StructField('phone', StringType(), True),
    StructField('country_code', StringType(), True),
    StructField('country', StringType(), True),
    StructField('state', StringType(), True),
])

raw_data_path = "/Volumes/ecommerce/source_data/raw/customers/*.csv"

df = spark.read.option('header', "true").option("delimeter", ",").schema(customers_schema).csv(raw_data_path)

#add metadata columns
df = df.withColumn("file_name", F.col("_metadata.file_path")) \
    .withColumn("ingest_timestamp", F.current_timestamp())

df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_customers")

## Creating Date


In [0]:
date_schema = StructType([
    StructField('date', StringType(), True),
    StructField('year', StringType(), True),
    StructField('day_name', StringType(), True),
    StructField('quarter', StringType(), True),
    StructField('week_of_year', StringType(), True),
])

raw_data_path = "/Volumes/ecommerce/source_data/raw/date/*.csv"

df = spark.read.option('header', "true").option("delimeter", ",").schema(date_schema).csv(raw_data_path)

#add metadata columns
df = df.withColumn("_source_file", F.col("_metadata.file_path")) \
    .withColumn("ingested_at", F.current_timestamp())

df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_date")

    