In [0]:
df_bronze = spark.table("supply_dev.bronze.makeup_supply_chain_raw")
display(df_bronze.limit(10))

## Product & Sales Information
| Field                        | Description                                     |
| ---------------------------- | ----------------------------------------------- |
| **Product Type**             | Category or type of product in the supply chain |
| **SKU (Stock Keeping Unit)** | Unique identifier for each product              |
| **Price**                    | Selling price of the product                    |
| **Number of Products Sold**  | Quantity of units sold in a given period        |
| **Revenue Generated**        | Revenue from product sales                      |

## Customer Information
| Field                     | Description                                            |
| ------------------------- | ------------------------------------------------------ |
| **Customer Demographics** | Customer characteristics (age, gender, location, etc.) |

## Inventory & Stock
| Field            | Description                 |
| ---------------- | --------------------------- |
| **Availability** | Product availability status |
| **Stock Levels** | Quantity currently in stock |

## Orders & Shipping
| Field                    | Description                                 |
| ------------------------ | ------------------------------------------- |
| **Order Quantities**     | Number of units in each order               |
| **Shipping Times**       | Time taken to deliver products              |
| **Shipping Carriers**    | Carrier or service responsible for shipment |
| **Shipping Costs**       | Cost associated with shipping               |
| **Transportation Modes** | Mode of transport (air, sea, land)          |
| **Routes**               | Shipping paths used for delivery            |

## Suppliers & Manufacturing
| Field                       | Description                                   |
| --------------------------- | --------------------------------------------- |
| **Supplier Name**           | Vendor providing the product/material         |
| **Location**                | Warehouse, supplier, or distribution location |
| **Lead Time**               | Time required to receive goods from supplier  |
| **Production Volumes**      | Units produced in a given period              |
| **Manufacturing Lead Time** | Time required to manufacture a product        |
| **Manufacturing Costs**     | Costs associated with production              |

## Quality & Inspection
| Field                  | Description                               |
| ---------------------- | ----------------------------------------- |
| **Inspection Results** | Outcome of quality checks                 |
| **Defect Rates**       | Percentage or count of defective products |

## General Cost Information
| Field     | Description                                      |
| --------- | ------------------------------------------------ |
| **Costs** | Operational costs across supply chain activities |


In [0]:
(df_bronze.count(), len(df_bronze.columns))

In [0]:
display(df_bronze.select("Product_type").distinct())
display(df_bronze.select("Customer_demographics").distinct())
display(df_bronze.select("Shipping_carriers").distinct())
display(df_bronze.select("Transportation_modes").distinct())
display(df_bronze.select("Location").distinct())
display(df_bronze.select("Supplier_name").distinct())
display(df_bronze.select("Inspection_results").distinct())

In [0]:
# Count missing (null) values for each column in the DataFrame
from pyspark.sql.functions import col, sum as spark_sum

missing_counts = (
    df_bronze
    .select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df_bronze.columns])
)

In [0]:
display(missing_counts)

In [0]:
from pyspark.sql.functions import col, trim, length

# Count the number of empty (after trimming) string values in each column
empty_string_counts = df_bronze.select([
    spark_sum((trim(col(c)) == "").cast("int")).alias(c)
    for c in df_bronze.columns
])

display(empty_string_counts)

In [0]:
# Count the number of values that are only whitespace or empty (after trimming) in each column
whitespace_counts = df_bronze.select([
    spark_sum((length(trim(col(c))) == 0).cast("int")).alias(c)
    for c in df_bronze.columns
])

display(whitespace_counts)

In [0]:
# Count the number of rows where Price is zero and Order_Quantities is less than or equal to zero
df_bronze.select([
    spark_sum((col("Price") == 0).cast("int")).alias("price_zero"),
    spark_sum((col("Order_Quantities") <= 0).cast("int")).alias("bad_quantity")
]).show()

In [0]:
from pyspark.sql.functions import countDistinct

# Count the number of distinct values in each column of the DataFrame
df_bronze.select([
    countDistinct(col(c)).alias(c) for c in df_bronze.columns
]).display()

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col, trim, length, sum as spark_sum, countDistinct, lit
)

def run_data_quality(df: DataFrame, table_name: str, numeric_rules: dict = None) -> DataFrame:
    """
    Runs generic data quality checks on a DataFrame.

    Parameters:
    - df: Input Spark DataFrame
    - table_name: Logical table name for reporting
    - numeric_rules: Optional dict like {"price": "> 0", "quantity": "> 0"}

    Returns:
    - DataFrame containing DQ results
    """

    total_rows = df.count()
    results = []

    # -------- Column-level checks --------
    for c in df.columns:
        stats = df.select(
            spark_sum(col(c).isNull().cast("int")).alias("nulls"),
            spark_sum((length(trim(col(c))) == 0).cast("int")).alias("empty_strings"),
            countDistinct(col(c)).alias("distinct_count")
        ).collect()[0]

        results.append((table_name, c, "null_values", stats["nulls"], total_rows))
        results.append((table_name, c, "empty_or_whitespace", stats["empty_strings"], total_rows))
        results.append((table_name, c, "distinct_values", stats["distinct_count"], total_rows))

    # -------- Numeric sanity checks (config-driven) --------
    if numeric_rules:
        for col_name, rule in numeric_rules.items():
            if col_name in df.columns:
                condition = ~eval(f"col('{col_name}') {rule}")
                issue_count = df.filter(condition).count()
                results.append((table_name, col_name, f"rule_violation ({rule})", issue_count, total_rows))

    # -------- Create result DataFrame --------
    dq_df = spark.createDataFrame(
        results,
        ["table_name", "column_name", "check_type", "issue_count", "total_rows"]
    )

    dq_df = dq_df.withColumn(
        "status",
        col("issue_count") == 0
    )

    return dq_df


# numeric_rules = {
#     "price": "> 0",
#     "quantity": "> 0",
#     "stock_levels": ">= 0"
# }

# dq_report = run_data_quality(df_bronze, "orders_raw", numeric_rules)
# display(dq_report)


In [0]:
df_bronze.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("supply_dev.silver.orders")
