# Pipeline Cleanup

Remove all tables, schemas, and optionally the catalog created by the pipeline.

## Warning
This notebook will **permanently delete** data. Use with caution.

## Cleanup Options
- **tables_only**: Drop all tables but keep schemas
- **schemas**: Drop all schemas (and their tables)
- **catalog**: Drop the entire catalog (including all schemas and tables)

In [0]:
# Cell 1: Cleanup Parameters

dbutils.widgets.text("catalog", "healthcare_dev", "Catalog Name")
dbutils.widgets.text("bronze_schema", "bronze", "Bronze Schema")
dbutils.widgets.text("silver_schema", "silver", "Silver Schema")
dbutils.widgets.text("gold_schema", "gold", "Gold Schema")
dbutils.widgets.dropdown("cleanup_level", "tables_only", ["tables_only", "schemas", "catalog"], "Cleanup Level")
dbutils.widgets.dropdown("dry_run", "true", ["true", "false"], "Dry Run (Preview Only)")

In [0]:
# Cell 2: Configuration

from dataclasses import dataclass
from typing import List, Tuple

@dataclass(frozen=True)
class CleanupConfig:
    """Configuration for cleanup operations."""
    catalog: str
    bronze_schema: str
    silver_schema: str
    gold_schema: str
    cleanup_level: str  # 'tables_only', 'schemas', 'catalog'
    dry_run: bool
    
    @property
    def schemas(self) -> Tuple[str, ...]:
        return (self.bronze_schema, self.silver_schema, self.gold_schema)


def create_cleanup_config() -> CleanupConfig:
    return CleanupConfig(
        catalog=dbutils.widgets.get("catalog"),
        bronze_schema=dbutils.widgets.get("bronze_schema"),
        silver_schema=dbutils.widgets.get("silver_schema"),
        gold_schema=dbutils.widgets.get("gold_schema"),
        cleanup_level=dbutils.widgets.get("cleanup_level"),
        dry_run=dbutils.widgets.get("dry_run").lower() == "true"
    )


config = create_cleanup_config()

print("="*60)
print("CLEANUP CONFIGURATION")
print("="*60)
print(f"Catalog:       {config.catalog}")
print(f"Schemas:       {', '.join(config.schemas)}")
print(f"Cleanup Level: {config.cleanup_level}")
print(f"Dry Run:       {config.dry_run}")
if config.dry_run:
    print("\n*** DRY RUN MODE - No changes will be made ***")
print("="*60)

In [0]:
# Cell 3: Discovery Functions

from typing import List, Dict
from pyspark.sql import Row

def catalog_exists(catalog_name: str) -> bool:
    """Check if a catalog exists."""
    try:
        catalogs = [row.catalog for row in spark.sql("SHOW CATALOGS").collect()]
        return catalog_name in catalogs
    except Exception:
        return False


def schema_exists(catalog_name: str, schema_name: str) -> bool:
    """Check if a schema exists in a catalog."""
    try:
        spark.sql(f"USE CATALOG {catalog_name}")
        schemas = [row.databaseName for row in spark.sql("SHOW SCHEMAS").collect()]
        return schema_name in schemas
    except Exception:
        return False


def list_tables_in_schema(catalog_name: str, schema_name: str) -> List[str]:
    """List all tables in a schema."""
    try:
        spark.sql(f"USE CATALOG {catalog_name}")
        spark.sql(f"USE SCHEMA {schema_name}")
        tables = spark.sql("SHOW TABLES").collect()
        return [row.tableName for row in tables]
    except Exception as e:
        print(f"Warning: Could not list tables in {catalog_name}.{schema_name}: {e}")
        return []


def get_table_row_count(catalog_name: str, schema_name: str, table_name: str) -> int:
    """Get row count for a table."""
    try:
        return spark.table(f"{catalog_name}.{schema_name}.{table_name}").count()
    except Exception:
        return -1


