<a id='1'></a>
## 1 ¬∑ Setup & Spark

In [None]:
import sys, os, warnings, json
warnings.filterwarnings('ignore')

PROJECT_ROOT = os.path.abspath('..')
sys.path.insert(0, PROJECT_ROOT)
os.chdir(PROJECT_ROOT)

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import seaborn as sns
from IPython.display import display, HTML

sns.set_theme(style='whitegrid', palette='viridis', font_scale=1.1)
plt.rcParams.update({'figure.figsize':(14,5), 'figure.dpi':110,
                     'axes.titlesize':14, 'axes.titleweight':'bold'})

from src.utils.spark_utils import get_spark_session
spark = get_spark_session(app_name='NB02-DataQuality', master='local[*]', driver_memory='4g')
print(f'‚úÖ SparkSession ready  ¬∑  v{spark.version}')

<a id='2'></a>
## 2 ¬∑ Data Generation & Spark DataFrames

In [None]:
from src.utils.data_generator import generate_customers, generate_products, generate_orders

print('‚è≥ Generating data ‚Ä¶')
customers_pdf = generate_customers(n=10_000)
products_pdf  = generate_products(n=2_000)
orders_pdf    = generate_orders(
    n=50_000,
    customer_ids=customers_pdf['customer_id'].tolist(),
    product_ids=products_pdf['product_id'].tolist(),
)

# Convert to Spark DataFrames
orders_sdf    = spark.createDataFrame(orders_pdf)
customers_sdf = spark.createDataFrame(customers_pdf)

print(f'‚úÖ Orders: {orders_sdf.count():,} rows  ¬∑  Customers: {customers_sdf.count():,} rows')
orders_sdf.printSchema()

<a id='3'></a>
## 3 ¬∑ Great Expectations ‚Äî ExpectationSuite

The `DataQualityFramework` creates an 8-rule `ExpectationSuite` that encodes business rules:

| # | Rule | GE Expectation |
|---|------|----------------|
| R1 | 9 required columns in order | `expect_table_columns_to_match_ordered_list` |
| R2 | `order_id` not null | `expect_column_values_to_not_be_null` |
| R3 | `customer_id` not null | `expect_column_values_to_not_be_null` |
| R4 | `order_value` not null | `expect_column_values_to_not_be_null` |
| R5 | `order_id` unique | `expect_column_values_to_be_unique` |
| R6 | `order_value` ‚àà [0, 1M] (99 % mostly) | `expect_column_values_to_be_between` |
| R7 | `payment_method` ‚àà valid set | `expect_column_values_to_be_in_set` |
| R8 | `delivery_pincode` matches `^\d{6}$` | `expect_column_values_to_match_regex` |

In [None]:
from src.quality.dq_framework import DataQualityFramework

dqf = DataQualityFramework(spark)
suite = dqf.create_ecommerce_expectations()

print(f'‚úÖ ExpectationSuite created: "{suite.expectation_suite_name}"')
print(f'   Number of expectations: {len(suite.expectations)}')
print(f'\n{"‚îÄ"*60}')
for i, exp in enumerate(suite.expectations, 1):
    exp_type = exp.expectation_type.replace('expect_', '').replace('_', ' ')
    col = exp.kwargs.get('column', '‚Äî')
    print(f'   R{i}: {exp_type:40s}  col={col}')
print(f'{"‚îÄ"*60}')

<a id='4'></a>
## 4 ¬∑ GE Validation & Quarantine

Records that fail validation are quarantined to a Delta Lake table.

In [None]:
import tempfile, shutil

quarantine_path = os.path.join(tempfile.mkdtemp(), 'quarantine_orders')

