In [1]:
from pathlib import Path
from typing import List, Dict, Tuple
from PIL import Image, ImageEnhance, ImageOps, ImageDraw, ImageFilter
from pydantic import BaseModel
from enum import Enum
import pandas as pd
import random

In [2]:
from enum import Enum
from pydantic import BaseModel
from typing import Dict, List

class AugmType(Enum):
    H_FLIP = "h_flip"
    V_FLIP = "v_flip"
    CROP = "crop"
    PAD = "padding"
    BRIGHT = "brightness"
    CONTRAST = "contrast"
    SAT = "saturation"
    ROT = "rotate"
    FLASH = "flash"
    BW = "bw"


class Label(Enum):
    DOG = "dog"
    BIKE = "bike"
    BALL = "ball"
    WATER = "water"    



class ImageDir(BaseModel):
    img_stem: str
    label: Label
    caption: str
    imgs_paths: Dict[str, List[AugmType]]
    

In [3]:
flicker_dir = Path("../data/flicker")
flicker_dir_2 = Path("../data/flickr_long_subset")

data_dir = flicker_dir

flicker_imgs_dir = data_dir / "images"
caption_csv_path = data_dir / "captions.csv"


augmented_dir = Path("../data/augmented")
augmented_dir_no = Path("../data/augmented_no")
augmented_dir_2 = Path("../data/augmented_2")
augmented_dir_3 = Path("../data/augmented_3")

ag = augmented_dir_3

flicker_dir.mkdir(parents=True, exist_ok=True)
ag.mkdir(parents=True, exist_ok=True)


In [9]:
df = pd.read_csv(caption_csv_path)
df2 = pd.read_csv(flicker_dir / "captions.csv")
df.columns

Index(['image_path', 'label', 'caption'], dtype='object')

In [7]:
def load_image(img_path: Path) -> Image.Image:
    return Image.open(img_path).convert('RGB')

def random_horizontal_flip(img: Image.Image, flip: float) -> Tuple[float, Image.Image]:
    random_flip = random.random()
    if random_flip < flip:
        return random_flip, ImageOps.mirror(img)
    return random_flip, img

def random_vertical_flip(img: Image.Image, flip: float) -> Tuple[float, Image.Image]:
    random_flip = random.random()
    if random_flip < flip:
        return random_flip, ImageOps.flip(img)  
    return random_flip, img

def random_crop(img: Image.Image, scale: float) -> Tuple[Tuple[int], Image.Image]:
    w, h = img.size
    crop_w = int(w * scale)
    crop_h = int(h * scale)

    x = random.randint(0, w - crop_w)
    y = random.randint(0, h - crop_h)

    cropped = img.crop((x, y, x+crop_w, y+crop_h))
    return ((x, y), cropped.resize((w,h), Image.BILINEAR))

def random_padding(img: Image.Image, padding_range: int) -> Tuple[int, Image.Image]:
    pad = random.randint(0, padding_range)
    w, h = img.size
    padded = ImageOps.expand(img, border=pad, fill=(0,0,0))
    return (pad, padded.resize((w,h), Image.BILINEAR))

def random_brightness(img: Image.Image, min_range: int, max_range: int) -> Tuple[float, Image.Image]:
    amount = random.uniform(min_range, max_range)
    factor = 1 + (amount / 100.0)
    enhancer = ImageEnhance.Brightness(img)
    return (factor, enhancer.enhance(factor))


def random_contrast(img: Image.Image, min_range: int, max_range: int) -> Tuple[float, Image.Image]:
    amount = random.uniform(min_range, max_range)
    factor = 1 + (amount / 100.0)
    enhancer = ImageEnhance.Contrast(img)
    return (factor, enhancer.enhance(factor))


def random_saturation(img: Image.Image, min_range: int, max_range: int) -> Tuple[float, Image.Image]:
    amount = random.uniform(min_range, max_range)
    factor = 1 + (amount / 100.0)
    enhancer = ImageEnhance.Color(img)
    return (factor, enhancer.enhance(factor))


def random_rotate(img: Image.Image, min_angle: int, max_angle: int) -> Tuple[float, Image.Image]:
    angle = random.uniform(min_angle, max_angle)
    return (angle, img.rotate(angle, resample=Image.BILINEAR, expand=True).resize(img.size))


def random_flash(img: Image.Image, max_radius: float, max_intensity: float) -> Image.Image:
    w, h= img.size

    flash_mask = Image.new("L", (w, h), 0)
    draw = ImageDraw.Draw(flash_mask)

    cx = random.randint(0, w)
    cy = random.randint(0, h)

    radius = int(min(w,h) * random.uniform(0.1, max_radius))

    intensity = int(255 * random.uniform(0.3, max_intensity))

    draw.ellipse((cx - radius, cy - radius, cx + radius, cy + radius), fill=intensity)

    flash_mask = flash_mask.filter(ImageFilter.GaussianBlur(radius / 2))
    white_layer = Image.new("RGB", (w, h), (255, 255, 255))

    return Image.composite(white_layer, img, flash_mask)

