# Databricks Data Preparation in ML - Notebook 02
## Data Validation

**Part of the Databricks Data Preparation in ML Training Series**

---

## Objectives

This notebook demonstrates essential data validation techniques for Databricks ML Associate Certification:

- **Schema Validation** - Verifying data types and structure consistency
- **Quality Metrics** - Measuring completeness, validity, and reliability
- **Business Rules** - Implementing and validating business logic constraints
- **Data Monitoring** - Setting up basic data quality monitoring workflows

## Duration: ~30 minutes
## Level: Intermediate

---

## Why is Data Validation Critical?

**Data quality** forms the foundation of every successful ML project:
- **Garbage In, Garbage Out** - Poor data quality leads to unreliable models
- **Early Detection** - Identifying issues before they impact downstream processes
- **Trust & Reliability** - Building confidence in data-driven decisions
- **Cost Efficiency** - Preventing expensive fixes in production systems

---

## Data Quality Dimensions

### Core Quality Metrics:

#### **Completeness**
- Are all required values present in the dataset?
- What percentage of records have missing critical fields?

#### **Validity** 
- Do values conform to expected formats and ranges?
- Are categorical values from the expected domain?

#### **Uniqueness**
- Are there unwanted duplicate records?
- Do business keys maintain their intended uniqueness?

#### **Business Rules Compliance**
- Do data relationships make business sense?
- Are domain-specific constraints satisfied?

### Validation in ML Context:
- **Feature Quality** - Ensure features meet model expectations
- **Training Data Integrity** - Validate consistency across training sets
- **Inference Monitoring** - Detect data drift in production
- **Pipeline Reliability** - Maintain data quality throughout ML workflows

## Environment Setup

In [0]:
# Basic imports for Databricks ML
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, isnan, isnull, mean, stddev, min as spark_min, max as spark_max, percentile_approx, length
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
import numpy as np
from datetime import datetime, timedelta

: 

In [0]:
%pip install faker

In [0]:
from faker import Faker
import random
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import Row

# Init
fake = Faker()
random.seed(42)
Faker.seed(42)

n_rows = 1000
generated_names = []
generated_emails = []
generated_salaries = []
sources = ["CRM_SYSTEM", "HR_SYSTEM", "ERP_SYSTEM", "WEB_PORTAL", "MOBILE_APP"]

rows = []

for i in range(1, n_rows + 1):
    # --- full_name ---
    name_choice = random.choices(["new", "duplicate", "null"], weights=[0.75, 0.2, 0.05])[0]
    if name_choice == "new":
        full_name = fake.name()
        generated_names.append(full_name)
    elif name_choice == "duplicate" and generated_names:
        full_name = random.choice(generated_names)
    else:
        full_name = None

    # --- age (as string) ---
    age_choice = random.choices(["valid", "invalid", "null"], weights=[0.85, 0.1, 0.05])[0]
    if age_choice == "valid":
        age = str(random.randint(18, 65))
    elif age_choice == "invalid":
        age = "xyz"
    else:
        age = None

    # --- email ---
    email_choice = random.choices(["new", "duplicate", "null"], weights=[0.75, 0.2, 0.05])[0]
    if email_choice == "new" and full_name:
        email = f"{full_name.replace(' ', '.').lower()}@example.com"
        generated_emails.append(email)
    elif email_choice == "duplicate" and generated_emails:
        email = random.choice(generated_emails)
    else:
        email = None

    # --- salary (as string) ---
    salary_choice = random.choices(["new", "duplicate", "null"], weights=[0.7, 0.25, 0.05])[0]
    if salary_choice == "new":
        salary = str(round(random.uniform(30000, 120000), 2))
        generated_salaries.append(salary)
    elif salary_choice == "duplicate" and generated_salaries:
        salary = str(random.choice(generated_salaries))
    else:
        salary = None

    # --- registration_date ---
    registration_date = fake.date_between(start_date="-2y", end_date="today").strftime("%Y-%m-%d")

    # --- source_system ---
    source_system = random.choice(sources)

    rows.append((i, full_name, age, email, salary, registration_date, source_system))

