# Diagnostic Pipeline Checker (Databricks + Local hybrid)

This notebook checks each layer of the Employment Contract ETL pipeline (Bronze → Silver Raw → Silver Clean → Gold) and reports issues preventing downstream training. It **autodetects** whether it's running inside Databricks (Spark available) or locally, and adapts accordingly.

It will:
- Verify presence and row counts for Bronze (raw files), Silver Raw (ocr text), Silver Clean (cleaned text), Gold (clauses, features)
- Show sample rows
- Detect empty tables, missing columns, or pipeline breakage
- Provide actionable remediation tips

## How to run
- **Databricks**: Upload this notebook to Databricks Repos and run the cells. Set the widget paths or let defaults run.
- **Local**: Install dependencies (`pandas`, `pyarrow`, optionally `pyspark` if you want to read delta via Spark locally). Then run in Jupyter/VSCode. Provide local paths (parquet/txt) via configuration below.

**Created:** 2025-12-07T21:15:51.326546


In [None]:
# Configuration (edit before running)
import os
RUN_MODE = 'DATABRICKS'
# Default DBFS / ADLS paths (used in Databricks)
raw_path = "abfss://contracts@ragstorage4122025.dfs.core.windows.net/"
bronze_path = "abfss://bronze@ragstorage4122025.dfs.core.windows.net/"
checkpoint = "dbfs:/checkpoints/contracts/bronze/"
silver_path = "abfss://silver@ragstorage4122025.dfs.core.windows.net/"
gold_path = "abfss://gold@ragstorage4122025.dfs.core.windows.net/"



# Local fallback paths (if running locally put your exported parquet files here)
LOCAL_BRONZE_PATH = './data/bronze/'
LOCAL_SILVER_RAW_PATH = './data/silver_raw/'
LOCAL_SILVER_CLEAN_PATH = './data/silver_clean/'
LOCAL_GOLD_PATH = './data/gold/'
print('RUN_MODE:', RUN_MODE)


In [None]:
# Helper: detect Databricks (Spark available) and prepare readers
def in_databricks():
    try:
        # dbutils is available in Databricks
        import dbutils
        return True
    except Exception:
        pass
    try:
        # SparkSession via pyspark.sql in Databricks
        from pyspark.sql import SparkSession
        spark = SparkSession.builder.getOrCreate()
        return True
    except Exception:
        return False

IS_DATABRICKS = False
if RUN_MODE == 'AUTO':
    IS_DATABRICKS = in_databricks()
elif RUN_MODE == 'DATABRICKS':
    IS_DATABRICKS = True
elif RUN_MODE == 'LOCAL':
    IS_DATABRICKS = False
print('IS_DATABRICKS:', IS_DATABRICKS)


In [None]:
# Readers for Delta (Databricks) and Parquet/CSV locally
def read_delta_table(path):
    # expects Databricks environment with spark available
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()
    print('Reading Delta table from', path)
    try:
        df = spark.read.format('delta').load(path)
        return df
    except Exception as e:
        print('Delta read failed:', e)
        # try reading path as parquet
        try:
            df = spark.read.parquet(path)
            return df
        except Exception as e2:
            print('Parquet read also failed:', e2)
            raise

def read_parquet_local(path):
    import pandas as pd
    import glob
    files = []
    if os.path.isdir(path):
        files = glob.glob(os.path.join(path, '*.parquet'))
    elif os.path.isfile(path) and path.endswith('.parquet'):
        files = [path]
    if not files:
        raise FileNotFoundError(f'No parquet files found at {path}')
    dfs = [pd.read_parquet(f) for f in files]
    return pd.concat(dfs, ignore_index=True)

def read_text_files_local(path):
    import glob
    texts = []
    if os.path.isdir(path):
        for f in glob.glob(os.path.join(path, '*.txt')):
            with open(f, 'r', encoding='utf-8', errors='ignore') as fh:
                texts.append({'filename': os.path.basename(f), 'text': fh.read()})
    else:
        raise FileNotFoundError(f'Path not found: {path}')
    import pandas as pd
    return pd.DataFrame(texts)


In [None]:
# Checker functions for each layer
def check_bronze(path):
    print('--- Bronze check ---')
    try:
        if IS_DATABRICKS:
            df = read_delta_table(path)
            cnt = df.count()
            print('Bronze rows (files):', cnt)
            display(df.limit(5))
            if cnt == 0:
                print('WARNING: Bronze table has 0 rows — check Autoloader and raw storage path')
                return False
            return True
        else:
            # local: expect text files in folder
            import os
            if os.path.isdir(path):
                files = os.listdir(path)
                print('Files found:', len(files))
                print(files[:10])
                if len(files) == 0:
                    print('WARNING: No raw files found in local bronze folder')
                    return False
                return True
            else:
                print('Local bronze path not found:', path)
                return False
    except Exception as e:
        print('Bronze check failed:', e)
        return False