# The framework's validate_and_quarantine uses GE's SparkDFDataset
# On GE >= 0.18 SparkDFDataset may be deprecated, so we handle gracefully
try:
    results = dqf.validate_and_quarantine(orders_sdf, suite, quarantine_path)
    ge_ran = True
    print('‚úÖ Great Expectations validation completed')
    print(f'\n   Total rows:     {results["metrics"]["total_rows"]:>10,}')
    print(f'   Valid rows:     {results["metrics"]["valid_rows"]:>10,}')
    print(f'   Failed rows:    {results["metrics"]["failed_rows"]:>10,}')
    print(f'   Success rate:   {results["metrics"]["success_rate"]:>9.1f}%')

    if results['metrics'].get('failed_rules'):
        print(f'\n   Failed Rules:')
        for rule in results['metrics']['failed_rules']:
            print(f'     ‚úó {rule}')

except Exception as e:
    ge_ran = False
    print(f'‚ö†Ô∏è  GE SparkDFDataset not available in this environment: {e}')
    print('   (This is expected with GE >= 0.18 ‚Äî SparkDFDataset was deprecated)')
    print('   Proceeding with framework\'s custom validation ‚Ä¶')

In [None]:
# Visualise GE results (or simulate if GE couldn't run)
from pyspark.sql import functions as F

if ge_ran:
    valid_n  = results['metrics']['valid_rows']
    failed_n = results['metrics']['failed_rows']
    rate     = results['metrics']['success_rate']
else:
    # Manual rule checking as fallback
    total = orders_sdf.count()
    null_oid = orders_sdf.filter(F.col('order_id').isNull()).count()
    null_cid = orders_sdf.filter(F.col('customer_id').isNull()).count()
    null_val = orders_sdf.filter(F.col('order_value').isNull()).count()
    neg_val  = orders_sdf.filter(F.col('order_value') < 0).count()
    extreme  = orders_sdf.filter(F.col('order_value') > 1_000_000).count()
    failed_n = null_oid + null_cid + null_val + neg_val + extreme
    valid_n  = total - failed_n
    rate     = valid_n / total * 100

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Donut chart
ax1.pie([valid_n, failed_n], labels=['Valid', 'Failed'],
        autopct='%1.1f%%', colors=['#27ae60', '#e74c3c'],
        startangle=90, pctdistance=0.75,
        wedgeprops=dict(width=0.4, edgecolor='white', linewidth=2))
ax1.text(0, 0, f'{rate:.1f}%', ha='center', va='center',
         fontsize=22, fontweight='bold', color='#2c3e50')
ax1.set_title('GE Validation ‚Äî Pass Rate')

# Rule-level breakdown
rule_checks = {
    'Null order_id':     int(orders_pdf['order_id'].isna().sum()),
    'Null customer_id':  int(orders_pdf['customer_id'].isna().sum()),
    'Null order_value':  int(orders_pdf['order_value'].isna().sum()),
    'Negative value':    int((orders_pdf['order_value'] < 0).sum()),
    'Extreme value':     int((orders_pdf['order_value'] > 1_000_000).sum()),
    'Duplicate order_id':int(orders_pdf['order_id'].duplicated().sum()),
}
rule_df = pd.DataFrame(list(rule_checks.items()), columns=['Rule', 'Failures'])
rule_df = rule_df.sort_values('Failures', ascending=True)
rule_df.plot.barh(x='Rule', y='Failures', ax=ax2, color='#e74c3c', edgecolor='white', legend=False)
ax2.set_xlabel('Failed Records')
ax2.set_title('Rule-Level Failure Breakdown')

plt.tight_layout()
plt.show()

<a id='5'></a>
## 5 ¬∑ QualityMetrics ‚Äî 5-Dimension Scoring

The framework measures data quality across **5 ISO 25012 dimensions**:

| Dimension | Description | Method |
|-----------|-------------|--------|
| **Completeness** | % non-null values | `measure_completeness` |
| **Uniqueness** | % distinct values | `measure_uniqueness` |
| **Validity** | % values passing rules | `measure_validity` |
| **Timeliness** | Freshness of data | `measure_timeliness` |
| **Consistency** | Cross-field rule adherence | `measure_consistency` |

In [None]:
from src.quality.quality_metrics import QualityMetrics

qm = QualityMetrics(spark)

# Define validity rules for orders
validity_rules = {
    'order_value >= 0':               'order_value >= 0',
    'order_value <= 1000000':          'order_value <= 1000000',
    'order_id IS NOT NULL':            'order_id IS NOT NULL',
    'customer_id IS NOT NULL':         'customer_id IS NOT NULL',
}

