In this stage we will prepare data to be ready to transform to Silver level , thats include: 
  - Validate schema (check datatypes) and convert object type to correct types (dates, string, number, category)
  - Check nulls & duplicates (decide whether to keep, impute, or drop).
  - Data entry correction if required.
  - Ingestion new data from Mongo like products English name
  - Join related tables for consistency checks.
  - Move the cleaned data to the Silver layer of the Azure Data Lake.

In [0]:
spark

<pyspark.sql.connect.session.SparkSession at 0x7fd90f5a8fa0>

Establish a Databricks connection configuration to the Bronze storage layer in the Data Lake.

In [0]:
# Parameters
storage_account = "storage_account"
application_id = "application_id"
directory_id = "directory_id"


spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", "")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

In [0]:
import pandas as pd

In [0]:
#%pip install pymongo

Extract product category names from the external MongoDB resource hosted on filess.io.

In [0]:
# importing module
from pymongo import MongoClient

hostname = "hostname"
database = "database"
port = "27018"
username = "username"
password = "password"

uri = "mongodb://" + username + ":" + password + "@" + hostname + ":" + port + "/" + database

# Connect with the portnumber and host
client = MongoClient(uri)

# Access database
mydatabase = client[database]

collection = mydatabase["product_category"]
product_cat_data_df = spark.createDataFrame([dict(row, _id=str(row['_id'])) for row in collection.find()])

In [0]:
# Load data from the Bronze layer of the Data Lake into Spark DataFrames.

customers_df = spark.read.csv(f"abfss://ecomdataplatformcontainer@{storage_account}.dfs.core.windows.net/Bronze/customers.csv", header=True, inferSchema=True)

sellers_df = spark.read.csv(f"abfss://ecomdataplatformcontainer@{storage_account}.dfs.core.windows.net/Bronze/sellers.csv", header=True, inferSchema=True)

orders_df = spark.read.csv(f"abfss://ecomdataplatformcontainer@{storage_account}.dfs.core.windows.net/Bronze/orders.csv", header=True, inferSchema=True)

order_items_df = spark.read.csv(f"abfss://ecomdataplatformcontainer@{storage_account}.dfs.core.windows.net/Bronze/order_items.csv", header=True, inferSchema=True)

order_payments_df = spark.read.csv(f"abfss://ecomdataplatformcontainer@{storage_account}.dfs.core.windows.net/Bronze/order_payments.csv", header=True, inferSchema=True)

order_reviews_df = spark.read.csv(f"abfss://ecomdataplatformcontainer@{storage_account}.dfs.core.windows.net/Bronze/order_reviews.csv", header=True, inferSchema=True)

products_df = spark.read.csv(f"abfss://ecomdataplatformcontainer@{storage_account}.dfs.core.windows.net/Bronze/products.csv", header=True, inferSchema=True)

Validate schema (check datatypes match expectations)

In [0]:
from pyspark.sql.functions import col

# Orders
orders_df = (orders_df
    .withColumn("order_id", col("order_id").cast("string"))
    .withColumn("customer_id", col("customer_id").cast("string"))
    .withColumn("order_status", col("order_status").cast("string"))  
)

# Order Items
order_items_df = (order_items_df
    .withColumn("order_id", col("order_id").cast("string"))
    .withColumn("product_id", col("product_id").cast("string"))
    .withColumn("seller_id", col("seller_id").cast("string"))
)

# Payments
order_payments_df = (order_payments_df
    .withColumn("order_id", col("order_id").cast("string"))
    .withColumn("payment_type", col("payment_type").cast("string"))  
)

# Reviews
order_reviews_df = (order_reviews_df
    .withColumn("order_id", col("order_id").cast("string"))
    .withColumn("review_id", col("review_id").cast("string"))
    .withColumn("review_score", col("review_score").cast("string"))
    .withColumn("review_comment_title", col("review_comment_title").cast("string"))
    .withColumn("review_comment_message", col("review_comment_message").cast("string"))
    .withColumn("review_creation_date", col("review_creation_date").cast("timestamp"))
    .withColumn("review_answer_timestamp", col("review_answer_timestamp").cast("timestamp"))
)

# Products
products_df = (products_df
    .withColumn("product_id", col("product_id").cast("string"))
    .withColumn("product_category_name", col("product_category_name").cast("string"))
)

# Sellers
sellers_df = (sellers_df
    .withColumn("seller_id", col("seller_id").cast("string"))
    .withColumn("seller_city", col("seller_city").cast("string"))
    .withColumn("seller_state", col("seller_state").cast("string"))
)

# Customers
customers_df = (customers_df
    .withColumn("customer_id", col("customer_id").cast("string"))
    .withColumn("customer_unique_id", col("customer_unique_id").cast("string"))
    .withColumn("customer_city", col("customer_city").cast("string"))
    .withColumn("customer_state", col("customer_state").cast("string"))
)

# Product category translation
product_cat_data_df = (product_cat_data_df
    .withColumn("product_category_name", col("product_category_name").cast("string"))
    .withColumn("product_category_name_english", col("product_category_name_english").cast("string"))
)


An Overview from the Data

In [0]:
from pyspark.sql import functions as F

