<div style="text-align: right;">
  <img src="https://raw.githubusercontent.com/exasol/ai-lab/refs/heads/main/assets/Exasol_Logo_2025_Dark.svg" style="width:200px; margin: 10px;" />
</div>

# GPU Resource Considerations

In this tutorial we will learn about how to manage access to GPU's from UDF's.

## Open Secure Configuration Storage


In [None]:
import os
%run ../utils/access_store_ui.ipynb
display(get_access_store_ui('../'))

### Prerequisites
- Run the [S3 VS tutorial](../cloud/02_s3_vs_reuters.ipynb) in order to populate the input data
- Run the [Advanced UDF with GPU Support](./advanced_udf_with_gpu.ipynb) in order to populate the ML models

### Instantiate ScriptLanguagesContainer

The following cell creates an instance of class `ScriptLanguageContainer` from the notebook-connector,
which enables us to use the Script Language Container (SLC).

In [None]:
from exasol.nb_connector.slc import ScriptLanguageContainer
slc = ScriptLanguageContainer(secrets=ai_lab_config, name="gpu_slc")

### Import Python modules

In [None]:
from exasol.nb_connector.connections import open_pyexasol_connection
from exasol.nb_connector.language_container_activation import open_pyexasol_connection_with_lang_definitions
import textwrap
import pandas as pd

### Check for Database compatibility

This notebooks works only correct on an Exasol Database version 2025.2.0 or later.

In [None]:
from packaging.version import Version
with open_pyexasol_connection(ai_lab_config, compression=True, schema=ai_lab_config.db_schema) as conn:
    res = conn.execute("SELECT PARAM_VALUE FROM EXA_METADATA WHERE PARAM_NAME='databaseProductVersion';").fetchall();
dbVersion = Version(res[0][0])
if dbVersion < Version("2025.2.0"):
    popup_message(f"This tutorial will not work correctly with the used database: The used queries here will not show the correct result.")

### How GPU resources are managed

In each select/sub-select statement, only a single UDF call is allowed to exclusively use all the available GPUs. The resource reservation by the GPU resource management happens per UDF call and individually for each UDF call.

When multiple queries that each contain a single GPU-accelerated UDF are executed concurrently, the executions (UDF calls) are serialized. Each UDF call will wait to execute until an exclusive resource usage is made possible by accelerator resources being freed up by other UDF calls.

This means that simultaneous execution of multiple GPU-accelerated UDF calls as part of a single select/sub-select statement is not possible. All UDF calls except one will in this case either fail or fall back to CPU usage, depending on the configuration.

To simulate two simultaneous SQL queries, we create first a long running UDF:

In [None]:
sql = f"""
CREATE OR REPLACE {slc.language_alias} SCALAR SCRIPT
long_running_gpu(id INTEGER)
EMITS (ID INT, START_TIME VARCHAR(1000), END_TIME VARCHAR(1000), NVIDIA_VISIBLE_DEVICES VARCHAR(10000)) AS
 %perInstanceRequiredAcceleratorDevices GpuNvidia|None;

import time

def timestamp():
    now = datetime.datetime.now().time()
    return str(now)
    
def run(ctx):
    start_timestamp = timestamp() 
    time.sleep(5)
    nvidia_vis_devices = os.getenv('NVIDIA_VISIBLE_DEVICES', "<invalid>")
    end_timestamp = timestamp()
    ctx.emit(ctx.id, start_timestamp, end_timestamp,  nvidia_vis_devices)
/
"""

with open_pyexasol_connection_with_lang_definitions(ai_lab_config, compression=True, schema=ai_lab_config.db_schema) as connection:
    connection.execute(sql)

Now we execute this UDF in 2 different pyexasol connections, which run in parallel:

In [None]:
import multiprocessing

def run_long_running_udf(id):
    with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as local_conn:
        sql = f"SELECT long_running_gpu({id});"
        result = local_conn.execute(sql)
        df = pd.DataFrame(result.fetchall(), columns=result.column_names())
        return df

with multiprocessing.Pool(processes=2) as pool:
    results = pool.map(run_long_running_udf, [1, 2])

res = pd.concat(results, ignore_index=True)
res.sort_values(by='START_TIME')


