# L2C Workbook -- Local to Cloud Migration

Migrate your local **Apache Polaris** PoC to **AWS S3 + Snowflake External Iceberg Tables**.

This workbook walks through the full lifecycle:

1. Inspect local Polaris tables
2. Review migration infrastructure (AWS + Snowflake)
3. Verify synced data on S3
4. Query from Snowflake
5. Incremental update flow
6. Reset and re-demo

> **Prerequisites:**
> * Run `task setup:all WORK_DIR=<your-project>` before opening this notebook.
> * L2C sections require `$PROJECT_HOME/bin/plf l2c setup` to have been run.
> * **AWS Credentials:** Ensure AWS credentials are properly configured for S3 access:
>   - **AWS SSO:** Run `aws sso login --profile <your-profile>` if using SSO
>   - **AWS Profile:** Set `AWS_PROFILE=<your-profile>` or configure default credentials
>   - **Verify Access:** Test with `aws sts get-caller-identity` to confirm authentication

## How L2C Works

**L2C Migration Flow:**

**INITIAL SETUP:**
1. **Local Apache Polaris** (Iceberg REST) stores tables via RustFS (S3-compatible)
2. **AWS S3 bucket** configured for Iceberg storage  
3. **Snowflake External Volume** points to S3 bucket

**MIGRATION STEPS:**
1. **SYNC:** Copy Iceberg data/metadata from local RustFS to AWS S3
2. **REGISTER:** Create Snowflake External Iceberg Tables pointing to S3 paths
3. **QUERY:** Access data through Snowflake while maintaining Iceberg format

**DAY-2 OPERATIONS:**
1. **UPDATE:** Local table changes ‚Üí sync to S3 ‚Üí refresh Snowflake metadata
2. **VERIFY:** Query both local and Snowflake to confirm consistency

The `plf l2c` CLI orchestrates each step. This workbook lets you inspect
the state at each stage and verify end-to-end.

> **‚ö†Ô∏è AWS Credentials Required:** L2C migration requires active AWS credentials with S3 access.
> The notebook will fail at sync/verification steps if AWS authentication is not properly configured.
> Common issues:
> - **SSO Expired:** Re-run `aws sso login --profile <your-profile>`
> - **Profile Missing:** Check `~/.aws/config` and set `AWS_PROFILE` if needed
> - **Permissions:** Ensure your AWS role has S3 read/write access to the configured bucket

## Setup

Load project configuration from `.env` and L2C state from `.snow-utils/l2c-state.json`.
Credentials are read from `work/principal.txt` (masked in output).

In [None]:
import json
from pathlib import Path

from dotenv import dotenv_values

project_root = Path("..").resolve()

# --- .env (required) ---
env_file = project_root / ".env"
if not env_file.exists():
    print(f"ERROR: {env_file} not found.")
    print("  Run: task setup:all WORK_DIR=<your-project>")
    raise SystemExit(1)

cfg = dotenv_values(env_file)
has_env = True

# --- work/principal.txt (needs cluster running) ---
principal_file = project_root / "work" / "principal.txt"
has_principal = principal_file.exists()
realm = client_id = client_secret = None
if has_principal:
    content = principal_file.read_text().strip()
    if "," in content:
        # Comma-separated format: REALM,CLIENT_ID,CLIENT_SECRET
        realm, client_id, client_secret = content.split(",")
    else:
        # Line-separated format (fallback)
        lines = content.splitlines()
        if len(lines) >= 3:
            realm, client_id, client_secret = lines[0], lines[1], lines[2]
else:
    print("WARNING: work/principal.txt not found.")
    print("  Run: task setup:all WORK_DIR=<your-project>")

# --- .snow-utils/l2c-state.json (needs plf l2c setup) ---
state_file = project_root / ".snow-utils" / "l2c-state.json"
state = None
has_l2c_state = state_file.exists()
if has_l2c_state:
    state = json.loads(state_file.read_text())
else:
    print("INFO: .snow-utils/l2c-state.json not found.")
    print(f"  Run: {project_root}/bin/plf l2c setup")