def discover_all_objects(config: CleanupConfig) -> Dict[str, List[str]]:
    """Discover all objects that would be cleaned up."""
    objects = {}
    
    if not catalog_exists(config.catalog):
        print(f"Catalog '{config.catalog}' does not exist.")
        return objects
    
    for schema in config.schemas:
        if schema_exists(config.catalog, schema):
            tables = list_tables_in_schema(config.catalog, schema)
            objects[schema] = tables
        else:
            objects[schema] = []
    
    return objects


# Discover existing objects
discovered = discover_all_objects(config)

print("\nDiscovered Objects:")
print("-" * 60)
total_tables = 0
for schema, tables in discovered.items():
    print(f"\n{config.catalog}.{schema}:")
    if tables:
        for table in sorted(tables):
            row_count = get_table_row_count(config.catalog, schema, table)
            count_str = f"{row_count:,} rows" if row_count >= 0 else "unknown"
            print(f"  - {table} ({count_str})")
            total_tables += 1
    else:
        print("  (no tables or schema does not exist)")

print(f"\nTotal tables found: {total_tables}")

In [0]:
# Cell 4: Cleanup Functions

from dataclasses import dataclass, field
from typing import List

@dataclass
class CleanupResult:
    """Result of cleanup operation."""
    action: str
    object_type: str
    object_name: str
    success: bool
    message: str = ""


def drop_table(catalog: str, schema: str, table: str, dry_run: bool) -> CleanupResult:
    """Drop a single table."""
    full_name = f"{catalog}.{schema}.{table}"
    
    if dry_run:
        return CleanupResult(
            action="DROP TABLE",
            object_type="table",
            object_name=full_name,
            success=True,
            message="[DRY RUN] Would drop"
        )
    
    try:
        spark.sql(f"DROP TABLE IF EXISTS {full_name}")
        return CleanupResult(
            action="DROP TABLE",
            object_type="table",
            object_name=full_name,
            success=True,
            message="Dropped"
        )
    except Exception as e:
        return CleanupResult(
            action="DROP TABLE",
            object_type="table",
            object_name=full_name,
            success=False,
            message=str(e)
        )


def drop_schema(catalog: str, schema: str, dry_run: bool) -> CleanupResult:
    """Drop a schema and all its contents."""
    full_name = f"{catalog}.{schema}"
    
    if dry_run:
        return CleanupResult(
            action="DROP SCHEMA",
            object_type="schema",
            object_name=full_name,
            success=True,
            message="[DRY RUN] Would drop with CASCADE"
        )
    
    try:
        spark.sql(f"DROP SCHEMA IF EXISTS {full_name} CASCADE")
        return CleanupResult(
            action="DROP SCHEMA",
            object_type="schema",
            object_name=full_name,
            success=True,
            message="Dropped with CASCADE"
        )
    except Exception as e:
        return CleanupResult(
            action="DROP SCHEMA",
            object_type="schema",
            object_name=full_name,
            success=False,
            message=str(e)
        )


def drop_catalog(catalog: str, dry_run: bool) -> CleanupResult:
    """Drop a catalog and all its contents."""
    if dry_run:
        return CleanupResult(
            action="DROP CATALOG",
            object_type="catalog",
            object_name=catalog,
            success=True,
            message="[DRY RUN] Would drop with CASCADE"
        )
    
    try:
        spark.sql(f"DROP CATALOG IF EXISTS {catalog} CASCADE")
        return CleanupResult(
            action="DROP CATALOG",
            object_type="catalog",
            object_name=catalog,
            success=True,
            message="Dropped with CASCADE"
        )
    except Exception as e:
        return CleanupResult(
            action="DROP CATALOG",
            object_type="catalog",
            object_name=catalog,
            success=False,
            message=str(e)
        )

In [0]:
# Cell 5: Execute Cleanup

