# Purview Ingestion Notebook

This notebook ingests Atlas entities into Microsoft Purview.

**Parameters:**
- `atlas_json_path`: Path to Atlas JSON file
- `purview_endpoint`: Purview endpoint URL (from Key Vault)

In [None]:
# Import libraries
from notebookutils import mssparkutils
from datetime import datetime
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
# Get parameters
atlas_json_path = mssparkutils.notebook.getArgument("atlas_json_path", "")

logger.info(f"Atlas JSON Path: {atlas_json_path}")

In [None]:
# Load Atlas JSON
logger.info(f"Loading Atlas JSON from {atlas_json_path}...")

with open(atlas_json_path, 'r') as f:
    atlas_data = json.load(f)

entities = atlas_data.get('entities', [])
metadata = atlas_data.get('metadata', {})

logger.info(f"Loaded {len(entities)} entities")
logger.info(f"Source Type: {metadata.get('source_type')}")
logger.info(f"Collection: {metadata.get('collection')}")

In [None]:
# Get Purview credentials from Key Vault
logger.info("Retrieving Purview credentials...")

try:
    purview_endpoint = mssparkutils.credentials.getSecret(
        "purview-connector-kv",
        "purview-endpoint"
    )
    logger.info(f"Purview endpoint: {purview_endpoint}")
except Exception as e:
    logger.error(f"Failed to get Purview endpoint from Key Vault: {e}")
    purview_endpoint = "https://your-purview.purview.azure.com"
    logger.warning(f"Using default endpoint: {purview_endpoint}")

In [None]:
# Initialize Purview client
import sys
sys.path.append("/lakehouse/default/Files/libs")

from purview_connector_sdk import PurviewClient

logger.info("Initializing Purview client with Managed Identity...")

purview_client = PurviewClient(
    endpoint=purview_endpoint,
    use_managed_identity=True
)

logger.info("✓ Purview client initialized")

In [None]:
# Test Purview connection
try:
    account_info = purview_client.get_account_info()
    logger.info(f"✓ Connected to Purview: {account_info.get('name')}")
except Exception as e:
    logger.error(f"✗ Failed to connect to Purview: {e}")
    raise

In [None]:
# Ingest entities in batches
logger.info(f"Starting ingestion of {len(entities)} entities...")

batch_size = 100
results = {
    "total_entities": len(entities),
    "created": 0,
    "updated": 0,
    "failed": 0,
    "errors": []
}

for i in range(0, len(entities), batch_size):
    batch = entities[i:i + batch_size]
    batch_num = (i // batch_size) + 1
    total_batches = (len(entities) + batch_size - 1) // batch_size
    
    logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} entities)...")
    
    try:
        # Bulk create entities
        result = purview_client.bulk_create_entities(batch)
        
        created = result.get('entities_created', len(batch))
        results['created'] += created
        
        logger.info(f"✓ Batch {batch_num} completed: {created} entities created")
        
    except Exception as e:
        logger.error(f"✗ Batch {batch_num} failed: {e}")
        results['failed'] += len(batch)
        results['errors'].append({
            "batch": batch_num,
            "error": str(e)
        })

logger.info("Ingestion complete")
logger.info(f"  Created: {results['created']}")
logger.info(f"  Failed: {results['failed']}")

In [None]:
# Display results
print("=" * 60)
print("Purview Ingestion Results")
print("=" * 60)
print(f"Total Entities: {results['total_entities']}")
print(f"Successfully Created: {results['created']}")
print(f"Failed: {results['failed']}")

if results['errors']:
    print(f"\nErrors ({len(results['errors'])}):")
    for error in results['errors'][:5]:  # Show first 5 errors
        print(f"  Batch {error['batch']}: {error['error']}")

print("=" * 60)

In [None]:
# Save ingestion results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
results_path = f"/lakehouse/default/Files/logs/ingestion_{timestamp}.json"

results_data = {
    "timestamp": timestamp,
    "atlas_json_path": atlas_json_path,
    "purview_endpoint": purview_endpoint,
    "results": results,
    "metadata": metadata
}

with open(results_path, 'w') as f:
    json.dump(results_data, f, indent=2, default=str)

logger.info(f"Results saved to: {results_path}")

In [None]:
# Archive processed Atlas JSON if successful
if results['failed'] == 0:
    archive_path = atlas_json_path.replace('/processed/', '/archive/')
    
    import shutil
    shutil.move(atlas_json_path, archive_path)
    
    logger.info(f"Atlas JSON archived to: {archive_path}")
else:
    # Move to errors folder for investigation
    error_path = atlas_json_path.replace('/processed/', '/errors/')
    
    import shutil
    shutil.move(atlas_json_path, error_path)
    
    logger.warning(f"Atlas JSON moved to errors: {error_path}")

In [None]:
# Return output
output = {
    "status": "success" if results['failed'] == 0 else "partial",
    "entities_created": results['created'],
    "entities_failed": results['failed'],
    "results_path": results_path,
    "timestamp": timestamp
}

logger.info("Purview ingestion workflow complete")
logger.info(f"Output: {json.dumps(output, indent=2)}")

mssparkutils.notebook.exit(json.dumps(output))