In [1]:
import math
import os
from methods import Segmenter, TransformerModel, LSTM, CRNN, UNet,CCRNN, UNet2, PatchTST
from matplotlib import pyplot as plt
import torch
from utilities import printc, seed
from utils_loader import get_dataloaders
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm
from utils_metrics import mean_iou
from torch.nn import functional as F

config = {
        'batch_size': 128,
        'epochs':200,
        'fsl': False,
        'model': 'unet',
        'seed': 73054772,
        'dataset': 'physiq',
        'window_size': 50,
        'step_size': 50,
    }
seed(config['seed'])
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
from sklearn.model_selection import train_test_split
from utilities import sliding_windows
from utils_dataset import ClassificationDataset, NormalDataset
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import SubsetRandomSampler
from utils_loader import _load



def sec_sliding(samples, labels, step=1, test=False):
    # create a sec sliding windows with step 1:
    sw2 = sliding_windows(50, step)
    # add padding on the samples left and right:
    samples = torch.nn.functional.pad(samples.permute(0, 2, 1), (25, 24), "constant", 0).permute(0, 2, 1)
    labels = torch.nn.functional.pad(labels.unsqueeze(1), (25, 24), "reflect").squeeze()
    samples, _ = sw2(samples, None)
    labels = labels.unfold(1, 50, step)
    if test:
        return samples, labels
    samples = samples.reshape(-1, 50, 6)
    labels = labels.reshape(-1, 50)
    return samples, labels

def non_fsl_dataloaders_with_sec_sliding(config):
    """
    Get dataloaders for the train, val, test sets for non-FSL

    Args:
        config (_type_): NA
    """
    inputs, labels = _load(config)
    sw = sliding_windows(300, 50)
    segmented_samples, segmented_labels = sw(torch.tensor(inputs), torch.tensor(labels))
    # Split the dataset into train, val and test:
    train_samples, test_samples, train_labels, test_labels = train_test_split(segmented_samples, segmented_labels, test_size=0.2, random_state=42)
    train_samples, val_samples, train_labels, val_labels = train_test_split(train_samples, train_labels, test_size=0.2, random_state=42)

    train_samples, train_labels = sec_sliding(train_samples, train_labels)
    val_samples, val_labels = sec_sliding(val_samples, val_labels)
    
    train_set = ClassificationDataset(train_samples, train_labels)
    val_set = ClassificationDataset(val_samples, val_labels)
    test_set = NormalDataset(test_samples, test_labels)
    temp_loader = DataLoader(train_set, batch_size=config['batch_size'], shuffle=True)
    printc("Train batch size if no sampler: ", len(temp_loader))
    # subsample the dataset:
    train_sampler = SubsetRandomSampler(list(range(0, len(train_set), 100)))
    # val:
    val_sampler = SubsetRandomSampler(list(range(0, len(val_set), 100)))
    
    train_loader = DataLoader(
        train_set,
        batch_size=config['batch_size'],
        # shuffle=True,
        num_workers=0,
        pin_memory=True,
        sampler=train_sampler
        )
    val_loader = DataLoader(
        val_set,
        batch_size=config['batch_size'],
        # shuffle=True,
        num_workers=0,
        pin_memory=True,
        sampler=val_sampler
        )
    test_loader = DataLoader(
        test_set,
        batch_size=config['batch_size'],
        # shuffle=True,
        num_workers=0,
        pin_memory=True,
        )
    return train_loader, val_loader, test_loader

In [3]:
train_loader, val_loader, test_loader = non_fsl_dataloaders_with_sec_sliding(config)
len(train_loader), len(val_loader), len(test_loader)


