## Training Data Generation (Augmented Text & Image Data)

In [9]:
# Importing useful dependencies
import random
import re
import io
from io import BytesIO
from typing import List
import boto3
import nltk
import numpy as np
import open_clip
import requests
import torch
from PIL import Image
from torchvision import transforms
import torchvision.transforms.functional as TF

# Download the corpus
nltk.download('wordnet')
from nltk.corpus import wordnet

# Set a seed for reproducibility
SEED = 10721
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\SakuraSnow\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [2]:
# Setup S3 client for MinIO (MinIO implements Amazon S3 API)
s3 = boto3.client(
    "s3",
    endpoint_url="http://127.0.0.1:9000", # MinIO API endpoint
    aws_access_key_id="minioadmin", # User name
    aws_secret_access_key="minioadmin", # Password
)

In [4]:
# We create a new Bucket in Min-IO to store our augmented training data

# List existing buckets
buckets = [b["Name"] for b in s3.list_buckets()["Buckets"]]

# Function that given a name, creates a bucket
def createBucket(name, list_buckets):
    if name in list_buckets:
        print(f"Bucket '{name}' already exists!")
    else:
        s3.create_bucket(Bucket=name)
        print(f"Created bucket: {name}")

# Create a bucket named landing_zone
createBucket("training-data-construction-zone", buckets)
# Sub-bucket: Baseline Training Data
s3.put_object(Bucket="training-data-construction-zone", Key="text_image_augmented-training-data/")

Bucket 'training-data-construction-zone' already exists!


{'ResponseMetadata': {'RequestId': '187A6635E1FD8D25',
  'HostId': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'accept-ranges': 'bytes',
   'content-length': '0',
   'etag': '"d41d8cd98f00b204e9800998ecf8427e"',
   'server': 'MinIO',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'vary': 'Origin, Accept-Encoding',
   'x-amz-checksum-crc32': 'AAAAAA==',
   'x-amz-checksum-type': 'FULL_OBJECT',
   'x-amz-id-2': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
   'x-amz-request-id': '187A6635E1FD8D25',
   'x-content-type-options': 'nosniff',
   'x-ratelimit-limit': '2101',
   'x-ratelimit-remaining': '2101',
   'x-xss-protection': '1; mode=block',
   'date': 'Sat, 22 Nov 2025 18:03:05 GMT'},
  'RetryAttempts': 0},
 'ETag': '"d41d8cd98f00b204e9800998ecf8427e"',
 'ChecksumCRC32': 'AAAAAA==',
 'ChecksumType': 'FULL_OBJECT'}

In this notebook, we create an augmented dataset by applying augmentation techniques to both the text and image data. In the following cell we implement the functions we used before.

In [11]:
# A simple tokenizer to split the text into a list of tokens (words in this case)
def simple_tokenize(text: str) -> List[str]:
    return re.findall(r"\w+|[^\w\s]", text, re.UNICODE)

# Delete each token with probability p.
def random_deletion(tokens: List[str], p: float = 0.1) -> List[str]:
    if len(tokens) == 1:
        return tokens

    kept = [t for t in tokens if random.random() > p]
    if not kept:
        kept.append(random.choice(tokens))
    return kept
    
# Randomly swap a small portion of tokens.
def random_swap(tokens: List[str], ratio: float = 0.05) -> list[str]:
    n = len(tokens)
    if n < 2:
        return tokens

    n_swaps = max(1, int(ratio * n))

    for _ in range(n_swaps):
        i, j = random.sample(range(n), 2)
        tokens[i], tokens[j] = tokens[j], tokens[i]
    return tokens

# Introduce a simple spelling error in a single word
def corrupt_word(word: str) -> str:
    if len(word) == 0:
        return word

    ALPHABET = "abcdefghijklmnopqrstuvwxyz"

    op = random.choice(["delete", "substitute", "duplicate"])

    if op == "delete" and len(word) > 1:
        pos = random.randrange(len(word))
        return word[:pos] + word[pos+1:]

    if op == "substitute":
        pos = random.randrange(len(word))
        new_char = random.choice(ALPHABET)
        return word[:pos] + new_char + word[pos+1:]

    if op == "duplicate":
        pos = random.randrange(len(word))
        return word[:pos] + word[pos] + word[pos:]

    return word