# Consistency rules
consistency_rules = {
    'delivery_date >= order_timestamp': 'delivery_date >= order_timestamp',
}

# Compute overall quality score
quality_report = qm.compute_overall_score(
    df=orders_sdf,
    table_name='orders',
    validity_rules=validity_rules,
    consistency_rules=consistency_rules,
    timestamp_col='order_timestamp',
)

overall = quality_report['overall_score']
dims    = quality_report['dimensions']

print(f'\n{"‚ïê"*60}')
print(f'  QUALITY SCORE: {overall:.1f} / 100')
print(f'{"‚ïê"*60}')
for dim, score in dims.items():
    bar = '‚ñà' * int(score / 5) + '‚ñë' * (20 - int(score / 5))
    print(f'  {dim:15s}  {bar}  {score:.1f}%')
print(f'{"‚ïê"*60}')

In [None]:
# Radar chart of 5 dimensions
dim_names  = list(dims.keys())
dim_values = list(dims.values())

angles = np.linspace(0, 2 * np.pi, len(dim_names), endpoint=False).tolist()
dim_values_plot = dim_values + [dim_values[0]]
angles += angles[:1]

fig, ax = plt.subplots(figsize=(7, 7), subplot_kw=dict(polar=True))
ax.fill(angles, dim_values_plot, alpha=0.25, color='#3498db')
ax.plot(angles, dim_values_plot, 'o-', linewidth=2, color='#2980b9', markersize=8)

ax.set_xticks(angles[:-1])
ax.set_xticklabels(dim_names, fontsize=12, fontweight='bold')
ax.set_ylim(0, 100)
ax.set_title(f'Data Quality Radar ‚Äî Overall {overall:.1f}%', fontsize=14,
             fontweight='bold', pad=20)
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

<a id='6'></a>
## 6 ¬∑ Anomaly Detection ‚Äî Z-Score

Statistical Z-score method: flags values > 3 standard deviations from mean.

In [None]:
from src.quality.anomaly_detector import AnomalyDetector

ad = AnomalyDetector(spark, z_threshold=3.0, iqr_factor=1.5, contamination=0.05)

# Z-Score detection
zscore_df = ad.z_score_detection(orders_sdf, column='order_value')
z_anomalies = zscore_df.filter(F.col('_is_anomaly_zscore') == True)
z_count = z_anomalies.count()
z_total = zscore_df.count()

print(f'üî¨ Z-Score Anomaly Detection (threshold = 3.0œÉ)')
print(f'   Total rows:    {z_total:>10,}')
print(f'   Anomalies:     {z_count:>10,}  ({z_count/z_total*100:.2f}%)')

# Plot
z_pdf = zscore_df.select('order_value', '_is_anomaly_zscore').toPandas()
fig, ax = plt.subplots(figsize=(14, 5))
normal = z_pdf[~z_pdf['_is_anomaly_zscore']]
anomal = z_pdf[z_pdf['_is_anomaly_zscore']]
ax.scatter(range(len(normal)), normal['order_value'], s=1, alpha=0.3, c='#3498db', label='Normal')
ax.scatter(range(len(normal), len(normal)+len(anomal)), anomal['order_value'],
           s=8, alpha=0.6, c='#e74c3c', label='Anomaly')
ax.set_ylabel('Order Value (‚Çπ)')
ax.set_title(f'Z-Score Anomaly Detection ‚Äî {z_count:,} anomalies flagged')
ax.legend()
ax.yaxis.set_major_formatter(mtick.StrMethodFormatter('‚Çπ{x:,.0f}'))
plt.tight_layout()
plt.show()

<a id='7'></a>
## 7 ¬∑ Anomaly Detection ‚Äî IQR

In [None]:
# IQR detection
iqr_df = ad.iqr_detection(orders_sdf, column='order_value')
iqr_anomalies = iqr_df.filter(F.col('_is_anomaly_iqr') == True)
iqr_count = iqr_anomalies.count()