# --- Derived config ---
polaris_url = cfg.get("POLARIS_HOST", "http://localhost:18181")
catalog_name = cfg.get("PLF_POLARIS_CATALOG_NAME", "polardb")

# --- Summary ---
print(f"Project root:  {project_root}")
print(f"Polaris URL:   {polaris_url}")
print(f"Catalog:       {catalog_name}")
print(f"has_principal: {has_principal}")
print(f"has_l2c_state: {has_l2c_state}")
if has_l2c_state and state:
    aws = state.get("aws", {})
    sf = state.get("snowflake", {})
    print(f"AWS bucket:    {aws.get('bucket', 'N/A')}")
    print(f"AWS region:    {aws.get('region', 'N/A')}")
    print(f"SF database:   {sf.get('database', 'N/A')}")
    print(f"SF schema:     {sf.get('schema', 'N/A')}")
    print(f"SF SA_ROLE:    {sf.get('sa_role', 'N/A')}")

## Utility Functions

Reusable functions to eliminate code duplication across sections.

In [None]:
import requests
import duckdb
import snowflake.connector

def get_polaris_token(polaris_url, client_id, client_secret, realm):
    """Get OAuth token from Polaris REST API."""
    token_resp = requests.post(
        f"{polaris_url}/api/catalog/v1/oauth/tokens",
        data={
            "grant_type": "client_credentials",
            "client_id": client_id,
            "client_secret": client_secret,
            "scope": "PRINCIPAL_ROLE:ALL",
        },
        headers={"Polaris-Realm": realm},
    )
    token_resp.raise_for_status()
    return token_resp.json()["access_token"]

def create_polaris_headers(polaris_url, client_id, client_secret, realm):
    """Create headers for Polaris REST API calls."""
    token = get_polaris_token(polaris_url, client_id, client_secret, realm)
    return {
        "Authorization": f"Bearer {token}", 
        "Content-Type": "application/json"
    }

def setup_duckdb_polaris_connection(polaris_url, catalog_name, client_id, client_secret, realm):
    """Setup DuckDB with Polaris Iceberg catalog connection."""
    conn_ddb = duckdb.connect(":memory:")
    
    # Install and load iceberg extension
    conn_ddb.execute("INSTALL iceberg")
    conn_ddb.execute("LOAD iceberg")
    
    # Create secret for OAuth2 authentication
    oauth_server = f"{polaris_url}/api/catalog/v1/oauth/tokens"
    conn_ddb.execute(f"""
        CREATE OR REPLACE SECRET polaris_secret (
            TYPE iceberg,
            CLIENT_ID '{client_id}',
            CLIENT_SECRET '{client_secret}',
            OAUTH2_SERVER_URI '{oauth_server}'
        )
    """)
    
    # Attach catalog
    catalog_endpoint = f"{polaris_url}/api/catalog"
    conn_ddb.execute(f"""
        ATTACH '{catalog_name}' AS polaris_catalog (
            TYPE iceberg,
            SECRET polaris_secret,
            ENDPOINT '{catalog_endpoint}'
        )
    """)
    
    return conn_ddb

def get_table_count_via_duckdb(polaris_url, catalog_name, client_id, client_secret, realm, namespace, table):
    """Get row count for a table via DuckDB."""
    try:
        conn_ddb = setup_duckdb_polaris_connection(polaris_url, catalog_name, client_id, client_secret, realm)
        result = conn_ddb.execute(f"SELECT COUNT(*) FROM polaris_catalog.{namespace}.{table}").fetchone()
        count = result[0] if result else 0
        conn_ddb.close()
        return count
    except Exception as e:
        print(f"DuckDB query failed for {namespace}.{table}: {e}")
        return "Error"

