In [None]:
#Librerias basicas
import torch
import torch.nn.functional as F
import numpy as np
from torch.utils.data import DataLoader
from torchmetrics import  R2Score, MeanAbsoluteError
from tqdm import tqdm

SEGMENTATION

In [None]:
def conv3x3(c_in, c_out):
    return torch.nn.Sequential(
        torch.nn.Conv2d(c_in, c_out, 3, padding=1),
        torch.nn.BatchNorm2d(c_out),
        torch.nn.ReLU(inplace=True)
    )
def encoder(c_in, c_out):
  return torch.nn.Sequential(
        torch.nn.MaxPool2d(2),
        conv3x3(c_in, c_out),
        conv3x3(c_out, c_out),
    )

class decoder(torch.nn.Module):
    def __init__(self, c_in, c_out):
        super(decoder, self).__init__()
        self.upsample = torch.nn.ConvTranspose2d(c_in, c_out, 2, stride=2)
        self.conv1 = conv3x3(c_in, c_out)
        self.conv2 = conv3x3(c_out, c_out)
    def forward(self, x1, x2):
        x1 = self.upsample(x1)
        diff_X = x2.size()[2] - x1.size()[2]
        diff_Y = x2.size()[3] - x1.size()[3]
        x1 = F.pad(x1, (diff_X, 0, diff_Y, 0))
        x = torch.cat([x2, x1], dim=1)
        x = self.conv1(x)
        x = self.conv2(x)
        return x

class UNet(torch.nn.Module):
    def __init__(self, n_classes=1, in_ch=1, c = 16):
        super().__init__()
        self.encoder1 = torch.nn.Sequential(
          conv3x3(in_ch, c),
          conv3x3(c, c),
        )
        self.encoder2 = encoder(c, 2*c)
        self.encoder3 = encoder(2*c, 4*c)
        self.encoder4 = encoder(4*c, 8*c)
        self.decoder1 = decoder(8*c,4*c)
        self.decoder2 = decoder(4*c,2*c)
        self.decoder3 = decoder(2*c,c)
        self.out = torch.nn.Conv2d(c, n_classes, 1)

    def forward(self, x):
        x1 = self.encoder1(x)
        x2 = self.encoder2(x1)
        x3 = self.encoder3(x2)
        x = self.encoder4(x3)
        x = self.decoder1(x, x3)
        x = self.decoder2(x, x2)
        x = self.decoder3(x, x1)
        x = self.out(x)
        return x

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [None]:
model = UNet(c = 16)
num_params = count_parameters(model)
print(num_params)
output = model(torch.randn((50,1,112,112)))
output.shape

In [None]:

class Dataset(torch.utils.data.Dataset):
    def __init__(self,X, y):
        N,W,H = X.shape
        self.img=torch.tensor(X.reshape(N,1,W,H).astype(np.float32))
        self.mask=torch.tensor(y.reshape(N,1,W,H).astype(np.float32))
    def __len__(self):
        return len(self.img)
    def __getitem__(self, idx):
        return self.img[idx],self.mask[idx]
    

In [None]:
dataset_images = np.load("your_images_data.npy",allow_pickle=True)
dataset_mask = np.load("your_masks_data.npy",allow_pickle=True)

In [None]:
N,W,H = dataset_images.shape
N_training = int(0.6*N)
N_validation = int(0.2*N) + N_training
dataset = {
    'train': Dataset(dataset_images[:N_training], dataset_mask[:N_training]), 
    'valid': Dataset(dataset_images[N_training:N_validation], dataset_mask[N_training:N_validation]),
    'test': Dataset(dataset_images[N_validation:], dataset_mask[N_validation:])
}

dataloader = {
    'train': DataLoader(dataset['train'], batch_size=50, shuffle=True, pin_memory=True),
    'valid': DataLoader(dataset['valid'], batch_size=100, pin_memory=True),
    'test': DataLoader(dataset['test'], batch_size=100, pin_memory=True)
}


len(dataset['train']), len(dataset['valid']), len(dataset['test'])

