# OneLake Migration Diagnostics & Analysis

## 🔍 Purpose
This notebook analyzes the OneLake migration issues and provides diagnostic tools to troubleshoot why the migration is failing despite directories being created successfully.

## 🎯 Key Objectives
1. **Validate Directory Structure** - Confirm all directories were created correctly
2. **Analyze Migration Logs** - Identify the root cause of 0% success rates
3. **Test API Connectivity** - Verify authentication and API access paths
4. **Performance Analysis** - Generate reports on migration progress and bottlenecks
5. **Path Mapping Validation** - Ensure directory paths match between creation and migration scripts

## 📊 Current Status
- **Directories Created**: ✅ 84 directories successfully created via Fabric notebook
- **Migration Status**: ❌ 0% success rate in Python migration script
- **Issue**: Path mismatch or API access problem between directory creation and file upload

## 📚 Section 1: Import Required Libraries
Import necessary libraries for analysis, visualization, and API testing.

In [None]:
# Import Required Libraries
import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import logging
import os
import requests
from datetime import datetime
import re
from pathlib import Path
import numpy as np

# Configure logging for better output
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Set up plotting style
plt.style.use('default')
sns.set_palette("husl")

print("✅ All libraries imported successfully!")
print(f"📊 Using pandas version: {pd.__version__}")
print(f"📈 Using matplotlib version: {plt.matplotlib.__version__}")
print(f"🎨 Using seaborn version: {sns.__version__}")
print(f"📅 Analysis started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

## 📁 Section 2: Load Migration Configuration and Environment
Load the onelake_directories.json file and environment variables to understand the migration setup.

In [None]:
# Load migration configuration and environment
import json
import os
from pathlib import Path

def load_environment():
    """Load environment variables with multi-path checking"""
    env_paths = [
        "config/.env",
        ".env",
        "../.env",
        "../../config/.env",
        "../config/.env"
    ]
    
    for env_path in env_paths:
        if os.path.exists(env_path):
            print(f"📄 Loading environment from: {env_path}")
            with open(env_path, 'r') as f:
                for line in f:
                    if '=' in line and not line.strip().startswith('#'):
                        key, value = line.strip().split('=', 1)
                        os.environ[key] = value
            return env_path
    
    print("⚠️ No .env file found in any expected location")
    return None

# Load environment
env_file_used = load_environment()

# Load onelake_directories.json
directories_file = "onelake_directories.json"
if os.path.exists(directories_file):
    with open(directories_file, 'r') as f:
        onelake_dirs = json.load(f)
    print(f"✅ Loaded {len(onelake_dirs)} OneLake directories from {directories_file}")
else:
    print(f"❌ {directories_file} not found")
    onelake_dirs = {}

# Display configuration summary
print("\n🔧 Configuration Summary:")
print(f"Environment file: {env_file_used}")
print(f"Workspace ID: {os.environ.get('FABRIC_WORKSPACE_ID', 'Not set')}")
print(f"Lakehouse ID: {os.environ.get('FABRIC_LAKEHOUSE_ID', 'Not set')}")
print(f"OneLake directories loaded: {len(onelake_dirs)}")

# Show sample directory structure
if onelake_dirs:
    print("\n📂 Sample directory paths:")
    for i, (key, path) in enumerate(list(onelake_dirs.items())[:5]):
        print(f"  {i+1}. {key}: {path}")
    if len(onelake_dirs) > 5:
        print(f"  ... and {len(onelake_dirs) - 5} more directories")

## 🔍 Section 3: Analyze Directory Creation vs Migration Results
Compare the successfully created directories with migration attempts to identify discrepancies.

In [None]:
# Analyze migration logs and compare with created directories
import glob
from datetime import datetime

def analyze_migration_logs():
    """Analyze migration log files to understand failure patterns"""
    log_files = glob.glob("*.log") + glob.glob("logs/*.log")
    migration_logs = []
    
    print("📋 Analyzing migration logs...")
    for log_file in log_files:
        try:
            with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()
                if 'migration' in content.lower() or 'upload' in content.lower():
                    migration_logs.append({
                        'file': log_file,
                        'size': os.path.getsize(log_file),
                        'modified': datetime.fromtimestamp(os.path.getmtime(log_file)),
                        'content_preview': content[:500]
                    })
        except Exception as e:
            print(f"⚠️ Error reading {log_file}: {e}")
    
    return migration_logs

# Load and analyze migration progress file
progress_file = "migration_progress_optimized.json"
migration_progress = {}

if os.path.exists(progress_file):
    with open(progress_file, 'r') as f:
        migration_progress = json.load(f)
    print(f"✅ Loaded migration progress: {len(migration_progress)} entries")
else:
    print(f"❌ {progress_file} not found")

# Analyze migration logs
logs = analyze_migration_logs()
print(f"📋 Found {len(logs)} migration-related log files")

# Compare directory creation vs migration attempts
if onelake_dirs and migration_progress:
    created_dirs = set(onelake_dirs.keys())
    attempted_migrations = set(migration_progress.keys())
    
    print(f"\n📊 Directory Analysis:")
    print(f"Directories created: {len(created_dirs)}")
    print(f"Migration attempts: {len(attempted_migrations)}")
    print(f"Overlap: {len(created_dirs.intersection(attempted_migrations))}")
    
    # Check for successful migrations
    successful_migrations = [k for k, v in migration_progress.items() if v.get('success', False)]
    print(f"Successful migrations: {len(successful_migrations)}")
    
    if successful_migrations:
        print("✅ Sample successful migrations:")
        for success in successful_migrations[:3]:
            print(f"  - {success}")
    else:
        print("❌ No successful migrations found")
        
        # Analyze failure patterns
        failure_reasons = {}
        for k, v in migration_progress.items():
            error = v.get('error', 'Unknown error')
            failure_reasons[error] = failure_reasons.get(error, 0) + 1
        
        print("\n🔴 Failure patterns:")
        for reason, count in sorted(failure_reasons.items(), key=lambda x: x[1], reverse=True):
            print(f"  {count:3d}x: {reason}")

# Show recent log entries
if logs:
    latest_log = max(logs, key=lambda x: x['modified'])
    print(f"\n📄 Latest migration log ({latest_log['file']}):")
    print("=" * 50)
    print(latest_log['content_preview'])
    print("=" * 50)

## 🔗 Section 4: API Path and Authentication Analysis
Compare the different API approaches used for directory creation vs file uploads.

In [None]:
# Compare API paths and authentication methods
import requests

def test_fabric_authentication():
    """Test different authentication approaches"""
    workspace_id = os.environ.get('FABRIC_WORKSPACE_ID')
    lakehouse_id = os.environ.get('FABRIC_LAKEHOUSE_ID')
    access_token = os.environ.get('FABRIC_ACCESS_TOKEN')
    
    print("🔐 Authentication Test:")
    print(f"Workspace ID: {'✅' if workspace_id else '❌'} {workspace_id}")
    print(f"Lakehouse ID: {'✅' if lakehouse_id else '❌'} {lakehouse_id}")
    print(f"Access Token: {'✅' if access_token else '❌'} {'Set' if access_token else 'Not set'}")
    
    if not all([workspace_id, lakehouse_id, access_token]):
        print("❌ Missing required authentication parameters")
        return False
    
    # Test API endpoints
    test_results = {}
    
    # Test 1: Fabric API workspace access
    try:
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
        
        url = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}"
        response = requests.get(url, headers=headers, timeout=30)
        test_results['workspace_api'] = {
            'status': response.status_code,
            'success': response.status_code == 200,
            'url': url
        }
        print(f"Workspace API: {'✅' if response.status_code == 200 else '❌'} {response.status_code}")
        
    except Exception as e:
        test_results['workspace_api'] = {'error': str(e), 'success': False}
        print(f"Workspace API: ❌ {e}")
    
    # Test 2: OneLake API access
    try:
        onelake_url = f"https://onelake.dfs.fabric.microsoft.com/{workspace_id}/{lakehouse_id}.Lakehouse/Files"
        response = requests.get(onelake_url, headers=headers, timeout=30)
        test_results['onelake_api'] = {
            'status': response.status_code,
            'success': response.status_code in [200, 404],  # 404 is expected for empty directory
            'url': onelake_url
        }
        print(f"OneLake API: {'✅' if response.status_code in [200, 404] else '❌'} {response.status_code}")
        
    except Exception as e:
        test_results['onelake_api'] = {'error': str(e), 'success': False}
        print(f"OneLake API: ❌ {e}")
    
    return test_results