print(f'üî¨ IQR Anomaly Detection (factor = 1.5)')
print(f'   Total rows:    {z_total:>10,}')
print(f'   Anomalies:     {iqr_count:>10,}  ({iqr_count/z_total*100:.2f}%)')

iqr_pdf = iqr_df.select('order_value', '_is_anomaly_iqr').toPandas()
fig, ax = plt.subplots(figsize=(14, 5))
norm = iqr_pdf[~iqr_pdf['_is_anomaly_iqr']]
anom = iqr_pdf[iqr_pdf['_is_anomaly_iqr']]
ax.scatter(range(len(norm)), norm['order_value'], s=1, alpha=0.3, c='#3498db', label='Normal')
ax.scatter(range(len(norm), len(norm)+len(anom)), anom['order_value'],
           s=8, alpha=0.6, c='#e74c3c', label='Anomaly')
ax.set_ylabel('Order Value (‚Çπ)')
ax.set_title(f'IQR Anomaly Detection ‚Äî {iqr_count:,} anomalies flagged')
ax.legend()
ax.yaxis.set_major_formatter(mtick.StrMethodFormatter('‚Çπ{x:,.0f}'))
plt.tight_layout()
plt.show()

<a id='8'></a>
## 8 ¬∑ Anomaly Detection ‚Äî Isolation Forest

ML-based unsupervised anomaly detection using scikit-learn's `IsolationForest`.

In [None]:
# Isolation Forest (operates on numeric columns)
if_df = ad.isolation_forest_detection(
    orders_sdf,
    columns=['order_value'],
    contamination=0.05,
)
if_anomalies = if_df.filter(F.col('_is_anomaly_iforest') == True)
if_count = if_anomalies.count()

print(f'üî¨ Isolation Forest Detection (contamination = 0.05)')
print(f'   Total rows:    {z_total:>10,}')
print(f'   Anomalies:     {if_count:>10,}  ({if_count/z_total*100:.2f}%)')

if_pdf = if_df.select('order_value', '_is_anomaly_iforest', '_anomaly_score').toPandas()
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 5))

norm_if = if_pdf[~if_pdf['_is_anomaly_iforest']]
anom_if = if_pdf[if_pdf['_is_anomaly_iforest']]
ax1.scatter(range(len(norm_if)), norm_if['order_value'], s=1, alpha=0.3, c='#3498db', label='Normal')
ax1.scatter(range(len(norm_if), len(norm_if)+len(anom_if)), anom_if['order_value'],
            s=8, alpha=0.6, c='#e74c3c', label='Anomaly')
ax1.set_ylabel('Order Value (‚Çπ)')
ax1.set_title(f'Isolation Forest ‚Äî {if_count:,} anomalies')
ax1.legend()

ax2.hist(if_pdf['_anomaly_score'], bins=60, color='#9b59b6', edgecolor='white', alpha=0.8)
ax2.axvline(x=0, color='red', linestyle='--', linewidth=2, label='Decision boundary')
ax2.set_xlabel('Anomaly Score')
ax2.set_ylabel('Frequency')
ax2.set_title('Anomaly Score Distribution')
ax2.legend()

plt.tight_layout()
plt.show()

<a id='9'></a>
## 9 ¬∑ Anomaly Method Comparison

In [None]:
comparison = pd.DataFrame({
    'Method':    ['Z-Score', 'IQR', 'Isolation Forest'],
    'Anomalies': [z_count, iqr_count, if_count],
    'Rate (%)':  [z_count/z_total*100, iqr_count/z_total*100, if_count/z_total*100],
})

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

colors_method = ['#3498db', '#f39c12', '#9b59b6']
ax1.bar(comparison['Method'], comparison['Anomalies'], color=colors_method, edgecolor='white')
ax1.set_ylabel('Anomalies Detected')
ax1.set_title('Anomaly Count by Method')
for i, v in enumerate(comparison['Anomalies']):
    ax1.text(i, v + 20, f'{v:,}', ha='center', fontweight='bold')