def run_cleanup(config: CleanupConfig, discovered: Dict[str, List[str]]) -> List[CleanupResult]:
    """Execute cleanup based on configuration."""
    results = []
    
    print("\n" + "="*60)
    print(f"EXECUTING CLEANUP - Level: {config.cleanup_level.upper()}")
    if config.dry_run:
        print("*** DRY RUN MODE - No actual changes ***")
    print("="*60)
    
    if config.cleanup_level == "catalog":
        # Drop entire catalog
        print(f"\nDropping catalog: {config.catalog}")
        result = drop_catalog(config.catalog, config.dry_run)
        results.append(result)
        status = "OK" if result.success else "FAILED"
        print(f"  {status}: {result.message}")
        
    elif config.cleanup_level == "schemas":
        # Drop schemas (which drops their tables)
        for schema in config.schemas:
            print(f"\nDropping schema: {config.catalog}.{schema}")
            result = drop_schema(config.catalog, schema, config.dry_run)
            results.append(result)
            status = "OK" if result.success else "FAILED"
            print(f"  {status}: {result.message}")
            
    else:  # tables_only
        # Drop individual tables
        for schema, tables in discovered.items():
            if tables:
                print(f"\nDropping tables in {config.catalog}.{schema}:")
                for table in sorted(tables):
                    result = drop_table(config.catalog, schema, table, config.dry_run)
                    results.append(result)
                    status = "OK" if result.success else "FAILED"
                    print(f"  {table}: {status} - {result.message}")
    
    return results


# Execute cleanup
cleanup_results = run_cleanup(config, discovered)

In [0]:
# Cell 6: Cleanup Summary

print("\n" + "="*60)
print("CLEANUP SUMMARY")
print("="*60)

if config.dry_run:
    print("\n*** THIS WAS A DRY RUN - NO CHANGES WERE MADE ***")
    print("Set 'dry_run' to 'false' to execute the cleanup.\n")

successful = [r for r in cleanup_results if r.success]
failed = [r for r in cleanup_results if not r.success]

print(f"Total operations: {len(cleanup_results)}")
print(f"Successful: {len(successful)}")
print(f"Failed: {len(failed)}")

if failed:
    print("\nFailed operations:")
    for r in failed:
        print(f"  - {r.action} {r.object_name}: {r.message}")

# Group by object type
by_type = {}
for r in successful:
    by_type.setdefault(r.object_type, []).append(r)

if by_type:
    print("\nObjects affected:")
    for obj_type, results in by_type.items():
        print(f"  {obj_type}s: {len(results)}")

print("\n" + "="*60)

In [0]:
# Cell 7: Verification (Optional)
# Re-discover to verify cleanup

if not config.dry_run:
    print("\nVerifying cleanup...")
    print("-" * 60)
    
    if config.cleanup_level == "catalog":
        if catalog_exists(config.catalog):
            print(f"WARNING: Catalog '{config.catalog}' still exists!")
        else:
            print(f"Catalog '{config.catalog}' successfully removed.")
    else:
        remaining = discover_all_objects(config)
        total_remaining = sum(len(tables) for tables in remaining.values())
        
        if total_remaining == 0:
            print("All tables successfully removed.")
        else:
            print(f"WARNING: {total_remaining} tables still remain:")
            for schema, tables in remaining.items():
                if tables:
                    print(f"  {schema}: {', '.join(tables)}")
else:
    print("\nSkipping verification (dry run mode)")

In [0]:
# Cell 8: Quick Cleanup Commands
# Copy these SQL commands if you want to run cleanup manually

print("\n" + "="*60)
print("MANUAL CLEANUP COMMANDS")
print("Copy and run these in a SQL cell if needed:")
print("="*60)

print("\n-- Drop all tables (preserves schemas):")
for schema, tables in discovered.items():
    for table in sorted(tables):
        print(f"DROP TABLE IF EXISTS {config.catalog}.{schema}.{table};")

print("\n-- Drop schemas (removes all tables):")
for schema in config.schemas:
    print(f"DROP SCHEMA IF EXISTS {config.catalog}.{schema} CASCADE;")

print("\n-- Drop entire catalog (removes everything):")
print(f"DROP CATALOG IF EXISTS {config.catalog} CASCADE;")

print("\n" + "="*60)

In [0]:
%sql
DROP CATALOG IF EXISTS healthcare_dev CASCADE;