In [92]:
import pandas as pd
import numpy as np
from jupyter_utils import jupyter_setup, load_tracker
import ipywidgets as widgets
jupyter_setup()
# from torchvision.models import resnet18, resnet50, ResNet18_Weights, ResNet50_Weights
from models import get_resnet18
import torch
from torch import nn
import torch.nn.functional as F


D:\Archiv\Studium\Master\6.-Semester\Masters_Thesis\Git\acoustic_covid_detection\python


In [25]:
resnet = get_resnet18()

In [14]:
def get_bag_statistics(y, batch_size, bag_size):
    y = y.view(batch_size, bag_size)
    
    mu = y.mean(dim=1)
    diff = y.t() - mu
    sigma = torch.pow(torch.mean(torch.pow(diff, 2.0), dim=0), 0.5)
    z_scores = diff / sigma
    skew = torch.mean(torch.pow(z_scores, 3), dim=0)
    kurtoses = torch.mean(torch.pow(z_scores, 4), dim=0)
    median, _ = y.median(dim=1)
    minimum, _ = y.min(dim=1)
    maximum, _ = y.max(dim=1)

    bag_statistics = torch.stack([mu, median, sigma, minimum, maximum, skew, kurtoses]).t()
    return bag_statistics

In [69]:
class PredictionLevelMIL_SingleGatedLayer(nn.Module):
    def __init__(self, n_neurons, dropout=0.25):
        super().__init__()
        self.n_bag_statistics = 7
        self.n_hidden_attention = n_neurons
        self.dropout = dropout
        self.resnet_out_features = 512
        
        self.binary_classification_layer = nn.Sequential(
            nn.Linear(self.resnet_out_features, 1),
            nn.Sigmoid()
        )
        
        self.attention_V = nn.Sequential(
            nn.Linear(self.n_bag_statistics, self.n_hidden_attention),
            nn.Tanh(),
        )
        self.attention_U = nn.Sequential(
            nn.Linear(self.n_bag_statistics, self.n_hidden_attention),
            nn.Sigmoid(),
        )
        self.attention_out =  nn.Sequential(
            nn.Dropout(p=self.dropout),
            nn.Linear(self.n_hidden_attention, 1)
        )

    def forward(self, y, batch_size, bag_size):
        y = self.binary_classification_layer(y.squeeze())
        bag_statistics = get_bag_statistics(y, batch_size, bag_size)
        A_V = self.attention_V(bag_statistics)
        A_U = self.attention_U(bag_statistics)
        y_pred = self.attention_out(A_V * A_U)  # element wise multiplication

        return y_pred

In [70]:
class PredictionLevelMIL_DoubleDenseLayer(nn.Module):
    def __init__(self, n_neurons, dropout=0.25):
        super().__init__()
        self.n_bag_statistics = 7
        self.n_hidden_attention = n_neurons
        self.dropout = dropout
        self.resnet_out_features = 512
        
        self.binary_classification_layer = nn.Sequential(
            nn.Linear(self.resnet_out_features, 1),
            nn.Sigmoid()
        )
        self.mil_net =  nn.Sequential(
            nn.Linear(self.n_bag_statistics, self.n_hidden_attention),
            nn.Dropout(p=self.dropout),
            nn.Linear(self.n_hidden_attention, self.n_hidden_attention),
            nn.Dropout(p=self.dropout),
            nn.Linear(self.n_hidden_attention, 1)
        )

    def forward(self, y, batch_size, bag_size):
        y = self.binary_classification_layer(y.squeeze())
        bag_statistics = get_bag_statistics(y, batch_size, bag_size)
        y_pred = self.mil_net(bag_statistics)  # element wise multiplication
        return y_pred

In [93]:
class FeatureLevelMIL(nn.Module):
    def __init__(self, n_neurons, dropout=0.25):
        super().__init__()
        self.n_hidden_attention = n_neurons
        self.dropout = dropout
        self.resnet_out_features = 512
#         self.n_features = n_features
        