def compare_api_paths():
    """Compare different API path formats"""
    workspace_id = os.environ.get('FABRIC_WORKSPACE_ID')
    lakehouse_id = os.environ.get('FABRIC_LAKEHOUSE_ID')
    
    if not workspace_id or not lakehouse_id:
        print("❌ Missing workspace or lakehouse ID for path comparison")
        return
    
    print("\n🔗 API Path Comparison:")
    
    # Path formats used in different contexts
    paths = {
        'Fabric Notebook (mssparkutils)': f"abfss://COE_F_EUC_P2@onelake.dfs.fabric.microsoft.com/{workspace_id}/{lakehouse_id}.Lakehouse/Files/",
        'OneLake REST API': f"https://onelake.dfs.fabric.microsoft.com/{workspace_id}/{lakehouse_id}.Lakehouse/Files/",
        'Fabric REST API': f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items/{lakehouse_id}/",
        'Azure Data Lake Gen2': f"https://onelake.dfs.fabric.microsoft.com/{workspace_id}/{lakehouse_id}.Lakehouse/Files/"
    }
    
    for method, path in paths.items():
        print(f"\n{method}:")
        print(f"  {path}")
    
    # Show sample directory paths from onelake_directories.json
    if onelake_dirs:
        print(f"\n📂 Sample created directory paths:")
        sample_dirs = list(onelake_dirs.items())[:3]
        for key, path in sample_dirs:
            print(f"  {key}:")
            print(f"    Created: {path}")
            
            # Show what the migration API would use
            if workspace_id in path and lakehouse_id in path:
                rest_path = path.replace("abfss://", "https://").replace("@onelake.dfs.fabric.microsoft.com", ".dfs.fabric.microsoft.com")
                print(f"    REST API: {rest_path}")