As you can see in the result above, the UDFs were executed sequentially.

### Multiple GPU accelerated UDF calls in single query 

Using more than one GPU accelerated UDF call in a single query will either raise an error or fall back to CPU usage.

Let's define a UDF which requires a GPU and then use the UDF twice in the same query.

In [None]:
sql = textwrap.dedent(f"""
CREATE OR REPLACE {slc.language_alias} SCALAR SCRIPT
gpu_required()
RETURNS VARCHAR(10000) AS
 %perInstanceRequiredAcceleratorDevices GpuNvidia;

def run(ctx):
    return os.getenv('NVIDIA_VISIBLE_DEVICES', "<invalid>")
/
""")
with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as conn:
    conn.execute(sql)

try:
    with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as conn:
        stmt = conn.execute("SELECT gpu_required(), gpu_required();")
except Exception as e:
    print(e)


The same problem can happen with only one UDF which requires an GPU, and other UDFs with CPU fallback, as execution order is unpredictable.

In [None]:
sql = textwrap.dedent(f"""
CREATE OR REPLACE {slc.language_alias} SCALAR SCRIPT
gpu_optional()
RETURNS VARCHAR(10000) AS
 %perInstanceRequiredAcceleratorDevices GpuNvidia|None;

def run(ctx):
    return os.getenv('NVIDIA_VISIBLE_DEVICES', "<invalid>")
/
""")

with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as conn:
    conn.execute(sql)

try:
    with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as conn:
        stmt = conn.execute("SELECT gpu_required(), gpu_required();")
except Exception as e:
    print(e)

If your query contains only UDFs that support CPU fallback, one of the UDFs will be assigned the GPU accelerator. However, it is not possible to predict which one.

In [None]:
with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as conn:
    res = conn.export_to_pandas("SELECT gpu_optional() as udf1, gpu_optional() as udf2, gpu_optional() as udf3, gpu_optional() as udf4;")
res

### GPU usage across UDF instances

If there are multiple UDF instances of a single UDF call within the same query, all those instances will have access to all (GPU) accelerators. To reduce the risk of running out of GPU memory, we recommend that you use UDF instance limiting to control how many accelerators each instance can use, and that you configure the instance limit based on the number of accelerators and the use case.

#### Scalar UDFs
For Scalar UDFs there is no mechanism to assign UDF to a certain GPU. Hence, we recommend to use Scalar UDF only to read some GPU device information, but do not any processing on the GPUs. A good example is to execute the `nvidia-smi` tool in a scalar UDF.

In [None]:
sql = textwrap.dedent(f"""
CREATE OR REPLACE {slc.language_alias} SCALAR SCRIPT
gpu_nvidia_smi()
RETURNS VARCHAR(10000) AS
 %perInstanceRequiredAcceleratorDevices GpuNvidia;

import subprocess

def run(ctx):
    cmd = ["nvidia-smi"] #List GPU's
    process = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
    exit_code = process.wait()
    
    if exit_code != 0:
        raise Exception(f"nvidia-smi returned non-zero exit code: '{{process.stderr}}'")
    return process.stdout.read()

/
""")
with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as conn:
    conn.execute(sql)

with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as conn:
    res = conn.execute("SELECT gpu_nvidia_smi();").fetchall()
print(res[0][0])

#### Set UDFs

For set UDF it is possible to generate a group per GPU and pass this `GPU id` to the UDF as parameter.

For this purpose we modify the example from the [Advanced UDF with GPU Support](./advanced_udf_with_gpu.ipynb) notebook.

First, we need the number of available GPUs. This can be achieved by reading the `EXA_METADATA` parameter `acceleratorDeviceGpuNvidiaCount`.

In [None]:
with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as conn:
    nAvailableGPU = conn.execute("SELECT PARAM_VALUE FROM EXA_METADATA WHERE PARAM_NAME='acceleratorDeviceGpuNvidiaCount'").fetchall()[0][0]
nAvailableGPU

Now need modify the "SEMANTIC_JOIN" UDF and add a GPU id parameter:
```
CREATE OR REPLACE {slc.language_alias} SET SCRIPT "SEMANTIC_JOIN"(gpu_id INTEGER, text1 VARCHAR(2000000), text2 VARCHAR(2000000))
```