# --- Define schema ---
bronze_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("full_name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("email", StringType(), True),
    StructField("salary", StringType(), True),
    StructField("registration_date", StringType(), True),
    StructField("source_system", StringType(), True)
])

# --- Create DataFrame ---
df_raw = spark.createDataFrame(rows, schema=bronze_schema)

# --- Display ---
display(df_raw)

#Schema Validation

## Theory

**Schema Validation** ensures that data conforms to the expected structure and format:
- **Data types** - Verify columns have correct data types (integer, string, double, etc.)
- **Nullable constraints** - Check if required fields are properly populated
- **Column presence** - Ensure all expected columns exist in the dataset
- **Column ordering** - Validate consistent column positioning for downstream processes

### 🎯 Schema Validation Benefits:
- **Early Error Detection** - Catch structural issues before processing
- **Pipeline Stability** - Prevent runtime errors in downstream applications
- **Data Contract Enforcement** - Ensure producers meet consumer expectations
- **ML Model Consistency** - Validate feature schema matches training expectations

In [0]:
# Basic schema validation
expected_columns = ["customer_id", "full_name", "age", "email", "salary"]
actual_columns = df_raw.columns

display(df_raw.columns)

In [0]:

missing_columns = set(expected_columns) - set(actual_columns)
extra_columns = set(actual_columns) - set(expected_columns)

if missing_columns:
    print(f"❌ Missing columns: {missing_columns}")
if extra_columns:
    print(f"⚠️ Extra columns: {extra_columns}")
if not missing_columns and not extra_columns:
    print("✅ All expected columns present")

# Check data types
print("📊 Current data types:")
for col_name, col_type in df_raw.dtypes:
    print(f"   {col_name}: {col_type}")

## Profiling

*The cell below computes summary statistics (count, mean, stddev, min, max, etc.) for each numeric column in df_raw.

In [0]:
# The cell below computes summary statistics (count, mean, stddev, min, max, etc.) for each numeric column in df_raw.
display(df_raw.describe())

In [0]:
df_raw.columns

In [0]:
from pyspark.sql.functions import col# Check for null values in each column

# Check for null or empty strings
print("Empty string values or null value:")
string_columns = [col_name for col_name, col_type in df_raw.dtypes if col_type == 'string']
for column in string_columns:
    empty_count = df_raw.filter((col(column) == "") | (col(column).isNull())).count()
    print(f"   {column}: {empty_count} empty/null values")

## Data Type Validation

This section explains how to verify that each column in your dataset has the expected data type. Data type validation ensures that numerical fields are not accidentally stored as strings, dates are properly formatted, and all columns match the schema required for downstream ML tasks. The code below will demonstrate how to check and enforce correct data types in your DataFrame.

In [0]:
# Data type validation for each column
expected_types = {
    "customer_id": "int",
    "full_name": "string",
    "age": "string",
    "email": "string",
    "salary": "int",
    "registration_date": "string",
    "source_system": "string"
}

actual_types = dict(df_raw.dtypes)

type_mismatches = []
for col_name, expected_type in expected_types.items():
    actual_type = actual_types.get(col_name)
    if actual_type != expected_type:
        type_mismatches.append((col_name, expected_type, actual_type))

if type_mismatches:
    print("❌ Data type mismatches found:")
    for col_name, expected, actual in type_mismatches:
        print(f"   {col_name}: expected {expected}, found {actual}")
else:
    print("✅ All column data types are correct")

#Data Quality Metrics

## Theory

**Data Quality Metrics** provide quantitative measures of data health across multiple dimensions. These metrics help establish baselines and monitor data quality over time.

### 📊 Key Quality Dimensions:

#### **Completeness**
- Percentage of non-null values in each column
- Critical for identifying missing data patterns
- Threshold: Typically >95% for critical fields

#### **Uniqueness**
- Percentage of unique values in key fields
- Essential for primary keys and unique identifiers
- Threshold: 100% for business keys

#### **Validity**
- Percentage of values conforming to expected format/range
- Includes format validation (emails, phone numbers)
- Range validation (age between 0-120, positive salaries)

