# Multimodal Structured Generation with Daft, Gemma-3, and vLLM

This notebook walks through a practical example of evaluating model performance on structured generation and multimodal reasoning.

Specifically, we will explore how to scale multimodal information extraction using a combination of powerful technologies:
1. Daft for data processing
2. Gemma-3n-E2B model for multimodal capabilities
3. vLLM/OpenRouter for efficient inference serving.


---

### Table of Contents
1. Setup and Install Dependencies
2. Launch a vLLM OpenAI API compatible server
2. Preprocess the ai2d dataset from huggingface's Cauldron collection
4. Testing the OpenAI client Gemma-3n client with an API key and new base url.



<span style="color:yellow;">
NOTE: A100 is recommended
</span>
---

### First we will install and import dependencies

In [None]:
!pip install "daft[huggingface]==0.5.21" vllm

 #### Login to HF for access gemma-3n
 Requires HF_TOKEN

In [None]:
!hf auth login

# Online Serving - Launch vLLM OpenAI Compatible Server

Run the following in your terminal
```bash
 python -m vllm.entrypoints.openai.api_server \
  --model google/gemma-3n-e4b-it \
  --guided-decoding-backend guidance \
  --dtype bfloat16 \
  --gpu-memory-utilization 0.85 \
  --host 0.0.0.0 --port 8000
```

Note: If you are in Google Colab, you can open a terminal by clicking the terminal icon in the bottom left of the ui.

It usually takes at least **7.5** minutes before the vLLM server is ready


---
## Verify you can connect to vLLM Online Serving using OpenAI Client

In [None]:
from openai import OpenAI
api_key = "none"
base_url = "http://0.0.0.0:8000/v1"
client = OpenAI(api_key=api_key, base_url=base_url)

## Test Client model list contains `google/gemma-3n-e4b-it`

In [None]:
result = client.models.list()

print(result)

## Test Simple Text Completion

In [None]:
# Text-only inference
chat_completion = client.chat.completions.create(
    messages=[{"role": "user", "content": "What's the coolest thing about daft dataframes?"}],
    model=model_id,
)

result = chat_completion.choices[0].message.content
print("Chat completion output: \n", result)

## Test Structured Output

In [None]:
completion = client.chat.completions.create(
    model=model_id,
    messages=[
        {"role": "user", "content": "Classify this sentiment: Daft is wicked fast!"}
    ],
    extra_body={"guided_choice": ["positive", "negative"]},
)
print(completion.choices[0].message.content)

## Test Image Understanding

In [None]:
image_url = "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/bee.jpg"
completion = client.chat.completions.create(
    model=model_id,
    messages = [
        {
            "role": "system",
            "content": [{"type": "text", "text": "You are a helpful assistant."}]
        },
        {
            "role": "user",
            "content": [
                {"type": "image_url", "image_url": {"url": image_url}},
                {"type": "text", "text": "Describe this image in detail."}
            ]
        }
    ]
)
print(completion.choices[0].message.content)

## Test Combining Image Inputs with Structured Output

We can mess with prompt ablation to understand how prompting and structured outputs can affect results.

Try commenting out the `extra_body` argument or the third user content text prompt to see how results change.

In [None]:
completion = client.chat.completions.create(
    model=model_id,
    messages = [
        {
            "role": "system",
            "content": [{"type": "text", "text": "You are a helpful assistant."}]
        },
        {
            "role": "user",
            "content": [
                {"type": "image_url", "image_url": {"url":image_url}},
                {"type": "text", "text": "Which insect is portrayed in the image: A. Ladybug, B. Beetle, C. Bee, D. Wasp "},
                #{"type": "text", "text": "Answer with only the letter from the multiple choice. "} # Try comment me out
            ]
        }
    ],
    extra_body={"guided_choice": ["A", "B", "C", "D"]}, # Try comment out

)
print(completion.choices[0].message.content)

---
#  Dataset Preprocessing

### Prepping the [HuggingFaceM4/the_cauldron](https://huggingface.co/datasets/HuggingFaceM4/the_cauldron/viewer?views%5B%5D=ai2d) ,  Dataset for inference (ai2d subset)

We can read directly from huggingface datasets by leveraging the `hf://` prefix in the url string.

In [None]:
import daft

# There are a total of 2,434 images in this dataset, at a size of ~ 500 MB
df = daft.read_parquet('hf://datasets/HuggingFaceM4/the_cauldron/ai2d/train-00000-of-00001-2ce340398c113b79.parquet').collect()
df.show(3)

 Taking a look at the schema we can see the familiar messages nested datatype we are used to in chat completions inside the `texts` column


