# ETL para el dataset

ETL para el dataset de señales de tránsito capturadas con Mapillary en la ciudad de Cochabamba, Bolivia.

Elaborado por Alvaro Zambrana Sejas

Requirements:

- Anaconda
- Python 3.9 

Configurar:

```
conda create -n copiloto-virtual python=3.9
conda activate copiloto-virtual
```

Instalar:

```
pip install ffmpeg-python opencv-python piexif pillow
```

In [1]:
!pip install ffmpeg-python opencv-python piexif pillow



In [2]:
!python --version

Python 3.9.19


La dimensión de las imágenes tienen: 4624 x 3468 px.

Estás van a redimensionarse a 2312 x 1734 (50%), para luego recortar dos regiones de 1280 x 1280.

Casi todos los casos por varias características como la posición de la cámara, el ángulo, ubicación de los objetos a capturar, etc. se extraeran las regiones donde suelen existir estas señales de tránsito verticales ubicadas en la parte inferior izquierda y otras en la parte inferior derecha.

El tratamiento será diferente para las señales de tránsito horizontales, ya que estas se encuentran en la parte inferior de la imagen.
En el caso de los semáforos, estos se encuentran en la parte superior de la imagen.  Existen semáforos que se encuentran en la parte inferior derecha e izquierda de la imagen, pero estos son muy pocos.  En la ciudad de Cochabamba, la mayoría de los semáforos se encuentran en la parte superior de la imagen.  Los semáforos inteligentes instalados en la ciudad de Cochabamba están ubicados en la parte inferior izquierda o derecha de la imagen.

In [3]:
import ffmpeg
import os
import shutil
import cv2

def resize_image(image_path, output_path, width, height):
    """
    Función para redimensionar una imagen a un nuevo tamaño

    :param image_path: Ruta al archivo de la imagen
    :param output_path: Ruta al directorio de salida
    :param width: Nuevo ancho para redimensionar la imagen 
    :param height: Nueva altura para redimensionar la imagen
    :return: Ruta al archivo de la imagen redimensionada
    """
    image = cv2.imread(image_path)   
    
    resized = cv2.resize(image, (width, height), interpolation=cv2.INTER_AREA)
    new_image_path = image_path.replace('raw', 'interim')
    image_name = os.path.basename(new_image_path)
    image_name_without_extension = os.path.splitext(image_name)[0]
    image_extension = os.path.splitext(new_image_path)[1]
    new_image_path = f'{output_path}/{image_name_without_extension}_{width}x{height}{image_extension}'
    cv2.imwrite(new_image_path, resized)    
    
    return new_image_path

def resize_image_to_half(image_path, output_path):
    image = cv2.imread(image_path)
    height, width, _ = image.shape
    width_half = width//2
    height_half = height//2    
    return resize_image_v2(image_path,  build_file_path(image_path, output_path, width_half, height_half), "50%", True)

def resize_image_v2(image_path, out_path, resize_to, out_path_is_file=False):
    """
    
    Redimensiona una imagen a un nuevo tamaño.
    
    :param image_path: Ruta al archivo de la imagen a redimensionar
    :param out_path: Ruta al archivo de salida o directorio
    :param resize_to: Procentaje a redimensionar ("porcentaje%") o en pixeles
    :param out_path_is_file: True si out_path es un fichero, False si es un directorio
    """

    try:
        im = Image.open(image_path)
        path, ext = os.path.splitext(image_path)
        if out_path_is_file:
            resized_image_path = out_path
        else:
            resized_image_path = os.path.join(out_path, os.path.basename(image_path))

        width, height = im.size
        max_side = max(width, height)

        if isinstance(resize_to, str) and resize_to.endswith("%"):
            ratio = float(resize_to[:-1]) / 100.0
        else:
            ratio = float(resize_to) / float(max_side)

        resized_width = int(width * ratio)
        resized_height = int(height * ratio)

        im.thumbnail((resized_width, resized_height), Image.LANCZOS)

        driver = ext[1:].upper()
        if driver == 'JPG':
            driver = 'JPEG'

        if 'exif' in im.info:
            exif_dict = piexif.load(im.info['exif'])
            exif_dict['Exif'][piexif.ExifIFD.PixelXDimension] = resized_width
            exif_dict['Exif'][piexif.ExifIFD.PixelYDimension] = resized_height
            
            # https://github.com/hMatoba/Piexif/issues/95
            exif_dict['Exif'][41729] = b'1'

            im.save(resized_image_path, driver, exif=piexif.dump(exif_dict), quality=100)
        else:
            im.save(resized_image_path, driver, quality=100)

        im.close()
        
        print("{} ({}x{}) --> {} ({}x{})".format(image_path, width, height, resized_image_path, resized_width, resized_height))

        return resized_image_path
    except (IOError, ValueError) as e:
        im.close()
        print("Error: No se pudo redimensionar {}: {}.".format(image_path, str(e)))

    return None


