In [8]:
import os
import numpy as np
import torch
from torch.autograd import Variable
import itertools
from sklearn.metrics import confusion_matrix
import torch.nn as nn
from torch.nn import functional as F
import torch.optim as optim
from python_speech_features import *
import os
import itertools

In [9]:
config = {
    'num_classes': 2,
    'dropout': 0.3,
    'rnn_layers': 2,
    'audio_embed_size': 256,
    'text_embed_size': 1024,
    'batch_size': 2,
    'epochs': 150,
    'learning_rate': 8e-6,
    'audio_hidden_dims': 256,
    'text_hidden_dims': 128,
    'cuda': False,
    'lambda': 1e-5,
}

def standard_confusion_matrix(y_test, y_test_pred):
    """
    Make confusion matrix with format:
                  -----------
                  | TP | FP |
                  -----------
                  | FN | TN |
                  -----------
    Parameters
    ----------
    y_true : ndarray - 1D
    y_pred : ndarray - 1D

    Returns
    -------
    ndarray - 2D
    """
    [[tn, fp], [fn, tp]] = confusion_matrix(y_test, y_test_pred)
    return np.array([[tp, fp], [fn, tn]])

def model_performance(y_test, y_test_pred_proba):
    """
    Evaluation metrics for network performance.
    """
    # y_test_pred = y_test_pred_proba.data.max(1, keepdim=True)[1]
    y_test_pred = y_test_pred_proba

    # Computing confusion matrix for test dataset
    conf_matrix = standard_confusion_matrix(y_test, y_test_pred)
    print("Confusion Matrix:")
    print(conf_matrix)

    return y_test_pred, conf_matrix

In [10]:
class fusion_net(nn.Module):
    def __init__(self, text_embed_size, text_hidden_dims, rnn_layers, dropout, num_classes, \
         audio_hidden_dims, audio_embed_size):
        super(fusion_net, self).__init__()
        self.text_embed_size = text_embed_size
        self.audio_embed_size = audio_embed_size
        self.text_hidden_dims = text_hidden_dims
        self.audio_hidden_dims = audio_hidden_dims
        self.rnn_layers = rnn_layers
        self.dropout = dropout
        self.num_classes = num_classes
        
        # ============================= TextBiLSTM =================================
        
        # attention layer
        self.attention_layer = nn.Sequential(
            nn.Linear(self.text_hidden_dims, self.text_hidden_dims),
            nn.ReLU(inplace=True)
        )

        # lstm
        self.lstm_net = nn.LSTM(self.text_embed_size, self.text_hidden_dims,
                                num_layers=self.rnn_layers, dropout=self.dropout,
                                bidirectional=True)
        # FC
        self.fc_out = nn.Sequential(
            nn.Dropout(self.dropout),
            nn.Linear(self.text_hidden_dims, self.text_hidden_dims),
            nn.ReLU(),
            nn.Dropout(self.dropout)
        )
        
        # ============================= TextBiLSTM =================================

        # ============================= AudioBiLSTM =============================

        self.lstm_net_audio = nn.GRU(self.audio_embed_size,
                                self.audio_hidden_dims,
                                num_layers=self.rnn_layers,
                                dropout=self.dropout,
                                bidirectional=False,
                                batch_first=True)

        self.fc_audio = nn.Sequential(
            nn.Dropout(self.dropout),
            nn.Linear(self.audio_hidden_dims, self.audio_hidden_dims),
            nn.ReLU(),
            nn.Dropout(self.dropout)
        )

        self.ln = nn.LayerNorm(self.audio_embed_size)
        
        # ============================= AudioBiLSTM =============================

        # ============================= last fc layer =============================
        # self.bn = nn.BatchNorm1d(self.text_hidden_dims + self.audio_hidden_dims)
        # modal attention
        self.modal_attn = nn.Linear(self.text_hidden_dims + self.audio_hidden_dims, self.text_hidden_dims + self.audio_hidden_dims, bias=False)
        self.fc_final = nn.Sequential(
            nn.Linear(self.text_hidden_dims + self.audio_hidden_dims, self.num_classes, bias=False),
            # nn.ReLU(),
            nn.Softmax(dim=1),
            # nn.Sigmoid()
        )
        
    def attention_net_with_w(self, lstm_out, lstm_hidden):
        '''
        :param lstm_out:    [batch_size, len_seq, n_hidden * 2]
        :param lstm_hidden: [batch_size, num_layers * num_directions, n_hidden]
        :return: [batch_size, n_hidden]
        '''
        lstm_tmp_out = torch.chunk(lstm_out, 2, -1)
        # h [batch_size, time_step, hidden_dims]
        h = lstm_tmp_out[0] + lstm_tmp_out[1]
        # [batch_size, num_layers * num_directions, n_hidden]
        lstm_hidden = torch.sum(lstm_hidden, dim=1)
        # [batch_size, 1, n_hidden]
        lstm_hidden = lstm_hidden.unsqueeze(1)
        # atten_w [batch_size, 1, hidden_dims]
        atten_w = self.attention_layer(lstm_hidden)
        # m [batch_size, time_step, hidden_dims]
        m = nn.Tanh()(h)
        # atten_context [batch_size, 1, time_step]
        atten_context = torch.bmm(atten_w, m.transpose(1, 2))
        # softmax_w [batch_size, 1, time_step]
        softmax_w = F.softmax(atten_context, dim=-1)
        # context [batch_size, 1, hidden_dims]
        context = torch.bmm(softmax_w, h)
        result = context.squeeze(1)
        return result
    
    def pretrained_feature(self, x):
        with torch.no_grad():
            x_text = []
            x_audio = []
            for ele in x:
                x_text.append(ele[1])
                x_audio.append(ele[0])
            x_text, x_audio = Variable(torch.tensor(x_text).type(torch.FloatTensor), requires_grad=False), Variable(torch.tensor(x_audio).type(torch.FloatTensor), requires_grad=False)
            # ============================= TextBiLSTM =================================
            # x : [len_seq, batch_size, embedding_dim]
            x_text = x_text.permute(1, 0, 2)
            output, (final_hidden_state, _) = self.lstm_net(x_text)
            # output : [batch_size, len_seq, n_hidden * 2]
            output = output.permute(1, 0, 2)
            # final_hidden_state : [batch_size, num_layers * num_directions, n_hidden]
            final_hidden_state = final_hidden_state.permute(1, 0, 2)
            # final_hidden_state = torch.mean(final_hidden_state, dim=0, keepdim=True)
            # atten_out = self.attention_net(output, final_hidden_state)
            atten_out = self.attention_net_with_w(output, final_hidden_state)
            text_feature = self.fc_out(atten_out)

            # ============================= TextBiLSTM =================================

            # ============================= AudioBiLSTM =============================
            x_audio = self.ln(x_audio)
            x_audio, _ = self.lstm_net_audio(x_audio)
            x_audio = x_audio.sum(dim=1)
            audio_feature = self.fc_audio(x_audio)

        # ============================= AudioBiLSTM =============================
        return (text_feature, audio_feature)
        
    def forward(self, x): 
        output = self.fc_final(x)
        return output