def create_snowflake_connection(cfg, sf_config):
    """Create Snowflake connection with error handling."""
    try:
        connection_name = cfg.get("SNOWFLAKE_DEFAULT_CONNECTION_NAME", "default")
        print(f"Attempting Snowflake connection using: {connection_name}")
        
        conn = snowflake.connector.connect(
            connection_name=connection_name,
            role=sf_config["sa_role"],
            database=sf_config["database"],
            schema=sf_config["schema"],
            login_timeout=10,  # 10 second timeout
        )
        print(f"‚úÖ Snowflake connection successful")
        return conn, True
    except Exception as e:
        print(f"‚ö†Ô∏è  Snowflake connection failed: {e}")
        if "browser" in str(e).lower() or "oauth" in str(e).lower():
            print(f"   Connection '{connection_name}' requires browser authentication")
            print(f"   For notebook use, consider configuring a key-pair connection")
        print(f"   Check ~/.snowflake/connections.toml configuration")
        return None, False

print("‚úÖ Utility functions loaded")

## 1. Local Inventory

Discover all namespaces and tables in the local Polaris catalog via the Iceberg REST API.
These are the tables available for migration.

<!-- Skill action: l2c-inventory -->

In [None]:
import pandas as pd
if not has_principal:
    print("\u23ed\ufe0f  Skipping -- Polaris credentials not available.")
    print("   Run: task setup:all WORK_DIR=<your-project>")
else:
    import requests
    headers = create_polaris_headers(polaris_url, client_id, client_secret, realm)
    base = f"{polaris_url}/api/catalog"

    rows = []
    ns_resp = requests.get(f"{base}/v1/{catalog_name}/namespaces", headers=headers)
    ns_resp.raise_for_status()
    for ns_parts in ns_resp.json().get("namespaces", []):
        ns_name = ".".join(ns_parts)
        tbl_resp = requests.get(f"{base}/v1/{catalog_name}/namespaces/{ns_name}/tables", headers=headers)
        tbl_resp.raise_for_status()
        for ident in tbl_resp.json().get("identifiers", []):
            table_name = ident["name"]
            meta_resp = requests.get(
                f"{base}/v1/{catalog_name}/namespaces/{ns_name}/tables/{table_name}",
                headers=headers,
            )
            meta_resp.raise_for_status()
            meta = meta_resp.json()
            location = meta.get("metadata", {}).get("location", "")
            schemas = meta.get("metadata", {}).get("schemas", [])
            col_count = len(schemas[-1].get("fields", [])) if schemas else 0
            rows.append({
                "namespace": ns_name,
                "table": table_name,
                "columns": col_count,
                "location": location,
            })

    df = pd.DataFrame(rows)
    display(df)

## 2. Migration Status

Current state of the L2C pipeline -- AWS infrastructure, Snowflake resources,
and per-table sync/register status.

> Requires `$PROJECT_HOME/bin/plf l2c setup` to have been run.  
<!-- Skill action: l2c-status -->

In [None]:
import pandas as pd

if not has_l2c_state:
    print("\u23ed\ufe0f  Skipping -- L2C not configured yet.")
    print(f"   Run: {project_root}/bin/plf l2c setup")
else:
    aws = state.get("aws", {})
    sf = state.get("snowflake", {})

    print("=== AWS ===")
    print(f"  Bucket:  {aws.get('bucket', 'N/A')}")
    print(f"  Region:  {aws.get('region', 'N/A')}")
    print(f"  Role:    {aws.get('role_arn', 'N/A')}")
    print()
    print("=== Snowflake ===")
    print(f"  SA_ROLE:         {sf.get('sa_role', 'N/A')}")
    print(f"  Ext Volume:      {sf.get('external_volume', 'N/A')}")
    print(f"  Catalog Int:     {sf.get('catalog_integration', 'N/A')}")
    print(f"  Database.Schema: {sf.get('database', 'N/A')}.{sf.get('schema', 'N/A')}")
    print()

    tables = state.get("tables", {})
    if tables:
        rows = []
        for key, info in tables.items():
            rows.append({
                "table": key,
                "namespace": info.get("namespace", ""),
                "sync_status": info.get("sync", {}).get("status", "pending"),
                "last_sync": info.get("sync", {}).get("last_sync", ""),
                "register_status": info.get("register", {}).get("status", "pending"),
            })
        df = pd.DataFrame(rows)
        display(df)
    else:
        print(f"No tables in state yet. Run: {project_root}/bin/plf l2c migrate")

## 3. Sync Verification

