# User Acceptance Test (UAT) for Pod Security Standards
This notebook verififes that the Kubernetes **Pod Security Standards (PSS)** are being correctly enforced on the current namespace. The test attempts to deploy a Pod (`privileged_pod.yaml`) that **violates** the `Restricted` and `Baseline` PSS profiles (e.g., by requesting privileged mode).

The expected outcome is determined dynamically by checking the `POD_SECURITY_STANDARDS_POLICY` environment variable:
- If the policy is **`privileged`**, the test expects the Pod creation to **succeed**.
- If the policy is **`baseline`**, the test expects the Pod creation to **fail** with a **`403 Forbidden`** error. 

In [None]:
!pip install -r requirements.txt

In [None]:
import os

# Default value if the environment variable is not set
# This corresponds to the default value of the "security-policy" option in `kubeflow-profiles-operator`
# See: https://github.com/canonical/kubeflow-profiles-operator/blob/f820fb36eec8f75e32137e4d5d3da7df58107223/config.yaml#L15
POD_SECURITY_STANDARDS_POLICY = "privileged"

if os.environ.get("POD_SECURITY_STANDARDS_POLICY"):
    POD_SECURITY_STANDARDS_POLICY = os.environ["POD_SECURITY_STANDARDS_POLICY"]

In [None]:
import os

from lightkube import Client
from lightkube.resources.core_v1 import Namespace, Pod
from lightkube.core.exceptions import ApiError
import yaml

YAML_FILE = "privileged_pod.yaml"
NAMESPACE_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"

try:
    with open(NAMESPACE_PATH, "r") as f:
        NAMESPACE = f.read().strip()
    print(f"\nTesting PSS enforcement in namespace: **{NAMESPACE}**")
except FileNotFoundError:
    raise AssertionError(f"Namespace file not found.")


def test_pss_restricted_violation(client: Client, expect_success: bool):
    """
    Attempt to create a Pod and assert the outcome based on the expect_success flag.
    If expect_success is False, expect a 403 Forbidden error.
    """

    try:
        with open(YAML_FILE, "r") as f:
            pod_manifest_dict = yaml.safe_load(f)
        pod_model = Pod.from_dict(pod_manifest_dict)
        pod_name = pod_model.metadata.name  # Store the name for deletion
        print(f"Successfully loaded manifest from {YAML_FILE}")
    except FileNotFoundError:
        raise AssertionError(f"Error: File '{YAML_FILE}' not found.")
    except Exception as e:
        raise AssertionError(f"Error loading or converting YAML file: {e}")

    print(f"Attempting to create Pod '{pod_model.metadata.name}'...")

    try:
        client.create(pod_model, namespace=NAMESPACE)

        if expect_success:
            print("\n✅ **ASSERTION PASSED**")
            print("Pod created successfully as expected.")
        else:
            raise AssertionError(
                f"\n❌ ASSERTION FAILED: Pod creation succeeded unexpectedly! PSS is NOT enforcing the policy."
            )

    except ApiError as e:
        # Execution reaches here ONLY if API call failed
        if expect_success:
            raise AssertionError(
                f"\n❌ ASSERTION FAILED: Expected Pod creation success, but failed with status **{e.status}**."
            )

        elif e.status.code == 403:
            error_body = e.response.json()
            message = error_body.get("message", "")
            details = (
                message.split("forbidden: ")[-1].split("code=403")[0].strip().replace("\\n", " ")
            )

            print("\n✅ **ASSERTION PASSED**")
            print(f"Pod creation failed with the expected **403 Forbidden** error.")
            print("\n--- PSA Rejection Details ---")
            print(details)

        else:
            # Got an error, but it wasn't the expected 403
            raise AssertionError(
                f"\n❌ ASSERTION FAILED: Expected 403 Forbidden, but received unexpected status **{e.status}**."
            )

    except Exception as e:
        raise AssertionError(
            f"\n❌ ASSERTION FAILED: An unexpected non-API error occurred: {type(e).__name__}: {e}"
        )

    finally:
        if client and expect_success and pod_name:
            print(f"\n--- CLEANUP: Deleting Pod '{pod_name}' ---")
            try:
                client.delete(Pod, name=pod_name, namespace=NAMESPACE)
                print(f"✅ Pod '{pod_name}' deleted successfully.")
            except ApiError as e:
                if e.status.code != 404:
                    print(f"⚠️ Warning: Failed to delete Pod {pod_name}. Status: {e.status}")
            except Exception as e:
                print(f"⚠️ Warning: Failed to delete Pod {pod_name}. Error: {e}")
        elif not expect_success:
            print(f"\n--- CLEANUP: No Pod was created (expected failure) ---")


try:
    client = Client()
except Exception as e:
    raise AssertionError(f"Could not create lightkube client: {e}")

    print("\n--- Running Test: Expecting PSS Compliance (Success) ---")
try:
    if POD_SECURITY_STANDARDS_POLICY == "baseline":
        print(f"Policy is **baseline**. The privileged action is expected to FAIL.")
        expect_success = False
    elif POD_SECURITY_STANDARDS_POLICY == "privileged":
        print(f"Policy is **privileged**. The privileged action is expected to SUCCEED.")
        expect_success = True
    else:
        raise AssertionError("Value for security policy is missing or unknown.")
    test_pss_restricted_violation(client, expect_success=expect_success)
except AssertionError as e:
    print(f"\n--- TEST FAILED ---")
    print(e)
    raise e
except Exception as e:
    print(f"\n--- FATAL ERROR ---")
    print(f"An unhandled critical error occurred: {e}")
    raise e