# Demo

In [2]:
# This are the required modules

import requests
import time
import datetime
import json
import uuid
import base64
import os

Make sure we read in the correct environment values...

In [3]:
env_vars = !cat .env
for var in env_vars:
    key, value = var.split('=')
    os.environ[key] = value

**Zitadel** is the class that connects to my zitadel instance

In [4]:
# https://zitadel.com/docs/apis/resources/mgmt

class Zitadel():

    def __init__(self, debug=True):
        self.ZITADEL_URL = os.environ.get('ZITADEL_URL', None)
        self.ZITADEL_ID = os.environ.get('ZITADEL_ID', None)
        self.ZITADEL_TOKEN = os.environ.get('ZITADEL_TOKEN', None)
        self.DEBUG=debug

**API** is a helper class for API request to Zitadel

In [5]:
class API(Zitadel):
        
    @classmethod
    def basic_auth(cls, username, password):
        token = base64.b64encode(f"{username}:{password}".encode('utf-8')).decode("ascii")
        return f'Basic {token}'
    
    def process(self, request, method="GET", headers={}, payload=None):
        headers = {
            "Authorization": f"Bearer {self.ZITADEL_TOKEN}",
            "Content-Type": "application/json"
        } | headers
    
        if payload and (headers["Content-Type"] == "application/json"):
            payload = json.dumps(payload)
    
        if (not request.startswith("http")):
            request = f"{self.ZITADEL_URL}{request}"

        if self.DEBUG:
            print('='*80)
            print(f"{method} : {request}")
            
        response = requests.request(method, f"{request}", headers=headers, data=payload)
        result = response.json()
        
        if self.DEBUG:
            print('='*80)
            print(json.dumps(result, indent=2))
            print('='*80)

        return(result)

    def get(self, request, headers={}):
        return self.process(request, headers=headers)
        
    def put(self, request, headers={}, payload=None):
        return self.process(request, method='PUT', headers=headers, payload=payload)

    def post(self, request, headers={}, payload=None):
        return self.process(request, method='POST', headers=headers, payload=payload)

    def delete(self, request, headers={}, payload=None):
        return self.process(request, method='DELETE', headers=headers, payload=payload)


**OIDC** is a helper class to make the relevant OIDC requests

In [6]:
class OIDC(API):

    def __init__(self, debug=True):
        super().__init__(debug)
        self.config = self.get("/.well-known/openid-configuration")

    def userinfo(self, access_token):
        
        return self.post(
            self.config["userinfo_endpoint"],
            headers={
                "Authorization": f"Bearer {access_token}"
            }
        )

    def device_code(self, client_id, client_secret, scope):
        
        return self.post(
            self.config["device_authorization_endpoint"],
            headers={
                "Content-Type": "application/x-www-form-urlencoded"
            },
            payload=\
                f"client_id={client_id}" \
                f"&client_secret={client_secret}" \
                f"&scope={requests.utils.quote(scope)}"
        )

    def token(self, client_id, client_secret, grant_type, **kwargs):
        parms = ""
        for k,v in kwargs.items():
            parms += f"&{k}={v}"
            
        return self.post(
            self.config["token_endpoint"],
            headers={
                "Content-Type": "application/x-www-form-urlencoded"
            },
            payload=\
                f"client_id={client_id}" \
                f"&client_secret={client_secret}" \
                f"&grant_type={grant_type}" \
                f"{parms}"
            )

    def refresh_token(self, client_id, client_secret, token):
        
        return self.post(
            self.config["token_endpoint"],
            headers={
                "Content-Type": "application/x-www-form-urlencoded"
            },
            payload=\
                f"client_id={client_id}" \
                f"&client_secret={client_secret}" \
                f"&grant_type=refresh_token" \
                f"&refresh_token={token}"
        )