tables = {
    "customers": customers_df,
    "sellers": sellers_df,
    "orders": orders_df,
    "order_items": order_items_df,
    "order_payments": order_payments_df,
    "order_reviews": order_reviews_df,
    "products": products_df
}

summary = []

for name, df in tables.items():
    row_count = df.count()
    col_count = len(df.columns)
    
    # Count duplicates (by all columns)
    dup_count = row_count - df.dropDuplicates().count()
    
    # Count nulls per column
    nulls = {}
    for col_name in df.columns:
        null_count = df.filter(F.col(col_name).isNull()).count()
        if null_count > 0:
            nulls[col_name] = null_count
    
    summary.append({
        "table": name,
        "rows": row_count,
        "cols": col_count,
        "duplicates": dup_count,
        "nulls": nulls
    })

# Convert summary into Spark DataFrame
summary_df = spark.createDataFrame(pd.DataFrame(summary))
display(summary_df)


table,rows,cols,duplicates,nulls
customers,99441,5,0,"List(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)"
sellers,3095,4,0,"List(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)"
orders,99441,8,0,"List(160, 1783, 2965, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)"
order_items,112650,7,0,"List(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)"
order_payments,103886,5,0,"List(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)"
order_reviews,104162,7,85,"List(null, null, null, 2236, null, null, null, null, null, null, null, null, 8803, 63079, 92157, 8832, 1, 2380)"
products,32951,9,0,"List(null, null, null, null, 610, 610, 2, 2, 610, 610, 2, 2, null, null, null, null, null, null)"


No duplicate records were found except in the Order Reviews table, where they will be removed.

In [0]:
order_reviews_df = order_reviews_df.dropDuplicates()

In [0]:
details = []

for name, df in tables.items():
    row_count = df.count()

    # Null counts for all columns in one pass
    null_counts = df.select([
        F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns
    ]).collect()[0].asDict()

    for col_name, null_count in null_counts.items():
        if null_count > 0:
            details.append({
                "table": name,
                "column": col_name,
                "rows": row_count,
                "null_count": int(null_count),
                "null_percent": round(null_count / row_count * 100, 2)
            })

# Convert to Spark DataFrame
summary_df = spark.createDataFrame(pd.DataFrame(details))
display(summary_df)


table,column,rows,null_count,null_percent
orders,order_approved_at,99441,160,0.16
orders,order_delivered_carrier_date,99441,1783,1.79
orders,order_delivered_customer_date,99441,2965,2.98
order_reviews,review_id,104162,1,0.0
order_reviews,order_id,104162,2236,2.15
order_reviews,review_score,104162,2380,2.28
order_reviews,review_comment_title,104162,92157,88.47
order_reviews,review_comment_message,104162,63079,60.56
order_reviews,review_creation_date,104162,8832,8.48
order_reviews,review_answer_timestamp,104162,8803,8.45


**Orders Review Table**
- Since many customers provide only a score without written feedback, this explains the presence of null values in most columns.

**Product Table**
- The null values are consistent across all affected columns, indicating that 32,951 products are not categorized. This may suggest that these products do not belong to any defined category, or the category information is missing. Further checks and investigation will be conducted. Other nulls may have been inherited from the missing category information. For now, they will be left as is, since they do not affect the analysis, and will be eliminated later if they are not used.

In [0]:
# Orders + Customers (drop duplicate customer_id from right table)
orders_customers_df = orders_df.join(
    customers_df,
    orders_df.customer_id == customers_df.customer_id,
    "left"
).drop(customers_df.customer_id)

# Add Payments (drop duplicate order_id from right table)
orders_payments_df = orders_customers_df.join(
    order_payments_df,
    orders_customers_df.order_id == order_payments_df.order_id,
    "left"
).drop(order_payments_df.order_id)

# Add Order Items (drop duplicate order_id from right table)
orders_items_df = orders_payments_df.join(
    order_items_df,
    orders_payments_df.order_id == order_items_df.order_id,
    "left"
).drop(order_items_df.order_id)

# Add Reviews (drop duplicate order_id from right table)
orders_reviews_df = orders_items_df.join(
    order_reviews_df,
    orders_items_df.order_id == order_reviews_df.order_id,
    "left"
).drop(order_reviews_df.order_id)

# Add Products (drop duplicate product_id from right table)
orders_products_df = orders_reviews_df.join(
    products_df,
    orders_reviews_df.product_id == products_df.product_id,
    "left"
).drop(products_df.product_id)

# Add Product Categories (drop duplicate product_category_name from right table)
orders_products_all_df = orders_products_df.join(
    product_cat_data_df,
    orders_products_df.product_category_name == product_cat_data_df.product_category_name,
    "left"
).drop(product_cat_data_df.product_category_name)

# Add Sellers (drop duplicate seller_id from right table)
final_df = orders_products_all_df.join(
    sellers_df,
    orders_products_all_df.seller_id == sellers_df.seller_id,
    "left"
).drop(sellers_df.seller_id)

Transport the processed data final file to the silver folder in Azure cloude

In [0]:
final_df.write.mode("overwrite").parquet(f"abfss://ecomdataplatformcontainer@{storage_account}.dfs.core.windows.net/Silver")