In [11]:
prefix = os.path.abspath(os.path.join(os.getcwd(), ".."))
idxs_paths = ['train_idxs_1.npy', 'train_idxs_2.npy', 'train_idxs_3.npy']
text_model_paths = ['BiLSTM_128_0.62_1.pt', 'BiLSTM_128_0.59_2.pt', 'BiLSTM_128_0.78_3.pt']
audio_model_paths = ['BiLSTM_gru_vlad256_256_0.63_1.pt', 'BiLSTM_gru_vlad256_256_0.65_2.pt', 'BiLSTM_gru_vlad256_256_0.63_3.pt']
fuse_model_paths = ['fuse_0.67_1.pt', 'fuse_0.66_2.pt', 'fuse_0.71_3.pt']
text_features = np.load(os.path.join(prefix, 'Features/TextWhole/whole_samples_clf_avg.npz'))['arr_0']
text_targets = np.load(os.path.join(prefix, 'Features/TextWhole/whole_labels_clf_avg.npz'))['arr_0']
audio_features = np.squeeze(np.load(os.path.join(prefix, 'Features/AudioWhole/whole_samples_clf_256.npz'))['arr_0'], axis=2)
audio_targets = np.load(os.path.join(prefix, 'Features/AudioWhole/whole_labels_clf_256.npz'))['arr_0']
fuse_features = [[audio_features[i], text_features[i]] for i in range(text_features.shape[0])]
fuse_targets = text_targets
fuse_dep_idxs = np.where(text_targets == 1)[0]
fuse_non_idxs = np.where(text_targets == 0)[0]