# Run authentication tests
auth_results = test_fabric_authentication()

# Compare API paths
compare_api_paths()

# Summary of findings
print(f"\n📋 API Analysis Summary:")
if auth_results:
    working_apis = sum(1 for result in auth_results.values() if result.get('success', False))
    print(f"Working API endpoints: {working_apis}/{len(auth_results)}")
else:
    print("❌ Authentication test failed - missing credentials")

## 🧪 Section 5: Test File Upload to Identify Issues
Perform a controlled test upload to identify the exact failure point.

In [None]:
# Test file upload to identify exact failure points
import tempfile
import base64

def create_test_file():
    """Create a small test file for upload testing"""
    test_content = f"Test file created at {datetime.now()}\nThis is a diagnostic test file."
    
    # Create temporary file
    with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
        f.write(test_content)
        return f.name, test_content

def test_onelake_upload(test_file_path, test_content):
    """Test uploading a file to OneLake using REST API"""
    workspace_id = os.environ.get('FABRIC_WORKSPACE_ID')
    lakehouse_id = os.environ.get('FABRIC_LAKEHOUSE_ID')
    access_token = os.environ.get('FABRIC_ACCESS_TOKEN')
    
    if not all([workspace_id, lakehouse_id, access_token]):
        print("❌ Missing required credentials for upload test")
        return False
    
    # Choose a test directory that we know exists
    if onelake_dirs:
        test_dir_key = list(onelake_dirs.keys())[0]
        test_dir_path = onelake_dirs[test_dir_key]
        print(f"🎯 Testing upload to: {test_dir_key}")
        print(f"Directory path: {test_dir_path}")
    else:
        print("❌ No OneLake directories available for testing")
        return False
    
    # Construct upload URL - try different approaches
    test_results = {}
    
    # Approach 1: Direct OneLake API
    try:
        # Convert abfss path to https path
        if test_dir_path.startswith("abfss://"):
            # Transform: abfss://COE_F_EUC_P2@onelake.dfs.fabric.microsoft.com/workspace/lakehouse.Lakehouse/Files/path
            # To: https://onelake.dfs.fabric.microsoft.com/workspace/lakehouse.Lakehouse/Files/path
            https_path = test_dir_path.replace("abfss://COE_F_EUC_P2@", "https://")
        else:
            https_path = test_dir_path
        
        upload_url = f"{https_path}/test_diagnostic_file.txt"
        
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'text/plain',
            'x-ms-blob-type': 'BlockBlob'
        }
        
        print(f"🔄 Testing upload to: {upload_url}")
        response = requests.put(upload_url, data=test_content, headers=headers, timeout=30)
        
        test_results['direct_upload'] = {
            'url': upload_url,
            'status': response.status_code,
            'success': response.status_code in [200, 201],
            'response': response.text[:200] if response.text else 'No response text'
        }
        
        print(f"Direct upload result: {'✅' if response.status_code in [200, 201] else '❌'} {response.status_code}")
        if response.status_code not in [200, 201]:
            print(f"Error response: {response.text[:200]}")
            
    except Exception as e:
        test_results['direct_upload'] = {'error': str(e), 'success': False}
        print(f"Direct upload error: ❌ {e}")
    
    # Approach 2: Try Azure Data Lake Gen2 API format
    try:
        # Use Azure Data Lake Gen2 API pattern
        gen2_url = f"https://onelake.dfs.fabric.microsoft.com/{workspace_id}/{lakehouse_id}.Lakehouse/Files/{test_dir_key}/test_diagnostic_file.txt"
        
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/octet-stream'
        }
        
        # First create the file
        create_response = requests.put(gen2_url, headers=headers, timeout=30)
        
        if create_response.status_code in [200, 201]:
            # Then append data
            append_url = f"{gen2_url}?action=append&position=0"
            append_response = requests.patch(append_url, data=test_content.encode(), headers=headers, timeout=30)
            
            if append_response.status_code in [200, 202]:
                # Finally flush
                flush_url = f"{gen2_url}?action=flush&position={len(test_content.encode())}"
                flush_response = requests.patch(flush_url, headers=headers, timeout=30)
                final_status = flush_response.status_code
            else:
                final_status = append_response.status_code
        else:
            final_status = create_response.status_code
        
        test_results['gen2_upload'] = {
            'url': gen2_url,
            'status': final_status,
            'success': final_status in [200, 201],
            'create_status': create_response.status_code,
            'response': create_response.text[:200] if create_response.text else 'No response text'
        }
        
        print(f"Gen2 API upload result: {'✅' if final_status in [200, 201] else '❌'} {final_status}")
        
    except Exception as e:
        test_results['gen2_upload'] = {'error': str(e), 'success': False}
        print(f"Gen2 upload error: ❌ {e}")
    
    return test_results