In [None]:
print(df.schema())

Lets decode the image bytes to see a preview of the images. You can click on a cell to have a preview pop up.

In [None]:
from daft import col
from daft import DataType
df = df.explode(col("images")).with_column("image_png", df["images"].struct.get("bytes").image.decode()).collect()
df.show(3)

#### Preprocessing the 'texts' column to extract Question, Choices, and Answer Columns

Copy/Pasting an entry from the `texts` column yields an openai messages list of dicts of the form:

```python
[{
    "user": """Question:
            
        From the above food web diagram, what cause kingfisher to increase

        Choices:
            A. decrease in fish
            B. decrease in water boatman
            C. increase in fish
            D. increase in algae

        Answer with the letter.""",

    "assistant": "Answer: C",
    "source": "AI2D",
}, ...]
```

In [None]:

# Explode the List of Dicts inside "texts" to extract "user" and "assistant" messages
df = df.explode(col("texts")).collect()

# Extract User and Assistant Messages
df = df.with_columns({
    "user": df["texts"].struct.get("user"),
    "assistant": df["texts"].struct.get("assistant")
}).collect()
df.show(8)

We can also go above an beyond to parse each text input into individual question, choices, and answer columns.  

In [None]:
# Parsing "user" and "assistant" messages for question, choices, and answer""
df = df.with_columns({
    "question": df["user"]
        .str.extract(r"(?s)Question:\s*(.*?)\s*Choices:")
        .str.replace("Choices:", "")
        .str.replace("Question:",""),
    "choices_string": df["user"]
        .str.extract(r"(?s)Choices:\s*(.*?)\s*Answer?\.?")
        .str.replace("Choices:\n", "")
        .str.replace("Answer",""),
    "answer": df["assistant"]
        .str.extract(r"Answer:\s*(.*)$")
        .str.replace("Answer:",""),
}).collect()

# Split Choices_String into a list
df = df.with_column("choices_list", df["choices_string"].str.split(r"\n[A-Z]\.\s*")).collect()

df.show()

# Moving on to Formalize the Workload.
We've implemented the evaluation pipeline from end to end. Now lets put it all together in a pretty package so we can take full advantage of lazy evaluation.

In [None]:
# Import Dependencies & Define Variables
import asyncio
import base64
import daft
from daft import col, lit
from daft.functions import format, llm_generate
from openai import OpenAI, AsyncOpenAI
import time

model_id = 'google/gemma-3n-e4b-it'
api_key = "none"
base_url = "http://0.0.0.0:8000/v1"
client = AsyncOpenAI(api_key=api_key, base_url=base_url)

# Benchmarking

No more demonstrations. Here we break down the workload into atomic secitons we can rerun over and over.


In [None]:
import time
start = time.time()
df_raw = daft.read_parquet('hf://datasets/HuggingFaceM4/the_cauldron/ai2d/train-00000-of-00001-2ce340398c113b79.parquet').collect()

end = time.time()
num_rows = df_raw.count_rows()
print(f"Processed {num_rows} rows in {end-start} seconds")

In [None]:
from daft import col
import base64
# Preprocess the workload just as before, but this time save the lazy df for downstream jobs to reuse pre-materialized.
start = time.time()

df = df_raw.explode(col("images")).with_column("image_base64", df_raw["images"].struct.get("bytes").apply(
        lambda x: base64.b64encode(x).decode('utf-8'),
        return_dtype=daft.DataType.string()
    )
)
df = df.explode(col("texts")).with_columns({
    "user": df["texts"].struct.get("user"),
    "assistant": df["texts"].struct.get("assistant")
})

# Parse the Question/Answer Strings
df_prepped = df.with_columns({
    "question": df["user"]
        .str.extract(r"(?s)Question:\s*(.*?)\s*Choices:")
        .str.replace("Choices:", "")
        .str.replace("Question:",""),
    "choices_string": df["user"]
        .str.extract(r"(?s)Choices:\s*(.*?)\s*Answer?\.?")
        .str.replace("Choices:\n", "")
        .str.replace("Answer",""),
    "answer": df["assistant"]
        .str.extract(r"Answer:\s*(.*)$")
        .str.replace("Answer:",""),
}).collect()


end = time.time()
num_rows = df_prepped.count_rows()
print(f"Processed {num_rows} rows in {end-start} seconds")


