In [None]:
%pip install spaces

In [19]:
import gradio as gr
import spaces
from transformers import Idefics3ForConditionalGeneration, AutoProcessor
import torch
from PIL import Image
from datetime import datetime
import numpy as np
import os

In [None]:
!huggingface-cli login

In [None]:
import spaces
from datasets import load_dataset

# Login using e.g. `huggingface-cli login` to access this dataset
indian_monuments_ds = load_dataset("AIMLOps-C4-G16/indian_monuments")
indian_festivals_ds = load_dataset("AIMLOps-C4-G16/IndianFestivals")
indian_sports_ds = load_dataset("AIMLOps-C4-G16/IndianSports")

In [8]:
!wget -q https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_Dataset.zip
!wget -q https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_text.zip
!unzip -qq Flickr8k_Dataset.zip
!unzip -qq Flickr8k_text.zip
!rm Flickr8k_Dataset.zip Flickr8k_text.zip

In [7]:
# Path to the images
import tensorflow as tf
IMAGES_PATH = "Flicker8k_Dataset"

# Desired image dimensions
IMAGE_SIZE = (299, 299)

# Vocabulary size
VOCAB_SIZE = 10000

# Fixed length allowed for any sequence
SEQ_LENGTH = 25

# Dimension for the image embeddings and token embeddings
EMBED_DIM = 512

# Per-layer units in the feed-forward network
FF_DIM = 512

# Other training parameters
BATCH_SIZE = 64
EPOCHS = 3
AUTOTUNE = tf.data.AUTOTUNE

In [9]:
import os
import numpy as np
def load_captions_data(filename):
    """Loads captions (text) data and maps them to corresponding images.

    Args:
        filename: Path to the text file containing caption data.

    Returns:
        caption_mapping: Dictionary mapping image names and the corresponding captions
        text_data: List containing all the available captions
    """

    with open(filename) as caption_file:
        caption_data = caption_file.readlines()
        caption_mapping = {}
        text_data = []
        images_to_skip = set()

        for line in caption_data:
            line = line.rstrip("\n")
            # Image name and captions are separated using a tab
            img_name, caption = line.split("\t")

            # Each image is repeated five times for the five different captions.
            # Each image name has a suffix `#(caption_number)`
            img_name = img_name.split("#")[0]
            img_name = os.path.join(IMAGES_PATH, img_name.strip())

            # We will remove caption that are either too short to too long
            tokens = caption.strip().split()

            if len(tokens) < 5 or len(tokens) > SEQ_LENGTH:
                images_to_skip.add(img_name)
                continue

            if img_name.endswith("jpg") and img_name not in images_to_skip:
                # We will add a start and an end token to each caption
                caption = "<start> " + caption.strip() + " <end>"
                text_data.append(caption)

                if img_name in caption_mapping:
                    caption_mapping[img_name].append(caption)
                else:
                    caption_mapping[img_name] = [caption]

        for img_name in images_to_skip:
            if img_name in caption_mapping:
                del caption_mapping[img_name]

        return caption_mapping, text_data


def train_val_split(caption_data, train_size=0.8, shuffle=True):
    """Split the captioning dataset into train and validation sets.

    Args:
        caption_data (dict): Dictionary containing the mapped caption data
        train_size (float): Fraction of all the full dataset to use as training data
        shuffle (bool): Whether to shuffle the dataset before splitting

    Returns:
        Traning and validation datasets as two separated dicts
    """

    # 1. Get the list of all image names
    all_images = list(caption_data.keys())

    # 2. Shuffle if necessary
    if shuffle:
        np.random.shuffle(all_images)

    # 3. Split into training and validation sets
    train_size = int(len(caption_data) * train_size)

    training_data = {
        img_name: caption_data[img_name] for img_name in all_images[:train_size]
    }
    print(training_data)
    validation_data = {
        img_name: caption_data[img_name] for img_name in all_images[train_size:]
    }

    # 4. Return the splits
    return training_data, validation_data

In [10]:
# Vocabulary size
VOCAB_SIZE = 10000

In [11]:
# Load the dataset
# For the training/validation set, only first caption is used
captions_mapping, text_data = load_captions_data("Flickr8k.token.txt")

# Split the dataset into training and validation sets
train_data, valid_data = train_val_split(captions_mapping)
print("Number of training samples: ", len(train_data))
print("Number of validation samples: ", len(valid_data))

Number of training samples:  6114
Number of validation samples:  1529


In [12]:
import tensorflow as tf
import re
from tensorflow import keras
from keras.layers import TextVectorization
strip_chars = "!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~"
strip_chars = strip_chars.replace("<", "")
strip_chars = strip_chars.replace(">", "")

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

vectorization = TextVectorization(
    max_tokens=VOCAB_SIZE,
    output_mode="int",
    output_sequence_length=SEQ_LENGTH,
    standardize=custom_standardization,
)
vectorization.adapt(text_data)

def decode_and_resize(img_path):
    img = tf.io.read_file(img_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, IMAGE_SIZE)
    img = tf.image.convert_image_dtype(img, tf.float32)
    return img

def process_input(img_path, captions):
    return decode_and_resize(img_path), vectorization(captions)

def make_dataset(images, captions):
    dataset = tf.data.Dataset.from_tensor_slices((images, captions))
    dataset = dataset.shuffle(BATCH_SIZE * 8)
    dataset = dataset.map(process_input, num_parallel_calls=AUTOTUNE)
    dataset = dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE)
    return dataset

In [13]:
list_of_images = list(train_data.keys())
list_of_captions = list(train_data.values())
list_of_v_images = list(valid_data.keys())
list_of_v_captions = list(valid_data.values())