[91mTrain batch size if no sampler:  6495[0m


(65, 17, 7)

In [4]:
from methods.unet import UNet_encoder
from train import get_model

model = UNet_encoder(in_channels=6, out_channels=5, cnn_embed_dims=[64, 128, 356]).float().to(device)
# model = UNet(in_channels=6, out_channels=5).float().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, verbose=True, threshold=0.0001, threshold_mode='rel', cooldown=0, min_lr=0, eps=1e-08)
# print number of trainable parameters:
printc('Number of trainable parameters:', sum(p.numel() for p in model.parameters() if p.requires_grad))

criterion = torch.nn.CrossEntropyLoss()

best_loss = math.inf
counter_i = 0
pbar = tqdm(range(config['epochs']), postfix={'loss': 0.0, 'acc': 0.0})
for epoch in pbar:
    model.train()
    for images, labels in train_loader:
        b, h, c = images.shape
        # Forward pass
        images = images.float().to(device)
        labels = labels.to(device)
        outputs = model(images)
        # probs = F.softmax(outputs, dim=1)
        loss= criterion(outputs, labels.long())
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1., norm_type=2)
        optimizer.step()
        # scheduler.step(loss)
        # train loss and accuracy check:
        # if counter_i % 1000 == 0 and counter_i != 0:
        #     print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, config['epochs'], loss.item()))
    val_losses = []
    accs = []
    model.eval()
    for images, labels in val_loader:
        images = images.float().to(device)
        labels = labels.to(device)
        outputs = model(images)
        # outputs = outputs.permute(0, 2, 1)
        loss = criterion(outputs, labels.long())
        val_losses.append(loss.item())
        # accuracy against labels:
        predicted = model.forward_pred(images)
        # predicted == labels
        acc = (predicted == labels).sum().item() / predicted.shape[0]
        accs.append(acc)
    val_loss = sum(val_losses) / len(val_losses)
    accs = sum(accs) / len(accs)
    # print(f'Epoch [{epoch+1}/{config["epochs"]}], Val Loss: {val_loss}, Mean IoU: {miou}')
    pbar.set_postfix({'loss': val_loss, 'acc': accs})
    if val_loss < best_loss:
        best_loss = val_loss
        torch.save(model.state_dict(), f'./saved_model/opportunity_{config["model"]}_sec_sliding.pth')
    # visualize the predictions:
    # plt.plot(images[0, :].cpu().detach().numpy(), color='black')
    # plt.plot(model.forward_pred(images)[0, :].cpu().detach().numpy(), label='Prediction')
    # plt.plot(labels[0, :].cpu().detach().numpy(), label='Ground Truth')
    # plt.legend()
    # plt.show()
    counter_i += 1
# Test the model
model.eval()

[91mNumber of trainable parameters: 606961[0m


100%|██████████| 200/200 [01:24<00:00,  2.38it/s, loss=1.23, acc=0.889] 


UNet_encoder(
  (final_conv): Linear(in_features=356, out_features=5, bias=True)
  (backbone): Sequential(
    (0): ConvBlock(
      (conv1): Conv1d(6, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      (conv2): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
      (relu): ReLU(inplace=True)
    )
    (1): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (2): ConvBlock(
      (conv1): Conv1d(64, 128, kernel_size=(3,), stride=(1,), padding=(1,))
      (conv2): Conv1d(128, 128, kernel_size=(3,), stride=(1,), padding=(1,))
      (relu): ReLU(inplace=True)
    )
    (3): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): ConvBlock(
      (conv1): Conv1d(128, 356, kernel_size=(3,), stride=(1,), padding=(1,))
      (conv2): Conv1d(356, 356, kernel_size=(3,), stride=(1,), padding=(1,))
      (relu): ReLU(inplace=True)
    )
    (5): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
)

In [5]:
model.load_state_dict(torch.load(f'./saved_model/opportunity_{config["model"]}_sec_sliding.pth'))
model.eval()

with torch.no_grad():
    correct = 0
    total = 0
    m_ious = []
    for images, labels in test_loader:
        images = images.float().to(device)
        labels = labels.to(device)

        simages, slabels = sec_sliding(images, labels, step=1, test=True)
        simages = simages.reshape(-1, 50, 6)
        outputs = model(simages)
        outputs = outputs.reshape(slabels.shape[0], slabels.shape[1], 5)
        predicted = outputs.argmax(dim=2)
        total += labels.size(0) * labels.size(1)
        correct += (predicted == labels).sum().item()
        
        # m_ious.append(mean_iou(outputs, labels.long(), num_classes=5))

    print('Test Accuracy of the model {} on the test images: {} %'.format(config['model'].upper(), 
                                                                                        100 * correct / total))

Test Accuracy of the model UNET on the test images: 83.91416474210932 %


In [6]:
unet = UNet(in_channels=6, out_channels=5).float().to(device)
unet.load_state_dict(torch.load(f'./saved_model/opportunity_UNET.pth'))
unet.eval()

with torch.no_grad():
        correct = 0
        total = 0
        m_ious = []
        for images, labels in test_loader:
            images = images.float().to(device)
            labels = labels.to(device)
            outputs = unet(images)
            outputs = outputs.permute(0, 2, 1)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0) * labels.size(1)
            correct += (predicted == labels).sum().item()
            m_ious.append(mean_iou(unet.forward_pred(images), labels.long(), num_classes=5))

        print('Test Accuracy of the model {} on the test images: {} %, Mean IoU: {}'.format(config['model'].upper(), 
                                                                                            100 * correct / total, 
                                                                                            sum(m_ious) / len(m_ious)))

Test Accuracy of the model UNET on the test images: 98.40223248652809 %, Mean IoU: 0.9672746062278748
