# Multimodal Structured Outputs with Daft, Gemma-3, and vLLM

*An end-to-end example of **Multimodal Structured Outputs** with Daft's high performance data engine.*


<a target="_blank" href="https://colab.research.google.com/github/everettVT/daft-examples-1/blob/main/notebooks/mm_structured_outputs.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

## Introduction

**Structured Outputs** refers to a family of features that enables language models to respond in a constrained format. While language models continue to improve and demonstrate emergent abilities, their unpredictable nature make them difficult to integrate them with traditional software systems. Most real-world AI uses-cases leverage structured outputs to some extext, whether that be to execute tool calls or adhere to Pydantic Models. The underlying technology that makes structured outputs is called guided decoding.

Guided decoding uses logits to control the output of a language model by adjusting the probabilities of the next possible tokens to enforce constraints or guide the generation process. This can be done through various methods, such as applying a logit bias to penalize or promote specific tokens, filtering invalid tokens based on rules like a Finite State Machine (FSM), or by using more advanced techniques to interact with the model's internal probability distribution.

Structured Outputs strategies consist of 5 strategies that define the desired output type:

- Basic Python Types: `int`, `float`, `bool`...
- Multiple Choices: using `Literal` or `Enum`
- JSON Schemas: using Pydantic models or dataclasses
- Regex
- Context-free Grammars