Compare object counts between local RustFS and the AWS S3 migration bucket.
A matching count confirms all Iceberg data files and metadata were synced.

> Requires `$PROJECT_HOME/bin/plf l2c migrate` to have been run.  
<!-- Skill action: l2c-verify -->

In [None]:
import boto3
import pandas as pd
import os
from contextlib import contextmanager
from botocore.config import Config as BotoConfig

if not has_l2c_state:
    print("\u23ed\ufe0f  Skipping -- L2C not configured yet.")
    print(f"   Run: {project_root}/bin/plf l2c setup")
else:
    # Duplicate L2C session functions (same as CLI implementation)
    _AWS_ENV_VARS = [
        "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN",
        "AWS_ENDPOINT_URL", "AWS_DEFAULT_REGION", "AWS_REGION",
        "AWS_PROFILE", "AWS_DEFAULT_PROFILE",
        "AWS_CONFIG_FILE", "AWS_SHARED_CREDENTIALS_FILE",
    ]

    @contextmanager
    def scrubbed_aws_env():
        """Context manager that strips RustFS AWS_* vars for the entire block."""
        saved = {k: os.environ.pop(k) for k in _AWS_ENV_VARS if k in os.environ}
        try:
            yield
        finally:
            os.environ.update(saved)

    def create_rustfs_session(cfg: dict):
        """Create an isolated boto3 S3 client for local RustFS."""
        session = boto3.Session(
            aws_access_key_id=cfg.get("AWS_ACCESS_KEY_ID", "admin"),
            aws_secret_access_key=cfg.get("AWS_SECRET_ACCESS_KEY", "password"),
            region_name="us-east-1",
        )
        endpoint = cfg.get("AWS_ENDPOINT_URL", "http://localhost:19000")
        return session.client(
            "s3",
            endpoint_url=endpoint,
            region_name="us-east-1",
            config=BotoConfig(s3={"addressing_style": "path"}),
        )

    def _resolve_profile(aws_profile: str | None = None) -> str:
        """Resolve AWS profile: CLI flag > env var > default."""
        return aws_profile or os.environ.get("L2C_AWS_PROFILE") or "default"

    def create_cloud_session(aws_profile: str | None = None, region: str = "us-east-1"):
        """Create an isolated boto3 session for real AWS."""
        if aws_profile is None:
            # No profile specified - use default credential chain (env vars, SSO, etc.)
            session = boto3.Session(region_name=region)
            profile_name = "default-chain"
        else:
            # Specific profile requested
            profile = _resolve_profile(aws_profile)
            session = boto3.Session(profile_name=profile, region_name=region)
            profile_name = profile
            
        cloud_s3 = session.client("s3", region_name=region)
        
        # Get account info for verification (like CLI does)
        cloud_sts = session.client("sts")
        account_id = cloud_sts.get_caller_identity()["Account"]
        print(f"Cloud session: AWS account {account_id}, profile '{profile_name}', region '{region}'")
        
        return cloud_s3, None, cloud_sts

    # Use L2C state for configuration
    aws = state.get("aws", {})
    bucket = aws.get("bucket", "")
    region = aws.get("region", "us-west-2")
    profile = aws.get("profile")  # Can be None

    # Use exact same session creation as L2C CLI - scrub environment for both
    with scrubbed_aws_env():
        rustfs_s3 = create_rustfs_session(cfg)

    # Real AWS S3 - use exact same pattern as L2C CLI
    try:
        with scrubbed_aws_env():
            cloud_s3, _, _ = create_cloud_session(profile, region)
        aws_available = True
    except Exception as e:
        print(f"‚ö†Ô∏è  AWS session creation failed: {e}")
        print(f"   Run: {project_root}/bin/plf l2c setup")
        print(f"   Or check AWS credentials: aws sts get-caller-identity")
        aws_available = False

    def count_objects(s3_client, bucket_name, prefix=""):
        try:
            paginator = s3_client.get_paginator("list_objects_v2")
            count = 0
            for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
                count += page.get("KeyCount", 0)
            return count
        except Exception:
            return -1  # Error indicator

    local_bucket = cfg.get("POLARIS_CATALOG_NAME", "polardb")
    rows = []
    for key, info in state.get("tables", {}).items():
        ns = info.get("namespace", "")
        tbl = info.get("table", "")
        prefix = f"{ns}/{tbl}/"
        local_count = count_objects(rustfs_s3, local_bucket, prefix)
        
        if aws_available:
            s3_count = count_objects(cloud_s3, bucket, prefix)
            match_status = "Yes" if local_count == s3_count else "NO"
        else:
            s3_count = "N/A"
            match_status = "AWS not configured"
            
        rows.append({
            "table": key,
            "local_objects": local_count,
            "s3_objects": s3_count,
            "match": match_status,
        })

    df = pd.DataFrame(rows)
    display(df)