ax2.bar(comparison['Method'], comparison['Rate (%)'], color=colors_method, edgecolor='white')
ax2.set_ylabel('Anomaly Rate (%)')
ax2.set_title('Anomaly Rate by Method')
for i, v in enumerate(comparison['Rate (%)']):
    ax2.text(i, v + 0.05, f'{v:.2f}%', ha='center', fontweight='bold')

plt.tight_layout()
plt.show()

display(comparison.style.set_caption('üî¨ Anomaly Detection ‚Äî Method Comparison')
        .format({'Anomalies':'{:,}', 'Rate (%)':'{:.2f}%'}))

<a id='10'></a>
## 10 ¬∑ AdaptiveDQScorer ‚Äî Weight Learning

The `AdaptiveDQScorer` uses historical quality scores to **learn optimal dimension weights** via least-squares regression, and adaptively sets pass/fail thresholds.

In [None]:
from src.quality.adaptive_scorer import AdaptiveDQScorer

scorer = AdaptiveDQScorer(
    history_dir='data/metrics/adaptive',
    baseline_window=20,
    sensitivity=1.5,
)

# Feed the quality report into the scorer
adaptive_result = scorer.score(quality_report)

print(f'\n{"‚ïê"*60}')
print(f'  ADAPTIVE DQ SCORE: {adaptive_result["overall_score"]:.1f}')
print(f'  Status:            {adaptive_result["status"]}')
print(f'  Threshold:         {adaptive_result["threshold"]:.1f}')
print(f'  History count:     {adaptive_result["history_count"]}')
print(f'{"‚ïê"*60}')
print(f'\n  Adaptive Weights (learned from history):')
for dim, weight in adaptive_result['weights'].items():
    pct = weight * 100
    bar = '‚ñà' * int(pct / 2)
    print(f'    {dim:15s}  {bar:20s}  {pct:.1f}%')

# Detect trend
trend = scorer.detect_trend(adaptive_result)
print(f'\n  Trend:  {trend["trend"]}  (severity: {trend["severity"]})')

In [None]:
# Visualise adaptive weights
weights = adaptive_result['weights']
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Weight pie chart
ax1.pie(weights.values(), labels=weights.keys(), autopct='%1.1f%%',
        colors=sns.color_palette('Set2', len(weights)),
        startangle=140, wedgeprops=dict(edgecolor='white', linewidth=1.5))
ax1.set_title('Adaptive Dimension Weights')

# Score vs threshold gauge
score = adaptive_result['overall_score']
thresh = adaptive_result['threshold']
ax2.barh(['Score'], [score], color='#27ae60' if score >= thresh else '#e74c3c',
         edgecolor='white', height=0.4)
ax2.axvline(x=thresh, color='orange', linewidth=3, linestyle='--', label=f'Threshold ({thresh:.0f})')
ax2.set_xlim(0, 100)
ax2.set_xlabel('Score')
ax2.set_title(f'Adaptive Score: {score:.1f} vs Threshold: {thresh:.0f}')
ax2.legend()

plt.tight_layout()
plt.show()

<a id='11'></a>
## 11 ¬∑ Data Contract Enforcement

YAML-based data contracts define schema and business rules. The `ContractEnforcer` splits data into valid and quarantined DataFrames.

In [None]:
from src.governance.data_contracts import ContractRegistry, ContractEnforcer

registry = ContractRegistry(contracts_dir='config/data_contracts')

# List registered contracts
contracts = registry.list_all()
print(f'üìú Registered Data Contracts: {len(contracts)}')
for c in contracts:
    print(f'   ‚Ä¢ {c["name"]} v{c["version"]}  ({c.get("description", "")})')

# Enforce on orders data
enforcer = ContractEnforcer(spark, registry=registry, quarantine_path='data/quarantine')

try:
    valid_df, quarantined_df, report = enforcer.enforce(orders_sdf, 'ecommerce_orders')
    v_count = valid_df.count()
    q_count = quarantined_df.count()

    print(f'\n‚úÖ Contract Enforcement Complete')
    print(f'   Valid records:       {v_count:>10,}')
    print(f'   Quarantined records: {q_count:>10,}')
    print(f'   Pass rate:           {v_count / (v_count + q_count) * 100:>9.1f}%')

    if report.get('violations'):
        print(f'\n   Violations:')
        for v in report['violations'][:10]:
            print(f'     ‚úó {v}')
