In [None]:
import requests
import jwt
from pprint import pprint

# Client-Credentials for Machine to Machine Authentication

## A: Client-Side

In [None]:
CATALOG_CLIENT_ID = "iceberg-catalog"
CLIENT_ID = "iceberg-machine-client"
CLIENT_SECRET = "M2HyiuJRJupuOEaCVni2wLDXAf2GZeCi"
TOKEN_URL = "http://keycloak:8080/realms/iceberg/protocol/openid-connect/token"

oauth2_properties = {
    "client-id": CLIENT_ID,
    "client-secret": CLIENT_SECRET,
    "token-endpoint": TOKEN_URL,
    "scope": CATALOG_CLIENT_ID,
    "grant-type": "client-credentials",
}

# ---------------- Handled in Iceberg Package ----------------
# "catalog" should be the default scope to preserve current behaviour.
# To enable separation of AuthN and AuthZ, clients should offer a way to not request any scope.
scope = oauth2_properties.get("scope", "catalog")

required_params = {
    "grant_type": "client_credentials",
    "client_id": oauth2_properties["client-id"],
    "client_secret": oauth2_properties["client-secret"],
}

optional_params = {}
if scope is not None:
    optional_params["scope"] = scope

response = requests.post(
    url=oauth2_properties["token-endpoint"],
    data={**required_params, **optional_params},
    headers={"Content-type": "application/x-www-form-urlencoded"},
)
response.raise_for_status()

# The returned token has an "expires_in" field which should be respected.
# Clients should fetch a new token if a previously assigned token is expired.
print("**Response from Token Endpoint**")
pprint(response.json())
access_token = response.json()["access_token"]
print("\n\n**Token Introspection**")
pprint(jwt.decode(access_token, options={"verify_signature": False}))

## B: Server Side

In [None]:
# ------------------- Server side -------------------
# Information only needed by the server:
TOKEN_INTROSPECTION_URI = (
    "http://keycloak:8080/realms/iceberg/protocol/openid-connect/token/introspect"
)
CATALOG_AUTH_CHECK_CLIENT_ID = "iceberg-catalog-authenticator"
CATALOG_AUTH_CHECK_CLIENT_SECRET = "UDeCLaqjSisBBcL8h4JctCXcXdP9f0Jo"

# 1. Validate token - there are different ways to do this including token introspection and local JWT introspection
# We are using token introspection endpoint as an example here, because it allows the use of opaque tokens as well.
# If only jwt tokens are used, local jwks validation is typically more performant but doens't offer point-in-time logout.
response = requests.post(
    url=TOKEN_INTROSPECTION_URI,
    data={"token": access_token},
    headers={"Content-type": "application/x-www-form-urlencoded"},
    auth=(CATALOG_AUTH_CHECK_CLIENT_ID, CATALOG_AUTH_CHECK_CLIENT_SECRET),
)
response.raise_for_status()

# 2. Check Audience / Scope / Resouce
# Depending on your specific setup, check if the token is intended for this service by
# checking the scope, audience (aud) or resource field.
if CATALOG_CLIENT_ID in response.json()["aud"]:
    print(f"You may proceed {response.json()['client_id']} :)")
    print(f"Subject: {response.json()['sub']}")
else:
    print("Unauthenticated!")