## 4. Query from Snowflake

Run the same count query both **locally** (PyIceberg) and in **Snowflake** to
confirm the data matches end-to-end.

Uses `snowflake-connector-python` with a named connection from
`~/.snowflake/connections.toml` -- no hardcoded credentials needed.

> Requires `$PROJECT_HOME/bin/plf l2c migrate` to have been run.  
<!-- Skill action: l2c-query -->

In [None]:
import pandas as pd

if not has_l2c_state:
    print("\u23ed\ufe0f  Skipping -- L2C not configured yet.")
    print(f"   Run: {project_root}/bin/plf l2c setup")
else:
    import snowflake.connector

    sf = state["snowflake"]
    database = sf["database"]
    schema = sf["schema"]

    # Snowflake query - handle OAuth connections gracefully
    # Use utility function for Snowflake connection
    conn, snowflake_available = create_snowflake_connection(cfg, sf)

    rows = []
    if snowflake_available:
        for key, info in state.get("tables", {}).items():
            if info.get("register", {}).get("status") != "done":
                continue
            # Use the fully qualified Snowflake table name from register section
            sf_table = info.get("register", {}).get(
                "sf_table", f"{database}.{schema}.{key.upper()}"
            )

            # Snowflake count
            cur = conn.cursor()
            cur.execute(f"SELECT COUNT(*) FROM {sf_table}")
            sf_count = cur.fetchone()[0]

            # Local count using DuckDB (much simpler and more reliable!)
            ns = info["namespace"]
            tbl = info["table"]

            local_count = get_table_count_via_duckdb(
                polaris_url,
                catalog_name,
                client_id,
                client_secret,
                realm,
                ns,
                tbl,
            )

            rows.append(
                {
                    "table": f"{ns}.{tbl}",
                    "local_count": local_count,
                    "snowflake_count": sf_count,
                    "match": "‚úÖ"
                    if local_count == sf_count
                    else "‚ùå"
                    if isinstance(local_count, int)
                    else "‚ö†Ô∏è",
                }
            )

        conn.close()
        df = pd.DataFrame(rows)
        display(df)
    else:
        print("Snowflake connection not available - skipping comparison.")

## 5. Incremental Update

Demonstrate the day-2 workflow:

1. Insert new rows into the **local** Polaris table
2. Run `plf l2c update --force` to sync changes to S3 and refresh Snowflake
3. Query Snowflake to confirm the new rows appear

This proves that the L2C bridge works for ongoing development, not just
one-shot migrations.

> Requires both local cluster and `$PROJECT_HOME/bin/plf l2c setup` to have been run.  
<!-- Skill action: l2c-update -->

In [None]:
import subprocess

import pandas as pd

if not has_l2c_state or not has_principal:
    missing = []
    if not has_principal:
        missing.append("Polaris credentials (run: task setup:all WORK_DIR=<your-project>)")
    if not has_l2c_state:
        missing.append(f"L2C state (run: {project_root}/bin/plf l2c setup)")
    print(f"\u23ed\ufe0f  Skipping -- missing: {', '.join(missing)}")