In [None]:
def iou(outputs, labels):
    outputs, labels = torch.sigmoid(outputs) > 0.5, labels > 0.5
    SMOOTH = 1e-6
    ious = []
    intersection = (outputs & labels).float().sum((1, 2))  
    union = (outputs | labels).float().sum((1, 2))         
    iou = (intersection + SMOOTH) / (union + SMOOTH)  
    ious.append(iou.mean().item())
    return np.mean(ious) 

def dice_coeff(outputs, labels, smooth=1e-6):
    outputs = (torch.sigmoid(outputs) > 0.5).float()
    labels = (labels > 0.5).float()
    intersection = (outputs * labels).sum((1, 2))
    union = outputs.sum((1, 2)) + labels.sum((1, 2))
    dice = (2 * intersection + smooth) / (union + smooth)
    return dice.mean().item()

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
torch.cuda.empty_cache()
torch.backends.cudnn.benchmark = True
torch.cuda.max_split_size_mb = 1024

def fit(model, dataloader, epochs=100, lr=3e-4, sub_batch_size=5):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = torch.nn.BCEWithLogitsLoss()
    model.to(device)
    hist = {'loss': [], 'iou': [], 'dice': [], 'val_loss': [], 'val_iou': [], 'val_dice': []}

    for epoch in range(1, epochs+1):
        bar = tqdm(dataloader['train'])
        train_loss, train_iou, train_dice = [], [], []
        model.train()

        for imgs, masks in bar:
            imgs, masks = imgs.to(device), masks.to(device)
            num_sub_batches = len(imgs) // sub_batch_size
            optimizer.zero_grad()

            for j in range(num_sub_batches):
                start = j * sub_batch_size
                end = start + sub_batch_size
                y_hat = model(imgs[start:end])
                loss = criterion(y_hat, masks[start:end])
                loss.backward()
                ious = iou(y_hat, masks[start:end])
                dices = dice_coeff(y_hat, masks[start:end])
                train_loss.append(loss.item())
                train_iou.append(ious)
                train_dice.append(dices)

            optimizer.step()
            bar.set_description(f"loss {np.mean(train_loss):.5f} iou {np.mean(train_iou):.5f} dice {np.mean(train_dice):.5f}")

        hist['loss'].append(np.mean(train_loss))
        hist['iou'].append(np.mean(train_iou))
        hist['dice'].append(np.mean(train_dice))
        bar = tqdm(dataloader['valid'])
        val_loss, val_iou, val_dice = [], [], []
        model.eval()

        with torch.no_grad():
            for imgs, masks in bar:
                imgs, masks = imgs.to(device), masks.to(device)
                y_hat = model(imgs)
                loss = criterion(y_hat, masks)
                ious = iou(y_hat, masks)
                dices = dice_coeff(y_hat, masks)
                val_loss.append(loss.item())
                val_iou.append(ious)
                val_dice.append(dices)

        hist['val_loss'].append(np.mean(val_loss))
        hist['val_iou'].append(np.mean(val_iou))
        hist['val_dice'].append(np.mean(val_dice))

        print(f"\nEpoch {epoch}/{epochs} loss {np.mean(train_loss):.5f} iou {np.mean(train_iou):.5f} dice {np.mean(train_dice):.5f} val_loss {np.mean(val_loss):.5f} val_iou {np.mean(val_iou):.5f} val_dice {np.mean(val_dice):.5f}")

    return hist


In [None]:
model = UNet(c=16)
hist = fit(model, dataloader, epochs=9, sub_batch_size=25)

In [None]:
bar = tqdm(dataloader['test'])
hist_test = {'test_loss': [], 'test_iou': [], 'test_dice': []}
test_loss, test_iou, test_dice = [], [], []
criterion = torch.nn.BCEWithLogitsLoss()
model.eval()

with torch.no_grad():
    for imgs, masks in bar:
        imgs, masks = imgs.to(device), masks.to(device)
        y_hat = model(imgs)
        loss = criterion(y_hat, masks)
        ious = iou(y_hat, masks)
        dices = dice_coeff(y_hat, masks)
        test_loss.append(loss.item())
        test_iou.append(ious)
        test_dice.append(dices)