# Create test file and run upload tests
print("🧪 Creating test file for upload diagnosis...")
test_file_path, test_content = create_test_file()
print(f"✅ Test file created: {test_file_path}")
print(f"Content length: {len(test_content)} bytes")

# Run upload tests
if auth_results and any(result.get('success', False) for result in auth_results.values()):
    print("\n🚀 Running upload tests...")
    upload_results = test_onelake_upload(test_file_path, test_content)
    
    # Summarize upload test results
    if upload_results:
        print(f"\n📊 Upload Test Summary:")
        successful_methods = [method for method, result in upload_results.items() if result.get('success', False)]
        print(f"Successful upload methods: {len(successful_methods)}/{len(upload_results)}")
        
        if successful_methods:
            print(f"✅ Working methods: {', '.join(successful_methods)}")
        else:
            print("❌ No upload methods worked")
            print("\n🔍 Failure details:")
            for method, result in upload_results.items():
                if 'error' in result:
                    print(f"  {method}: {result['error']}")
                else:
                    print(f"  {method}: HTTP {result.get('status', 'Unknown')} - {result.get('response', 'No details')}")
else:
    print("❌ Skipping upload tests - authentication failed")

# Clean up test file
try:
    os.unlink(test_file_path)
    print(f"\n🧹 Cleaned up test file: {test_file_path}")
except Exception as e:
    print(f"⚠️ Could not clean up test file: {e}")

## 📝 Section 6: Recommendations and Next Steps
Based on the diagnostic analysis, provide actionable recommendations to fix the migration issues.