**RSC** represents [ResearchCloud](https://portal.live.surfresearchcloud.nl/)", this is the SURF Virtual Machine orchestrator

In [10]:
class RSC(OIDC):

    def __init__(self, debug=False):
        super().__init__(debug)
        self.VMS = {}
        
    def __enter__(self):       
        return self
        
    def __exit__(self, exc_type, exc_value, traceback):
        for name, vm in self.VMS.items():
            print(f"[RSC] Shutdown: {name}")
            
            bonding = vm['bonding']
            
            if bonding:
                self.delete(f"/management/v1/projects/{bonding['op']['id']}/apps/{bonding['rp']['appId']}")
                self.delete(f"/management/v1/projects/{bonding['op']['id']}")
                
        self.VMS = {}          

    def register(self, vm):
        self.VMS[vm.name] = { 'vm': vm, 'bonding': None }

    def Register_Workers(self, parent, workers):
        print("Setup bonding...")
        
        print(f"- Register, parent: {parent.name}")
        
        if parent.name not in self.VMS:
            raise Exception(f"Parent '{parent.name}' not an existing VM")

        for worker in workers:
            print(f"- Register, worker: {worker.name}")
            if worker.name not in self.VMS:
                raise Exception(f"Worker '{worker.name}' not an existing VM")

        print(f"- Creating OP...")
        OP = self.post(
            "/management/v1/projects",
            payload={ 
                "name": str(uuid.uuid4())
            }
        )

        print(f"- Creating RP...")
        RP = self.post(
            f"/management/v1/projects/{OP['id']}/apps/oidc",
            payload={
              "name": str(uuid.uuid4()),
              "responseTypes": [
                "OIDC_RESPONSE_TYPE_CODE"
              ],
              "grantTypes": [
                "OIDC_GRANT_TYPE_DEVICE_CODE",
                "OIDC_GRANT_TYPE_REFRESH_TOKEN"
              ],
              "appType": "OIDC_APP_TYPE_WEB",
              "authMethodType": "OIDC_AUTH_METHOD_TYPE_BASIC",
              "version": "OIDC_VERSION_1_0",
              "devMode": True,
              "accessTokenType": "OIDC_TOKEN_TYPE_BEARER",
              "accessTokenRoleAssertion": True,
              "idTokenRoleAssertion": True,
              "idTokenUserinfoAssertion": True,
              "clockSkew": "1s",
              "skipNativeAppSuccessPage": True
            }
        )
        
        print(f"- Creating Bonding...")

        self.VMS[parent.name]['bonding'] = {
            'op': OP,
            'rp': RP,
            'rs': workers,
            'users': {}
        }

    def AccessToken(self, worker, parent, user=None):
        if parent.name not in self.VMS:
            raise Exception(f"Parent VM '{parent.name}' not an existing VM")

        if worker.name not in self.VMS:
            raise Exception(f"Worker VM '{worker.name}' not an existing VM")

        bonding = self.VMS[parent.name]['bonding']
        if not bonding:
            raise Exception(f"No bonding found for parent: {parent.name}")
            
        if worker not in bonding['rs']:
            raise Exception(f"worker: {worker.name} not a registered worker for: {parent.name}")
            
        if not user:
            return self.authenticate(bonding)
        else:
            print(f"[{parent.name}] Refreshing token...")

            bonding['users'][user] = self.refresh_token(
                bonding['rp']['clientId'],
                bonding['rp']['clientSecret'],
                bonding['users'][user]['refresh_token']
            )
            
            return bonding['users'][user]['access_token']

    def delegate(self, parent, worker):
        print(f"Delegating from {parent.name} to {worker.name}")
        
        if parent.name not in self.VMS:
            raise Exception(f"Parent '{parent.name}' not an existing VM")
        if worker.name not in self.VMS:
            raise Exception(f"Child '{worker.name}' not an existing VM")

        print(f"[{parent.name}] Delegating to: {worker.name}")
        
        self.VMS[worker.name]['vm'].job(parent)

    def authenticate(self, bonding):

        print(f"Authenticating client: {bonding['rp']['clientId']}")
              
        device_code = self.device_code(
            bonding['rp']['clientId'],
            bonding['rp']['clientSecret'],
            'openid profile offline_access'
        )

        print(f"Authenticate here: {device_code['verification_uri_complete']}")

        start = datetime.datetime.now()
        device_code['expires_in'] = 60

        print("start waiting...")
        
        while datetime.datetime.now() < (start + datetime.timedelta(seconds=device_code['expires_in'])):
            time.sleep(device_code['interval'])

            token = self.token(
               bonding['rp']['clientId'],
               bonding['rp']['clientSecret'],
               "urn:ietf:params:oauth:grant-type:device_code",
               device_code=device_code['device_code']
            )

            if 'access_token' in token:
                userinfo = self.userinfo(token['access_token'])

                user = userinfo['sub']
                
                bonding['users'][user] = token

                return token['access_token']

        raise Exception("Time out during Authentication...")


**VM** represents the VM that is created via ResearchCloud (RSC)

In [11]:
class VM():

    def __init__(self, rsc, name):
        self.rsc = rsc
        self.name = name

        self.rsc.register(self)
        
        print(f"VM {name} is deployed")

    def delegate(self, worker):
        self.rsc.delegate(self, worker)
        
    def job(self, parent):
        
        print(f"[{self.name}] Starting job, called from: {parent.name}")
        
        print(f"[{self.name}] I need an Access Token from: {parent.name}, asking him...")
        
        access_token = self.rsc.AccessToken(self, parent)

        for _ in range(2):

            userinfo = self.rsc.userinfo(access_token)

            print(f"[{self.name}] Working on behalf of: {userinfo['preferred_username']}")
            print(f"[{self.name}] To guarantee valid access token, i ask {parent.name} to refresh it...")
            
            access_token = self.rsc.AccessToken(self, parent, userinfo['sub'])



This is the start for running the demo.

**Tip:** Run it once with **debug=False**, when that runs OK, then run it again with **debug=True** to see what actually goes on...

In [12]:
# Legenda:
# RSC = ResearchCloud
# ABC = Is a VM that iniates a (background) job on behalf of an authenticated user on worker node DEF
# DEF = The Worker Node

with RSC(debug=True) as rsc:
    try:
        ABC = VM(rsc, 'ABC')
        DEF = VM(rsc, 'DEF')

        rsc.Register_Workers(ABC, [DEF])

        ABC.delegate(DEF)
        
    except Exception as e:
        print(f"Exception: {str(e)}")

GET : https://kodden-instance-mnfyca.zitadel.cloud/.well-known/openid-configuration
{
  "issuer": "https://kodden-instance-mnfyca.zitadel.cloud",
  "authorization_endpoint": "https://kodden-instance-mnfyca.zitadel.cloud/oauth/v2/authorize",
  "token_endpoint": "https://kodden-instance-mnfyca.zitadel.cloud/oauth/v2/token",
  "introspection_endpoint": "https://kodden-instance-mnfyca.zitadel.cloud/oauth/v2/introspect",
  "userinfo_endpoint": "https://kodden-instance-mnfyca.zitadel.cloud/oidc/v1/userinfo",
  "revocation_endpoint": "https://kodden-instance-mnfyca.zitadel.cloud/oauth/v2/revoke",
  "end_session_endpoint": "https://kodden-instance-mnfyca.zitadel.cloud/oidc/v1/end_session",
  "device_authorization_endpoint": "https://kodden-instance-mnfyca.zitadel.cloud/oauth/v2/device_authorization",
  "jwks_uri": "https://kodden-instance-mnfyca.zitadel.cloud/oauth/v2/keys",
  "scopes_supported": [
    "openid",
    "profile",
    "email",
    "phone",
    "address",
    "offline_access"
  ],