#### **Consistency**
- Uniform representation across similar fields
- Standardized formatting and naming conventions
- Cross-field relationship validation

### 🎯 Quality Scoring:
- **Excellent**: >95% - Production ready
- **Good**: 85-95% - Minor issues, acceptable with monitoring
- **Poor**: <85% - Requires immediate attention

This cell selects the `id` and `review_str` columns from the `example_table` for a specific date, and adds a new column `review_sentiment` that classifies the sentiment of each review as positive, negative, neutral, or mixed using the `ai_analyze_sentiment` function.

In [0]:
# Calculate basic quality metrics
total_records = df_raw.count()

print(f"Data Quality Assessment: Total records: {total_records}")
print()

# Completeness check for each column
print("Completeness (% non-null values):")
for column in df_raw.columns:
    null_count = df_raw.filter(col(column).isNull()).count()
    completeness = (total_records - null_count) / total_records * 100
    status = "✅" if completeness >= 95 else "⚠️" if completeness >= 85 else "❌"
    print(f"   {column}: {completeness:.1f}% {status}")

## Completeness

**Completeness** measures the extent to which all required data is present in a dataset. It focuses on identifying missing or null values in critical fields, ensuring that essential information is available for analysis and modeling. High completeness is crucial for reliable ML outcomes, as missing data can lead to biased models and inaccurate predictions. Typical completeness checks include calculating the percentage of non-null values per column and flagging records with missing mandatory fields.

## Uniqueness

**Uniqueness** ensures that each record or key field in a dataset is distinct and not duplicated. This is critical for maintaining data integrity, especially for primary keys, unique identifiers, or business keys. Duplicate records can lead to inaccurate analytics, skewed model training, and operational errors. Typical uniqueness checks involve identifying duplicate rows or values in key columns and quantifying the percentage of unique entries. High uniqueness is essential for reliable ML outcomes and trustworthy data pipelines.

In [0]:
# Uniqueness check for ID columns
print("🔍 Uniqueness check:")
unique_ids = df_raw.select("customer_id").distinct().count()
total_ids = df_raw.filter(col("customer_id").isNotNull()).count()
uniqueness = unique_ids / total_ids * 100 if total_ids > 0 else 0

print(f"Customer ID uniqueness: {uniqueness:.1f}%")
if uniqueness < 100:
    duplicate_count = total_ids - unique_ids
    print(f"   Found {duplicate_count} duplicate customer IDs")
    
    # Show duplicate IDs
    duplicates = df_raw.groupBy("customer_id").count().filter(col("count") > 1)
    duplicates.show()

##Validity - Business Rule Validation

## Theory

**Business Rules** are domain-specific constraints that data must satisfy to be considered valid for business use. These rules encode organizational knowledge and operational requirements.

### 🏢 Types of Business Rules:

#### **Range Constraints**
- Age must be between 18-100 years for employee records
- Salary must be positive and within reasonable bounds
- Dates must be within expected business periods

#### **Format Requirements**
- Email addresses must follow standard email format
- Phone numbers must match regional patterns
- Postal codes must conform to country standards

#### **Relationship Rules**
- Start date must be before end date
- Manager salary should be higher than direct reports
- Department codes must exist in reference tables

#### **Business Logic Constraints**
- Minimum experience requirements for certain roles
- Credit limits based on customer categories
- Inventory levels must be non-negative

### 🎯 Implementation Strategy:
- **Declarative Rules** - Define rules as SQL conditions
- **Threshold-based** - Set acceptable compliance percentages
- **Exception Handling** - Document and track rule violations
- **Business Context** - Rules should reflect real business needs

### Business Rule 1: Age must be between 18 and 100

In [0]:

print("🏢 Business Rule Validation:")
print()

# Age validation
invalid_age = df_raw.filter(
    col("age").isNotNull() & 
    ((col("age") < 18) | (col("age") > 100))
).count()

age_compliance = (total_records - invalid_age) / total_records * 100
print(f"Age Range (18-100): {age_compliance:.1f}% compliant")
if invalid_age > 0:
    print(f"   Found {invalid_age} invalid ages")
    df_raw.filter((col("age") < 18) | (col("age") > 100)).select("customer_id", "age").show()