In [None]:
# Generate recommendations based on diagnostic results
def generate_recommendations():
    """Generate actionable recommendations based on the diagnostic analysis"""
    
    print("🎯 MIGRATION DIAGNOSTIC SUMMARY")
    print("=" * 50)
    
    # Check what we discovered
    has_directories = len(onelake_dirs) > 0
    has_migration_progress = len(migration_progress) > 0
    has_auth = auth_results and any(result.get('success', False) for result in auth_results.values())
    
    print(f"✅ OneLake directories created: {len(onelake_dirs)} directories")
    print(f"{'✅' if has_migration_progress else '❌'} Migration progress file: {'Found' if has_migration_progress else 'Not found'}")
    print(f"{'✅' if has_auth else '❌'} API authentication: {'Working' if has_auth else 'Failed'}")
    
    # Analyze the gap
    print(f"\n🔍 ROOT CAUSE ANALYSIS:")
    
    if has_directories and not has_migration_progress:
        print("1. ❌ ISSUE: Migration progress file missing or empty")
        print("   → The migration script may not be running or saving progress")
        print("   → Check if the migration script is using the correct file paths")
    
    if has_directories and has_migration_progress:
        failed_migrations = [k for k, v in migration_progress.items() if not v.get('success', False)]
        print(f"2. ❌ ISSUE: {len(failed_migrations)} migrations failed despite directories existing")
        print("   → API path mismatch between directory creation and file upload")
        print("   → Directory creation uses mssparkutils (abfss://) but upload uses REST API (https://)")
    
    if not has_auth:
        print("3. ❌ ISSUE: API authentication failing")
        print("   → Access token may be expired or invalid")
        print("   → Workspace/Lakehouse IDs may be incorrect")
    
    print(f"\n💡 RECOMMENDED SOLUTIONS:")
    print("1. 🔧 PATH STANDARDIZATION:")
    print("   → Modify migration script to use consistent API approach")
    print("   → Convert abfss:// paths to https:// for REST API uploads")
    print("   → Test path conversion: abfss://COE_F_EUC_P2@onelake.dfs.fabric.microsoft.com → https://onelake.dfs.fabric.microsoft.com")
    
    print("\\n2. 🔐 AUTHENTICATION REFRESH:")
    print("   → Regenerate Fabric access token")
    print("   → Verify workspace and lakehouse IDs are correct")
    print("   → Test authentication with simple API call before bulk migration")
    
    print("\\n3. 📁 MIGRATION STRATEGY:")
    print("   → Use Azure Data Lake Gen2 API for file uploads (create → append → flush)")
    print("   → Implement proper error handling and retry logic")
    print("   → Add progress tracking with detailed logging")
    
    print("\\n4. 🧪 TESTING APPROACH:")
    print("   → Start with single file upload test")
    print("   → Verify directory listing works before file upload")
    print("   → Implement batch upload with progress monitoring")
    
    # Generate specific code fixes
    print(f"\\n🛠️ SPECIFIC CODE CHANGES NEEDED:")
    
    if onelake_dirs:
        sample_dir = list(onelake_dirs.items())[0]
        abfss_path = sample_dir[1]
        https_path = abfss_path.replace("abfss://COE_F_EUC_P2@", "https://")
        
        print("\\nPath conversion example:")
        print(f"  Original (mssparkutils): {abfss_path}")
        print(f"  For REST API:           {https_path}")
        
        print("\\nRecommended upload function:")
        print("""
def upload_to_onelake_fixed(file_path, destination_path):
    # Convert abfss path to https
    if destination_path.startswith("abfss://"):
        api_path = destination_path.replace("abfss://COE_F_EUC_P2@", "https://")
    else:
        api_path = destination_path
    
    # Use Azure Data Lake Gen2 API pattern
    headers = {'Authorization': f'Bearer {access_token}'}
    
    # Step 1: Create file
    create_response = requests.put(api_path, headers=headers)
    
    # Step 2: Append data
    with open(file_path, 'rb') as f:
        data = f.read()
    append_url = f"{api_path}?action=append&position=0"
    append_response = requests.patch(append_url, data=data, headers=headers)
    
    # Step 3: Flush
    flush_url = f"{api_path}?action=flush&position={len(data)}"
    flush_response = requests.patch(flush_url, headers=headers)
    
    return flush_response.status_code in [200, 201]
        """)

# Generate final recommendations
print("🔬 DIAGNOSTIC ANALYSIS COMPLETE")
print("=" * 50)
generate_recommendations()

print(f"\\n⏰ Analysis completed at: {datetime.now()}")
print("\\n📋 NEXT STEPS:")
print("1. Run this diagnostic notebook to identify specific issues")
print("2. Implement the recommended path conversion fixes")
print("3. Test single file upload before bulk migration")
print("4. Monitor progress with detailed logging")
print("\\n🎯 Expected outcome: 100% migration success rate with proper path handling")