<a href="https://colab.research.google.com/github/Salva13s/HybridModel/blob/main/Unet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import os
import cv2
from sklearn.metrics import r2_score
from torchvision.utils import save_image

In [None]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

set_seed(30)

In [None]:
class FruitDataset(Dataset):
    def __init__(self, image_dir, density_map_dir, transform=None):
        self.image_dir = image_dir
        self.density_map_dir = density_map_dir
        self.transform = transform
        self.image_files = sorted([f for f in os.listdir(image_dir) if f.endswith(('.jpg'))])
        self.density_map_files = sorted([f for f in os.listdir(density_map_dir) if f.endswith(('.jpg'))])

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_path = os.path.join(self.image_dir, self.image_files[idx])
        density_map_path = os.path.join(self.density_map_dir, self.density_map_files[idx])

        image = Image.open(image_path).convert('RGB')
        density_map = Image.open(density_map_path).convert('L')

        if self.transform:
            image = self.transform(image)

        density_map_np = np.array(density_map)
        density_map_np = cv2.GaussianBlur(density_map_np, (5, 5), sigmaX=1.0)
        density_map_np = density_map_np / np.max(density_map_np) if np.max(density_map_np) > 0 else density_map_np

        density_map = torch.tensor(density_map_np, dtype=torch.float32).unsqueeze(0)

        return image, density_map

In [None]:
class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1):
        super(UNet, self).__init__()

        def conv_block(in_channels, out_channels):
            return nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
                nn.ReLU(inplace=True)
            )

        def upconv_block(in_channels, out_channels):
            return nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)

        self.encoder1 = conv_block(in_channels, 64)
        self.encoder2 = conv_block(64, 128)
        self.encoder3 = conv_block(128, 256)
        self.encoder4 = conv_block(256, 512)
        self.encoder5 = conv_block(512, 1024)

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.upconv4 = upconv_block(1024, 512)
        self.decoder4 = conv_block(1024, 512)
        self.upconv3 = upconv_block(512, 256)
        self.decoder3 = conv_block(512, 256)
        self.upconv2 = upconv_block(256, 128)
        self.decoder2 = conv_block(256, 128)
        self.upconv1 = upconv_block(128, 64)
        self.decoder1 = conv_block(128, 64)

        self.conv_final = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x):
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool(enc1))
        enc3 = self.encoder3(self.pool(enc2))
        enc4 = self.encoder4(self.pool(enc3))
        enc5 = self.encoder5(self.pool(enc4))

        dec4 = self.upconv4(enc5)
        dec4 = torch.cat((dec4, enc4), dim=1)
        dec4 = self.decoder4(dec4)
        dec3 = self.upconv3(dec4)
        dec3 = torch.cat((dec3, enc3), dim=1)
        dec3 = self.decoder3(dec3)
        dec2 = self.upconv2(dec3)
        dec2 = torch.cat((dec2, enc2), dim=1)
        dec2 = self.decoder2(dec2)
        dec1 = self.upconv1(dec2)
        dec1 = torch.cat((dec1, enc1), dim=1)
        dec1 = self.decoder1(dec1)

        return self.conv_final(dec1)

In [None]:
def train_model(unet, train_loader, val_loader, num_epochs=50):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(unet.parameters(), lr=1e-4)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    unet.to(device)

    for epoch in range(num_epochs):
        unet.train()
        running_loss = 0.0

        for inputs, density_maps in train_loader:
            inputs = inputs.to(device)
            density_maps = density_maps.to(device)

            optimizer.zero_grad()

            outputs = unet(inputs)
            loss = criterion(outputs, density_maps)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        print(f'Epoch {epoch}/{num_epochs - 1}, Training Loss: {epoch_loss:.4f}')

        validate_model(unet, val_loader)

# Funci√≥n para validar el modelo
def validate_model(unet, val_loader):
    unet.eval()
    running_loss = 0.0
    criterion = nn.MSELoss()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    with torch.no_grad():
        for inputs, density_maps in val_loader:
            inputs = inputs.to(device)
            density_maps = density_maps.to(device)

            outputs = unet(inputs)
            loss = criterion(outputs, density_maps)

            running_loss += loss.item() * inputs.size(0)

    val_loss = running_loss / len(val_loader.dataset)
    print(f'Validation Loss: {val_loss:.4f}')

In [None]:
# Cargar los datos
train_loader = DataLoader(
    FruitDataset(image_dir='/content/drive/MyDrive/1 Tesis/Fish/DataDividido/train/reales_resized',
                 density_map_dir='/content/drive/MyDrive/1 Tesis/Fish/DataDividido/train/generadas_resized',
                 transform=transforms.ToTensor()),
    batch_size=6, shuffle=True
)

val_loader = DataLoader(
    FruitDataset(image_dir='/content/drive/MyDrive/1 Tesis/Fish/DataDividido/val/reales_resized',
                 density_map_dir='/content/drive/MyDrive/1 Tesis/Fish/DataDividido/val/generadas_resized',
                 transform=transforms.ToTensor()),
    batch_size=6, shuffle=False
)

test_loader = DataLoader(
    FruitDataset(image_dir='/content/drive/MyDrive/1 Tesis/Fish/DataDividido/test/reales_resized',
                 density_map_dir='/content/drive/MyDrive/1 Tesis/Fish/DataDividido/test/generadas_resized',
                 transform=transforms.ToTensor()),
    batch_size=6, shuffle=False
)

In [None]:
unet = UNet()
train_model(unet, train_loader, val_loader, num_epochs=50)

In [None]:
from torchvision.utils import save_image

def evaluate_and_save_predictions(unet, test_loader, save_dir):
    unet.eval()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    os.makedirs(save_dir, exist_ok=True)

    with torch.no_grad():
        for batch_idx, (inputs, true_density_maps) in enumerate(test_loader):
            inputs = inputs.to(device)
            true_density_maps = true_density_maps.to(device)

            predicted_density_maps = unet(inputs)

            image_filenames = test_loader.dataset.image_files[batch_idx * test_loader.batch_size: (batch_idx + 1) * test_loader.batch_size]

            for i in range(len(predicted_density_maps)):
                pred_map = predicted_density_maps[i].cpu().squeeze(0)

                real_filename = image_filenames[i]
                pred_filename = f'{real_filename}'

                save_image(pred_map, os.path.join(save_dir, pred_filename))

    print(f'Predictions saved to {save_dir}')

save_directory = '/content/drive/MyDrive/1 Tesis/Fish/Unet/fishPred50'
evaluate_and_save_predictions(unet, test_loader, save_directory)