except Exception as e:
    print(f'‚ö†Ô∏è  Contract enforcement note: {e}')
    print('   (Contract may expect different column names than raw data)')

<a id='12'></a>
## 12 ¬∑ Executive Summary

In [None]:
display(HTML(f'''
<div style="background:linear-gradient(135deg,#0f3460,#16213e,#1a1a2e);color:white;
            padding:30px;border-radius:12px;font-family:sans-serif;">
  <h2 style="text-align:center;margin:0 0 20px;">üîç Data Quality ‚Äî Executive Summary</h2>
  <div style="display:grid;grid-template-columns:repeat(3,1fr);gap:15px;">
    <div style="background:rgba(39,174,96,0.2);padding:18px;border-radius:10px;text-align:center;">
      <div style="font-size:11px;opacity:0.7;">OVERALL DQ SCORE</div>
      <div style="font-size:32px;font-weight:bold;color:#27ae60;">{overall:.1f}%</div></div>
    <div style="background:rgba(255,255,255,0.08);padding:18px;border-radius:10px;text-align:center;">
      <div style="font-size:11px;opacity:0.7;">GE EXPECTATIONS</div>
      <div style="font-size:32px;font-weight:bold;">8 rules</div></div>
    <div style="background:rgba(255,255,255,0.08);padding:18px;border-radius:10px;text-align:center;">
      <div style="font-size:11px;opacity:0.7;">ADAPTIVE STATUS</div>
      <div style="font-size:32px;font-weight:bold;color:{'#27ae60' if adaptive_result['status']=='PASS' else '#e74c3c'};">
        {adaptive_result['status']}</div></div>
    <div style="background:rgba(52,152,219,0.2);padding:18px;border-radius:10px;text-align:center;">
      <div style="font-size:11px;opacity:0.7;">Z-SCORE ANOMALIES</div>
      <div style="font-size:28px;font-weight:bold;">{z_count:,}</div></div>
    <div style="background:rgba(243,156,18,0.2);padding:18px;border-radius:10px;text-align:center;">
      <div style="font-size:11px;opacity:0.7;">IQR ANOMALIES</div>
      <div style="font-size:28px;font-weight:bold;">{iqr_count:,}</div></div>
    <div style="background:rgba(155,89,182,0.2);padding:18px;border-radius:10px;text-align:center;">
      <div style="font-size:11px;opacity:0.7;">ISOLATION FOREST</div>
      <div style="font-size:28px;font-weight:bold;">{if_count:,}</div></div>
    <div style="background:rgba(255,255,255,0.08);padding:18px;border-radius:10px;text-align:center;">
      <div style="font-size:11px;opacity:0.7;">QUALITY DIMENSIONS</div>
      <div style="font-size:28px;font-weight:bold;">5</div></div>
    <div style="background:rgba(255,255,255,0.08);padding:18px;border-radius:10px;text-align:center;">
      <div style="font-size:11px;opacity:0.7;">ANOMALY METHODS</div>
      <div style="font-size:28px;font-weight:bold;">3</div></div>
    <div style="background:rgba(255,255,255,0.08);padding:18px;border-radius:10px;text-align:center;">
      <div style="font-size:11px;opacity:0.7;">DATA CONTRACTS</div>
      <div style="font-size:28px;font-weight:bold;">{len(contracts)}</div></div>
  </div>
  <p style="text-align:center;margin:20px 0 0;opacity:0.6;font-size:12px;">
    Proceed to <b>Notebook 03</b> for PII Detection & Privacy analysis</p>
</div>
'''))

In [None]:
# Clean up
try:
    shutil.rmtree(os.path.dirname(quarantine_path), ignore_errors=True)
except:
    pass
spark.stop()
print('‚úÖ SparkSession stopped ‚Äî Notebook 02 complete')