# Inference / Model Performance Evalutation across permutations
- Row-Wise vs Batch UDF (Which one is faster?)
- With/Without Image (Can the model just guess the right answer without the image?)
    - llm_generate on just text
- With/Without Prompt Template (Does the formatting matter?)
- With/Without Extra Body (Does structured outputs matter?)

Analyzing both model accuracy and workload processing time across the entire dataset.

If you are interested in diving deep into the lazy optimization you can run

```python
df.explain(show_all=True)
```

In [None]:
row_limit = 2000

## Minimal Row-Wise UDF

In [None]:
@daft.func(return_dtype=daft.DataType.string())
async def struct_output_rowwise(model_id: str, text_col: str, image_col: str, extra_body: dict | None = None) -> str:

    content = [{"type": "text", "text": text_col}]
    if image_col:
        content.append({
            "type": "image_url",
            "image_url": {"url": f"data:image/png;base64,{image_col}"},
        })


    result = await client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": content
            }
        ],
        model=model_id,
        extra_body=extra_body,
    )
    return result.choices[0].message.content

This cannot run 2000 rows

In [None]:
# 1. Run the Rowwise UDF
start = time.time()
df_rowwise_udf = df_prepped.with_column("result", struct_output_rowwise(
    model_id = model_id,
    text_col = format("{} \n {}", col("question"), col("choices_string")), # Prompt Template
    image_col = col("image_base64"),
    extra_body={"guided_choice": ["A", "B", "C", "D"]}
)).with_column("is_correct", col("result").str.lstrip().str.rstrip() == col("answer").str.lstrip().str.rstrip()).limit(row_limit).collect()
end = time.time()
print(f"Row wise UDF (Image + Text + Full User String) Processed {df_rowwise_udf.count_rows()} rows in {end-start} seconds")

Row_wise UDF (Image + Text + Full User String) Processed 1500 rows in 80.16509366035461 seconds
Row wise UDF (Image + Text + Full User String) Processed 1500 rows in 78.8978271484375 seconds

## Minimal Batch UDF

In [None]:
@daft.udf(return_dtype=daft.DataType.string())
def struct_output_batch(
        model_id: str,
        text_col: daft.Series,
        image_col: daft.Series,
        extra_body: dict | None = None
    ) -> list[str]:


    async def generate(text: str, image: str) -> str:

        content = [{"type": "text", "text": text}]
        if image:
            content.append({
                "type": "image_url",
                "image_url": {"url": f"data:image/png;base64,{image}"},
            })

        result = await client.chat.completions.create(
            messages=[
                {
                    "role": "user",
                    "content": content
                }
            ],
            model=model_id,
            extra_body=extra_body,
        )
        return result.choices[0].message.content

    texts = text_col.to_pylist()
    images = image_col.to_pylist()

    async def gather_completions() -> list[str]:
        tasks = [generate(t,i) for t,i in zip(texts,images) ]
        return await asyncio.gather(*tasks)

    return asyncio.run(gather_completions())

This does not work for 2000 rows

In [None]:
# 2. Run the Batch UDF
start = time.time()
df_batch_udf = df_prepped.with_column("result", struct_output_batch(
    model_id = model_id,
    text_col = format("{} \n {}", col("question"), col("choices_string")), # Prompt Template
    image_col = col("image_base64"),
    extra_body={"guided_choice": ["A", "B", "C", "D"]}
)).with_column("is_correct", col("result").str.lstrip().str.rstrip() == col("answer").str.lstrip().str.rstrip()).limit(row_limit).collect()
end = time.time()
print(f"Batch UDF (Image + Text + Prompt Template) \n Processed {df_batch_udf.count_rows()} rows in {end-start} seconds")

In [None]:
batch_size = 32
concurrency = 4
max_conn = 32

## Production UDF