# resize_image_to_half('../dataset/raw/zona-escolar-1/2024_09_11_14_18_50_849_-0400.jpg')
# resize_image_to_half('input/2024_09_08_17_18_09_167_-0400.jpg', './output')

In [4]:
import ffmpeg
import os
import shutil
import cv2

def crop_image_region(image_path, output_folder, width, height):
    
    """
    Recorta la imagen en dos regiones, una en la parte izquierda y otra en la parte derecha.

    :param image_path: Ruta al archivo de la imagen 
    :param output_folder: Ruta al directorio de salida
    :param width: Ancho de la región a recortar
    :param height: Altura de la región a recortar
    :return: La ruta a los archivos de las regiones recortadas
    """
    
    image = cv2.imread(image_path)

    current_image_width = image.shape[1]
    current_image_height = image.shape[0]

    y = current_image_height - height

    crop_1 = image[y:y + height, 0:width]
    image_path_crop1 = build_file_path(image_path, output_folder, width, height, '_LEFT')
    cv2.imwrite(image_path_crop1, crop_1)
    
    crop_2 = image[y:y+height, current_image_width - width:current_image_width]    
    image_path_crop2 = build_file_path(image_path, output_folder, width, height, '_RIGHT')
    cv2.imwrite(image_path_crop2, crop_2)

    image.close()

    return [image_path_crop1, image_path_crop2]
    

def crop_image_region_v2(image_path, output_folder, width, height):
    
    """
    Recorta la imagen en dos regiones, una en la parte izquierda y otra en la parte derecha.

    :param image_path: Ruta al archivo de la imagen 
    :param output_folder: Ruta al directorio de salida
    :param width: Ancho de la región a recortar
    :param height: Altura de la región a recortar
    :return: La ruta a los archivos de las regiones recortadas
    """

    print(f"Procesando {image_path}")

    image = Image.open(image_path)
    path, ext = os.path.splitext(image_path)

    current_image_width, current_image_height = image.size
    
    y = current_image_height - height

    crop_1 = image.crop((0, y, width, current_image_height))
    image_path_crop1 = build_file_path(image_path, output_folder, width, height, '_LEFT')

    driver = ext[1:].upper()
    if driver == 'JPG':
        driver = 'JPEG'

    crop_2 = image.crop((current_image_width - width, y, current_image_width, current_image_height))
    image_path_crop2 = build_file_path(image_path, output_folder, width, height, '_RIGHT')

    if 'exif' in image.info:
        exif_dict = piexif.load(image.info['exif'])
        exif_dict['Exif'][piexif.ExifIFD.PixelXDimension] = width
        exif_dict['Exif'][piexif.ExifIFD.PixelYDimension] = height
        exif_dict['Exif'][41729] = b'1'
        crop_1.save(image_path_crop1, driver, exif=piexif.dump(exif_dict), quality=100)
        crop_2.save(image_path_crop2, driver, exif=piexif.dump(exif_dict), quality=100)
    else:        
        crop_1.save(image_path_crop1, driver, quality=100)
        crop_2.save(image_path_crop2, driver, quality=100)

    image.close()

    return [image_path_crop1, image_path_crop2]    

def build_file_path(file_path, output_folder, width, height, suffix=''):
    """
    Construye la ruta al archivo de salida con el nuevo tamaño de la imagen.

    :param file_path: Ruta del archivo de la imagen
    :param output_folder: Rutal del directorio de salida
    :param width: Ancho de la imagen
    :param height: Altura de la imagen
    :param suffix: Sufix para el nombre del archivo
    :return: Ruta con el nombre del archivo de salida
    """
    image_name = os.path.basename(file_path)
    image_name_without_extension = os.path.splitext(image_name)[0]
    image_extension = os.path.splitext(file_path)[1]
    new_image_path = f'{output_folder}/{image_name_without_extension}_{width}x{height}{suffix}{image_extension}'
    return new_image_path

# image_path = '../dataset/interim/zona-escolar-1/2024_09_11_14_18_50_849_-0400_2312x1734.jpg'
# image_path = 'output/2024_09_08_17_18_09_167_-0400_2312x1734.jpg'
# crop_image_region(image_path, './output', 1280, 1280)

In [5]:
from PIL import Image, ExifTags
import piexif

