# AIMS Data Ingestion Pipeline with DQ Gatekeeping

This notebook executes the data ingestion process with Data Quality checks.

## Purpose
- Load data files from local source directory
- Validate each file against its DQ config
- If PASSED: Process and mark as complete
- If FAILED: Quarantine and log for review
- Track processed files using watermarks

## Local Execution
This notebook runs entirely locally using the sample data in `data/Samples_LH_Bronze_Aims_26_parquet/`

In [None]:
# Step 1: Configuration & Environment Setup
# Uses centralized settings - no wheel installation needed

from pathlib import Path

# Import shared utilities
from notebooks.config import settings
from notebooks.lib import platform_utils
from notebooks.lib.storage import StorageManager

# Environment info
IS_FABRIC = platform_utils.IS_FABRIC
print(f"Running in {'Microsoft Fabric' if IS_FABRIC else 'Local Development'}")
print(f"Environment: {settings.environment}")
print(f"Storage Format: {settings.storage_format}")

# Initialize StorageManager for cross-platform file operations
storage_manager = StorageManager()

In [None]:
# Step 2: Configuration & Setup
import os
import json
from datetime import datetime
import glob
from pathlib import Path
from tqdm import tqdm
import pyarrow.parquet as pq  # For empty file detection

# Use centralized paths from settings
DATA_PATH = settings.bronze_dir        # Source: Bronze layer
SILVER_DIR = settings.silver_dir       # Target: Silver layer
CONFIG_DIR = settings.config_dir       # DQ validation configs
STATE_DIR = settings.state_dir         # Watermarks and logs
QUARANTINE_DIR = settings.quarantine_dir  # Quarantined files

# Ensure directories exist
platform_utils.ensure_directory(SILVER_DIR)
platform_utils.ensure_directory(STATE_DIR)
platform_utils.ensure_directory(QUARANTINE_DIR)

# Define state files
WATERMARK_FILE = STATE_DIR / "watermarks.json"
DQ_LOG_FILE = STATE_DIR / "dq_results.jsonl"

# DQ Settings from centralized config
DQ_THRESHOLD = settings.dq_threshold  # Pass/fail threshold from settings

print(f"\nConfiguration:")
print(f"  Bronze (Source): {DATA_PATH}")
print(f"  Silver (Target): {SILVER_DIR}")
print(f"  Config Dir: {CONFIG_DIR}")
print(f"  State Dir: {STATE_DIR}")
print(f"  DQ Threshold: {DQ_THRESHOLD}%")

# Step 3: Import DQ Libraries
try:
    from aims_data_platform.dq_framework import DataLoader, DataQualityValidator
    print("✅ Libraries imported from aims_data_platform")
except ImportError:
    from dq_framework import DataLoader, DataQualityValidator
    print("✅ Libraries imported from dq_framework (fallback)")

Configuration:
   Environment: Local
   Bronze (Source): /home/sanmi/Documents/HS2/HS2_PROJECTS_2025/AIMS_LOCAL/data/Samples_LH_Bronze_Aims_26_parquet
   Silver (Target): /home/sanmi/Documents/HS2/HS2_PROJECTS_2025/AIMS_LOCAL/data/Silver
   Gold (Target):   /home/sanmi/Documents/HS2/HS2_PROJECTS_2025/AIMS_LOCAL/data/Gold
   DQ Configs: /home/sanmi/Documents/HS2/HS2_PROJECTS_2025/AIMS_LOCAL/dq_great_expectations/generated_configs
   Watermarks: /home/sanmi/Documents/HS2/HS2_PROJECTS_2025/AIMS_LOCAL/data/state/watermarks.json
   DQ Logs: /home/sanmi/Documents/HS2/HS2_PROJECTS_2025/AIMS_LOCAL/data/state/dq_logs.jsonl


In [None]:
# Step 4: Watermark & Logging Helper Functions
# Uses platform_utils for cross-platform file operations

def load_watermarks():
    """Load watermarks from JSON file."""
    if platform_utils.file_exists(WATERMARK_FILE):
        with open(WATERMARK_FILE, 'r') as f:
            return json.load(f)
    return {}

def save_watermark(file_name):
    """Mark a file as processed."""
    watermarks = load_watermarks()
    watermarks[file_name] = datetime.now().isoformat()
    with open(WATERMARK_FILE, 'w') as f:
        json.dump(watermarks, f, indent=2)

def is_processed(file_name):
    """Check if file has already been processed."""
    watermarks = load_watermarks()
    return file_name in watermarks

def log_dq_result(file_name, status, score, details=None):
    """Append validation result to JSONL log file."""
    entry = {
        "timestamp": datetime.now().isoformat(),
        "file": file_name,
        "status": status,
        "score": score,
        "details": details or {}
    }
    with open(DQ_LOG_FILE, "a") as f:
        f.write(json.dumps(entry) + "\n")

