# EOEPCA IAM Validation and Usage Notebook

## Setup

In [None]:
import os
import requests
import json
from pathlib import Path

import sys
sys.path.append('../')
from modules.helpers import get_access_token, load_eoepca_state, test_cell, test_results

Load `eoepca state` environment

In [None]:
load_eoepca_state()

In [None]:
platform_domain = os.environ.get("INGRESS_HOST")
iam_domain = f"{os.environ.get('HTTP_SCHEME')}://{os.environ.get('KEYCLOAK_HOST')}"
iam_realm = os.environ.get("REALM")

temporary_user = os.environ.get("TEMPORARY_USER", "notebook_user")
temporary_user_password = os.environ.get("TEMPORARY_USER_PASSWORD", "notebook_user_password")

print(f"IAM (Keycloak) URL: {iam_domain}")

## Validate Keycloak API Endpoints

In [None]:
endpoints = [
    ("Keycloak Home", iam_domain),
    ("OIDC Discovery", f"{iam_domain}/realms/{iam_realm}/.well-known/openid-configuration"),
    ("OAuth2 Authorization", f"{iam_domain}/realms/{iam_realm}/protocol/openid-connect/auth"),
    ("OAuth2 Token", f"{iam_domain}/realms/{iam_realm}/protocol/openid-connect/token"),
    ("Admin Console", f"{iam_domain}/admin/")
]

for name, url in endpoints:
    response = requests.get(url)
    print(f"{name} ({url}): {response.status_code}")

## Obtain Admin Access Token

In [None]:
KEYCLOAK_ADMIN_USER = os.environ.get("KEYCLOAK_ADMIN_USER")
KEYCLOAK_ADMIN_PASSWORD = os.environ.get("KEYCLOAK_ADMIN_PASSWORD")

token_url = f"{iam_domain}/realms/master/protocol/openid-connect/token"
payload = {
    "username": KEYCLOAK_ADMIN_USER,
    "password": KEYCLOAK_ADMIN_PASSWORD,
    "grant_type": "password",
    "client_id": "admin-cli"
}

response = requests.post(token_url, data=payload)
admin_token = response.json().get("access_token")
print(f"Admin Token Retrieved: {admin_token[:50]}...")

## Create and Validate a Test User

In [None]:
create_user_url = f"{iam_domain}/admin/realms/{iam_realm}/users"
headers = {"Authorization": f"Bearer {admin_token}", "Content-Type": "application/json"}
user_payload = {
    "username": temporary_user,
    "enabled": True,
    "credentials": [{"type": "password", "value": temporary_user_password, "temporary": False}],
    "emailVerified": True,
    "firstName": "Temporary",
    "lastName": "User",
    "email": "temporary@eoepca.org"
}

response = requests.post(create_user_url, json=user_payload, headers=headers)
print(f"Create User Status: {response.status_code}")
print(response.text)

## Authenticate as Test User

In [None]:
test_user_payload = {
    "username": temporary_user,
    "password": temporary_user_password,
    "grant_type": "password",
    "client_id": "admin-cli"
}
response = requests.post(token_url.replace("master", "eoepca"), data=test_user_payload)
test_user_token = response.json().get("access_token")
print(f"Test User Token Retrieved: {test_user_token[:50]}...")

## Open Policy Agent

Check correct authorisation policy execution via the OPA endpoint.

In [None]:
OPA_URL = f"{os.environ.get('HTTP_SCHEME')}://opa.{os.environ.get('INGRESS_HOST')}"

### Obtain session as user `eoepcauser`

In [None]:
_opa_session = requests.Session()
KEYCLOAK_TEST_USER = os.environ.get("KEYCLOAK_TEST_USER")
KEYCLOAK_TEST_PASSWORD = os.environ.get("KEYCLOAK_TEST_PASSWORD")
OPA_CLIENT_ID = os.environ.get("OPA_CLIENT_ID")
OPA_CLIENT_SECRET = os.environ.get("OPA_CLIENT_SECRET")
token_url = f"{iam_domain}/realms/{iam_realm}/protocol/openid-connect/token"

def opa_session():
    payload = {"username": KEYCLOAK_TEST_USER, "password": KEYCLOAK_TEST_PASSWORD, "grant_type": "password", "client_id": OPA_CLIENT_ID, "client_secret": OPA_CLIENT_SECRET}
    response = requests.post(token_url, data=payload)
    access_token = response.json().get("access_token")
    _opa_session.headers.update({"Authorization": f"Bearer {access_token}"})
    print(f"Access Token Retrieved: {access_token[:50]}...")
    return _opa_session

### Simple `allow all` test query

In [None]:
response = opa_session().get(f"{OPA_URL}/v1/data/example/allow_all")
response.text

### User `bob` is a privileged user

In [None]:
payload = {"input": {"identity": {"attributes": { "preferred_username": ["bob"]}}}}
response = opa_session().post(f"{OPA_URL}/v1/data/example/privileged_user", json=payload)
response.text

### User `larry` is NOT a privileged user

In [None]:
payload = {"input": {"identity": {"attributes": { "preferred_username": ["larry"]}}}}
response = opa_session().post(f"{OPA_URL}/v1/data/example/privileged_user", json=payload)
response.text

### User `larry` has a verified email

In [None]:
payload = {"input": {"identity": {"attributes": { "preferred_username": ["larry"], "email_verified": ["true"]}}}}
response = opa_session().post(f"{OPA_URL}/v1/data/example/email_verified", json=payload)
response.text