# Demonstration of Dask requesting GPU resources

GPU resources are currently available for STACK services at Central site and EUMETSAT bridge.
The configuration of GPU resources is different between Central and EUMETSAT.

STACK services at **Central** currently can make use of the following configuration:
- GPU A1000 cards (no MIG support)
- One GPU node with a 1/4 of an A1000 GPU
- Second GPU node with time-slicing of a GPU allowing to get *2* slices


STACK services at **EUMETSAT** bridge the following configuration is available:
- One GPU (H100) node with MIG (Multi-instance GPU).
- This GPU offeres 7 GPU slices, accordingly we can get 7 worker (pods) using this MIG profile.

In [1]:
from dask_gateway.auth import GatewayAuth
from getpass import getpass
import requests
import jwt
from jwt import PyJWKClient
from datetime import datetime

class DEDLAuth(GatewayAuth):
    
    def __init__(self, username):
        self.username = username
        self.client_id = "dedl-stack-public-client"
        self.token_url = "https://identity.data.destination-earth.eu/auth/realms/dedl/protocol/openid-connect/token"
        self.cert_url = "https://identity.data.destination-earth.eu/auth/realms/dedl/protocol/openid-connect/certs"
        self.token = self.get_token()
        self.access_token_decoded = self.decode_access_token()
    
    def get_token(self):
        payload = {
            "grant_type": "password",
            "client_id": self.client_id,
            "username": self.username,
            "password": getpass(prompt="Your DEDL Password:")
        }
        return requests.post(self.token_url, data=payload).json()
    
    def decode_access_token(self):
        jwks_client = PyJWKClient(self.cert_url)
        signing_key = jwks_client.get_signing_key_from_jwt(self.token["access_token"])
        return jwt.decode(self.token["access_token"], signing_key.key, audience="account", algorithms=["RS256"])
    
    def token_expired(self):
        if datetime.now() > datetime.fromtimestamp(self.access_token_decoded["exp"]):
            return True
        else:
            return False
    
    def refresh_token_exchange(self):
        payload = {
            "grant_type": "refresh_token",
            "client_id": "dedl-stack-public-client",
            "refresh_token": self.token["refresh_token"],
        }
        return requests.post(self.token_url, data=payload).json()
    
    def refresh(self):
        self.token = self.refresh_token_exchange()
        self.access_token_decoded = self.decode_access_token()
        
        
    def pre_request(self, resp):
        if self.token_expired():
            self.refresh()
        headers = {"Authorization": "Bearer " + self.token["access_token"]}
        return headers, None

In [2]:
from dask_gateway import Gateway
from distributed import Client

In [3]:
username=input()
authenticator = DEDLAuth(username=username)

 christoph.reimer@eodc.eu
Your DEDL Password: ········


## Dask Gateway - Staging Central

In [4]:
dgw_central = Gateway(address="http://217.71.193.55.nip.io",
              proxy_address="tcp://217.71.193.55:80",
              auth=authenticator)

In [5]:
options=dgw_central.cluster_options()
options

VBox(children=(HTML(value='<h2>Cluster Options</h2>'), GridBox(children=(HTML(value="<p style='font-weight: bo…

Options<worker_cores=1,
        worker_memory=2.0,
        image='ghcr.io/eodcgmbh/container-images/dedl-dask:2023.08.3',
        gpu_enabled=False>


In [6]:
cluster_central = dgw_central.new_cluster(options)

In [7]:
cluster_central

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [9]:
cluster_central.scale(1)

In [8]:
client_central = cluster_central.get_client()


+---------+--------+-----------+---------+
| Package | Client | Scheduler | Workers |
+---------+--------+-----------+---------+
| numpy   | 1.26.4 | 1.25.2    | 1.25.2  |
| pandas  | 2.2.3  | 2.0.3     | 2.0.3   |
+---------+--------+-----------+---------+


In [9]:
import subprocess

# Define a function that runs a shell command
def run_command(cmd):
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    return {
        "returncode": result.returncode,
        "stdout": result.stdout,
        "stderr": result.stderr,
    }

# Run nvidia-smi command on all workers
results = client_central.run(run_command, "nvidia-smi -L")

for worker, output in results.items():
    print(f"\nWorker: {worker}")
    print(f"Return code: {output['returncode']}")
    print(f"STDOUT:\n{output['stdout']}")
    print(f"STDERR:\n{output['stderr']}")


Worker: tls://10.100.11.13:37033
Return code: 0
STDOUT:
GPU 0: NVIDIA RTXA6000-12Q (UUID: GPU-9b477c69-77c0-11f0-bf5e-abaf983175e8)

STDERR:



In [10]:
cluster_central.shutdown()

## Dask Gateway - Staging EUMETSAT

In [4]:
dgw_eum = Gateway(address="http://194.153.73.129.nip.io",
              proxy_address="tcp://194.153.73.129:80",
              auth=authenticator)

RAPIDS image to test rapids MIG support ghcr.io/eodcgmbh/container-images/eodc-rapids-dask:2025.9.1

In [5]:
cluster_options = dgw_eum.cluster_options()
cluster_options

VBox(children=(HTML(value='<h2>Cluster Options</h2>'), GridBox(children=(HTML(value="<p style='font-weight: bo…

Options<worker_cores=1,
        worker_memory=2.0,
        image='ghcr.io/eodcgmbh/container-images/dedl-dask:2023.08.3',
        gpu_enabled=False>


In [20]:
cluster_eum = dgw_eum.new_cluster(cluster_options)

In [21]:
cluster_eum

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [14]:
cluster_eum.scale(1)

In [22]:
client_eum = cluster_eum.get_client()


+---------+--------+-----------+---------+
| Package | Client | Scheduler | Workers |
+---------+--------+-----------+---------+
| numpy   | 1.26.4 | 1.25.2    | 1.25.2  |
| pandas  | 2.2.3  | 2.0.3     | 2.0.3   |
+---------+--------+-----------+---------+


Run nvidia-smi command on all workers to see GPU availability on each worker node.

In [23]:
import subprocess

# Define a function that runs a shell command
def run_command(cmd):
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    return {
        "returncode": result.returncode,
        "stdout": result.stdout,
        "stderr": result.stderr,
    }

# Run nvidia-smi command on all workers
results = client_eum.run(run_command, "nvidia-smi -L | grep -oP 'MIG-[0-9a-f-]+'")
#results = client_eum.run(run_command, "nvidia-smi --query-gpu=gpu_uuid --format=csv")
#results = client_eum.run(run_command, "nvidia-smi --query=mig_uuid --format=csv")

for worker, output in results.items():
    print(f"\nWorker: {worker}")
    print(f"Return code: {output['returncode']}")
    print(f"STDOUT:\n{output['stdout']}")
    print(f"STDERR:\n{output['stderr']}")


Worker: tls://10.100.18.135:34555
Return code: 0
STDOUT:
MIG-a5064957-71b0-52d6-abff-1f93683d2997

STDERR:


Worker: tls://10.100.18.69:46527
Return code: 0
STDOUT:
MIG-7e533c01-a08b-58d5-b593-f963114abe91

STDERR:



In [24]:
cluster_eum.shutdown()