def quarantine_file(file_path, reason, df=None):
    """
    Move/copy failed file to quarantine using StorageManager.
    If df is provided, writes the DataFrame instead of copying.
    """
    file_name = Path(file_path).name
    print(f"   QUARANTINED: {file_name} -> Reason: {reason}")
    
    if df is not None:
        # Use StorageManager to quarantine the data
        quarantine_path = storage_manager.quarantine_data(
            df=df,
            table_name=file_name.replace('.parquet', ''),
            reason=reason
        )
        print(f"   -> Quarantined to: {quarantine_path}")
    else:
        # Just log the quarantine action (file stays in bronze)
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "file": file_name,
            "reason": reason,
            "source_path": str(file_path)
        }
        quarantine_log = QUARANTINE_DIR / "quarantine_log.jsonl"
        with open(quarantine_log, "a") as f:
            f.write(json.dumps(log_entry) + "\n")

print("✅ Helper functions defined")

Helper functions defined


In [None]:
# Step 5: Execute Ingestion Pipeline with DQ Gatekeeping

import pandas as pd

# Discover all parquet files in source directory (Bronze)
source_files = list(DATA_PATH.glob("*.parquet"))
print(f"Starting Ingestion Pipeline for {len(source_files)} files...\n")

processed_count = 0
passed_count = 0
failed_count = 0
skipped_count = 0

# Use tqdm for progress bar
for file_path in tqdm(source_files, desc="Processing Files"):
    file_name = file_path.name
    
    # Check Watermark (Skip if already processed)
    if is_processed(file_name):
        print(f"Skipping {file_name} (Already Processed)")
        skipped_count += 1
        continue

    # Check for Empty File (0 rows) using platform_utils
    try:
        if platform_utils.file_exists(file_path):
            if pq.read_metadata(file_path).num_rows == 0:
                print(f"Skipping {file_name} (Empty File - 0 rows)")
                skipped_count += 1
                continue
    except Exception:
        pass  # Proceed if check fails (let the pipeline handle it)

    print(f"Processing: {file_name}...")
    
    try:
        # --- PHASE 1: DQ GATEKEEPING (Validation) ---
        config_name = file_name.replace('.parquet', '_validation.yml')
        config_path = CONFIG_DIR / config_name
        
        if not platform_utils.file_exists(config_path):
            print(f"   Warning: No validation config found. Skipping DQ check.")
            validation_passed = True
            score = 0.0
            failures = []
            df_batch = None
        else:
            # Load data and validate
            validator = DataQualityValidator(config_path=str(config_path))
            df_batch = DataLoader.load_data(str(file_path), sample_size=settings.sample_size or 100000)
            result = validator.validate(df_batch)
            
            validation_passed = result['success']
            score = result['success_rate']
            failures = result.get('failed_expectations', [])
            
            # Use centralized DQ threshold
            if score < DQ_THRESHOLD:
                validation_passed = False
            
            # Log the result
            log_dq_result(
                file_name, 
                "PASSED" if validation_passed else "FAILED", 
                score, 
                {"failed_count": len(failures), "failures": failures[:5]}
            )

        # --- PHASE 2: ACTION (Ingest to Silver) ---
        if validation_passed:
            print(f"   DQ Passed (Score: {score:.1f}%). Ingesting to Silver...")
            
            table_name = file_name.replace('.parquet', '')
            
            # Load data if not already loaded
            if df_batch is None:
                df_batch = pd.read_parquet(file_path)
            
            # Use StorageManager for cross-platform write (works in both local and Fabric)
            output_path = storage_manager.write_to_silver(
                df=df_batch,
                table_name=table_name
            )
            print(f"   -> Written to: {output_path}")
            
            # Mark as processed
            save_watermark(file_name)
            print(f"   Marked as processed.")
            passed_count += 1
            
        else:
            print(f"   DQ FAILED (Score: {score:.1f}%). Blocked from ingestion.")
            # Quarantine with data if available
            quarantine_file(file_path, f"Failed {len(failures)} checks (Score: {score:.1f}%)", df_batch)
            failed_count += 1
            
        processed_count += 1
            
    except Exception as e:
        print(f"   Pipeline Error: {e}")
        log_dq_result(file_name, "ERROR", 0.0, {"error": str(e)})
        failed_count += 1

print(f"\n{'='*60}")
print(f"Pipeline Execution Complete")
print(f"{'='*60}")
print(f"Total Files: {len(source_files)}")
print(f"Processed: {processed_count}")
print(f"Passed DQ: {passed_count}")
print(f"Failed DQ: {failed_count}")
print(f"Skipped (Already Processed or Empty): {skipped_count}")
print(f"\nView results in Notebook 03 (Monitoring Dashboard)")

Starting Ingestion Pipeline for 0 files...



Processing Files: 0it [00:00, ?it/s]


Pipeline Execution Complete
Total Files: 0
Processed: 0
Passed DQ: 0
Failed DQ: 0
Skipped (Already Processed or Empty): 0

View results in Notebook 03 (Monitoring Dashboard)