While there are tremendous number of examples in pure python, few tutorials exist that demonstrate structured outputs within a large-scale processing context. Even fewer, if any, examples exist that demonstrate how to run batch structured outputs with multimodal data on your own inference server. Here, we will walk you through the entire process, using your own OpenAI-compatible server using [vLLM](https://docs.vllm.ai/en/v0.6.3.post1/serving/openai_compatible_server.html).

Large scale multimodal structured outptus is a real world use-case that every enterprise team faces when attempting to work with massive amounts of internal/private data. These teams face significant hurdles with traditional tooling, especially for cutting-edge uses cases like batch tool calls for background agents or reinforcement learning with verifiable rewards.

Daft's unified multimodal data processing engine is purpose built to support workloads like this and is rapidly becoming the default engine of choice for teams deploying frontier AI solutions in production.

In this notebook, we will leverage Daft to evaluate the image understanding accuracy of [Google's Gemma‑3‑4b‑it](https://ai.google.dev/gemma/docs/core) using HugginFace's [the_cauldron dataset](https://huggingface.co/datasets/HuggingFaceM4/the_cauldron). By the end of this notebook, you will be ready to implement your own distributed batch structured outputs pipeline with a copy-paste script you can use in your own environment.

> NOTE:
  This Notebook contains an advanced path where you can use vLLM as your inference solution. In this case, Google Colab's A100 GPU instance is recommended. Additionally, in order to access to [google/gemma-3-4b-it](https://huggingface.co/google/gemma-3-4B-it) you will need accept Google's usage policy and authenticate with HuggingFace.

### Table of Contents

1. [Setup](#1-setup)
2. [Choose an Inference Solution](#2-choose-an-inference-solution)
3. [Sanity Check OpenAI Client Requests](#3-sanity-check-openai-client-requests)
4. [Dataset Preprocessing](#4-dataset-preprocessing)
5. [Multimodal Structured Outputs with `prompt`](#5-multimodal-inference-with-structured-outputs)
6. [Post Processing and Analysis](#)
7. [Evaluation]
8. [Conclusion]



## 1. Setup

### Install Dependencies

In [None]:
!pip install -q daft openai numpy

### Configure Parameters

In [None]:
import daft

# PICK ONE
MODEL_ID = "google/gemma-3-4b-it"        # vLLM & OpenRouter
# MODEL_ID = "google/gemma-3-4b-it:free" # OpenRouter free version (Low Rate Limits)
# MODEL_ID = "google/gemma-3-4b"         # LM Studio
DATASET_URI = "HuggingFaceM4/the_cauldron"

# Inference Parameters
ROW_LIMIT = 100
TEMPERATURE = 0.1
CONCURRENCY = 4
BATCH_SIZE = 32

## 2. Choose an Inference Solution

### Option 1: Use OpenRouter (provider)

[OpenRouter](https://openrouter.ai/models?max_price=0.5&order=top-weekly) has model endpoints for [google/gemma-3-4b-it](https://openrouter.ai/google/gemma-3-4b-it). That means if you don't have access to an A100 GPU or a PRO Google Colab subscription, you can still walk through this notebook without spinning up a production vLLM server.

In [None]:
!export OPENROUTER_API_KEY=...

In [None]:
import os
OPENAI_API_KEY = os.environ.get("OPENROUTER_API_KEY")
OPENAI_BASE_URL = "https://openrouter.ai/api/v1/"
MODEL_ID = "google/gemma-3-4b-it"
MODEL_ID = "google/gemma-3-4b"

### Option 2: Use LM Studio (local)

Similarly, if you are running on a Mac, [LMStudio](https://lmstudio.ai/) is a particularly attractive option since the [gemma-3-4b](https://lmstudio.ai/models/google/gemma-3-4b) only takes up 2 GB of storage with both [MLX](https://github.com/ml-explore/mlx) and [GGUF](https://huggingface.co/docs/hub/en/gguf) variants. Daft already supports LM Studio as a provider which means you can take advantage of [Apple Metal Performance Shaders](https://developer.apple.com/documentation/metalperformanceshaders) on your local machine.

In [None]:
OPENAI_API_KEY = "none"
OPENAI_BASE_URL = "http://127.0.0.1:1234/v1"
MODEL_ID = "google/gemma-3-4b"

### Option 2: Launch vLLM OpenAI Compatible Server (Advanced)

#### Install vLLM

After you install vllm you will be prompted to restart the session, then proceed to the next step.

In [None]:
!pip install -q vllm

#### Log in to HF for access google/gemma-3-4b-it

The [google/gemma-3-4b-it repository](https://huggingface.co/google/gemma-3-4B-it) is publicly accessible, but will need to login to HuggingFace and accept Google's conditions to access its files and content. Requests are processed immediately.

In [None]:
!hf auth login

#### Launch vLLM OpenAI Compatible Server

Run the following vllm cli command in your terminal

If you are in Google Colab, you can open a terminal by clicking the terminal icon in the bottom left of the ui.

```bash
 python -m vllm.entrypoints.openai.api_server \
  --model google/gemma-3-4b-it \
  --enable-chunked-prefill \
  --guided-decoding-backend guidance \
  --dtype bfloat16 \
  --gpu-memory-utilization 0.85 \
  --host 0.0.0.0 --port 8000
```

* This config is optimized for Google Colab's A100 instance and gemma-3-4b-it.
* For vLLM online serving, set `api_key = "none"` and `base_url = "http://0.0.0.0:8000/v1"`
* Server readiness may take ~7–8 minutes; ‘guided_choice’ requires guided decoding enabled

In [None]:
OPENAI_API_KEY = "none"
OPENAI_BASE_URL = "http://0.0.0.0:8000/v1"
MODEL_ID = "google/gemma-3-4b-it"

## 3. Sanity Check OpenAI Client Requests

Configuring an inference server on a new model can be a long and painful process. Adding support for Images and Guided Decoding are not standard options, and tuning a particular model to specific hardware takes multiple iterations to get right. Along the way, we need to make sure our inference server is working across all of the types of requests we expect to need to support.  

In [None]:
from openai import OpenAI

client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL)

# Test Client connects to Server
result = client.models.list()
print(result)

In [None]:
# Test Simple Text Completion
chat_completion = client.chat.completions.create(
    messages=[{"role": "user", "content": "How many strawberries are in the word r?"}],
    model=MODEL_ID,
)

result = chat_completion.choices[0].message.content
print(result)

In [None]:
# Test Structured Output
completion = client.chat.completions.create(
    model=MODEL_ID,
    messages=[{"role": "user", "content": "Classify this sentiment: Daft is wicked fast!"}],
    extra_body={"guided_choice": ["positive", "negative"]},
)
print(completion.choices[0].message.content)

In [None]:
# Test Image Understanding
IMAGE_URL = "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/bee.jpg"

completion = client.chat.completions.create(
    model=MODEL_ID,
    messages=[
        {"role": "system", "content": [{"type": "text", "text": "You are a helpful assistant."}]},
        {
            "role": "user",
            "content": [
                {"type": "image_url", "image_url": {"url": IMAGE_URL}},
                {"type": "text", "text": "Describe this image in detail."},
            ],
        },
    ],
)
print(completion.choices[0].message.content)

### Test Combining Image Inputs with Structured Output

We can play with prompting/structured outputs to understand how prompting and structured outputs can affect results.

Try commenting out the `response_model` argument or the third text prompt to see how results change.

vLLM also supports a simpler usage pattern of `extra_body={guided_choice:["A","B","C","D"]}`, but for compatibility with OpenRouter we use the Pydantic Json Schema approach.

In [None]:
from enum import Enum

from pydantic import BaseModel, Field
from typing import Literal

# Define a pydantic model
class ChoiceResponse(BaseModel):
    choice: str = Field(..., description="Provide the letter of the correct choice with no other text.")


# Test Image Understanding
completion = client.chat.completions.create(
    model=MODEL_ID,
    messages=[
        {"role": "system", "content": [{"type": "text", "text": "You are a helpful assistant."}]},
        {
            "role": "user",
            "content": [
                {"type": "image_url", "image_url": {"url": IMAGE_URL}},
                {
                    "type": "text",
                    "text": "Which insect is portrayed in the image: A. Ladybug, B. Beetle, C. Bee, D. Wasp ",
                },
                # {"type": "text", "text": "Answer with only the letter from the multiple choice. "} # Try comment me out
            ],
        },
    ],
    response_format={
        "type": "json_schema",
        "json_schema": {
            "name": "math-response",
            "schema": ChoiceResponse.model_json_schema(),
        },
    },
)
response = completion.choices[0].message.content
print(response)

In [None]:
# Pydantic Valiation
choice_obj = ChoiceResponse.model_validate_json(response)
print(choice_obj)

## 4: Dataset Preprocessing

[HuggingFaceM4/the_cauldron](https://huggingface.co/datasets/HuggingFaceM4/the_cauldron/viewer?views%5B%5D=ai2d) is a massive collection of 50 vision-language dataset spanning millions of rows across:

1. General visual question answering
2. OCR document understanding & text transcription
3. Chart/figure understanding
4. Table understanding
5. Reasoning, logic, maths
6. Textbook/academic questions
7. Differences between 2 images
8. Screenshot to code

For now we will begin with General visual Q&A subset AI2D

In [None]:
import daft

df_raw = daft.read_huggingface("HuggingFaceM4/the_cauldron/ai2d").collect()
df_raw.show(3)

 Taking a look at the schema we can see the familiar messages nested datatype we are used to in chat completions inside the `texts` column


In [None]:
print(df_raw.schema())

Lets decode the image bytes to see a preview of the images and add one more column for the base64 encoding.

Note: You can click on any cell to preview its contents.

In [None]:
from daft import col

df_img = df_raw.explode(col("images")).with_columns(
    {
        "image": col("images").struct.get("bytes").image.decode(),          # For viewing the images
        "image_base64": col("images").struct.get("bytes").encode("base64"), # For openai requests
    }
)
df_img.show(3)

#### Preprocessing the 'texts' column to extract Question, Choices, and Answer Columns

Copy/Pasting an entry from the `texts` column yields an openai messages list of dicts of the form:

```python
[{
    "user": """Question:
            
        From the above food web diagram, what cause kingfisher to increase

        Choices:
            A. decrease in fish
            B. decrease in water boatman
            C. increase in fish
            D. increase in algae

        Answer with the letter.""",

    "assistant": "Answer: C",
    "source": "AI2D",
}, ...]
```

In [None]:
# Explode the List of Dicts inside "texts" to extract "user" and "assistant" messages
df_text = df_img.explode(col("texts")).collect()

# Extract User and Assistant Messages
df_text = df_text.with_columns(
    {"user": df_text["texts"].struct.get("user"), "assistant": df_text["texts"].struct.get("assistant")}
).collect()
df_text.select("texts","image", ).show(3)

We can also go above an beyond to parse each text input into individual question, choices, and answer columns.  

In [None]:
# Parsing "user" and "assistant" messages for question, choices, and answer""
df_prepped = df_text.with_columns(
    {
        "question": col("user")
        .str.extract(r"(?s)Question:\s*(.*?)\s*Choices:")
        .str.replace("Choices:", "")
        .str.replace("Question:", ""),
        "choices_string": col("user")
        .str.extract(r"(?s)Choices:\s*(.*?)\s*Answer?\.?")
        .str.replace("Choices:\n", "")
        .str.replace("Answer", ""),
        "answer": col("assistant").str.extract(r"Answer:\s*(.*)$").str.replace("Answer:", ""),
    }
).collect()

df_prepped.show(3)

## 5. Multimodal Inference with Structured Outputs

Now we will move on to scaling our OpenAI client calls with Daft's new `prompt` function.

In [None]:
daft.set_provider("openai", api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL)

In [None]:
from daft import col
from daft.functions import format, prompt
import time

start = time.time()
df = df_prepped.with_column(
    "result",
    prompt(
        [col("image"), col("user")],
        system_message = "Observe the attached image and respond to the multiple choice question with just the letter corresponding to the correct answer.",
        model = MODEL_ID,
        provider = "openai",
        use_chat_completions=True,
        extra_body={"guided_choice": ["A", "B", "C", "D"]},
    )
).collect()
end = time.time()
print(f"Processed {df.count_rows()} rows in {end-start} seconds")

In [None]:
df.show()

___
# Analysis
Evaluating Gemma-3's performance on image understanding by comparing structured output responses to the answer.

In [None]:
df = df.with_column("is_correct", col("result").lstrip().rstrip() == col("answer").lstrip().rstrip()) # strip whitespace

In [None]:
pass_fail_rate = df.where(col("is_correct")).count_rows() / df.count_rows()
print(f"Pass/Fail Rate: {pass_fail_rate}")

In [None]:
# Lets investigate some of the Failures

df_failures = df.where(col("is_correct") == False).select("user", "image", "answer", "result").show(8)

In [None]:
# How does these results compare without images?
start = time.time()
df_no_image = df_prepped.with_column(
    "result",
    prompt(
        col("user"),
        system_message = "Observe the attached image and respond to the multiple choice question with just the letter corresponding to the correct answer.",
        model = MODEL_ID,
        provider = "openai",
        use_chat_completions=True,
        extra_body={"guided_choice": ["A", "B", "C", "D"]},
    )
).with_column("is_correct", col("result").lstrip().rstrip() == col("answer").lstrip().rstrip()).collect()
end = time.time()

print(f"Processed {df_no_image.count_rows()} rows in {end-start} seconds")


In [None]:
pass_fail_rate_no_image = df_no_image.where(col("is_correct")).count_rows() / df_no_image.count_rows()
print(f"Pass/Fail Rate: \n With Image: {pass_fail_rate} \n Without Image: {pass_fail_rate_no_image} ")

In [None]:
# How do results change at different temperatures?
temps = [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.2]
pf_rates = []
for temp in temps:
    start = time.time()
    df_no_image = df_prepped.with_column(
        "result",
        prompt(
            [col("image"), col("user")],
            system_message = "Observe the attached image and respond to the multiple choice question with just the letter corresponding to the correct answer.",
            model = MODEL_ID,
            provider = "openai",
            use_chat_completions=True,
            extra_body={"guided_choice": ["A", "B", "C", "D"]},
            temperature=temp,
        )
    ).with_column("is_correct", col("result").lstrip().rstrip() == col("answer").lstrip().rstrip()).collect()
    end = time.time()

    total_rows = df.count_rows
    pass_fail_rate = df.where(col("is_correct")).count_rows() / total_rows



print(f"Processed {df_no_image.count_rows()} rows in {end-start} seconds")


---
# Putting everything together: Evaluating Gemma across the AI2D Dataset
Now that we have walked through implementing this image understanding evaluation pipeline from end to end, lets put it all together so we can take full advantage of lazy evaluation and provide opportunities for future extensibility and re-use.

In [None]:
from typing import Any

from openai import AsyncOpenAI

import daft
from daft import col
from daft.functions import format, prompt


class MultimodalStructuredOutputsEval:
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key

    def __call__(
        self,
        model_id: str,
        dataset_uri: str,
        sampling_params: dict[str, Any] | None = None,
        concurrency: int = 4,
        row_limit: int | None = None,
        is_eager: bool = False,
    ) -> daft.DataFrame:
        """Executes dataset loading, preprocessing, inference, and post-processing.

        Evaluation must be run separately since it requires materialization.
        """
        if is_eager:
            # Load Dataset and Materialize
            df = self.load_dataset(dataset_uri)
            df = df.limit(row_limit) if row_limit else df
            df = self._log_processing_time(df)

            # Preprocess
            df = self.preprocess(df)
            df = self._log_processing_time(df)

            # Perform Inference
            df = self.infer(df, model_id, sampling_params)
            df = self._log_processing_time(df)

            # Post-Process
            df = self.postprocess(df)
            df = self._log_processing_time(df)
        else:
            df = self.load_dataset(dataset_uri)
            df = self.preprocess(df)
            df = self.infer(df, model_id, sampling_params)
            df = self.postprocess(df)
            df = df.limit(row_limit) if row_limit else df

        return df

    @staticmethod
    def _log_processing_time(df: daft.DataFrame):
        start = time.time()
        df_materialized = df.collect()
        end = time.time()
        num_rows = df_materialized.count_rows()
        print(f"Processed {num_rows} rows in {end-start} sec, {num_rows/(end-start)} rows/s")
        return df_materialized

    def load_dataset(self, uri: str) -> daft.DataFrame:
        return daft.read_parquet(uri)

    def preprocess(self, df: daft.DataFrame) -> daft.DataFrame:
        # Convert png image byte string to base64
        df = df.explode(col("images")).with_column(
            "image_base64",
            df["images"].struct.get("bytes").encode("base64"),
        )

        # Explode Lists of User Prompts and Assistant Answer Pairs
        df = df.explode(col("texts")).with_columns(
            {"user": df["texts"].struct.get("user"), "assistant": df["texts"].struct.get("assistant")}
        )

        # Parse the Question/Answer Strings
        df = df.with_columns(
            {
                "question": df["user"]
                .str.extract(r"(?s)Question:\s*(.*?)\s*Choices:")
                .str.replace("Choices:", "")
                .str.replace("Question:", ""),
                "choices_string": df["user"]
                .str.extract(r"(?s)Choices:\s*(.*?)\s*Answer?\.?")
                .str.replace("Choices:\n", "")
                .str.replace("Answer", ""),
                "answer": df["assistant"].str.extract(r"Answer:\s*(.*)$").str.replace("Answer:", ""),
            }
        )
        return df

    def infer(
        self,
        df: daft.DataFrame,
        model_id: str = "google/gemma-3n-e4b-it",
        sampling_params: dict[str, Any] = {"temperature": 0.0},
        concurrency: int = 4,
        extra_body: dict[str, Any] = {"guided_choice": ["A", "B", "C", "D"]},
    ) -> daft.DataFrame:
        return df.with_column(
            "result",
            StructuredOutputsProdUDF.with_init_args(
                base_url=self.base_url,
                api_key=self.api_key,
            ).with_concurrency(concurrency)(
                model_id=model_id,
                text_col=format("{} \n {}", col("question"), col("choices_string")),  # Prompt Template
                image_col=col("image_base64"),
                sampling_params=sampling_params,
                extra_body=extra_body,
            ),
        )

    def postprocess(self, df: daft.DataFrame) -> daft.DataFrame:
        df = df.with_column(
            "is_correct", col("result").str.lstrip().str.rstrip() == col("answer").str.lstrip().str.rstrip()
        )
        return df

    def evaluate(self, df: daft.DataFrame) -> float:
        pass_fail_rate = df.where(col("is_correct")).count_rows() / df.count_rows()
        return pass_fail_rate

In [None]:
# Our entire pipeline collapses into a three lines
dataset_uri = "hf://datasets/HuggingFaceM4/the_cauldron/ai2d/train-00000-of-00001-2ce340398c113b79.parquet"
pipeline = TheCauldronImageUnderstandingEvaluationPipeline(
    api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL, row_limit=ROW_LIMIT
)
df = pipeline(model_id=MODEL_ID, sampling_params={"temperature": 0.1}, is_eager=True)

In [None]:
# Materialize if not eager
df_mat = df.collect()

In [None]:
# Print the Pass/Fail Rate
print(f"Pass/Fail Rate: {pipeline.evaluate(df_mat)}")

---
## Conclusion

In this notebook we explored how to evaluate Gemma-3's image understanding using a subset from HuggingFace's TheCauldron Dataset. The AI2D subset we used is just one of a massive collection of 50 vision-language datasets that can be used for evaluating or training vision language models totaling millions of rows. You can also leverage this pipeline to evaluate model performance across sampling parameters or model variants. Please note that not all Gemma-3 series models support image inputs, and leveraging datasets outside of the TheCauldron would require different preprocessing stages.

A natural next step would be to parallelize this pipeline across multiple datasets leveraging multiple gpus. In this scenario, I recommend transitioning daft's execution context to leverage Ray, a distributed compute framework.

```bash
pip install "daft[huggingface,ray]"
```

You can set daft's execution context to ray adding the `ray` optional dependency during installation and running the following at the top of your script.

```python
import daft

daft.set_runner_ray()
```

Simply run your pipeline across each dataset uri and collect the results, Daft will orchestrate ray in the background for you.