Also, we set the UDF instance limit to the number of available GPUs, to avoid parallel access from differnt UDF instances to the same GPU.
```
%perNodeAndCallInstanceLimit {{nAvailableGPU}};
```

Finally, we use the given GPU id from the UDF call and specify it when we instantiate the `pytorch` device:
```
gpu_id = df["gpu_id"][0] #Just take the first one. The caller needs to ensure that column gpu_id has only identical values
device = torch.device(f"cuda:{{gpu_id}}")
```

In [None]:
sql_semantic_join = f"""
--/
CREATE OR REPLACE {slc.language_alias} SET SCRIPT "SEMANTIC_JOIN"(gpu_id INTEGER, text1 VARCHAR(2000000), text2 VARCHAR(2000000))
EMITS(gpu_id INTEGER, text1 VARCHAR(2000000), text2 VARCHAR(2000000), similarity_score DOUBLE) AS
%perInstanceRequiredAcceleratorDevices GpuNvidia;
%perNodeAndCallInstanceLimit {nAvailableGPU};

import pandas as pd
from transformers import AutoTokenizer, AutoModel, Pipeline, pipeline
import torch
import exasol.bucketfs as bfs
from pathlib import Path
from exasol_transformers_extension.utils import device_management
from exasol.python_extension_common.connections.bucketfs_location import (
    create_bucketfs_location_from_conn_object)
from exasol_transformers_extension.utils.bucketfs_model_specification import (
    BucketFSModelSpecification)
from exasol_transformers_extension.utils.huggingface_hub_bucketfs_model_transfer_sp import (
    HuggingFaceHubBucketFSModelTransferSP)
from exasol_transformers_extension.utils.load_local_model import LoadLocalModel


def get_bucketfs_location(exa, bucketfs_conn_name: str) -> bfs.path.PathLike:
    return create_bucketfs_location_from_conn_object(
        exa.get_connection(bucketfs_conn_name))

def load_transformers_pipline(exa,
                              bucketfs_conn_name: str,
                              sub_dir: str,
                              device: str,
                              task_type: str,
                              model_name: str,
                              model_factory,
                              tokenizer_factory=AutoTokenizer) -> Pipeline:
    model_loader = LoadLocalModel(pipeline,
                                  base_model_factory=model_factory,
                                  tokenizer_factory=tokenizer_factory,  # type: ignore
                                  task_type=task_type,
                                  device=device)    # type: ignore

    model_spec = BucketFSModelSpecification(model_name, task_type, bucketfs_conn_name,
                                            Path(sub_dir))

    bucketfs_location = get_bucketfs_location(exa, bucketfs_conn_name)

    model_loader.clear_device_memory()
    model_loader.set_current_model_specification(model_spec)
    model_loader.set_bucketfs_model_cache_dir(bucketfs_location)
    return model_loader.load_models()

# Function to calculate embeddings
def batch_get_embeddings(batch_texts, model_pipeline, device):
    # Tokenize the batch of texts
    inputs = model_pipeline.tokenizer(batch_texts, padding=True, truncation=True, return_tensors='pt').to(device)  # Send to GPU
    # Get embeddings from the model
    with torch.no_grad():
        outputs = model_pipeline.model(**inputs)
    # Use the outputs pooler_output or last_hidden_state to get the embedding
    return outputs.pooler_output if 'pooler_output' in outputs else outputs.last_hidden_state.mean(dim=1)

# Function to compute row-by-row cosine similarity using PyTorch
def row_by_row_cosine_similarity_gpu(embeddings1, embeddings2):
    # Normalized embeddings can be calculated separately
    normalized_embeddings1 = embeddings1 / embeddings1.norm(dim=1)[:, None]
    normalized_embeddings2 = embeddings2 / embeddings2.norm(dim=1)[:, None]

    # Compute pair-wise cosine similarity for each corresponding pair
    similarities = (normalized_embeddings1 * normalized_embeddings2).sum(dim=1)
    return similarities


def get_model_pipeline(device):
    return load_transformers_pipline(
        exa,
        bucketfs_conn_name='{ai_lab_config.bfs_connection_name}',
        sub_dir='{ai_lab_config.bfs_model_subdir}',
        device=device,
        model_name="sentence-transformers/all-mpnet-base-v2",
        task_type='feature-extraction',
        model_factory=AutoModel)

def run(ctx):
    
    # Batch processing configurations
    batch_size = 100  # Adjust this based on your memory capacity
    model_pipeline = None
    while True:
        df = ctx.get_dataframe(batch_size)
        if df is None:
            break

        data1 = df["text1"]
        data2 = df["text2"]
        gpu_id = df["gpu_id"][0] #Just take the first one. The caller needs to ensure that column gpu_id has only identical values
        device = torch.device(f"cuda:{{gpu_id}}")
        if model_pipeline is None:
            model_pipeline = get_model_pipeline(device)

        # Compute embeddings for each dataset in batches
        embeddings1 = batch_get_embeddings(data1.to_list(), model_pipeline, device)
        embeddings2 = batch_get_embeddings(data2.to_list(), model_pipeline, device)

        # Calculate row-by-row cosine similarity using PyTorch on GPU
        similarity_scores = row_by_row_cosine_similarity_gpu(embeddings1, embeddings2)

        # Determine matches based on similarity scores (threshold can be adjusted)
        df["SIMILARITY_SCORE"]=similarity_scores.cpu()
        ctx.emit(df)
/
"""
with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as conn:
    conn.execute(sql_semantic_join)