def check_silver_raw(path):
    print('--- Silver Raw (OCR) check ---')
    try:
        if IS_DATABRICKS:
            df = read_delta_table(path)
            cnt = df.count()
            print('Silver Raw rows:', cnt)
            display(df.select('filename','text_raw').limit(5))
            if cnt == 0:
                print('WARNING: Silver Raw is empty — check OCR step')
                return False
            return True
        else:
            # local parquet or text
            import os
            if os.path.isdir(path):
                # try parquet
                try:
                    import pandas as pd
                    df = read_parquet_local(path)
                    print('Loaded local parquet rows:', len(df))
                    print(df.head())
                    return True
                except Exception:
                    # try text files
                    try:
                        df = read_text_files_local(path)
                        print('Loaded local text files:', len(df))
                        print(df.head())
                        return True
                    except Exception as e:
                        print('No usable files in local silver raw path:', e)
                        return False
            else:
                print('Local silver raw path not found:', path)
                return False
    except Exception as e:
        print('Silver Raw check failed:', e)
        return False

def check_silver_clean(path):
    print('--- Silver Clean check ---')
    try:
        if IS_DATABRICKS:
            df = read_delta_table(path)
            cnt = df.count()
            print('Silver Clean rows:', cnt)
            display(df.select('filename','text_clean').limit(5))
            if cnt == 0:
                print('WARNING: Silver Clean is empty — check cleaning & PII redaction')
                return False
            # sanity checks
            nulls = df.filter((df.text_clean.isNull()) | (df.text_clean == '')).count()
            print('Null/empty cleaned texts:', nulls)
            if nulls > 0:
                print('WARNING: Some cleaned texts are empty — inspect OCR and cleaning regex rules')
            return True
        else:
            try:
                df = read_parquet_local(path)
                print('Local cleaned rows:', len(df))
                print(df.head())
                return True
            except Exception as e:
                print('Silver Clean local read failed:', e)
                return False
    except Exception as e:
        print('Silver Clean check failed:', e)
        return False

def check_gold(path):
    print('--- Gold check (clauses & features) ---')
    try:
        if IS_DATABRICKS:
            df = read_delta_table(path)
            cnt = df.count()
            print('Gold rows:', cnt)
            display(df.limit(10))
            # expect columns: clause_text or embedding_mean/clause_count/num_negations
            cols = df.columns
            print('Columns:', cols)
            if 'clause_text' in cols:
                print('Found clause_text column — good for classifier')
            else:
                print('Missing clause_text column — classifier needs clause_text & label')
            if 'embedding_mean' in cols and 'clause_count' in cols:
                print('Found features columns — good for risk model')
            else:
                print('Missing feature columns for risk model — create features from clauses')
            if cnt == 0:
                print('WARNING: Gold table has 0 rows — segmentation produced no clauses')
                return False
            return True
        else:
            try:
                df = read_parquet_local(path)
                print('Local gold rows:', len(df))
                print(df.head())
                return True
            except Exception as e:
                print('Gold local read failed:', e)
                return False
    except Exception as e:
        print('Gold check failed:', e)
        return False


In [None]:
# Run the checks with chosen paths
def run_all_checks():
    bronze_path = DEFAULT_BRONZE_PATH if IS_DATABRICKS else LOCAL_BRONZE_PATH
    silver_raw_path = DEFAULT_SILVER_RAW_PATH if IS_DATABRICKS else LOCAL_SILVER_RAW_PATH
    silver_clean_path = DEFAULT_SILVER_CLEAN_PATH if IS_DATABRICKS else LOCAL_SILVER_CLEAN_PATH
    gold_path = DEFAULT_GOLD_PATH if IS_DATABRICKS else LOCAL_GOLD_PATH
    print('\nUsing paths:')
    print('bronze_path ->', bronze_path)
    print('silver_raw_path ->', silver_raw_path)
    print('silver_clean_path ->', silver_clean_path)
    print('gold_path ->', gold_path)
    ok = True
    ok &= check_bronze(bronze_path)
    ok &= check_silver_raw(silver_raw_path)
    ok &= check_silver_clean(silver_clean_path)
    ok &= check_gold(gold_path)
    print('\nOverall pipeline health:', 'OK' if ok else 'ISSUES FOUND')
    if not ok:
        print('Read the warnings above and inspect the specific notebook where the failure occurred. Use the sample rows printed to reproduce the problem locally.')
    return ok

run_all_checks()