In [None]:
@daft.udf(return_dtype=daft.DataType.string(), concurrency=concurrency, batch_size= batch_size)
class StructuredOutputsProdUDF:
    def __init__(self, base_url: str, api_key: str, max_conn: int = 32):
        self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
        self.semaphore = asyncio.Semaphore(max_conn)
        try:
            self.loop = asyncio.get_running_loop()
        except RuntimeError:
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)


    def __call__(self, model_id: str, text_col: daft.Series, image_col: daft.Series, extra_body: dict) -> list[str]:

        async def generate(text: str, image: str) -> str:
                content = [{"type": "text", "text": text}]
                if image:
                    content.append({
                        "type": "image_url",
                        "image_url": {"url": f"data:image/png;base64,{image}"},
                    })

                result = await self.client.chat.completions.create(
                    messages=[
                        {
                            "role": "user",
                            "content": content
                        }
                    ],
                    model=model_id,
                    extra_body=extra_body
                )
                return result.choices[0].message.content

        async def infer_with_semaphore(t, i):
            async with self.semaphore:
                return await generate(t,i)

        async def gather_completions(texts,images) -> list[str]:
            tasks = [infer_with_semaphore(t,i) for t,i in zip(texts,images)]
            return await asyncio.gather(*tasks)

        texts = text_col.to_pylist()
        images = image_col.to_pylist()

        return self.loop.run_until_complete(gather_completions(texts,images))

This will run the full dataset at
- batch_size = 32
- concurrency = 4
- max_conn = 32

In [None]:
# 3. Production UDF
start = time.time()
df_prod_udf = df_prepped.with_column("result", StructuredOutputsProdUDF.with_init_args(
    base_url=base_url,
    api_key=api_key,
    max_conn=max_conn
).with_concurrency(concurrency)(
    model_id = model_id,
    text_col = format("{} \n {}", col("question"), col("choices_string")), # Prompt Template
    image_col = col("image_base64"),
    extra_body={"guided_choice": ["A", "B", "C", "D"]}
)).with_column("is_correct", col("result").str.lstrip().str.rstrip() == col("answer").str.lstrip().str.rstrip()).collect()
end = time.time()
print(f"Batch UDF (Image + Text + Prompt Template) \n Processed {df_prod_udf.count_rows()} rows in {end-start} seconds")

Processed 1500 rows in 77.09290647506714 seconds

# Analysis

In [None]:
# Calculate
pass_fail_rate = df_prod_udf.where(col("is_correct")).count_rows() / df_prod_udf.count_rows()
print(f"Pass/Fail Rate: {pass_fail_rate}")

In [None]:
# How does this compare without images
from daft.functions import llm_generate
start = time.time()
df_prod_no_img = df_prepped.with_column("result", llm_generate(
    input_column = format("{} \n {}", col("question"), col("choices_string")), # Prompt Template
    model = model_id,
    extra_body={"guided_choice": ["A", "B", "C", "D"]},
    api_key=api_key,
    base_url=base_url,
    provider = "openai"
)).with_column("is_correct", col("result").str.lstrip().str.rstrip() == col("answer").str.lstrip().str.rstrip()).collect()
end = time.time()
print(f"Batch UDF (Image + Text + Prompt Template) \n Processed {df_prod_no_img.count_rows()} rows in {end-start} seconds")

In [None]:
# Taking a look at the failures:
pass_fail_rate = df_prod_no_img.where(col("is_correct")).count_rows() / df_prod_no_img.count_rows()
print(f"Pass/Fail Rate: {pass_fail_rate}")

Lets verify the model can see the image.

In [None]:
df = df_failures.with_column("sees_image", struct_output_batch(
                                model_id = model_id,
                                text_col = lit("describe the image. If you see no image, say I can't see an image"), # Prompt Template
                                image_col = col("image_base64"),
                                extra_body = None
)).show(3)

---
## Appendix




From Gemma Model Card

In [None]:
from transformers import AutoProcessor, Gemma3nForConditionalGeneration
from PIL import Image
import requests
import torch

model_id = "google/gemma-3n-e4b-it"
model = Gemma3nForConditionalGeneration.from_pretrained(model_id, device_map="auto", torch_dtype=torch.bfloat16,).eval()
processor = AutoProcessor.from_pretrained(model_id)

messages = [
    {
        "role": "system",
        "content": [{"type": "text", "text": "You are a helpful assistant."}]
    },
    {
        "role": "user",
        "content": [
            {"type": "image", "image": "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/bee.jpg"},
            {"type": "text", "text": "Describe this image in detail."}
        ]
    }
]

inputs = processor.apply_chat_template(
    messages,
    add_generation_prompt=True,
    tokenize=True,
    return_dict=True,
    return_tensors="pt",
).to(model.device)

input_len = inputs["input_ids"].shape[-1]

with torch.inference_mode():
    generation = model.generate(**inputs, max_new_tokens=100, do_sample=False)
    generation = generation[0][input_len:]

decoded = processor.decode(generation, skip_special_tokens=True)
print(decoded)

# vllm usage patterns for multimodal.

In [None]:



