## Practica 1 - Computer Vision

In [7]:
import os
import gdown

# Crear el directorio de datos si no existe
data_dir = "data"
if not os.path.exists(data_dir):
    os.makedirs(data_dir)

# URL de Google Drive en formato correcto para gdown
url = "https://drive.google.com/uc?id=1iGBv-VT5mm1RiouD-U2qWcU3BYqp2OwE"
zip_filename = "practica_1_dataset.zip"
zip_path = os.path.join(data_dir, zip_filename)

# Descargar el archivo
if not os.path.exists(zip_path):
    gdown.download(url, zip_path, quiet=False)
else:
    print("Data zipfile already exists")


Data zipfile already exists


In [6]:
import os
from pathlib import Path
from zipfile import ZipFile
from concurrent.futures import ThreadPoolExecutor

data_dir = "data"
zip_filename = "practica_1_dataset.zip"
zip_path = os.path.join(data_dir, zip_filename)
subfolders = ["test", "train", "valid"]
full_paths = [os.path.join(data_dir, folder) for folder in subfolders]

if not all(os.path.isdir(path) for path in full_paths):
    with ZipFile(zip_path, 'r') as zf:
        with ThreadPoolExecutor() as exe:
            for file in zf.namelist():
                if not file.startswith("__MACOSX"):
                    exe.submit(zf.extract, file, path=data_dir)
else:
    print("test, train and valid folders already exist")

test, train and valid folders already exist


In [None]:
import tensorflow as tf

for folder in subfolders:
    folder_path = os.path.join(data_dir, folder)
    ds_files = tf.data.Dataset.list_files(folder_path + "/*.jpg", shuffle=False)
    print(f"Total image files in {folder}: {len(ds_files)}")
    print("Filenames shape:", ds_files.element_spec)

example = next(ds_files.take(1).as_numpy_iterator())
example

Total image files in test: 63
Filenames shape: TensorSpec(shape=(), dtype=tf.string, name=None)
Total image files in train: 448
Filenames shape: TensorSpec(shape=(), dtype=tf.string, name=None)
Total image files in valid: 127
Filenames shape: TensorSpec(shape=(), dtype=tf.string, name=None)


b'data/valid/IMG_2277_jpeg_jpg.rf.86c72d6192da48d941ffa957f4780665.jpg'

In [None]:
import pandas as pd
df = pd.read_csv("data/test/annotations.csv")
count_class = df.groupby(["filename", "class"]).size()
print(f"{count_class.shape = }")
count_class.head()

count_class.shape = (83,)


filename                                                   class  
IMG_2289_jpeg_jpg.rf.fe2a7a149e7b11f2313f5a7b30386e85.jpg  puffin      1
IMG_2301_jpeg_jpg.rf.2c19ae5efbd1f8611b5578125f001695.jpg  penguin    23
IMG_2319_jpeg_jpg.rf.6e20bf97d17b74a8948aa48776c40454.jpg  penguin     8
IMG_2347_jpeg_jpg.rf.7c71ac4b9301eb358cd4a832844dedcb.jpg  penguin     2
IMG_2354_jpeg_jpg.rf.396e872c7fb0a95e911806986995ee7a.jpg  penguin     5
dtype: int64

In [None]:
df["area"] = (df["xmax"] - df["xmin"]) * (df["ymax"] - df["ymin"])
sum_area = df.groupby(["filename", "class"])["area"].sum()
print(f"{sum_area.shape = }")
sum_area.head()

sum_area.shape = (83,)


filename                                                   class  
IMG_2289_jpeg_jpg.rf.fe2a7a149e7b11f2313f5a7b30386e85.jpg  puffin      94864
IMG_2301_jpeg_jpg.rf.2c19ae5efbd1f8611b5578125f001695.jpg  penguin     32549
IMG_2319_jpeg_jpg.rf.6e20bf97d17b74a8948aa48776c40454.jpg  penguin     29583
IMG_2347_jpeg_jpg.rf.7c71ac4b9301eb358cd4a832844dedcb.jpg  penguin    250311
IMG_2354_jpeg_jpg.rf.396e872c7fb0a95e911806986995ee7a.jpg  penguin     14881
Name: area, dtype: int64

In [5]:
score = pd.Series(sum_area * count_class, name="score").reset_index()
print(f"{score.shape = }")
score.head()

score.shape = (83, 3)