else:
    import snowflake.connector

    sf = state["snowflake"]
    database = sf["database"]
    schema = sf["schema"]

    # Pick the first registered table for the demo
    demo_key = None
    demo_info = None
    for key, info in state.get("tables", {}).items():
        if info.get("register", {}).get("status") == "done":
            demo_key = key
            demo_info = info
            break

    if not demo_key:
        print(f"No registered tables found. Run: {project_root}/bin/plf l2c migrate")
    else:
        ns = demo_info["namespace"]
        tbl = demo_info["table"]
        # Use the fully qualified Snowflake table name from register section
        sf_table = demo_info.get("register", {}).get("sf_table", f"{database}.{schema}.{demo_key.upper()}")

        # Get before count using DuckDB (same as Snowflake comparison section)
        # Query the table for row count
        local_count = get_table_count_via_duckdb(
            polaris_url,
            catalog_name,
            client_id,
            client_secret,
            realm,
            ns,
            tbl,
        )
        
        before_count = result[0] if result else 0
        
            
        print(f"Current row count in {ns}.{tbl}: {before_count}")
        print("Note: Incremental data insertion requires PyIceberg or direct SQL - skipping for demo")
        print("In a real scenario, you would insert data here, then run l2c update")
        
        # Simulate the "after" count (in real demo, you'd insert actual data)
        after_local = before_count  # No actual insertion in this demo

        # Sync to cloud
        plf = str(project_root / "bin" / "plf")
        print("\nRunning: plf l2c update --force --yes")
        subprocess.run([plf, "l2c", "update", "--force", "--yes"], cwd=project_root, check=True)

        # Query Snowflake
        conn = snowflake.connector.connect(
            connection_name=cfg.get("SNOWFLAKE_DEFAULT_CONNECTION_NAME", "default"),
            role=sf["sa_role"],
            database=database,
            schema=schema,
        )
        cur = conn.cursor()
        cur.execute(f"SELECT COUNT(*) FROM {sf_table}")
        sf_count = cur.fetchone()[0]
        conn.close()

        df = pd.DataFrame([{
            "table": sf_table,
            "before": before_count,
            "after_local": after_local,
            "after_snowflake": sf_count,
            "match": "Yes" if after_local == sf_count else "NO",
        }])
        display(df)

## 6. Reset and Reload (Demo Reset)

Drop and recreate the local catalog to regenerate sample data, then clear
the S3 bucket and Snowflake tables. This lets you re-run the full L2C
migration demo **without** tearing down the k3d cluster.

<!-- Skill action: l2c-reset -->

In [None]:
import subprocess

if not has_principal:
    print("‚è≠Ô∏è  Skipping -- Polaris credentials not available.")
    print("   Run: task setup:all WORK_DIR=<your-project>")
else:
    plf = str(project_root / "bin" / "plf")
    print(f"Using PLF binary: {plf}")
    print(f"Working directory: {project_root}")
    print()

    try:
        print("=== Step 1: Drop and recreate catalog ===")
        result1 = subprocess.run([plf, "catalog", "cleanup", "--yes"], 
                                cwd=project_root, check=True, capture_output=True, text=True)
        print("Catalog cleanup completed.")
        
        result2 = subprocess.run([plf, "catalog", "setup"], 
                                cwd=project_root, check=True, capture_output=True, text=True)
        print("Catalog recreated with sample data.")
        print()

        if has_l2c_state:
            print("=== Step 2: Clear S3 + Snowflake tables ===")
            result3 = subprocess.run([plf, "l2c", "clear", "--yes"], 
                                    cwd=project_root, check=True, capture_output=True, text=True)
            print("S3 objects and Snowflake tables cleared.")
        else:
            print("L2C not configured -- skipping S3/Snowflake clear.")

        print()
        print("‚úÖ Ready to re-run L2C migration demo.")
        
    except subprocess.CalledProcessError as e:
        print(f"‚ùå Command failed: {e.cmd}")
        print(f"Return code: {e.returncode}")
        if e.stdout:
            print(f"STDOUT: {e.stdout}")
        if e.stderr:
            print(f"STDERR: {e.stderr}")
        print()
        print("üí° Troubleshooting:")
        print(f"   - Check if PLF binary exists: ls -la {plf}")
        print(f"   - Check working directory: ls -la {project_root}")
        print("   - Ensure cluster is running: task status")