# Import Library

In [1]:
import torch
import torch.nn as nn
import torchvision.transforms as T
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchvision.models import resnet50
import numpy as np
import pandas as pd
import cv2
import os
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from PIL import Image
import matplotlib.pyplot as plt
from sentence_transformers import SentenceTransformer
from torchsummary import summary
from tensorboard.plugins import projector
from scipy.optimize import linear_sum_assignment
from utils.utils import *
from utils.assignment import *
from utils.latent_loss import *

In [2]:
def format_pytorch_version(version):
    return version.split('+')[0]

TORCH_version = torch.__version__
TORCH = format_pytorch_version(TORCH_version)

def format_cuda_version(version):
    return 'cu' + version.replace('.', '')

CUDA_version = torch.version.cuda
CUDA = format_cuda_version(CUDA_version)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

# Dataset & Dataloader

In [3]:
class XRayDataset(Dataset):
    def __init__(self, data, img_dir, transform):
        self.data = data
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.data.iloc[idx, 0])
        image = Image.open(img_path).resize((256, 256))
        label = self.data.iloc[idx, 2]
        
        image = transform_img(image, self.transform)
        _, label = tokenize_report(label)
        return image[0], len(label)

In [4]:
df_train = pd.read_csv('data/training_set.csv')
df_test = pd.read_csv('data/testing_set.csv')

img_path = 'data/images'

transform = T.Compose([
    T.Resize(256),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225])
])

train_data = XRayDataset(df_train, img_path, transform)
test_data = XRayDataset(df_test, img_path, transform)
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
test_loader = DataLoader(test_data, batch_size=128, shuffle=True)

# CNN Text

In [5]:
class CNN_Feat(nn.Module):
    def __init__(self, hidden_dim=348):
        super().__init__()
        self.backbone = nn.Sequential(*list(resnet50(pretrained=True).children())[:-2])
        
        for param in self.backbone.parameters():
            param.requires_grad = False
    
    def forward(self, X):
        feat = self.backbone(X)
        return feat

