In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [0]:
%run /Workspace/Users/arunbabuprakash@gmail.com/E_Comm/Utilities

In [0]:
catalog_name


## Brands

In [0]:
brand_schema = StructType([
    StructField("brand_code", StringType(), False),
    StructField("brand_name", StringType(), True),
    StructField("category_code", StringType(),True)
])

In [0]:
raw_data_path  = "/Volumes/e_comm/oldp_data/raw/brands/*csv"

In [0]:
df = spark.read.csv(raw_data_path, header=True, schema=brand_schema)

In [0]:
# add Metadata

df = df.withColumn("file_name",F.col("_metadata.file_path"))\
    .withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))

In [0]:
df.write.format("delta").mode("overwrite")\
    .option("overwriteSchema", "true")\
    .saveAsTable(f"{catalog_name}.{bronze}.brz_brands")

## Catagory

In [0]:
category_schema = StructType([
    StructField('category_code', StringType(), False),
    StructField('category_name', StringType(), True)
])

In [0]:
raw_data_path = "/Volumes/e_comm/oldp_data/raw/category/*csv"

In [0]:
df = spark.read.csv(raw_data_path, header=True, schema=category_schema)
# add Metadata

df = df.withColumn("file_name",F.col("_metadata.file_path"))\
    .withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))


In [0]:
df.write.format("delta").mode("overwrite")\
    .option("overwriteSchema", "true")\
    .saveAsTable(f"{catalog_name}.{bronze}.brz_category")

## customers

In [0]:
raw_data_path = '/Volumes/e_comm/oldp_data/raw/customers/*.csv'

In [0]:
customers_schema = StructType([
    StructField('customer_id', StringType(), False),
    StructField('phone', StringType(), True),
    StructField('country_code', StringType(), True),
    StructField('country', StringType(), True),
    StructField('state', StringType(), True)    
])

In [0]:
df = spark.read.csv(raw_data_path, header=True, schema=customers_schema)
# add Metadata

df = df.withColumn("file_name",F.col("_metadata.file_path"))\
    .withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))



In [0]:
df.write.format("delta").mode("overwrite")\
    .option("overwriteSchema", "true")\
    .saveAsTable(f"{catalog_name}.{bronze}.brz_customers")

## Date

In [0]:
raw_data_path = '/Volumes/e_comm/oldp_data/raw/date/*.csv'

In [0]:
date_schema = StructType([
    StructField('date', StringType(), False),
    StructField('year', IntegerType(), False),
    StructField('day_name',StringType(),False),
    StructField('quarter', StringType(),False),
    StructField('week_of_year',StringType(),False)
])

In [0]:
df = spark.read.csv(raw_data_path, header=True, schema=date_schema)
# add Metadata

df = df.withColumn("file_name",F.col("_metadata.file_path"))\
    .withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))

In [0]:
df.write.format("delta").mode("overwrite")\
    .option("overwriteSchema",True)\
    .saveAsTable(f"{catalog_name}.{bronze}.brz_date")

## Order item


In [0]:
raw_data_path = '/Volumes/e_comm/oldp_data/raw/order_items/landing/*.csv'

In [0]:
order_items_schema = StructType([
    StructField('dt', StringType(), False),
    StructField('order_ts', StringType(), False),
    StructField('customer_id', StringType(), True),
    StructField('order_id', StringType(), True),
    StructField('item_seq', StringType(), True),
    StructField('product_id', StringType(), True),
    StructField('quantity', StringType(), True),
    StructField('unit_price_currency', StringType(), True),
    StructField('unit_price', StringType(), True),
    StructField('discount_pct', StringType(), True),
    StructField('tax_amount', StringType(), True),
    StructField('channel', StringType(), True),
    StructField('coupon_code', StringType(), True)
])
df = spark.read.csv(raw_data_path, header=True, schema=order_items_schema)
# add Metadata

df = df.withColumn("file_name",F.col("_metadata.file_path"))\
    .withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))

In [0]:
df.write.format("delta").mode("overwrite")\
    .option("overwriteSchema",True)\
    .saveAsTable(f"{catalog_name}.{bronze}.brz_order_items")

## Products

In [0]:
raw_data_path = '/Volumes/e_comm/oldp_data/raw/products/*.csv'

In [0]:
product_schema = StructType([
    StructField('product_id', StringType(), False),
    StructField('sku', StringType(), True),
    StructField('category_code', StringType(), True),
    StructField('brand_code', StringType(), False),
    StructField('color', StringType(), True),
    StructField('size', StringType(), True),
    StructField('material', StringType(), True),
    StructField('weight_grams', StringType(), True),
    StructField('length_cm', StringType(), True),
    StructField('width_cm', StringType(), True),
    StructField('height_cm', StringType(), True),
    StructField('rating_count', StringType(), True)
])
df = spark.read.csv(raw_data_path, header=True, schema=product_schema)
# add Metadata

df = df.withColumn("file_name",F.col("_metadata.file_path"))\
    .withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))

In [0]:
df.write.format("delta").mode("overwrite")\
    .option("overwriteSchema",True)\
    .saveAsTable(f"{catalog_name}.{bronze}.brz_products")