# AIS-ETL Examples

This notebook shows how to initialize ETLs using the AIStore SDK.

For ETL documentation, see [ETL documentation](https://github.com/NVIDIA/aistore/blob/main/docs/etl.md)

Sample transformers are available at [ais-etl/transformers](https://github.com/NVIDIA/ais-etl/tree/main/transformers)

> **Note:** Install ETL support with:
> ```bash
> pip install aistore[etl]
> ```


In [5]:
from aistore import Client

# Make sure AIStore cluster is running on Kubernetes to use AIS-ETL
# Replace <ip> and <port> with the actual IP address and port of your AIStore cluster
client = Client("http://10.52.160.25:51080")

## ETL Webserver Framework

**AIS ETL** is language- and framework-agnostic. You can deploy your own custom web server as a transformation pod, supporting both **inline transformations** (real-time GET requests) and **offline batch transformations** (bucket-to-bucket). 

However, building such a server from scratch involves more than just writing transformation logic. It must also be capable of:

* Performing health checks
* Communicating with AIStore targets
* Parsing [`etl args`](https://github.com/NVIDIA/aistore/blob/main/docs/cli/etl.md#transform-object-with-arguments)—user-defined parameters that control the transformation behavior
* Supporting [`direct put`](#direct-put-optimization-faster-bucket-to-bucket-etl-transformation), which allows transformed objects to be directly written to the target bucket without going through the client
* Managing HTTP and WebSocket protocols with proper concurrency control

Selecting the right web server and communication strategy depends on factors like object size and volume, desired concurrency model, and whether you need a synchronous ([WSGI](https://peps.python.org/pep-3333/)) or asynchronous ([ASGI](https://asgi.readthedocs.io/en/latest/introduction.html)) stack. Each option has its own trade-offs.

To simplify this, we’ve introduced **AIS-ETL Web Server Framework** in both **[Go](https://github.com/NVIDIA/aistore/tree/main/ext/etl/webserver#readme)** and **[Python](https://github.com/NVIDIA/aistore/blob/main/python/aistore/sdk/etl/webserver/README.md)**. These SDKs abstract away the boilerplate—so you can build and deploy custom ETL containers in minutes. Focus solely on your transformation logic; the SDK handles everything else, including networking, protocol handling, and high-throughput optimizations.

In [2]:
# Import ETL webserver classes
# See ETL webserver docs: https://github.com/NVIDIA/aistore/tree/main/python/aistore/sdk/etl/webserver
from aistore.sdk.etl.webserver.fastapi_server import FastAPIServer
from aistore.sdk.etl.webserver.flask_server import FlaskServer
from aistore.sdk.etl.webserver.http_multi_threaded_server import HTTPMultiThreadedServer

## Initializing ETLs with the Python SDK

You can set up an ETL in three ways:

1. **`init_class`**  
   - For pure-Python transforms.  
   - Decorate an `ETLServer` subclass (implementing `transform`) to register it.  
   - Best when you only need Python and PyPI dependencies.

2. **`init`**  
   - Use a container image for your ETL logic.  
   - Configure options like `comm_type`, timeouts, commands, etc.  
   - Works with built-in transformers or your own image.  
   - Also available via the CLI.

3. **`init_spec`** (_Advanced Usage_)
   - Supply a full Kubernetes Pod spec.  
   - Allows advanced tweaks (health checks, init containers, etc.).  
   - Kept for backward compatibility or deep customizations.


## Example: Uppercase Transformer (`init_class`)

Convert text to uppercase with a Python class.

In [6]:
etl_upper_case = client.etl("etl-upper-case")


@etl_upper_case.init_class()
class UpperCaseETL(FastAPIServer):
    """
    ETL webserver that converts data to uppercase.
    """

    def transform(self, data, *_args):
        return data.upper()

In [9]:
# Test the UpperCase ETL

# Create a bucket and upload an object to test
bucket = client.bucket("etl-examples").create(exist_ok=True)
obj = bucket.object("test.txt")
# Write content to the object
obj.get_writer().put_content(b"Hello ais-etl!")

print("Original Object Content:")
print(obj.get_reader().read_all())

from aistore.sdk.etl import ETLConfig

print("\nTransformed Object Content (Uppercase):")
print(obj.get_reader(etl=ETLConfig(etl_upper_case.name)).read_all())

Original Object Content:
b'Hello ais-etl!'

Transformed Object Content (Uppercase):
b'HELLO AIS-ETL!'


## Example: XXHash Transformer (`init_class`)

Hash bytes with a seed passed per request.

This example shows how to use external packages. The XXHash transformer reads ETL arguments from the inline transform request and uses it as a seed value. This shows how to pass custom metadata per object and use it in your ETL pod.

In [17]:
import os
import xxhash

hash_etl = client.etl("etl-xxhash")

# xxhash is a fast non-cryptographic hash function not in the standard library.
# It must be installed separately.

# This example uses communication type "hpull".
# See communication options: https://github.com/NVIDIA/aistore/blob/main/docs/etl.md#communication-mechanisms


# We set `SEED_DEFAULT` env var for default seed value in the ETL container.
@hash_etl.init_class(comm_type="hpull", dependencies=["xxhash"], SEED_DEFAULT="500")
class XXHash(FastAPIServer):
    def __init__(self):
        super().__init__()
        self.default_seed = int(os.getenv("SEED_DEFAULT", "0"))

    def transform(
        self,
        data: bytes,
        _path: str,
        etl_args: str,
    ) -> bytes:
        seed = int(etl_args) if etl_args else self.default_seed
        hasher = xxhash.xxh64(seed=seed)
        hasher.update(data)
        return hasher.hexdigest().encode("ascii")

In [6]:
# Use the ETL defined above

# Read original and transformed object content
print("Original Object Content:")
print(obj.get_reader().read_all())

print("\nTransformed Object Content (XXHash with default seed 500):")
print(obj.get_reader(etl=ETLConfig(hash_etl.name)).read_all())

# To use a custom seed (1000), pass it via etl_args:
print("\nTransformed Object Content (XXHash with seed value 1000):")

print(obj.get_reader(etl=ETLConfig(hash_etl.name, args=1000)).read_all())

Original Object Content:
b'Hello ais-etl!'

Transformed Object Content (XXHash with default seed 500):
b'5a1c0264c777ae72'

Transformed Object Content (XXHash with seed value 1000):
b'337eb4735c14a118'


## Example: FFmpeg Transformer (`init`)

Run a pre-built FFmpeg container to convert audio to WAV.

For this we will be using the images that are already built and ready in [`ais-etl/transformers`](https://github.com/NVIDIA/ais-etl/tree/main/transformers).

We will be running the [FFmpeg transformer](https://github.com/NVIDIA/ais-etl/tree/main/transformers/FFmpeg) which is used to transform audio files into WAV format with control over Audio Channels (AC) and Audio Rate (AR).

In [7]:
ffmpeg_etl = client.etl("etl-ffmpeg")

ffmpeg_etl.init(image="aistorage/transformer_ffmpeg:latest", AR="16000", AC="1")

# To test this, you will need to have a audio file in a bucket.
# wav_bytes = client.bucket("<audio-files-bck>").object("<audio-file>.<wav/flac/mp3>").get_reader(etl=ETLConfig(ffmpeg_etl.name)).read_all()

'etl-tbh6yoAuh'

## Example: Pod Spec (`init_spec`) — Advanced usage

Use a full Kubernetes Pod template only if you need to customize the Pod spec (resources, init containers, labels, etc.).

In [9]:
ffmpeg_spec_etl = client.etl("etl-ffmpeg-pod-spec")

pod_spec_tmpl = """
apiVersion: v1
kind: Pod
metadata:
  name: transformer-nemo-ffmpeg
  annotations:
    # Values it can take ["hpull://","hpush://"]
    communication_type: "hpull://"
    wait_timeout: 5m
    support_direct_put: "true"
spec:
  containers:
    - name: server
      image: aistorage/transformer_ffmpeg:latest
      imagePullPolicy: Always
      ports:
        - name: default
          containerPort: 8000
      # for flask based app
      # command: ["gunicorn", "flask_server:flask_app", "--bind", "0.0.0.0:8000", "--workers", "4", "--log-level", "debug"]
      # for http based app
      # command: ["python", "http_server.py"]
      # for fastapi based app
      command: ["uvicorn", "fastapi_server:fastapi_app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4", "--no-access-log"]
      readinessProbe:
        httpGet:
          path: /health
          port: default
      env:
        - name: AR
          value: "16000"
        - name: AC
          value: "1"
"""

ffmpeg_spec_etl.init_spec(template=pod_spec_tmpl)

# To test this, you will need to have a audio file in a bucket.
# wav_bytes = client.bucket("<audio-files-bck>").object("<audio-file>.<wav/flac/mp3>").get_reader(etl=ETLConfig(ffmpeg_spec_etl.name)).read_all()

'etl-WLzByoauh'

## ETL Pipelines

You can combine multiple ETLs into a pipeline using the `>>` operator. Pipelines allow you to compose transformations and apply them in sequence during a single object read.

- Define ETLs as usual (e.g., with `init_class`).
- Chain them: `etl_a >> etl_b >> etl_c`.
- Use the resulting pipeline with `ETLConfig(etl=pipeline)` for inline transforms.

Below are examples of:
- Two-stage pipeline
- Three-stage pipeline
- Chaining a multi-stage pipeline with another ETL using `>>`

In [7]:
# Register additional ETLs for pipeline demos
etl_reverse = client.etl("etl-reverse")

@etl_reverse.init_class()
class ReverseETL(FastAPIServer):
    """Reverse byte content."""
    def transform(self, data: bytes, *_args) -> bytes:
        return data[::-1]

etl_append = client.etl("etl-append-done")

@etl_append.init_class()
class AppendETL(FastAPIServer):
    """Append a marker to the end of the byte content."""
    def transform(self, data: bytes, *_args) -> bytes:
        return data + b"::DONE"

In [10]:
# Two-stage pipeline: Uppercase -> Reverse
two_stage = etl_upper_case >> etl_reverse

print("Original:")
print(obj.get_reader().read_all())

print("\nTwo-stage pipeline (Uppercase -> Reverse):")
print(obj.get_reader(etl=ETLConfig(name=two_stage)).read_all())

Original:
b'Hello ais-etl!'

Two-stage pipeline (Uppercase -> Reverse):
b'!LTE-SIA OLLEH'


In [11]:
# Three-stage pipeline and chaining a multi-stage pipeline with another ETL
from aistore.sdk.etl import ETLConfig

# Three-stage directly: Uppercase -> Reverse -> Append
three_stage_direct = etl_upper_case >> etl_reverse >> etl_append
print("Three-stage pipeline (direct chain Uppercase -> Reverse -> Append):")
print(obj.get_reader(etl=ETLConfig(name=three_stage_direct)).read_all())

# Chaining an existing multi-stage with another ETL using >>
full_pipeline = two_stage >> etl_append
print("\nChaining multi-stage with another ETL (two_stage >> etl_append):")
print(obj.get_reader(etl=ETLConfig(name=full_pipeline)).read_all())

Three-stage pipeline (direct chain Uppercase -> Reverse -> Append):
b'!LTE-SIA OLLEH::DONE'

Chaining multi-stage with another ETL (two_stage >> etl_append):
b'!LTE-SIA OLLEH::DONE'


In [12]:
# Finally, stop and removethe ETLs
etl_upper_case.stop()
etl_reverse.stop()
etl_append.stop()
etl_upper_case.delete()
etl_reverse.delete()
etl_append.delete()