In [12]:
def evaluate(model, test_idxs):
    model.eval()
    pred = torch.empty(config['batch_size'], 1).type(torch.LongTensor)
    X_test = []
    Y_test = []
    for idx in test_idxs:
        X_test.append(fuse_features[idx])
        Y_test.append(fuse_targets[idx])
    global max_train_acc, max_acc,max_f1
    for i in range(0, len(X_test), config['batch_size']):
        if i + config['batch_size'] > len(X_test):
            x, y = X_test[i:], Y_test[i:]
        else:
            x, y = X_test[i:(i+config['batch_size'])], Y_test[i:(i+config['batch_size'])]
        if config['cuda']:
            x, y = Variable(torch.from_numpy(x).type(torch.FloatTensor), requires_grad=True).cuda(), Variable(torch.from_numpy(y)).cuda()
        text_feature, audio_feature = model.pretrained_feature(x)
        with torch.no_grad():
            concat_x = torch.cat((text_feature, audio_feature), dim=1)
            output = model(concat_x)
        pred = torch.cat((pred, output.data.max(1, keepdim=True)[1]))
        
    y_test_pred, conf_matrix = model_performance(Y_test, pred[config['batch_size']:])
    
    # custom evaluation metrics
    print('Calculating additional test metrics...')
    accuracy = float(conf_matrix[0][0] + conf_matrix[1][1]) / np.sum(conf_matrix)
    precision = float(conf_matrix[0][0]) / (conf_matrix[0][0] + conf_matrix[0][1])
    recall = float(conf_matrix[0][0]) / (conf_matrix[0][0] + conf_matrix[1][0])
    f1_score = 2 * (precision * recall) / (precision + recall)
    
    print("Accuracy: {}".format(accuracy))
    print("Precision: {}".format(precision))
    print("Recall: {}".format(recall))
    print("F1-Score: {}\n".format(f1_score))
    print('='*89)

    return precision, recall, f1_score

In [13]:
ps, rs, fs = [], [], []

for fold in range(3):
    train_idxs_tmp = np.load(os.path.join(prefix, 'Features/TextWhole/{}'.format(idxs_paths[fold])), allow_pickle=True)
    test_idxs_tmp = list(set(list(fuse_dep_idxs)+list(fuse_non_idxs)) - set(train_idxs_tmp))
    resample_idxs = list(range(6))
    train_idxs, test_idxs = [], []
    # depression data augmentation
    for idx in train_idxs_tmp:
        if idx in fuse_dep_idxs:
            feat = fuse_features[idx]
            audio_perm = itertools.permutations(feat[0], 3)
            text_perm = itertools.permutations(feat[1], 3)
            count = 0
            for fuse_perm in zip(audio_perm, text_perm):
                if count in resample_idxs:
                    fuse_features.append(fuse_perm)
                    fuse_targets = np.hstack((fuse_targets, 1))
                    train_idxs.append(len(fuse_features)-1)
                count += 1
        else:
            train_idxs.append(idx)

    for idx in test_idxs_tmp:
        if idx in fuse_dep_idxs:
            feat = fuse_features[idx]
            audio_perm = itertools.permutations(feat[0], 3)
            text_perm = itertools.permutations(feat[1], 3)
            count = 0
            resample_idxs = [0,1,4,5]
            for fuse_perm in zip(audio_perm, text_perm):
                if count in resample_idxs:
                    fuse_features.append(fuse_perm)
                    fuse_targets = np.hstack((fuse_targets, 1))
                    test_idxs.append(len(fuse_features)-1)
                count += 1
        else:
            test_idxs.append(idx)

    fuse_model = torch.load(os.path.join(prefix, 'Model/ClassificationWhole/Fuse/{}'.format(fuse_model_paths[fold])))

    p, r, f = evaluate(fuse_model, test_idxs)
    ps.append(p)
    rs.append(r)
    fs.append(f)

  fuse_model = torch.load(os.path.join(prefix, 'Model/ClassificationWhole/Fuse/{}'.format(fuse_model_paths[fold])))
  x_text, x_audio = Variable(torch.tensor(x_text).type(torch.FloatTensor), requires_grad=False), Variable(torch.tensor(x_audio).type(torch.FloatTensor), requires_grad=False)


Confusion Matrix:
[[32 28]
 [ 4 17]]
Calculating additional test metrics...
Accuracy: 0.6049382716049383
Precision: 0.5333333333333333
Recall: 0.8888888888888888
F1-Score: 0.6666666666666667



  fuse_model = torch.load(os.path.join(prefix, 'Model/ClassificationWhole/Fuse/{}'.format(fuse_model_paths[fold])))


Confusion Matrix:
[[30 21]
 [10 23]]
Calculating additional test metrics...
Accuracy: 0.6309523809523809
Precision: 0.5882352941176471
Recall: 0.75
F1-Score: 0.6593406593406592



  fuse_model = torch.load(os.path.join(prefix, 'Model/ClassificationWhole/Fuse/{}'.format(fuse_model_paths[fold])))


Confusion Matrix:
[[27  5]
 [17 38]]
Calculating additional test metrics...
Accuracy: 0.7471264367816092
Precision: 0.84375
Recall: 0.6136363636363636
F1-Score: 0.7105263157894737



In [14]:
print('precison: {} \n recall: {} \n f1 score: {}'.format(np.mean(ps), np.mean(rs), np.mean(fs)))

precison: 0.6551062091503268 
 recall: 0.7508417508417509 
 f1 score: 0.6788445472655998