def random_black_and_white(img: Image.Image) -> Tuple[float, Image.Image]:    
    bw = ImageOps.grayscale(img).convert("RGB")
    return bw
    

def sequence_aug_spatial(img: Image.Image, prob: float) -> Image.Image:
    list_aug : List[str] = []
    if random.random() < prob:
        i, img = random_horizontal_flip(img, flip=0.5)
        list_aug.append("h_flip")
    if random.random() < prob:
        i, img = random_vertical_flip(img, flip=0.5)
        list_aug.append("v_flip")
    if random.random() < prob:
        i, img = random_crop(img, scale=0.9)
        list_aug.append("crop")
    # if random.random() < prob:
    #     img = random_padding(img, padding_range=20)

    if random.random() < prob:
        i, img = random_rotate(img, -10, 10)
        list_aug.append("rotate")

    return list_aug, img

def sequence_aug_colors(img: Image.Image, prob: float) -> Image.Image:
    list_aug : List[str] = []
    if random.random() < 0.3:
        img = random_black_and_white(img)
        list_aug.append("bw")
        return list_aug, img
    if random.random() < prob:
        img = random_flash(img, 0.4, 1.7)
        list_aug.append(("flash"))
    if random.random() < prob:
        i, img = random_brightness(img, -20, 40)
        list_aug.append("brightness")
    if random.random() < prob:
        i, img = random_saturation(img, -20, 40)
        list_aug.append("saturation")
    if random.random() < prob:
        i, img = random_contrast(img, 0, 40)
        list_aug.append("contrast")

    return list_aug, img







In [8]:
def to_augm_enum_list(list_str: List[str]) -> List[AugmType]:
    return [AugmType(s) for 
    s in list_str]


In [None]:
for index, row in df.iterrows(): 
    img_path = Path(row["image_path"])
    label = Label(row["label"])
    caption = row["caption"]
    path = data_dir / img_path
    # path = flicker_dir_2 / img_path
    
    type_dir = ag / label.value
    type_dir.mkdir(parents=True, exist_ok=True)

    img_dir = type_dir / img_path.stem
    img_dir.mkdir(parents=True, exist_ok=True)
    
    img = load_image(path)
    new_img_path = img_dir / f"{img_path.stem}.jpg"
    img.save(new_img_path)

    # list_spatial_aug, spatial_aug_img = sequence_aug_spatial(img, 0.7)
    # img_spatial_aug_path = img_dir / f"{img_path.stem}_spatial.jpg"
    # spatial_aug_img.save(img_spatial_aug_path)

    # list_color_aug, color_aug_img = sequence_aug_colors(img, 0.7)
    # img_color_aug_path = img_dir / f"{img_path.stem}_color.jpg"
    # color_aug_img.save(img_color_aug_path)

    # spatial_enum = to_augm_enum_list(list_spatial_aug)
    # color_enum = to_augm_enum_list(list_color_aug)

    imgs_paths = {
        str(new_img_path): [],
        # str(img_spatial_aug_path): spatial_enum,
        # str(img_color_aug_path): color_enum,
    }

    img_infos = ImageDir(
        img_stem=img_path.stem,
        label=label,
        caption=caption,
        imgs_paths=imgs_paths
    )

    infos_path = img_dir / "infos.json"
    infos_path.write_text(img_infos.model_dump_json(indent=4))


In [12]:
import json


rows = []
for type_dir in ag.iterdir():
    for img_dir in type_dir.iterdir():
        json_path = Path(f"{img_dir}/infos.json")
        with open(json_path, "r", encoding="utf-8") as f:
            data = json.load(f)
            imgs_infos = ImageDir(**data)
        
        for k, v in imgs_infos.imgs_paths.items():
            rows.append({
                "image_path": Path(k).name,
                "label": imgs_infos.label,
                "caption": imgs_infos.caption,

            })

df = pd.DataFrame(rows)
print(df)

pd_output_path = Path(f"{ag}/metadata.csv")

df.to_csv(pd_output_path, index=False)


NotADirectoryError: [Errno 20] Not a directory: '../data/augmented_3/metadata.csv'

### CONCATENATION

In [27]:
import requests
import io 
import os
import zipfile
flicker_dir = Path("../data/new_dataset")
flicker_dir.mkdir(parents=True, exist_ok=True)
'''Get dataset from flicker'''
# url = "https://www.lirmm.fr/~poncelet/Ressources/flickr_subset2.zip"
url_augmented = "https://www.lirmm.fr/~poncelet/Ressources/flickr_long_subset.zip"
print("Requesting...\n")
response = requests.get(url_augmented)
if response.status_code == 200:
    print("Téléchargement réussi. Extraction...")
    with zipfile.ZipFile(io.BytesIO(response.content)) as zip_ref:
        # Extraire sans ajouter de sous-dossier supplémentaire
        for member in zip_ref.namelist():
            # Corrige les chemins pour ignorer un éventuel prefixe flickr_subset2/
            member_path = member
            if member.startswith("flickr_subset2/"):
                member_path = member[len("flickr_subset2/"):]
            target_path = flicker_dir / member_path

            # Si c'est un répertoire, on le crée
            if member.endswith("/"):
                target_path.mkdir(exist_ok=True, parents=True)
            else:
                os.makedirs(os.path.dirname(target_path), exist_ok=True)
                with zip_ref.open(member) as source, open(target_path, "wb") as target:
                    target.write(source.read())
    print(f"Données extraites dans : {flicker_dir}")