In [6]:
class CNN_Text(nn.Module):
    def __init__(self, hidden_dim=384, nheads=4, ## According to feature vectors that we get from SBERT, hidden_dim = 384
                 num_encoder_layers=3, num_decoder_layers=3):
        super().__init__()

        # create ResNet-50 backbone
        self.conv_feat = CNN_Feat(hidden_dim)
        self.conv = nn.Conv2d(2048, hidden_dim, 1)

        # create encoder and decoder layers
        self.encoder = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=nheads)
        self.decoder = nn.TransformerDecoderLayer(d_model=hidden_dim, nhead=nheads)
        
        # create a default PyTorch transformer: nn.Transformer(hidden_dim, nheads, num_encoder_layers, num_decoder_layers)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder, num_encoder_layers)
        self.transformer_decoder = nn.TransformerDecoder(self.decoder, num_decoder_layers)

        # output positional encodings (sentence)
        self.sentence = nn.Parameter(torch.rand(10, hidden_dim))

        # spatial positional encodings (may be changed to sin positional encodings)
        self.row_embed = nn.Parameter(torch.rand(50, hidden_dim // 2))
        self.col_embed = nn.Parameter(torch.rand(50, hidden_dim // 2))
        
    def forward(self, X):
        feat = self.conv_feat(X)
        feat = self.conv(feat)
        H, W = feat.shape[-2:]
        
        pos = torch.cat([
            self.col_embed[:W].unsqueeze(0).repeat(H, 1, 1),
            self.row_embed[:H].unsqueeze(1).repeat(1, W, 1),
        ], dim=-1).flatten(0, 1).unsqueeze(1)
        
        feat = self.transformer_encoder(pos + 0.1 * feat.flatten(2).permute(2, 0, 1))
        R = self.transformer_decoder(self.sentence.unsqueeze(1), feat).transpose(0, 1)
        return R, feat

In [7]:
img_path = 'data/images'
filenames = os.listdir(img_path)
f = os.path.join(img_path, filenames[0])

img = Image.open(f)

transform = T.Compose([
    T.Resize(800),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225])
])

t_img = transform_img(img, transform)
print(t_img.shape)

torch.Size([1, 3, 975, 800])


In [8]:
model = CNN_Text()
R, feat = model(t_img)

print(R.shape, feat.shape)

torch.Size([1, 10, 384]) torch.Size([775, 1, 384])


# LSP Decoder

In [9]:
class LSP_Decoder(nn.Module):
    def __init__(self, vocab_size, hidden_dim=384, nhead=4, num_layers=3):
        super().__init__()
        
        self.decoder = nn.TransformerDecoderLayer(d_model=hidden_dim, nhead=nhead)
        self.transformer_decoder = nn.TransformerDecoder(self.decoder, num_layers=num_layers)
        
        self.linear = nn.Linear(hidden_dim, vocab_size)
        
    def forward(self, tgt, memory):
        decode_sen = self.transformer_decoder(tgt, memory)
        decode_sen = F.softmax(self.linear(decode_sen), dim=-1)
        return decode_sen

In [10]:
decoder = LSP_Decoder(vocab_size = 2000)

N, c = 10, 384
emb = nn.Embedding(N, c)
x = torch.arange(N)
x = emb(x).unsqueeze(1)
print(x.shape)

y = decoder(x, feat)
y = y.transpose(0, 1)
print(y.shape)

torch.Size([10, 1, 384])
torch.Size([1, 10, 2000])


# Loss

In [17]:
loss_fn = MSEGCRLatentLoss()

In [26]:
B = torch.tensor([[2, 0], [1, 2], [5, 3]], dtype=float, requires_grad=True)
len_B = torch.tensor([1, 2])

R = torch.tensor([[2, 0], [2, 0], [5, 2], [10, 2]], dtype=float, requires_grad=True)
len_R = torch.tensor([1, 3])

R_pi, R_i, loss = loss_fn.forward(B, len_B, R, len_R)
print(loss)
loss.backward()

tensor(0.8250, dtype=torch.float64, grad_fn=<AddBackward0>)


In [37]:
chunk_pad_by_lengths(R_pi, len_B, batch_first = True)

tensor([[[2., 0.],
         [0., 0.]],

        [[2., 0.],
         [5., 2.]]], dtype=torch.float64, grad_fn=<CopySlices>)

# MLP Predict lenght of the report

In [13]:
class MLP(nn.Module):
    def __init__(self, hidden_dim=384):
        super().__init__()
        
        self.conv_feat = CNN_Feat(hidden_dim)
        self.linear1 = nn.Linear(131072, 32)
        self.linear2 = nn.Linear(32, 32)
        self.linear3 = nn.Linear(32, 32)
        self.output = nn.Linear(32, 1)
        
        self.dropout = nn.Dropout(0.7)
        
    def forward(self, X):
        feat = self.conv_feat(X)
        x = torch.flatten(feat, 1)
        x = self.linear1(x).relu()
        x = self.linear2(x).relu()
        x = self.linear3(x).relu()
        x = self.dropout(x)
        output = self.output(x).relu()
        return output

In [14]:
def train(train_loader):
    model.train()
    c=0
    correct=0
    for X, y in train_loader:  # Iterate in batches over the training dataset.
        out = model(X.to(device))
    
        loss = criterion(torch.flatten(out), y.to(device))  # Compute the loss.
        loss.backward()  # Derive gradients.
        optimizer.step()  # Update parameters based on gradients.
        optimizer.zero_grad()  # Clear gradients.

        c=c+1
        correct+=loss.cpu().detach().numpy()
    return correct/c

def test(loader):
    model.eval()
    correct = 0
    c=0
    for X, y in loader:  # Iterate in batches over the training/test dataset.
        out = model(X.to(device))
    
        loss = criterion(torch.flatten(out), y.to(device)) # Compute the loss.
        correct += loss.cpu().detach().numpy()  # Check against ground-truth labels.
        c=c+1
    return correct / c  # Derive ratio of correct predictions.

In [15]:
model = MLP().to(device)
params_to_update = model.parameters()

print("Params to learn:")
params_to_update = []
for name,param in model.named_parameters():
    if param.requires_grad == True:
        params_to_update.append(param)
        print("\t",name)
        
optimizer = torch.optim.Adam(params_to_update, lr=0.001)
criterion = torch.nn.L1Loss()
scheduler = ReduceLROnPlateau(optimizer, 'min', factor=0.3, patience=2, min_lr=0.000001)

Params to learn:
	 linear1.weight
	 linear1.bias
	 linear2.weight
	 linear2.bias
	 linear3.weight
	 linear3.bias
	 output.weight
	 output.bias


In [None]:
train_loss = []
val_loss = []
test_loss = []
epochs = 25
min_loss = 1.54
print('start train')

for epoch in range(epochs):
    train_acc = train(train_loader)
    test_acc = test(test_loader)
    train_loss.append(train_acc)
    test_loss.append(test_acc)
    scheduler.step(test_acc)
    print(f'Epoch: {epoch+1:03d}, Train MAE: {train_acc:.4f}, Test MAE: {test_acc:.4f}')
    if min_loss > test_acc:
        min_loss = test_acc
        print('Minimum Loss: {}'.format(min_loss))
        torch.save(model.state_dict(), "model/length_model_best2.pt")

In [None]:
plt.title('loss')
plt.plot(np.arange(epochs), train_loss, label='train loss')
plt.plot(np.arange(epochs), test_loss, label='val loss')

plt.legend()
plt.show()

# Tensorboard

In [None]:
writer = SummaryWriter()

for n_iter in range(100):
    writer.add_scalar('Loss/train', np.random.random(), n_iter)
    writer.add_scalar('Loss/test', np.random.random(), n_iter)
    writer.add_scalar('Accuracy/train', np.random.random(), n_iter)
    writer.add_scalar('Accuracy/test', np.random.random(), n_iter)


writer.add_embedding(y.reshape((-1, 4))) ##y.reshape((10,384))
writer.close()

In [None]:
%load_ext tensorboard
%tensorboard --logdir=runs