Unnamed: 0,filename,class,score
0,IMG_2289_jpeg_jpg.rf.fe2a7a149e7b11f2313f5a7b3...,puffin,94864
1,IMG_2301_jpeg_jpg.rf.2c19ae5efbd1f8611b5578125...,penguin,748627
2,IMG_2319_jpeg_jpg.rf.6e20bf97d17b74a8948aa4877...,penguin,236664
3,IMG_2347_jpeg_jpg.rf.7c71ac4b9301eb358cd4a8328...,penguin,500622
4,IMG_2354_jpeg_jpg.rf.396e872c7fb0a95e911806986...,penguin,74405


In [6]:
df_labels = score.groupby("filename").max().drop("score", axis=1)
print(f"{df_labels.shape = }")
df_labels.head()

df_labels.shape = (63, 1)


Unnamed: 0_level_0,class
filename,Unnamed: 1_level_1
IMG_2289_jpeg_jpg.rf.fe2a7a149e7b11f2313f5a7b30386e85.jpg,puffin
IMG_2301_jpeg_jpg.rf.2c19ae5efbd1f8611b5578125f001695.jpg,penguin
IMG_2319_jpeg_jpg.rf.6e20bf97d17b74a8948aa48776c40454.jpg,penguin
IMG_2347_jpeg_jpg.rf.7c71ac4b9301eb358cd4a832844dedcb.jpg,penguin
IMG_2354_jpeg_jpg.rf.396e872c7fb0a95e911806986995ee7a.jpg,penguin


In [94]:
import os
import tensorflow as tf
import pandas as pd
ds_files = tf.data.Dataset.list_files("data/test/" + '*.jpg', shuffle=False)
df = pd.read_csv("data/test/annotations.csv")
df["class"] = pd.Categorical(df["class"])
def get_image(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    target_size = 1024
    image_padded = tf.image.resize_with_pad(image, target_size, target_size)
    return image_padded

ds_iter = ds_files.take(1).as_numpy_iterator()
example = next(ds_iter)
image = get_image(example)
image.shape

TensorShape([1024, 1024, 3])

In [113]:
import os
import tensorflow as tf
import pandas as pd
ds_files = tf.data.Dataset.list_files("data/test/" + '*.jpg', shuffle=False)
df = pd.read_csv("data/test/annotations.csv")
df["class"] = pd.Categorical(df["class"])

def get_label(image_path, df):
    # parts = tf.strings.split(image_path, os.path.sep)
    # filename = parts[-1].numpy().decode("utf-8")
    _, filename = os.path.split(image_path.decode("utf-8"))
    return filename
    image_annotations = df[df["filename"] == filename].copy(deep=True)
    image_annotations["area"] = (image_annotations["xmax"] - image_annotations["xmin"]) * (image_annotations["ymax"] - image_annotations["ymin"]).values
    class_area = image_annotations.groupby(["filename", "class"], observed=True)["area"].sum()
    return class_area[class_area == class_area.max()].reset_index("class")["class"].cat.codes.values

ds_iter = ds_files.take(1).as_numpy_iterator()
example = next(ds_iter)
label = get_label(example, df)
label[0] 

'I'

In [106]:
AUTOTUNE = tf.data.AUTOTUNE

ds_files = tf.data.Dataset.list_files("data/test/" + '*.jpg', shuffle=False)
df = pd.read_csv("data/test/annotations.csv")
df["class"] = pd.Categorical(df["class"])
ds_images = (
    ds_files
    .shuffle(len(ds_files))
    .cache()
    .map(lambda x: (get_image(x), get_label(x, df)), num_parallel_calls=AUTOTUNE)
)

print("Total images:", len(ds_images))
print("Image shape:", ds_images.element_spec)

Total images: 63
Image shape: (TensorSpec(shape=(1024, 1024, 3), dtype=tf.float32, name=None), TensorSpec(shape=<unknown>, dtype=tf.int64, name=None))


In [None]:
import os
import tensorflow as tf
import pandas as pd

def get_label(image_path, df):
    def _lookup_label(image_path):
        image_path = image_path.decode("utf-8")  # Convert Tensor to string
        parts = image_path.split(os.path.sep)
        filename = parts[-1]

        # Process DataFrame
        image_annotations = df[df["filename"] == filename].copy(deep=True)
        image_annotations["area"] = (image_annotations["xmax"] - image_annotations["xmin"]) * (image_annotations["ymax"] - image_annotations["ymin"]).values
        class_area = image_annotations.groupby(["filename", "class"], observed=True)["area"].sum()
        
        # Return the class with max area
        return class_area[class_area == class_area.max()].reset_index("class")["class"].cat.codes.values

    # Wrap the function inside `tf.py_function`
    label = tf.py_function(func=_lookup_label, inp=[image_path], Tout=tf.int64)
    return label

AUTOTUNE = tf.data.AUTOTUNE

ds_files = tf.data.Dataset.list_files("data/test/" + '*.jpg', shuffle=False)
df = pd.read_csv("data/test/annotations.csv")
df["class"] = pd.Categorical(df["class"])

ds_images = (
    ds_files
    .shuffle(len(ds_files))
    .cache()
    .map(lambda x: (get_image(x), get_label(x, df)), num_parallel_calls=AUTOTUNE)
)

In [107]:
for im, label in ds_images.take(1):
    print(im.shape, label)

2025-03-28 17:16:07.484640: W tensorflow/core/framework/op_kernel.cc:1844] UNKNOWN: AttributeError: 'tensorflow.python.framework.ops.EagerTensor' object has no attribute 'decode'
Traceback (most recent call last):

  File "/home/alf/git-repos/UFVDeepLearning/.venv/lib/python3.10/site-packages/tensorflow/python/ops/script_ops.py", line 267, in __call__
    return func(device, token, args)

  File "/home/alf/git-repos/UFVDeepLearning/.venv/lib/python3.10/site-packages/tensorflow/python/ops/script_ops.py", line 145, in __call__
    outputs = self._call(device, args)

  File "/home/alf/git-repos/UFVDeepLearning/.venv/lib/python3.10/site-packages/tensorflow/python/ops/script_ops.py", line 152, in _call
    ret = self._func(*args)

  File "/home/alf/git-repos/UFVDeepLearning/.venv/lib/python3.10/site-packages/tensorflow/python/autograph/impl/api.py", line 643, in wrapper
    return func(*args, **kwargs)

  File "/tmp/__autograph_generated_filewhy2r9h7.py", line 16, in _lookup_label
    image