train_dataset = make_dataset(list_of_images,list_of_captions)
valid_dataset = make_dataset(list_of_v_images,list_of_v_captions)

In [14]:
import PIL
target_size = (224, 224)
def crop_and_resize(image, target_size):
    width, height = image.size
    source_size = min(image.size)
    left = width // 2 - source_size // 2
    top = height // 2 - source_size // 2
    right, bottom = left + source_size, top + source_size
    return image.resize(target_size, box=(left, top, right, bottom))

def read_image(im, target_size):
    image = PIL.Image.open(im)
    image = crop_and_resize(image, target_size)
    image = np.array(image)
    # Remove alpha channel if necessary.
    if image.shape[2] == 4:
        image = image[:, :, :3]
    return image

In [15]:
DESCRIPTION = """
# SmolVLM-trl-dpo-rlaif-v Demo
This is a demo Space for a fine-tuned version of [SmolVLM](https://huggingface.co/HuggingFaceTB/SmolVLM-Instruct) trained using [rlaif-v dataset](https://huggingface.co/datasets/HuggingFaceH4/rlaif-v_formatted).
The corresponding model is located [here](https://huggingface.co/HuggingFaceTB/SmolVLM-Instruct-DPO).
For a full tutorial of fine-tuning using DPO, check out [this link](https://huggingface.co/learn/cookbook/index).
"""

model_id = "HuggingFaceTB/SmolVLM-Instruct"
model = Idefics3ForConditionalGeneration.from_pretrained(
    model_id,
    device_map="auto",
    torch_dtype=torch.bfloat16,
    #_attn_implementation="flash_attention_2",
)

processor = AutoProcessor.from_pretrained(model_id)
adapter_path = "HuggingFaceTB/SmolVLM-Instruct-DPO"
model.load_adapter(adapter_path)

def array_to_image_path(image_array):
    if image_array is None:
        raise ValueError("No image provided. Please upload an image before submitting.")
    # Convert numpy array to PIL Image
    img = Image.fromarray(np.uint8(image_array))

    # Generate a unique filename using timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"image_{timestamp}.png"

    # Save the image
    img.save(filename)

    # Get the full path of the saved image
    full_path = os.path.abspath(filename)

    return full_path

config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/4.49G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/136 [00:00<?, ?B/s]

processor_config.json:   0%|          | 0.00/68.0 [00:00<?, ?B/s]

chat_template.json:   0%|          | 0.00/429 [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/486 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

added_tokens.json:   0%|          | 0.00/92.0 [00:00<?, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

adapter_config.json:   0%|          | 0.00/783 [00:00<?, ?B/s]

adapter_model.safetensors:   0%|          | 0.00/108M [00:00<?, ?B/s]

In [20]:
@spaces.GPU
def run_rlhf(input_img, output_text, feedback_text_input="y", alternate_caption=""):
  with open('rlhf.txt', 'w') as f:
    f.write(input_img + "\t" + output_text + "\t" + feedback_text_input + "\n" + alternate_caption)
  return

In [21]:
@spaces.GPU
def run_example(image, text_input=None):
    if image is None:
        print("No image provided.Selecting random")
        image = np.random.choice(list_of_v_images)
    output_img = image
    image_path = array_to_image_path(image)
    image = Image.fromarray(image).convert("RGB")

    messages = [
    {
            "role": "user",
            "content": [
                {
                    "type": "image",
                    "text": None,
                },
                {
                    "text": text_input,
                    "type": "text"
                },
            ],
        }
    ]

    # Preparation for inference
    text = processor.apply_chat_template(
        messages, tokenize=False, add_generation_prompt=True
    )
    image_inputs = []
    if image.mode != 'RGB':
        image = image.convert('RGB')
    image_inputs.append([image])

    inputs = processor(
        text=text,
        images=image_inputs,
        padding=True,
        return_tensors="pt",
    )
    inputs = inputs.to("cuda")

    # Inference: Generation of the output
    generated_ids = model.generate(**inputs, max_new_tokens=1024)
    generated_ids_trimmed = [
        out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
    ]
    output_text = processor.batch_decode(
        generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
    )
    return output_img, output_text[0]

In [None]:
with open('smolvlm_rlaif_output.txt', 'w') as f:
  for imcap in zip(list_of_v_images,list_of_v_captions):
    #print(imcap[0], imcap[1])
    image = read_image(imcap[0], target_size)
    output_img, output_text = run_example(image)
    print(output_text)
    # Use the first caption from the list
    f.write(imcap[0] + "\t" + imcap[1][0] + "\t" + output_text + "\n")

In [None]:
from google.colab import files
files.download('smolvlm_rlaif_output.txt')

In [None]:
css = """
  #output {
    height: 500px;
    overflow: auto;
    border: 1px solid #ccc;
  }
"""

with gr.Blocks(css=css) as demo:
    gr.Markdown(DESCRIPTION)
    with gr.Tab(label="SmolVLM-Instruct-DPO Input"):
        with gr.Row():
            with gr.Column():
                input_img = gr.Image(label="Input Picture")
                text_input = gr.Textbox(label="Question")
                submit_btn = gr.Button(value="Submit")
            with gr.Column():
                output_text = gr.Textbox(label="Output Text")
                output_img = gr.Image(label="Output Picture")
                feedback_text_input = gr.Textbox(label="Correct? y/n")
                alternate_caption = gr.Textbox(label="Alternate Caption")
                feedback_btn = gr.Button(value="Submit Feedback")
        submit_btn.click(run_example, [input_img, text_input], [output_img, output_text])
        submit_btn.click(run_rlhf, [input_img, output_text, feedback_text_input, alternate_caption])

demo.launch(debug=True)