# Multi-image input inference
def run_multi_image(model: str) -> None:
    image_url_duck = "https://upload.wikimedia.org/wikipedia/commons/d/da/2015_Kaczka_krzy%C5%BCowka_w_wodzie_%28samiec%29.jpg"
    image_url_lion = "https://upload.wikimedia.org/wikipedia/commons/7/77/002_The_lion_king_Snyggve_in_the_Serengeti_National_Park_Photo_by_Giles_Laurent.jpg"
    chat_completion_from_url = client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "What are the animals in these images?"},
                    {
                        "type": "image_url",
                        "image_url": {"url": image_url_duck},
                    },
                    {
                        "type": "image_url",
                        "image_url": {"url": image_url_lion},
                    },
                ],
            }
        ],
        model=model,
        max_completion_tokens=64,
    )

    result = chat_completion_from_url.choices[0].message.content
    print("Chat completion output:", result)


# Video input inference
def run_video(model: str) -> None:
    video_url = "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerFun.mp4"
    video_base64 = encode_base64_content_from_url(video_url)

    ## Use video url in the payload
    chat_completion_from_url = client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "What's in this video?"},
                    {
                        "type": "video_url",
                        "video_url": {"url": video_url},
                    },
                ],
            }
        ],
        model=model,
        max_completion_tokens=64,
    )

    result = chat_completion_from_url.choices[0].message.content
    print("Chat completion output from image url:", result)

    ## Use base64 encoded video in the payload
    chat_completion_from_base64 = client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "What's in this video?"},
                    {
                        "type": "video_url",
                        "video_url": {"url": f"data:video/mp4;base64,{video_base64}"},
                    },
                ],
            }
        ],
        model=model,
        max_completion_tokens=64,
    )

    result = chat_completion_from_base64.choices[0].message.content
    print("Chat completion output from base64 encoded image:", result)


# Audio input inference
def run_audio(model: str) -> None:
    from vllm.assets.audio import AudioAsset

    audio_url = AudioAsset("winning_call").url
    audio_base64 = encode_base64_content_from_url(audio_url)

    # OpenAI-compatible schema (`input_audio`)
    chat_completion_from_base64 = client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "What's in this audio?"},
                    {
                        "type": "input_audio",
                        "input_audio": {
                            # Any format supported by librosa is supported
                            "data": audio_base64,
                            "format": "wav",
                        },
                    },
                ],
            }
        ],
        model=model,
        max_completion_tokens=64,
    )

    result = chat_completion_from_base64.choices[0].message.content
    print("Chat completion output from input audio:", result)

    # HTTP URL
    chat_completion_from_url = client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "What's in this audio?"},
                    {
                        "type": "audio_url",
                        "audio_url": {
                            # Any format supported by librosa is supported
                            "url": audio_url
                        },
                    },
                ],
            }
        ],
        model=model,
        max_completion_tokens=64,
    )

    result = chat_completion_from_url.choices[0].message.content
    print("Chat completion output from audio url:", result)

    # base64 URL
    chat_completion_from_base64 = client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "What's in this audio?"},
                    {
                        "type": "audio_url",
                        "audio_url": {
                            # Any format supported by librosa is supported
                            "url": f"data:audio/ogg;base64,{audio_base64}"
                        },
                    },
                ],
            }
        ],
        model=model,
        max_completion_tokens=64,
    )

    result = chat_completion_from_base64.choices[0].message.content
    print("Chat completion output from base64 encoded audio:", result)


In [None]:
import base64

# Example image bytes (first few bytes of a PNG)
image_bytes = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHD"

# Encode the bytes to base64
base64_string = base64.b64encode(image_bytes).decode('utf-8')

print(base64_string)

In this code:
- We import the `base64` module.
- We have a sample `image_bytes` byte string.
- `base64.b64encode(image_bytes)` encodes the byte string into a base64 byte string.
- `.decode('utf-8')` decodes the base64 byte string into a UTF-8 string, which is the standard representation for base64.

In [None]:
import daft
import base64

df_raw = daft.read_parquet('hf://datasets/HuggingFaceM4/the_cauldron/ai2d/train-00000-of-00001-2ce340398c113b79.parquet')

# To Get Daft Image
df = df_raw.explode(col("images")).with_column("image_png", df["images"].struct.get("bytes").image.decode())

# To Get Base64
df = df.with_column(
    "image_base64", df["images"].struct.get("bytes").apply(
        lambda x: base64.b64encode(x).decode('utf-8'),
        return_dtype=daft.DataType.string()
    )
).collect()
df.show()