# For each alphabetical token, apply a spelling error with probability p.
def random_spelling_error(tokens: List[str], p: float = 0.1) -> List[str]:
    new_tokens = []
    for t in tokens:
        if t.isalpha() and random.random() < p:
            new_tokens.append(corrupt_word(t))
        else:
            new_tokens.append(t)
    return new_tokens

# Collect synonyms from WordNet
def get_synonyms(word: str) -> List[str]:
    synonyms = set()
    for syn in wordnet.synsets(word):
        for lemma in syn.lemmas():
            lemma_name = lemma.name().replace("_", " ")
            if lemma_name.lower() != word.lower():
                synonyms.add(lemma_name)
    return list(synonyms)

# Randomly choose a small portion of tokens and replace them with synonyms.
def random_synonym_replacement(tokens: List[str], ratio: float = 0.05) -> List[str]:
    n = len(tokens)
    n_replacements = max(1, int(ratio * n))
    candidate_indices = [
        i for i, t in enumerate(tokens)
        if t.isalpha() and len(t) > 2 # skip punctuation & very short tokens
    ]
    if not candidate_indices:
        return tokens

    indices = random.sample(candidate_indices, n_replacements)

    for idx in indices:
        word = tokens[idx]
        syns = get_synonyms(word)
        if syns:
            tokens[idx] = random.choice(syns)
    return tokens
###########################################################################

# This function applies an image augmentation technique to the given image
def augment_image(image: Image.Image, transformer, suffix: str = "crop"):

    if (suffix not in ["crop", "flip-x", "flip-y", "blur"]):
        return image
    
    # Apply the augmentation to the input image
    if suffix == "crop":
        
        # For random crops we sample a region per image, so we capture the crop parameters to know exactly what was applied.
        i, j, h, w = transformer.get_params(image, output_size=transformer.size) # sample crop parameters
        # Apply crop manually
        aug_image = TF.crop(image, i, j, h, w)
        # Resize back to 512x512
        resize_to_512 = transforms.Resize((512, 512))  # resize image to 512x512 pixels # resize helper used after cropping
        aug_image = resize_to_512(aug_image)
        
    else: # "flip-x", "flip-y", "blur"

        # Apply augmentation technique
        aug_image = transformer(image)
    
    return aug_image

In the following cells, we will construct a dataset that includes both augmented images and augmented text. Since we applied four augmentation techniques to each data type, this notebook will organize and store every generated sample according to the specific technique used. For illustration, an image produced with a horizontal flip will be paired with a text augmented using random word deletion, while an image generated with a vertical flip will be paired with a text created through random word swapping, and so on.

In [6]:
# We can use this function to retrieve an text from our bucket
def get_text(bucket, key):
    resp = s3.get_object(Bucket=bucket, Key=key)
    body = resp["Body"].read()
    text = body.decode("utf-8")
    return text

# We can use this function to retrieve an image from our bucket in PIL Image format
def get_image(bucket, key):
    resp = s3.get_object(Bucket=bucket, Key=key)
    body = resp["Body"].read()
    img = Image.open(BytesIO(body))
    return img

