In [None]:
import io
import json
import re
from datetime import datetime, timezone

from databricks.sdk import WorkspaceClient


def strip_scheme(host_or_url: str) -> str:
    return host_or_url.replace("https://", "").replace("http://", "").strip("/")


dbutils.widgets.text("app_url", "<app-name> OR <full-app-host-or-url>")
dbutils.widgets.text("scope", "")
dbutils.widgets.text("api_path", "/api/sample")
dbutils.widgets.text("expected_header_text", "Hello World")
dbutils.widgets.text("expected_api_message", "Hello from API sample")
dbutils.widgets.text("artifacts_volume_path", "")

raw_app_url = dbutils.widgets.get("app_url").strip()
scope = dbutils.widgets.get("scope").strip()
expected_header_text = dbutils.widgets.get("expected_header_text").strip()
expected_api_message = dbutils.widgets.get("expected_api_message").strip()
artifacts_volume_path = dbutils.widgets.get("artifacts_volume_path").strip()

client_id = dbutils.secrets.get(scope=scope, key="client_id")
client_secret = dbutils.secrets.get(scope=scope, key="client_secret")

# Get workspace host FIRST - needed for proper OAuth token generation
workspace_host = spark.conf.get("spark.databricks.workspaceUrl")

# Create WorkspaceClient with explicit host for M2M OAuth flow
workspace = WorkspaceClient(
    host=f"https://{workspace_host}",
    client_id=client_id,
    client_secret=client_secret,
)

auth_headers = workspace.config.authenticate()

def resolve_app_host(app_url: str, workspace_host: str) -> str:
    app_host = strip_scheme(app_url)
    if not app_host:
        raise ValueError("app_url is required")
    if "." in app_host:
        return app_host

    workspace_host = strip_scheme(workspace_host)
    m = re.fullmatch(r"adb-(\d+)\.(\d+)\.azuredatabricks\.net", workspace_host)
    if not m:
        raise ValueError(f"Unsupported workspace host format: {workspace_host}")

    workspace_id, shard = m.groups()
    return f"{app_host}-{workspace_id}.{shard}.azure.databricksapps.com"

raw_api_path = dbutils.widgets.get("api_path").strip()
api_path = "/" + raw_api_path.lstrip("/")
if not api_path.startswith("/api/"):
    raise ValueError(f"api_path must start with '/api/': {api_path}")

app_host = resolve_app_host(raw_app_url, workspace_host)

base_url = f"https://{app_host}".rstrip("/")
api_url = f"{base_url}{api_path}"

items = workspace.workspace.list(path="/")
print("Authentication successful. Workspace root contents:")
for item in items:
    print(f"- {item.path} ({item.object_type})")

print(f"\nTest targets:")
print(f"  Landing page: {base_url}/")
print(f"  API endpoint: {api_url}")

browser_headers = {}
if "Authorization" in auth_headers:
    browser_headers["Authorization"] = auth_headers["Authorization"]

# Artifact output setup
artifacts_enabled = False
run_output_dir = None
run_id = "manual"
run_timestamp = datetime.now(timezone.utc)

if artifacts_volume_path:
    if not artifacts_volume_path.startswith("/Volumes/"):
        raise ValueError(f"artifacts_volume_path must start with '/Volumes/': {artifacts_volume_path}")
    
    # Verify volume path exists using workspace.files API
    try:
        list(workspace.files.list_directory_contents(artifacts_volume_path))
    except Exception as e:
        raise ValueError(f"Volume path does not exist or is not accessible: {artifacts_volume_path}. Error: {e}")
    
    # Get run_id from job context, fallback to "manual"
    try:
        run_id = spark.conf.get("spark.databricks.job.runId", "manual")
    except Exception:
        run_id = "manual"
    
    # Build timestamp-tagged output directory
    timestamp_folder = run_timestamp.strftime("%Y-%m-%d_%H-%M-%S") + f"-run-{run_id}"
    run_output_dir = f"{artifacts_volume_path}/{timestamp_folder}"

    # Create directory using workspace.files API
    workspace.files.create_directory(run_output_dir)
    
    artifacts_enabled = True
    print(f"\nArtifacts will be saved to: {run_output_dir}")
else:
    print("\nArtifact output disabled (artifacts_volume_path not set)")

In [None]:
import asyncio

from playwright.async_api import Error as PlaywrightError
from playwright.async_api import async_playwright

MAX_RETRIES = 3
RETRY_DELAY_SECONDS = 5

# Track test results for summary
test_results = {
    "landing_page": {"status": "pending", "pdf": None},
    "api_endpoint": {"status": "pending", "pdf": None},
}


async def capture_pdf(page, filename: str) -> str | None:
    """Capture page as PDF if artifacts are enabled. Returns filename or None."""
    if not artifacts_enabled or not run_output_dir:
        return None
    try:
        # Generate PDF as bytes
        pdf_bytes = await page.pdf(format="A4", print_background=True)
        
        # Upload via SDK files API
        file_path = f"{run_output_dir}/{filename}"
        workspace.files.upload(file_path, io.BytesIO(pdf_bytes), overwrite=True)
        
        print(f"  Saved PDF: {filename}")
        return filename
    except Exception as e:
        print(f"  Warning: Failed to capture PDF {filename}: {e}")
        return None