Finally, we modify the SQL query from the Semantic Join query, and group the data by GPU id and Exasol node.
For this purpose we simply create an integer value between 0 and the number of available GPUs and assign this to each row with:
```
MOD(ROW_NUMBER() OVER (), {nAvailableGPU}) AS GPU_ID
```
Then we add this integer value to the UDF call:
```
"SEMANTIC_JOIN"(GPU_ID, TITLE, TOPIC)
```
The data we use to call the UDF looks then like:

In [None]:
sql = textwrap.dedent(f"""
SELECT 
    MOD(ROW_NUMBER() OVER (), {nAvailableGPU}) AS GPU_ID, TOPIC, TITLE
FROM (
    SELECT 
        distinct TOPIC, TITLE
    FROM (
        SELECT distinct title as TITLE FROM NEWS WHERE TITLE IS NOT NULL AND BODY IS NOT NULL
    )
    CROSS JOIN (
        SELECT distinct topic AS TOPIC FROM NEWS WHERE topic IS NOT NULL
    )
)
""")
with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as conn:
    res = conn.export_to_pandas(sql)
res

Finally we call the UDF to calculate the semantic join, distributed safely across multiple GPUs and nodes:

In [None]:
sql_semantic_join = textwrap.dedent(f"""
SELECT *
FROM (
    SELECT
        TEXT1 AS TITLE, TEXT2 AS TOPIC,
        RANK() OVER (PARTITION BY TEXT1 ORDER BY SIMILARITY_SCORE DESC) as r,
        SIMILARITY_SCORE
    FROM (
        SELECT
            "SEMANTIC_JOIN"(GPU_ID, TITLE, TOPIC)
        FROM (
            SELECT 
                MOD(ROW_NUMBER() OVER (), {nAvailableGPU}) AS GPU_ID, TOPIC, TITLE
            FROM (
                SELECT 
                    distinct TOPIC, TITLE
                FROM (
                    SELECT distinct title as TITLE FROM NEWS WHERE TITLE IS NOT NULL AND BODY IS NOT NULL
                )
                CROSS JOIN (
                    SELECT distinct topic AS TOPIC FROM NEWS WHERE topic IS NOT NULL
                )
            )
        ) GROUP BY iproc(), GPU_ID
    )
    WHERE SIMILARITY_SCORE > 0.5
)
WHERE r<=5
ORDER BY TITLE, r ASC;
""")
with open_pyexasol_connection_with_lang_definitions(ai_lab_config, schema=ai_lab_config.db_schema, compression=True) as conn:
    res = conn.export_to_pandas(sql_semantic_join)
res

The following is a more generic approach, with which you can assign UDF instances to GPU's, on multiple nodes.