## Lbrerías

In [99]:
import os
import random
from collections import Counter
from PIL import Image

# Pytorch
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import datasets, transforms

# Pytorch Lightning
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks.early_stopping import EarlyStopping

# Hydra
import hydra
from hydra.core.global_hydra import GlobalHydra
from omegaconf import DictConfig
from omegaconf import OmegaConf


## Configuración de librerías

In [100]:
random.seed(42)

## Globales

In [101]:
GlobalHydra.instance().clear()
hydra.initialize(config_path=".", version_base=None)
cfg = hydra.compose(config_name="config")
print(OmegaConf.to_yaml(cfg))

model:
  learning_rate: 0.001
  batch_size: 32
  epochs: 10
  train_dataset_dir: data/train
  validation_dataset_dir: data/valid
  test_dataset_dir: data/test
data:
  labeled_percentage: 0.3



In [102]:
class CustomLoader(pl.LightningDataModule):
    def __init__(self, dataset, batch_size=32, labeled_percentage=0.3):
        super().__init__()
        self.dataset = dataset
        self.batch_size = batch_size
        self.labeled_percentage = labeled_percentage

    def split(self):
        total_size = len(self.dataset)
        labeled_size = int(total_size * self.labeled_percentage)
        unlabeled_size = total_size - labeled_size
        self.labeled_dataset, self.unlabeled_dataset = random_split(self.dataset, [labeled_size, unlabeled_size])

    def get_dataloader(self):
        return DataLoader(self.labeled_data, batch_size=self.batch_size, shuffle=True)

In [103]:
# (U Net Autoencoder)
class Autoencoder(pl.LightningModule):
    def __init__(self):
        super(Autoencoder, self).__init__()
        # Define las capas del encoder y del decoder con skip connections
        self.encoder = nn.Sequential
        (
            nn.Conv2d(1, 64, 3, padding=1),
            nn.ReLU(),
            # Agrega más capas de encoder
        )
        self.decoder = nn.Sequential
        (
            nn.ConvTranspose2d(64, 1, 3, padding=1),
            nn.ReLU(),
            # Agrega más capas de decoder
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

    def training_step(self, batch, batch_idx):
        x, _ = batch
        reconstruction = self(x)
        loss = nn.MSELoss()(reconstruction, x)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)


In [104]:
class Classifier_A(pl.LightningModule):
    def __init__(self):
        super(Classifier_A, self).__init__()
        self.model = nn.Sequential
        (
            nn.Flatten(),
            nn.Linear(512, 128),  # Ajusta según sea necesario
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)


class Classifier_B(pl.LightningModule):
    def __init__(self, encoder, fine_tune=False):
        super(Classifier_B, self).__init__()
        self.encoder = encoder

        if not fine_tune:
            for param in self.encoder.parameters():
                param.requires_grad = False  # Congelar el encoder si es necesario
        
        self.classifier = nn.Sequential
        (
            nn.Flatten(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
        x = self.encoder(x)
        return self.classifier(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)


In [105]:
early_stop_callback = EarlyStopping \
(
    monitor='val_loss',
    patience=3,
    verbose=True,
    mode='min'
)

## Hydra

## Samples

In [106]:
# El dataset no redimensiona las imágenes de forma inmediata al ser cargadas en memoria. Las imágenes se cargan solo cuando accedes a ellas.
# Las transformaciones se aplican cada vez que accedes a una imagen, no al momento de la creación del dataset. Esto significa que la imagen será
# redimensionada solo cuando el dataset realmente cargue la imagen durante el entrenamiento o la evaluación (cuando haces una iteración sobre el dataset).
train_full_dataset = datasets.ImageFolder(root=cfg.model.train_dataset_dir)
test_full_dataset = datasets.ImageFolder(root=cfg.model.test_dataset_dir)

train_species_counts = Counter([sample[1] for sample in train_full_dataset.samples])
top_20_species = [species for species, count in train_species_counts.most_common(20)]

train_samples_from_top_20_species_by_image_count = [sample for sample in train_full_dataset.samples if sample[1] in top_20_species]
test_samples_from_top_20_species_by_image_count = [sample for sample in test_full_dataset.samples if sample[1] in top_20_species]

train_samples = []
test_samples = test_samples_from_top_20_species_by_image_count

for specie in top_20_species:
    species_samples = [sample for sample in train_samples_from_top_20_species_by_image_count if sample[1] == specie] # if sample tag = tag
    random.shuffle(species_samples)
    train_samples.extend(species_samples[:-20])
    test_samples.extend(species_samples[-20:])

print(f'Training samples: {len(train_samples)}')
print(f'Testing samples: {len(test_samples)}')

Training samples: 2546
Testing samples: 500


## Datasets

In [107]:
transform = transforms.Compose
(
    [
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

train_dataset = datasets.ImageFolder(root=cfg.model.train_dataset_dir, transform=transform)
train_dataset.samples = train_samples

test_dataset = datasets.ImageFolder(root=cfg.model.test_dataset_dir, transform=transform)
test_dataset.samples = test_samples

## Experimento 1

Tome el set de datos de entrenamiento y simule que una parte de cada clase no contiene
labels, denominado set de datos sin labels, el otro restante será denominado set de datos
con labels.

In [None]:
cl = CustomLoader(dataset=train_dataset, batch_size=cfg.model.batch_size, labeled_percentage=cfg.data.labeled_percentage)
cl.setup()

TypeError: __init__() missing 1 required positional argument: 'dataset'

In [None]:
trainer = Trainer(callbacks=[early_stop_callback], max_epochs=10)
trainer.fit(model, datamodule)


In [None]:
@hydra.main(config_path=".", config_name="config")
def main(cfg: DictConfig):
    # Aquí usas los valores de cfg para ajustar los hiperparámetros
    print(cfg.model.lr)