UnknownError: {{function_node __wrapped__IteratorGetNext_output_types_2_device_/job:localhost/replica:0/task:0/device:CPU:0}} Error in user-defined function passed to ParallelMapDatasetV2:295 transformation with iterator: Iterator::Root::Prefetch::FiniteTake::ParallelMapV2: AttributeError: 'tensorflow.python.framework.ops.EagerTensor' object has no attribute 'decode'
Traceback (most recent call last):

  File "/home/alf/git-repos/UFVDeepLearning/.venv/lib/python3.10/site-packages/tensorflow/python/ops/script_ops.py", line 267, in __call__
    return func(device, token, args)

  File "/home/alf/git-repos/UFVDeepLearning/.venv/lib/python3.10/site-packages/tensorflow/python/ops/script_ops.py", line 145, in __call__
    outputs = self._call(device, args)

  File "/home/alf/git-repos/UFVDeepLearning/.venv/lib/python3.10/site-packages/tensorflow/python/ops/script_ops.py", line 152, in _call
    ret = self._func(*args)

  File "/home/alf/git-repos/UFVDeepLearning/.venv/lib/python3.10/site-packages/tensorflow/python/autograph/impl/api.py", line 643, in wrapper
    return func(*args, **kwargs)

  File "/tmp/__autograph_generated_filewhy2r9h7.py", line 16, in _lookup_label
    image_path = ag__.converted_call(ag__.ld(image_path).decode, ('utf-8',), None, fscope_1)

  File "/home/alf/git-repos/UFVDeepLearning/.venv/lib/python3.10/site-packages/tensorflow/python/framework/tensor.py", line 260, in __getattr__
    self.__getattribute__(name)

AttributeError: 'tensorflow.python.framework.ops.EagerTensor' object has no attribute 'decode'


	 [[{{node EagerPyFunc}}]] [Op:IteratorGetNext] name: 

In [8]:
df_labels.value_counts()

class    
fish         13
jellyfish    11
shark        11
stingray     10
penguin       7
puffin        6
starfish      5
Name: count, dtype: int64

In [None]:
import os
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
import shutil

# Rutas de los datos originales
base_dir = "data/practica_1_dataset"
train_dir = os.path.join(base_dir, "train")
valid_dir = os.path.join(base_dir, "valid")

# Rutas de los datos procesados (donde se guardarán las imágenes originales y sintéticas)
processed_base = "data/processed"
processed_train_dir = os.path.join(processed_base, "train")
processed_validation_dir = os.path.join(processed_base, "validation")

# Se crean los directorios de salida si no existen
os.makedirs(processed_train_dir, exist_ok=True)
os.makedirs(processed_validation_dir, exist_ok=True)

# Mapeo de clases a números (para la máscara)
class_mapping = {
    "fish": 1,
    "jellyfish": 2,
    "penguin": 3,
    "shark": 4,
    "puffin": 5,
    "stingray": 6,
    "starfish": 7
}