hist_test['test_loss'].append(np.mean(test_loss))
hist_test['test_iou'].append(np.mean(test_iou))
hist_test['test_dice'].append(np.mean(test_dice))

EF ESTIMATION 

In [None]:
class RegressionHead(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim=64):
        super(RegressionHead, self).__init__()
        self.fc1 = torch.nn.Linear(input_dim, hidden_dim)
        self.fc3 = torch.nn.Linear(hidden_dim, 1) 
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc3(x)
        return x
    
class reduce_cnn(torch.nn.Module):
    def __init__(self):
        super(reduce_cnn, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels = 1, out_channels = 16, kernel_size = 3)
        self.conv2 = torch.nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = 3)
        self.conv3 = torch.nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = 3)
        self.conv4 = torch.nn.Conv2d(in_channels = 64, out_channels = 128, kernel_size = 3)
        self.conv5 = torch.nn.Conv2d(in_channels = 128, out_channels = 256, kernel_size = 3)
        self.pool = torch.nn.MaxPool2d(2)
        
    def forward(self, x):
        x = self.pool(F.normalize(F.relu(self.conv1(x))))
        x = self.pool(F.normalize(F.relu(self.conv2(x))))
        x = self.pool(F.normalize(F.relu(self.conv3(x))))
        x = self.pool(F.normalize(F.relu(self.conv4(x))))
        x = self.pool(F.normalize(F.relu(self.conv5(x))))
        N,C,W,H = x.shape 
        return x.reshape(N,C*H*W)

class UnetToRegressor(torch.nn.Module):
    def __init__(self):
        super(UnetToRegressor, self).__init__()
        self.unet = UNet()
        #self.unet = torch.load('model_Unet.pt')
        self.reduce = reduce_cnn()
        self.regression_head = RegressionHead(768)

    def forward(self, x):
        N,C,W,H = x.shape
        x1, x2 = x[:,0,:,:].reshape(N,1,W,H), x[:,1,:,:].reshape(N,1,W,H)
        x1, x2 = self.unet(x1), self.unet(x2)
        x_diff = torch.abs(x1 - x2)
        x1,x2,x_diff = self.reduce(x1), self.reduce(x2), self.reduce(x_diff)
        x = torch.cat([x1.view(x1.size(0), -1), x2.view(x2.size(0), -1), x_diff.view(x_diff.size(0), -1)], dim=1)
        x = self.regression_head(x)
        return x.view(-1)

In [None]:
model = UnetToRegressor()
num_params = count_parameters(model)
print(num_params)
output = model(torch.randn((50,2,112,112)))
output.shape

In [None]:
class Dataset2(torch.utils.data.Dataset):
    def __init__(self,X, y):
        self.img=torch.tensor(X.astype(np.float32))
        self.y=torch.tensor(y.astype(np.float32))
    def __len__(self):
        return len(self.img)
    def __getitem__(self, idx):
        return self.img[idx],self.y[idx]

In [None]:
dataset_img_ef = np.load("your_images_ef_data.npy",allow_pickle=True)
y_ef = np.load("your_ef_data.npy",allow_pickle=True)

In [None]:
N,C,H,W = np.array(dataset_img_ef).shape
N_training = int(0.6*N)
N_validation = int(0.2*N) + N_training
indices = np.random.permutation(len(dataset_img_ef))
dataset_img_ef= dataset_img_ef[indices]
y_ef= y_ef[indices]
dataset2 = {
    'train': Dataset2(np.array(dataset_img_ef[:N_training]), y_ef[:N_training]), 
    'valid': Dataset2(np.array(dataset_img_ef[N_training:N_validation]), y_ef[N_training:N_validation]),
    'test': Dataset2(np.array(dataset_img_ef[N_validation:]), y_ef[N_validation:])
}
dataloader2 = {
    'train': DataLoader(dataset2['train'], batch_size=50, shuffle=True, pin_memory=True),
    'valid': DataLoader(dataset2['valid'], batch_size=100, pin_memory=True),
    'test': DataLoader(dataset2['test'], batch_size=100, pin_memory=True)
}
print(len(dataset2['train']), len(dataset2['valid']), len(dataset2['test']))