In [14]:
# This function generates augmented data for both texs and images in the baseline training data
def augmented_training_data(src_bucket, dest_bucket, dest_prefix="text_image_augmented-training-data/"):

    # Define image transformers
    hflip_transform = transforms.RandomHorizontalFlip(p=1.0) # always flip image horizontally (left-right)
    vflip_transform = transforms.RandomVerticalFlip(p=1.0) # always flip image vertically (top-bottom)
    crop_transform = transforms.RandomCrop(400) # randomly crop a 400x400 patch from the original image
    blur_transform = transforms.GaussianBlur(kernel_size=11, sigma=(1.0, 3.0)) # random sigma in this range of sigma
    
    paginator = s3.get_paginator("list_objects_v2") # It returns objects in pages and not all at once.
    for page in paginator.paginate(Bucket=src_bucket, Prefix="baseline-training-data/"):

        for obj in page.get("Contents", []):
            key = obj["Key"]

            if obj['Size'] == 0 and key.endswith("/"): # skip the folder itself
                continue

            # Add new prefixes
            key_1 = dest_prefix + "aug1" + "_" + key.split("/")[1]
            key_2 = dest_prefix + "aug2" + "_" + key.split("/")[1]
            key_3 = dest_prefix + "aug3" + "_" + key.split("/")[1]
            key_4 = dest_prefix + "aug4" + "_" + key.split("/")[1]

            # New key for original text and image file
            new_key = dest_prefix + key.split("/")[1]

            if "text" in key:

                # Get the description
                description = get_text(src_bucket, key)

                # Get augmented descriptions
                str1 = " ".join(random_deletion(simple_tokenize(description)))
                str2 = " ".join(random_swap(simple_tokenize(description)))
                str3 = " ".join(random_spelling_error(simple_tokenize(description)))
                str4 = " ".join(random_synonym_replacement(simple_tokenize(description)))

                # Copy objects without top-level folder and rename them
                copy_source_text = {"Bucket": src_bucket, "Key": key}
                s3.copy_object(Bucket=dest_bucket, Key=new_key, CopySource=copy_source_text)
                s3.put_object(Bucket=dest_bucket, Key=key_1, Body=io.BytesIO(str1.encode("utf-8")),ContentType="text/plain")
                s3.put_object(Bucket=dest_bucket, Key=key_2, Body=io.BytesIO(str2.encode("utf-8")),ContentType="text/plain")
                s3.put_object(Bucket=dest_bucket, Key=key_3, Body=io.BytesIO(str3.encode("utf-8")),ContentType="text/plain")
                s3.put_object(Bucket=dest_bucket, Key=key_4, Body=io.BytesIO(str4.encode("utf-8")),ContentType="text/plain")
                
            elif "image" in key:

                # Get the image
                img = get_image(src_bucket, key)

                # Get image buffer
                def get_buffer(image):
                    buffer = io.BytesIO()
                    image.save(buffer, format="PNG")
                    buffer.seek(0)
                    return buffer

                # Get augmented images
                aug_img1 = get_buffer(augment_image(img, hflip_transform, suffix = "flip-x"))
                aug_img2 = get_buffer(augment_image(img, vflip_transform, suffix = "flip-y"))
                aug_img3 = get_buffer(augment_image(img, crop_transform, suffix = "crop"))
                aug_img4 = get_buffer(augment_image(img, blur_transform, suffix = "blur"))

                # Copy objects without top-level folder and rename them
                copy_source_image = {"Bucket": src_bucket, "Key": key}
                s3.copy_object(Bucket=dest_bucket, Key=new_key, CopySource=copy_source_image)
                s3.upload_fileobj(Fileobj=aug_img1, Bucket=dest_bucket, Key=key_1, ExtraArgs={"ContentType": "image/png"})
                s3.upload_fileobj(Fileobj=aug_img2, Bucket=dest_bucket, Key=key_2, ExtraArgs={"ContentType": "image/png"})
                s3.upload_fileobj(Fileobj=aug_img3, Bucket=dest_bucket, Key=key_3, ExtraArgs={"ContentType": "image/png"})
                s3.upload_fileobj(Fileobj=aug_img4, Bucket=dest_bucket, Key=key_4, ExtraArgs={"ContentType": "image/png"})

            print(f"✅ Augmented data for #{key.split('/')[1]} created successfully.")

    print(f"✅ All augmented image data have been successfully uploaded.")

In [13]:
# Create augmented training dataset
augmented_training_data(src_bucket = "training-data-construction-zone", dest_bucket = "training-data-construction-zone")

✅ Augmented data for #image_000001.png created successfully.
✅ Augmented data for #image_000002.png created successfully.
✅ Augmented data for #image_000003.png created successfully.
✅ Augmented data for #image_000004.png created successfully.
✅ Augmented data for #image_000005.png created successfully.
✅ Augmented data for #image_000006.png created successfully.
✅ Augmented data for #image_000007.png created successfully.
✅ Augmented data for #image_000008.png created successfully.
✅ Augmented data for #image_000009.png created successfully.
✅ Augmented data for #image_000010.png created successfully.
✅ Augmented data for #image_000011.png created successfully.
✅ Augmented data for #image_000012.png created successfully.
✅ Augmented data for #image_000013.png created successfully.
✅ Augmented data for #image_000014.png created successfully.
✅ Augmented data for #image_000015.png created successfully.
✅ Augmented data for #image_000016.png created successfully.
✅ Augmented data for #im