### Business Rule 2: Salary must be positive and reasonable

In [0]:
# Business Rule 2: Salary must be positive and reasonable
invalid_salary = df_raw.filter(
    col("salary").isNotNull() & 
    ((col("salary") <= 0) | (col("salary") > 500000))
).count()

salary_compliance = (total_records - invalid_salary) / total_records * 100
print(f"Salary Range (>0, <500k): {salary_compliance:.1f}% compliant")
if invalid_salary > 0:
    print(f"   Found {invalid_salary} invalid salaries")
    df_raw.filter((col("salary") <= 0) | (col("salary") > 500000)).select("customer_id", "salary").show()

### Business Rule 3: Email format validation

In [0]:
# Business Rule 3: Email format validation
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
invalid_email = df_raw.filter(
    col("email").isNotNull() & 
    (~col("email").rlike(email_pattern))
).count()

email_compliance = (total_records - invalid_email) / total_records * 100
print(f"Email Format: {email_compliance:.1f}% compliant")
if invalid_email > 0:
    print(f"   Found {invalid_email} invalid emails")
    df_raw.filter(col("email").isNotNull() & (~col("email").rlike(email_pattern))).select("customer_id", "email").show()

#Statistical Validation

## Theory

**Statistical Validation** uses statistical methods to detect anomalies, outliers, and distribution changes that may indicate data quality issues or unexpected patterns.

### 📊 Statistical Validation Techniques:

#### **Descriptive Statistics**
- Mean, median, standard deviation for central tendency
- Min/max values for range validation
- Percentiles for distribution understanding

#### **Outlier Detection**
- **IQR Method** - Values beyond 1.5 * IQR from Q1/Q3
- **Z-Score** - Values more than 3 standard deviations from mean
- **Business Context** - Domain-specific outlier definitions

#### **Distribution Analysis**
- **Skewness** - Measure of asymmetry in data distribution
- **Kurtosis** - Measure of tail heaviness
- **Normality Tests** - Validate distributional assumptions

#### **Temporal Validation**
- **Trend Analysis** - Detect unusual patterns over time
- **Seasonality** - Validate expected seasonal patterns
- **Change Point Detection** - Identify significant distribution shifts

### 🎯 ML Applications:
- **Feature Quality** - Ensure features have expected statistical properties
- **Data Drift Detection** - Monitor changes in production data
- **Anomaly Detection** - Identify unusual patterns for investigation
- **Model Validation** - Verify training data statistical assumptions

In [0]:
from pyspark.sql.functions import mean, stddev, min as spark_min, max as spark_max, percentile_approx, col

# Basic statistical validation for age

age_stats_df = df_raw.filter(col("age").isNotNull()).select(
    mean("age").alias("mean"),
    stddev("age").alias("std"),
    spark_min("age").alias("min_val"),
    spark_max("age").alias("max_val"),
    percentile_approx("age", 0.25).alias("q1"),
    percentile_approx("age", 0.75).alias("q3")
)

age_stats = age_stats_df.collect()[0]

display(age_stats_df)

###Outlier Detection Using IQR Method (for age column)

This section applies the Interquartile Range (IQR) method to detect outliers in the age column of the dataset.

 Step-by-step explanation:

	1.	Compute IQR
IQR is the range between the third quartile (Q3) and the first quartile (Q1):
{IQR} = Q3 - Q1

	2.	Define bounds for outliers
Any value that lies below the lower bound or above the upper bound is considered an outlier:
{Lower Bound} = Q1 - 1.5 {IQR}
{Upper Bound} = Q3 + 1.5 {IQR}

	3.	Filter outliers in the age column
	•	Null values are ignored
	•	Only values outside the [lower_bound, upper_bound] range are selected
	4.	Display the results
The filtered DataFrame contains only customers whose age is considered an outlier.

Use case:

This method is commonly used in data cleaning and feature engineering to identify values that may require further inspection, correction, or removal before training machine learning models

In [0]:
iqr = age_stats['q3'] - age_stats['q1']
lower_bound = age_stats['q1'] - 1.5 * iqr
upper_bound = age_stats['q3'] + 1.5 * iqr