else:
    print("Échec du téléchargement. Code HTTP :", response.status_code)


Requesting...

Téléchargement réussi. Extraction...
Données extraites dans : ../data/new_dataset


In [4]:
import pandas as pd
base_dataset_dir = Path("../data/flicker")
new_dataset_dir = Path("../data/flickr_long_subset")

In [5]:
df_base = pd.read_csv(base_dataset_dir / "captions.csv")
df_new = pd.read_csv(new_dataset_dir / "captions.csv")

In [5]:
df_new.columns

Index(['image_path', 'label', 'caption'], dtype='object')

In [11]:
from pathlib import Path
final_dataset_dir = Path("../data/final_dataset_noaug2")
final_dataset_dir.mkdir(parents=True, exist_ok=True)
for index, row in df_new.iterrows(): 

    img_path = Path(row["image_path"])
    label = Label(row["label"])
    caption = row["caption"]
    path = new_dataset_dir / img_path
    # path = flicker_dir_2 / img_path
    
    type_dir = final_dataset_dir /  f"new_{label.value}"
    type_dir.mkdir(parents=True, exist_ok=True)

    img_dir = type_dir / img_path.stem
    img_dir.mkdir(parents=True, exist_ok=True)
    
    img = load_image(path)
    new_img_path = img_dir / f"new_{img_path.stem}.jpg"
    img.save(new_img_path)

    # list_spatial_aug, spatial_aug_img = sequence_aug_spatial(img, 0.7)
    # img_spatial_aug_path = img_dir / f"new_{img_path.stem}_spatial.jpg"
    # spatial_aug_img.save(img_spatial_aug_path)

    # list_color_aug, color_aug_img = sequence_aug_colors(img, 0.7)
    # img_color_aug_path = img_dir / f"new_{img_path.stem}_color.jpg"
    # color_aug_img.save(img_color_aug_path)

    # spatial_enum = to_augm_enum_list(list_spatial_aug)
    # color_enum = to_augm_enum_list(list_color_aug)

    imgs_paths = {
        str(new_img_path): [],
        # str(img_spatial_aug_path): spatial_enum,
        # str(img_color_aug_path): color_enum,
    }

    img_infos = ImageDir(
        img_stem=img_path.stem,
        label=label,
        caption=caption,
        imgs_paths=imgs_paths
    )

    infos_path = img_dir / "infos.json"
    infos_path.write_text(img_infos.model_dump_json(indent=4))


In [12]:
import json
from pathlib import Path
import pandas as pd
final_dataset_dir = Path("../data/final_dataset_noaug2")

rows = []
for type_dir in final_dataset_dir.iterdir():
    for img_dir in type_dir.iterdir():
        json_path = Path(f"{img_dir}/infos.json")
        with open(json_path, "r", encoding="utf-8") as f:
            data = json.load(f)
            imgs_infos = ImageDir(**data)
        
        for k, v in imgs_infos.imgs_paths.items():
            rows.append({
                "image_path": Path(k).name,
                "label": imgs_infos.label,
                "caption": imgs_infos.caption,

            })

df = pd.DataFrame(rows)
print(df)

pd_output_path = Path(f"{final_dataset_dir}/metadata.csv")

df.to_csv(pd_output_path, index=False)


             image_path       label  \
0      new_ball_683.jpg  Label.BALL   
1      new_ball_196.jpg  Label.BALL   
2      new_ball_092.jpg  Label.BALL   
3      new_ball_708.jpg  Label.BALL   
4      new_ball_190.jpg  Label.BALL   
...                 ...         ...   
1395  base_ball_084.jpg  Label.BALL   
1396  base_ball_015.jpg  Label.BALL   
1397  base_ball_049.jpg  Label.BALL   
1398  base_ball_031.jpg  Label.BALL   
1399  base_ball_137.jpg  Label.BALL   

                                                caption  
0     Two black dogs, their fur fluffed up in agitat...  
1     In the sun-drenched backyard, a joyful black a...  
2     A young boy, his eyes fixed on the ball in his...  
3     A sunny day at the local park, where families ...  
4     A young boy stands in front of the bathroom si...  
...                                                 ...  
1395       A laughing boy lies on a pit of blue balls .  
1396  A little boy points to the face of another lit...  
1397  A g

In [15]:
df_verify = pd.read_csv("../data/final_dataset_noaug2/metadata.csv")
len(df_verify)

1400

In [14]:
image_path = Path("/home/ubuntu/MiniCLIP/data/final_dataset_noaug/base_water/water_002/new_water_002.jpg")
Image.open(image_path).size

(500, 375)