#         self.feature_layer = nn.Sequential(
#             nn.Linear(self.resnet_out_features, n_features),
#         )
        
        self.attention_V = nn.Sequential(
            nn.Linear(self.resnet_out_features, self.n_hidden_attention),
            nn.Tanh(),
        )
        self.attention_U = nn.Sequential(
            nn.Linear(self.resnet_out_features, self.n_hidden_attention),
            nn.Sigmoid(),
        )
        self.attention_out =  nn.Sequential(
            nn.Dropout(p=self.dropout),
            nn.Linear(self.n_hidden_attention, 1)
        )
        
        self.output_layer = nn.Sequential(
            nn.Linear(in_features=self.resnet_out_features, out_features=1)
            # nn.Linear(in_features=128, out_features=1)
        )

    def forward(self, y, batch_size, bag_size):
        # y = self.feature_layer(y.squeeze())
        
        y = y.squeeze()
        # batchsize*bagsize x 512

        A_V = self.attention_V(y)
        A_U = self.attention_U(y)
        attentation_coef = self.attention_out(A_V * A_U)  # element wise multiplication
        attentation_coef = attentation_coef.view(batch_size, bag_size, 1)
        attentation_coef = F.softmax(attentation_coef, dim=1)
        # batchsize x bagsize x 1
        
        x_combined_bag = y.view(batch_size, bag_size, self.resnet_out_features) * attentation_coef
        # y = y.view(batch_size, bag_size, 1)
        # [batchsize x bagsize x 512] * [batchsize x bagsize x 1]
        x_combined_bag = x_combined_bag.mean(dim=1)
        # [batch_size x 512]
        y_pred = self.output_layer(x_combined_bag)
        # [batch_size x 1]
        return y_pred

In [104]:
class FeatureLevelMIL_ExtraFeatureLayer(nn.Module):
    def __init__(self, n_features, n_neurons, dropout=0.25):
        super().__init__()
        self.n_hidden_attention = n_neurons
        self.dropout = dropout
        self.resnet_out_features = 512
        self.n_features = n_features
        
        self.feature_layer = nn.Sequential(
            nn.Linear(self.resnet_out_features, self.n_features),
            nn.Dropout(p=self.dropout)
        )
        
        self.attention_V = nn.Sequential(
            nn.Linear(self.n_features, self.n_hidden_attention),
            nn.Tanh(),
        )
        self.attention_U = nn.Sequential(
            nn.Linear(self.n_features, self.n_hidden_attention),
            nn.Sigmoid(),
        )
        self.attention_out =  nn.Sequential(
            nn.Dropout(p=self.dropout),
            nn.Linear(self.n_hidden_attention, 1)
        )
        
        self.output_layer = nn.Sequential(
            nn.Linear(self.n_features, 1)
        )

    def forward(self, y, batch_size, bag_size):
        # y = self.feature_layer(y.squeeze())
        
        y = self.feature_layer(y.squeeze())
        # batchsize*bagsize x 512

        A_V = self.attention_V(y)
        A_U = self.attention_U(y)
        attentation_coef = self.attention_out(A_V * A_U)  # element wise multiplication
        attentation_coef = attentation_coef.view(batch_size, bag_size, 1)
        attentation_coef = F.softmax(attentation_coef, dim=1)
        # batchsize x bagsize x 1
        
        x_combined_bag = y.view(batch_size, bag_size, self.n_features) * attentation_coef
        # y = y.view(batch_size, bag_size, 1)
        # [batchsize x bagsize x 512] * [batchsize x bagsize x 1]
        x_combined_bag = x_combined_bag.mean(dim=1)
        # [batch_size x 512]
        y_pred = self.output_layer(x_combined_bag)
        # [batch_size x 1]
        return y_pred

In [86]:
newmodel = torch.nn.Sequential(*(list(resnet.children())[:-1]))

In [105]:
# mil = PredictionLevelMIL_SingleGatedLayer(32)
# mil = PredictionLevelMIL_SingleGatedLayer(32)
# mil = FeatureLevelMIL(32)
mil = FeatureLevelMIL_ExtraFeatureLayer(64, 32)

In [100]:
x = torch.rand(16, 8, 1, 224, 112)

In [101]:
batch_size, bag_size = x.shape[0], x.shape[1]
feature_size = x.shape[2:]
x = x.view(batch_size * bag_size, *feature_size)

In [102]:
y = newmodel(x)
y.shape

torch.Size([128, 512, 1, 1])

In [106]:
pred = mil(y, batch_size, bag_size)

In [107]:
pred

tensor([[0.0581],
        [0.0845],
        [0.1133],
        [0.1208],
        [0.0835],
        [0.0988],
        [0.1136],
        [0.0795],
        [0.0815],
        [0.1122],
        [0.0626],
        [0.0787],
        [0.0939],
        [0.0944],
        [0.0860],
        [0.0855]], grad_fn=<AddmmBackward0>)