outliers_df = df_raw.filter(
    col("age").isNotNull() & 
    ((col("age") < lower_bound) | (col("age") > upper_bound))
)


in_outliers_df = df_raw.filter(
    col("age").isNotNull() & 
    ((col("age") >= lower_bound) | (col("age") <= upper_bound))
)

display(outliers_df)

In [0]:
df_raw.filter(col("age").isNotNull()).count()

In [0]:
df_raw.count()

In [0]:
in_outliers_df.count()

In [0]:
outliers_df.count()

In [0]:
from pyspark.sql.functions import col

df_raw = df_raw.withColumn("age", col("age").cast("int"))

#Comprehensive Data Quality Report

## Production-Ready Data Quality Scorecard

Create a comprehensive data quality assessment that can be used for:
- **Executive Reporting** - High-level data health summary
- **Operational Monitoring** - Daily/weekly quality tracking
- **ML Pipeline Validation** - Pre-training data quality checks
- **Compliance Documentation** - Audit trail for data governance

In [0]:
# Simple Data Quality Report
total_records = df_raw.count()

# 1. Completeness check
print("📊 COMPLETENESS:")
completeness_scores = []
for column in df_raw.columns:
    null_count = df_raw.filter(col(column).isNull()).count()
    completeness = (total_records - null_count) / total_records * 100
    completeness_scores.append(completeness)
    status = "✅" if completeness >= 95 else "⚠️" if completeness >= 85 else "❌"
    print(f"   {column}: {completeness:.1f}% {status}")

avg_completeness = sum(completeness_scores) / len(completeness_scores)

# 2. Business rules compliance
print("\n🏢 BUSINESS RULES:")
age_violations = df_raw.filter((col("age") < 18) | (col("age") > 100)).count()
age_compliance = (total_records - age_violations) / total_records * 100
print(f"   Age (18-100): {age_compliance:.1f}% {'✅' if age_compliance >= 90 else '❌'}")

salary_violations = df_raw.filter((col("salary") <= 0) | (col("salary") > 500000)).count()
salary_compliance = (total_records - salary_violations) / total_records * 100
print(f"   Salary (>0, <500k): {salary_compliance:.1f}% {'✅' if salary_compliance >= 90 else '❌'}")

avg_compliance = (age_compliance + salary_compliance) / 2

# 3. Overall score
overall_score = (avg_completeness * 0.6 + avg_compliance * 0.4)

if overall_score >= 90:
    grade = "A (Excellent)"
elif overall_score >= 80:
    grade = "B (Good)"
elif overall_score >= 70:
    grade = "C (Fair)"
else:
    grade = "D (Poor)"

print(f"\n🎯 OVERALL QUALITY SCORE: {overall_score:.1f}% - {grade}")

# Simple recommendations
print("\n💡 RECOMMENDATIONS:")
if overall_score < 90:
    print("   • Review and improve data quality before ML training")
    print("   • Fix business rule violations")
    print("   • Handle missing values appropriately")
else:
    print("   • Data quality is good - proceed with ML pipeline")

print("\n" + "=" * 50)

## QUICK REFERENCE for Databricks ML Associate

### **1️⃣ Check for Null Values:**
```python
null_count = df.filter(col('column').isNull()).count()
completeness = (total - null_count) / total * 100
```

### **2️⃣ Business Rule Validation:**
```python
violations = df.filter((col('age') < 18) | (col('age') > 100)).count()
compliance = (total - violations) / total * 100
```

### **3️⃣ Email Format Validation:**
```python
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
valid_emails = df.filter(col('email').rlike(pattern))
```

### **4️⃣ Schema Validation:**
```python
expected_columns = ["id", "name", "email"]
actual_columns = df.columns
missing = set(expected_columns) - set(actual_columns)
```

---

## Congratulations!

You have completed **Data Validation in Databricks ML**!  

### Key Skills Acquired:
- ✅ Schema validation techniques
- ✅ Data quality metrics calculation  
- ✅ Business rule implementation
- ✅ Simple monitoring frameworks