def write_summary():
    """Write summary.json with test results."""
    if not artifacts_enabled or not run_output_dir:
        return
    
    overall_status = "pass" if all(
        r["status"] == "pass" for r in test_results.values()
    ) else "fail"
    
    summary = {
        "run_id": run_id,
        "timestamp": run_timestamp.isoformat(),
        "base_url": base_url,
        "api_url": api_url,
        "tests": test_results,
        "overall_status": overall_status,
    }
    
    # Upload via SDK files API
    summary_path = f"{run_output_dir}/summary.json"
    summary_bytes = json.dumps(summary, indent=2).encode("utf-8")
    workspace.files.upload(summary_path, io.BytesIO(summary_bytes), overwrite=True)
    
    print(f"\nSummary written to: {summary_path}")


async def navigate_with_retry(page, url: str, description: str):
    """Navigate to URL with retry logic for transient errors."""
    for attempt in range(1, MAX_RETRIES + 1):
        response = await page.goto(url, wait_until="domcontentloaded")

        if response is None:
            raise RuntimeError(f"{description}: Navigation completed without an HTTP response.")

        if response.status in {502, 503, 504}:
            if attempt < MAX_RETRIES:
                print(f"{description}: Attempt {attempt}/{MAX_RETRIES}: HTTP {response.status}, retrying in {RETRY_DELAY_SECONDS}s...")
                await asyncio.sleep(RETRY_DELAY_SECONDS)
                continue
            else:
                raise AssertionError(
                    f"{description}: Service unavailable after {MAX_RETRIES} attempts. "
                    f"Last error: HTTP {response.status}. The app may not be running."
                )
        else:
            return response

    return response


async def run_playwright_test():
    async with async_playwright() as p:
        browser = None
        page = None
        try:
            # --no-sandbox is required when running Chromium in a container as root
            browser = await p.chromium.launch(args=["--no-sandbox"])
        except PlaywrightError as exc:
            raise RuntimeError(f"Chromium launch failed: {exc}") from exc
        
        try:
            context = await browser.new_context()
            page = await context.new_page()

            # Intercept all requests and add Authorization header dynamically
            # This ensures the header is sent even on cross-origin redirects
            async def add_auth_header(route):
                headers = {**route.request.headers, **browser_headers}
                await route.continue_(headers=headers)

            await page.route("**/*", add_auth_header)

            # ========== Test 1: Landing Page ==========
            print("Testing landing page...")
            response = await navigate_with_retry(page, base_url, "Landing page")

            if response.status in {401, 403}:
                raise AssertionError(
                    f"Landing page: Authentication failed with HTTP {response.status}. "
                    "Check scope and client secrets."
                )
            if response.status != 200:
                raise AssertionError(
                    f"Landing page: Expected HTTP 200 from {base_url}, got {response.status} {response.status_text}"
                )

            h1_element = page.locator("h1")
            h1_text = await h1_element.text_content()
            if h1_text is None:
                raise AssertionError("Landing page: Expected an <h1> element but none was found.")

            h1_text = h1_text.strip()
            assert h1_text == expected_header_text, (
                f"Landing page: Expected h1={expected_header_text!r}, got {h1_text!r}"
            )
            print(f"  OK: Found h1 = {h1_text!r}")
            
            # Capture landing page PDF
            pdf_file = await capture_pdf(page, "landing-page.pdf")
            test_results["landing_page"]["status"] = "pass"
            test_results["landing_page"]["pdf"] = pdf_file

            # ========== Test 2: API Endpoint ==========
            print(f"Testing API endpoint {api_path}...")
            response = await navigate_with_retry(page, api_url, "API endpoint")

            if response.status in {401, 403}:
                raise AssertionError(
                    f"API endpoint: Authentication failed with HTTP {response.status}. "
                    "Check scope and client secrets."
                )
            if response.status != 200:
                raise AssertionError(
                    f"API endpoint: Expected HTTP 200 from {api_url}, got {response.status} {response.status_text}"
                )

            content_type = response.headers.get("content-type", "")
            if "application/json" not in content_type.lower():
                raise AssertionError(
                    f"API endpoint: Expected JSON response content-type, got {content_type!r}"
                )

            payload = await response.json()
            assert payload.get("status") == "ok", (
                f"API endpoint: Expected payload status='ok', got {payload.get('status')!r}"
            )
            assert payload.get("message") == expected_api_message, (
                f"API endpoint: Expected message={expected_api_message!r}, got {payload.get('message')!r}"
            )
            assert payload.get("path") == api_path, (
                f"API endpoint: Expected path={api_path!r}, got {payload.get('path')!r}"
            )
            print(f"  OK: API returned status='ok', message={payload.get('message')!r}")
            
            # Capture API endpoint PDF
            pdf_file = await capture_pdf(page, "api-endpoint.pdf")
            test_results["api_endpoint"]["status"] = "pass"
            test_results["api_endpoint"]["pdf"] = pdf_file

        except Exception as e:
            # Mark failed test and attempt failure PDF capture
            for test_name, result in test_results.items():
                if result["status"] == "pending":
                    result["status"] = "fail"
            
            # Try to capture failure screenshot if page is available
            if page:
                try:
                    await capture_pdf(page, "failure.pdf")
                except Exception:
                    pass  # Best effort
            
            raise
        finally:
            # Write summary before closing browser
            write_summary()
            if browser:
                await browser.close()

await run_playwright_test()
print("\nAll tests passed!")