def rotate_image_if_upside_down(image_path):
    """
    Rotar la imagen 180 grados si está al revés.
    :param image_path: Rutal al archivo de la imagen
    :return: Rutal al archivo de la imagen rotada
    """

    image = Image.open(image_path)
    
    exif_dict = piexif.load(image.info['exif'])

    if check_image_upside_down(image_path):
        exif_dict['0th'][piexif.ImageIFD.Orientation] = 1        
        exif_dict['Exif'][41729] = b'1'
        
        flipped_img = image.rotate(180, expand=True)        

        if exif_dict is not None:
            flipped_img.save(image_path, "JPEG", quality=100, exif=piexif.dump(exif_dict))
        else:
            flipped_img.save(image_path, "JPEG", quality=100)

        print(f"The image was flipped and saved as {image_path}")
    else:
        print(f"The image was not upside down. Saved as {image_path}")

    image.close()

    return image_path

In [6]:
# copy EXIF metadata from one image to another
    
import shutil
import os

from PIL import Image

import piexif

def check_image_upside_down(image_path):
    exif_data = piexif.load(image_path)
    
    orientation = exif_data['0th'].get(piexif.ImageIFD.Orientation)
    
    if orientation is None:
        return False
    
    return orientation == 3


def copy_image_to_directory(input_file, output_directory):
    try:
        if not os.path.exists(output_directory):
            os.makedirs(output_directory)
        
        file_name = os.path.basename(input_file)        
        output_path = os.path.join(output_directory, file_name)

        return shutil.copy(input_file, output_path)
    
    except Exception as e:
        print(f"Ocurrió un error al copiar el fichero: {e}")
        return None

In [7]:
def process_folder(input_folder, output_folder, width, height):
    """
    Procesa un directorio de imágenes para redimensionarlas y recortarlas.

    :param input_folder: Ruta del directorio de las imágenes
    :param output_folder: Ruta del directorio de salida
    :param width: Ancho de la región a recortar
    :param height: Altura de la región a recortar
    :return: Ruta del directorio de salida 
    """

    if os.path.exists(output_folder):
        shutil.rmtree(output_folder)
    os.makedirs(output_folder)

    for root, dirs, files in os.walk(input_folder):
        for file in files:
            print(f"Procesando {file}")
            input_file = copy_image_to_directory(os.path.join(root, file), output_folder)
            input_file = rotate_image_if_upside_down(input_file)
            resized_image = resize_image_to_half(input_file, output_folder)
            cropped_files = crop_image_region_v2(resized_image, output_folder, width, height)
    
    return output_folder

In [None]:
!pip install mapillary_tools
def descarga_imagenes():
    """
    Descarga las imágenes de la cámara de Mapillary.
    """
    !mapillary_tools download --import_path ../dataset/raw --user_name azambrana --advanced

In [8]:
# Configuración de la dimensión de las imágenes para el dataset que se desea obtener de las imágenes
preferred_height = 1280
preferred_width = 1280

# process_folder('./input', './output', 1280, 1280) 
# process_folder('../dataset/raw/pare', '../dataset/interim/pare', 1280, 1280)
process_folder('../dataset/raw/zona-escolar-1', '../dataset/interim/zona-escolar-1', preferred_width, preferred_height)

Processing 2024_09_11_14_18_50_849_-0400.jpg
The image was not upside down. Saved as ../dataset/interim/zona-escolar-1\2024_09_11_14_18_50_849_-0400.jpg
../dataset/interim/zona-escolar-1\2024_09_11_14_18_50_849_-0400.jpg (4624x3468) --> ../dataset/interim/zona-escolar-1/2024_09_11_14_18_50_849_-0400_2312x1734.jpg (2312x1734)
Processing ../dataset/interim/zona-escolar-1/2024_09_11_14_18_50_849_-0400_2312x1734.jpg
Output folder ../dataset/interim/zona-escolar-1 1280 1280
Processing 2024_09_11_14_18_51_814_-0400.jpg
The image was not upside down. Saved as ../dataset/interim/zona-escolar-1\2024_09_11_14_18_51_814_-0400.jpg
../dataset/interim/zona-escolar-1\2024_09_11_14_18_51_814_-0400.jpg (4624x3468) --> ../dataset/interim/zona-escolar-1/2024_09_11_14_18_51_814_-0400_2312x1734.jpg (2312x1734)
Processing ../dataset/interim/zona-escolar-1/2024_09_11_14_18_51_814_-0400_2312x1734.jpg
Output folder ../dataset/interim/zona-escolar-1 1280 1280
Processing 2024_09_11_14_18_53_147_-0400.jpg
The ima

'../dataset/interim/zona-escolar-1'