# Demo: Daft + Huggingface

In [2]:
import daft

In [2]:
import os

os.environ["HUGGINGFACE_API_TOKEN"] = "*** YOUR TOKEN HERE ***"

## Attach our catalog to the current session

In [None]:
from daft.io import IOConfig, HTTPConfig

io_config = IOConfig(http=HTTPConfig(bearer_token=os.getenv("HUGGINGFACE_API_TOKEN")))

In [None]:
df = daft.read_parquet("hf://datasets/laion/laion400m/**/*.parquet", io_config=io_config)

In [None]:
df.show()

In [None]:
df = df.limit(25)
df = df.with_column(
    "image",
    df["url"].url.download(on_error="null").image.decode(on_error="null").image.to_mode("RGB"),
)

df.show()

## Let's run an LLM

In [None]:
SYSTEM_MESSAGE = """
You are an AI assistant which powers an API for scoring the quality of image captions. Your capabilities
includes determining the general overall quality of a caption,
generating a cleaned-up version of the caption as well as being able to provide a reason for these scores.

Caption quality is a scored float between 0 and 1, determined by:
1. How well does a caption describe the overall scene of the image?
2. Are semantically important focal points in the image captured by the caption?
3. Does the caption make grammatical and logical sense?
4. Is the caption not just describing what is in the image, but capturing important context around the significance of these items?
"""

In [3]:
from PIL import Image
import base64
import io
import asyncio
from openai import AsyncOpenAI
import instructor

max_concurrent_requests = 16

import pydantic

class CaptionScore(pydantic.BaseModel):
    caption_quality: float
    clean_caption: str
    reason: str

def resize_image(image, max_dimension = 1024) -> Image.Image:
    width, height = image.size
    if width > max_dimension or height > max_dimension:
        if width > height:
            new_width = max_dimension
            new_height = int(height * (max_dimension / width))
        else:
            new_height = max_dimension
            new_width = int(width * (max_dimension / height))
        image = image.resize((new_width, new_height))
    return image

def convert_to_png(image):
    with io.BytesIO() as output:
        image.save(output, format="jpeg")
        return output.getvalue()

def process_image(image) -> str:
    resized_image = resize_image(image)
    png_image = convert_to_png(resized_image)
    return base64.b64encode(png_image).decode('utf-8')

@daft.udf(return_dtype={
    "caption_similarity": float,
    "caption_quality": float,
    "clean_caption": str,
    "reason": str,
})
def score_caption(captions, images):
    openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    client = instructor.from_openai(openai_client)
    async def score_single_caption(caption, image_arr):
        image = Image.fromarray(image_arr)
        base64_encoded_image = process_image(image)
        result = await client.chat.completions.create(
            model="gpt-4.1-mini",
            response_model=CaptionScore,
            messages=[
                {"role": "system", "content": SYSTEM_MESSAGE},
                {"role": "user", "content": [
                    {"type": "text", "text": caption},
                    {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_encoded_image}"}},
                ]},
            ],
        )
        return result.model_dump()

    async def analyze_with_semaphore(semaphore, *args):
        async with semaphore:
            return await score_single_caption(*args)

    async def run_tasks():
        semaphore = asyncio.Semaphore(max_concurrent_requests)
        tasks = [
            analyze_with_semaphore(semaphore, caption, image)
            for caption, image in zip(captions, images)
        ]
        return await asyncio.gather(*tasks)

    results = asyncio.run(run_tasks())
    return results

In [None]:
df = df.where("image IS NOT NULL")
df = df.with_column("results", score_caption(df["caption"], df["image"]))
df = df.with_columns({
    c: df["results"][c]
    for c in ["caption_similarity", "caption_quality", "clean_caption", "reason"]
})
df = df.select(
    "key",
    "url",
    "image",
    "caption",
    "clean_caption",
    "similarity",
    "caption_quality",
    "reason",
)

In [None]:
df.show()

In [None]:
df.exclude("image").write_parquet("laion-400M-sample/cleaned-captions")

## Load into Pytorch for training

Now that your data is "cleaned" and saved to your datalake, you can use Daft to stream it into your model during training

In [None]:
cleaned = daft.read_parquet("laion-400M-sample/cleaned-captions")
cleaned = cleaned.with_column(
    "image",
    df["url"].url.download().image.decode(),
)

In [None]:
cleaned.sort("caption_quality", desc=False).show()

In [None]:
ds = cleaned.where("caption_quality > 0.5").to_torch_iter_dataset()

In [None]:
it = iter(ds)
next(it)