In [None]:
pip install pillow numpy

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate
from tensorflow.keras.layers import TimeDistributed, Bidirectional, LSTM, ConvLSTM2D

def conv_block(input_tensor, num_filters):
    x = TimeDistributed(Conv2D(num_filters, (3, 3), padding='same', activation='relu'))(input_tensor)
    x = TimeDistributed(Conv2D(num_filters, (3, 3), padding='same', activation='relu'))(x)
    return x

def conv_lstm_block(input_tensor, num_filters):
    x = Bidirectional(ConvLSTM2D(num_filters, (3, 3), padding='same', return_sequences=True))(input_tensor)
    return x

def unet_lstm(input_size=(256, 256, 3), n_sequences=10, n_filters=64, n_classes=4):
    inputs = Input((n_sequences, *input_size))

    # Downsample
    c1 = conv_block(inputs, n_filters)
    l1 = conv_lstm_block(c1, n_filters)
    p1 = TimeDistributed(MaxPooling2D((2, 2)))(l1)

    c2 = conv_block(p1, n_filters*2)
    l2 = conv_lstm_block(c2, n_filters*2)
    p2 = TimeDistributed(MaxPooling2D((2, 2)))(l2)

    # Bottleneck
    c3 = conv_block(p2, n_filters*4)

    # Upsample
    u1 = TimeDistributed(UpSampling2D((2, 2)))(c3)
    concat1 = concatenate([u1, l2], axis=4)
    c4 = conv_block(concat1, n_filters*2)

    u2 = TimeDistributed(UpSampling2D((2, 2)))(c4)
    concat2 = concatenate([u2, l1], axis=4)
    c5 = conv_block(concat2, n_filters)

    # Output layer
    outputs = TimeDistributed(Conv2D(n_classes, (1, 1), activation='softmax'))(c5)

    model = Model(inputs=[inputs], outputs=[outputs])
    return model

# Create the model
model = unet_lstm()
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Summary of the model
model.summary()


In [None]:
import numpy as np
from random import shuffle
import logging
from glob import glob
from PIL import Image

from random import shuffle
from glob import glob

class SequenceDataProvider:
    def __init__(self, data_dir, sequence_length, image_suffix, label_suffix, input_size, n_classes, shuffle_data=True, max_sequences=None):
        self._data_dir = data_dir
        self._sequence_length = sequence_length
        self._image_suffix = image_suffix
        self._label_suffix = label_suffix
        self._input_size = input_size
        self._n_classes = n_classes
        self._shuffle_data = shuffle_data
        self._max_sequences = max_sequences
        self._sequences = self._load_sequences()

    def dataset_length(self):
        # Devuelve el número total de secuencias cargadas
        return len(self._sequences)

    def _load_sequences(self):
        image_paths = sorted(glob(f"{self._data_dir}/*{self._image_suffix}"))
        label_paths = sorted(glob(f"{self._data_dir}/*{self._label_suffix}"))

        sequences = [
            list(zip(image_paths[i:i+self._sequence_length], label_paths[i:i+self._sequence_length]))
            for i in range(0, len(image_paths) - self._sequence_length + 1, self._sequence_length)
        ]

        if self._max_sequences:
            sequences = sequences[:self._max_sequences]

        if self._shuffle_data:
            shuffle(sequences)
        print(f"Loaded {len(sequences)} sequences.")
        return sequences

    def _process_image(self, image_path):
        image = Image.open(image_path)
        image = image.resize(self._input_size[:2])  # Assuming input_size is (height, width, channels)
        return np.array(image)

    def _process_label(self, label_path):
        label = Image.open(label_path)
        label = label.resize(self._input_size[:2])
        label = np.array(label, dtype=np.int32)  # Asegúrate de que sea entero para usar como índices

        # Convertir la etiqueta a formato one-hot
        one_hot_label = np.eye(self._n_classes)[label]
        return one_hot_label

    def next_batch(self, batch_size=1):
        batch_images = []
        batch_labels = []
        for _ in range(batch_size):
            if len(self._sequences) == 0:
                self._sequences = self._load_sequences()

            sequence = self._sequences.pop(0)
            sequence_images = []
            sequence_labels = []

            for image_path, label_path in sequence:
                image = self._process_image(image_path)
                label = self._process_label(label_path)
                sequence_images.append(image)
                sequence_labels.append(label)

            batch_images.append(sequence_images)
            batch_labels.append(sequence_labels)
        print(np.array(batch_labels).shape)

        batch_images = np.array(batch_images).reshape(batch_size, self._sequence_length, *self._input_size)
        # Asegúrate de ajustar el reshape de batch_labels para que coincida con las dimensiones esperadas por tu modelo
        batch_labels = np.array(batch_labels).reshape(batch_size, self._sequence_length, *self._input_size[:2], self._n_classes)

        return batch_images, batch_labels

# Asegúrate de definir input_size y n_classes acorde a tu modelo
input_size = (256, 256, 3)  # Ejemplo de tamaño de entrada
n_classes = 4  # Ejemplo de número de clases

data_provider = SequenceDataProvider(
    data_dir="/content/drive/MyDrive/Glottis/dataset/dataset/train",
    sequence_length=10,
    image_suffix="_rgb.png",
    label_suffix="_mask.png",
    input_size=(256, 256, 3),
    n_classes=4,
    shuffle_data=True,
    max_sequences=10
)



In [None]:
epochs = 10  # Número de épocas para el entrenamiento
batch_size = 2  # Tamaño del lote
def data_generator(batch_size):
    while True:
        batch_images, batch_labels = data_provider.next_batch(batch_size)
        yield batch_images, batch_labels

# Crear el generador
train_generator = data_generator(batch_size)

# Calcular el número de pasos por época
steps_per_epoch = data_provider.dataset_length() // batch_size

# Entrenar el modelo
model.fit(x=train_generator,
          steps_per_epoch=steps_per_epoch,
          epochs=epochs)