def process_folder(input_folder, output_folder, annotations_df):
    """
    Procesa las imágenes de input_folder y guarda en output_folder:
      - Una copia de la imagen original.
      - Una imagen sintética en la que se "borran" (reemplazan) los píxeles de la clase predominante 
        usando el color medio calculado fuera de dicha región. Esta imagen se guarda con el sufijo 
        "_synthetic" antes de la extensión.
    Además, registra en un CSV la información (nombre de imagen, clase predominante y área) de cada imagen.
    """
    dataset_info = []  # Lista para almacenar la información de cada imagen procesada

    # Obtener archivos de imagen (considerando las extensiones jpg, jpeg y png)
    image_files = [f for f in os.listdir(input_folder) if f.lower().endswith((".jpg", ".jpeg", ".png"))]
    image_files = annotations_df["image"].unique()

    for image_name in tqdm(image_files, desc=f"Procesando {os.path.basename(input_folder)}"):
        image_path = os.path.join(input_folder, image_name)
        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            print(f"Error al abrir {image_name}: {e}")
            continue

        width, height = image.size

        # Crear la máscara con fondo = 0
        mask = np.zeros((height, width), dtype=np.uint8)

        # Filtrar las anotaciones para la imagen
        image_annotations = annotations_df[annotations_df['filename'] == image_name]
        for _, row in image_annotations.iterrows():
            cls = row['class']
            if cls in class_mapping:
                class_id = class_mapping[cls]
                xmin, ymin, xmax, ymax = int(row['xmin']), int(row['ymin']), int(row['xmax']), int(row['ymax'])
                mask[ymin:ymax, xmin:xmax] = class_id

        # Determinar la clase predominante (excluyendo el fondo)
        uniques, counts = np.unique(mask, return_counts=True)
        pixel_count_dict = dict(zip(uniques, counts))
        pixel_count_dict.pop(0, None)  # Se elimina el fondo
        if pixel_count_dict:
            predominant_class_id = max(pixel_count_dict, key=pixel_count_dict.get)  # id with more pixels
            predominant_area = pixel_count_dict[predominant_class_id]  # num pixels
            # predominant_class_label = [label for label, idx in class_mapping.items() if idx == predominant_class_id][0]
            predominant_class_label = class_mapping[predominant_class_id]
        else:
            predominant_class_label = None
            predominant_area = 0

        # Registrar la información en el dataset
        dataset_info.append({
            "filename": image_name,
            "predominant_class": predominant_class_label,
            "area": predominant_area
        })
        {
            "filename": [].append()
        }

        # Copiar la imagen original al directorio procesado
        dest_original_path = os.path.join(output_folder, image_name)
        shutil.copy(image_path, dest_original_path)

        # Generar la imagen sintética
        image_array = np.array(image)
        if predominant_class_label is not None:
            mask_predominant = (mask == predominant_class_id)
            # Calcular el color medio de los píxeles fuera de la región predominante
            if np.any(~mask_predominant):
                mean_color = image_array[~mask_predominant].mean(axis=0).astype(np.uint8)
            else:
                mean_color = np.array([0, 0, 0], dtype=np.uint8)
            synthetic_image_array = image_array.copy()
            synthetic_image_array[mask_predominant] = mean_color
        else:
            synthetic_image_array = image_array.copy()

        # Se genera el nombre de la imagen sintética con el sufijo "_synthetic" antes de la extensión .jpg
        file_base, _ = os.path.splitext(image_name)
        synthetic_name = f"{file_base}_synthetic.jpg"
        synthetic_image = Image.fromarray(synthetic_image_array)
        synthetic_image.save(os.path.join(output_folder, synthetic_name))
    
    # Guardar el CSV con la información de las imágenes en el mismo directorio de salida
    csv_output_path = os.path.join(output_folder, "new_dataset.csv")
    pd.DataFrame(dataset_info).to_csv(csv_output_path, index=False)
    print(f"Procesamiento completado para {os.path.basename(input_folder)}. CSV guardado en {csv_output_path}")

# Leer los archivos de anotaciones respectivos de cada carpeta
train_annotations_path = os.path.join(train_dir, "annotations.csv")
valid_annotations_path = os.path.join(valid_dir, "annotations.csv")

train_annotations = pd.read_csv(train_annotations_path)
valid_annotations = pd.read_csv(valid_annotations_path)

# Procesar el conjunto de entrenamiento
process_folder(train_dir, processed_train_dir, train_annotations)

# Procesar el conjunto de validación (se guarda en la carpeta "validation")
process_folder(valid_dir, processed_validation_dir, valid_annotations)

print("Todos los conjuntos han sido procesados correctamente.")