In [None]:
r2_metric = R2Score()
mae_metric = MeanAbsoluteError()

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

def fit(model, dataloader, epochs=100, lr=3e-4, sub_batch_size=5):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = torch.nn.MSELoss()
    model.to(device)
    best_r_squared = 0.75 
    hist = {'loss': [], 'mae': [], 'val_loss': [], 'val_mae': [], 'val_r_squared': []}
    for epoch in range(1, epochs+1):
        
        bar = tqdm(dataloader['train'])
        train_loss, train_mae = [], []
        model.train()
        for imgs, masks in bar:
            imgs, masks = imgs.to(device), masks.to(device)
            num_sub_batches = len(imgs) // sub_batch_size
            optimizer.zero_grad()
            for j in range(num_sub_batches):
                start = j * sub_batch_size
                end = start + sub_batch_size
                y_hat = model(imgs[start:end])
                loss = criterion(y_hat, masks[start:end])
                loss.backward()
                optimizer.step()
                maes = mae_metric(y_hat.cpu().detach(), masks[start:end].cpu().detach())
                train_loss.append(loss.item())
                train_mae.append(maes)
            bar.set_description(f"loss {np.mean(train_loss):.5f} mae {np.mean(train_mae):.5f}")
        hist['loss'].append(np.mean(train_loss))
        hist['mae'].append(np.mean(train_mae))

        bar = tqdm(dataloader['valid'])
        val_loss, val_mae, val_r_squared = [], [], []
        model.eval()
        with torch.no_grad():
            for imgs, masks in bar:
                imgs, masks = imgs.to(device), masks.to(device)
                y_hat = model(imgs)
                loss = criterion(y_hat, masks)
                maes = mae_metric(y_hat.cpu().detach(), masks.cpu().detach())
                r_squareds = r2_metric(y_hat.cpu().detach(), masks.cpu().detach())
                val_loss.append(loss.item())
                val_mae.append(maes)
                val_r_squared.append(r_squareds)
                bar.set_description(f"val_loss {np.mean(val_loss):.5f}  val_mae {np.mean(val_mae):.5f} val_r_squared {np.mean(val_r_squared):.5f}")
        hist['val_loss'].append(np.mean(val_loss))
        hist['val_mae'].append(np.mean(val_mae))
        hist['val_r_squared'].append(np.mean(val_r_squared))
        mean_val_r_squared = np.mean(val_r_squared)
        if mean_val_r_squared > best_r_squared:
            torch.save(model, 'the_ef_estimator.pt')
            best_r_squared = mean_val_r_squared
            print(f"Model saved with test_r_squared: {mean_val_r_squared}")

        print(f"\nEpoch {epoch}/{epochs} loss {np.mean(train_loss):.5f} mae {np.mean(train_mae):.5f} val_loss {np.mean(val_loss):.5f} val_mae {np.mean(val_mae):.5f} val_r_squared {np.mean(val_r_squared):.5f}")
    return hist


In [None]:
model = UnetToRegressor()
hist = fit(model, dataloader2, epochs=50, sub_batch_size=25)

In [None]:
#model = torch.load('the_ef_estimator.pt')

In [None]:
bar = tqdm(dataloader['test'])
hist_test = {'test_loss': [], 'test_mae': [], 'test_r_squared': []}
test_loss, test_mae, test_r_squared = [], [], []
model.eval()
with torch.no_grad():
    for imgs, masks in bar:
        imgs, masks = imgs.to(device), masks.to(device)
        y_hat = model(imgs)
        loss = criterion(y_hat, masks)
        maes = mae_metric(y_hat.cpu().detach(), masks.cpu().detach())
        r_squareds = r2_metric(y_hat.cpu().detach(), masks.cpu().detach())
        test_loss.append(loss.item())
        test_mae.append(maes)
        test_r_squared.append(r_squareds)
        
hist_test['test_loss'].append(np.mean(test_loss))
hist_test['test_mae'].append(np.mean(test_mae))
hist_test['test_r_squared'].append(np.mean(test_r_squared))