In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install pydub

Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1


In [3]:
import subprocess, os, csv
from torch import nn
import torch, torchaudio, statistics
from pydub import AudioSegment
from pydub.silence import detect_nonsilent
import pandas as pd
import warnings

In [4]:
# suppress all warnings
warnings.filterwarnings("ignore")
# suppress only the UserWarning with message "At least one mel filterbank has all zero values."
warnings.filterwarnings("ignore")

In [5]:
# Model flow
class define_model(nn.Module):

    # Define layers
    def __init__(self, num_emotions):
        super().__init__()

        transformer_layer = nn.TransformerEncoderLayer(
            d_model=40, #####################
            nhead=4,
            dim_feedforward=512,
            dropout=0.4,
            activation='relu'
        )
        self.transformer_maxpool = nn.MaxPool2d(kernel_size=[1,4], stride=[1,4])
        self.transformer_encoder = nn.TransformerEncoder(transformer_layer, num_layers=4)

        #maxpool: reshape (width, height)
        #conv: reshape (channel)
        conv2d_layer = nn.Sequential(
            nn.Conv2d(
                in_channels=1,
                out_channels=16,
                kernel_size=3,
                stride=1,
                padding=1
            ),
            nn.BatchNorm2d(16),#######
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(p=0.3),
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=3,
                stride=1,
                padding=1
            ),
            nn.BatchNorm2d(32),#######
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=4, stride=4),
            nn.Dropout(p=0.3),
            nn.Conv2d(
                in_channels=32,
                out_channels=64,
                kernel_size=3,
                stride=1,
                padding=1
            ),
            nn.BatchNorm2d(64),#######
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=4, stride=4),
            nn.Dropout(p=0.3),
        )
        self.conv2Dblock1 = nn.Sequential(
            nn.Conv2d(
                in_channels=1,
                out_channels=16,
                kernel_size=3,
                stride=1,
                padding=1
            ),
            nn.BatchNorm2d(16),#######
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(p=0.3),
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=3,
                stride=1,
                padding=1
            ),
            nn.BatchNorm2d(32),#######
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=4, stride=4),
            nn.Dropout(p=0.3),
            nn.Conv2d(
                in_channels=32,
                out_channels=64,
                kernel_size=3,
                stride=1,
                padding=1
            ),
            nn.BatchNorm2d(64),#######
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=4, stride=4),
            nn.Dropout(p=0.3),
        )
        self.conv2Dblock2 = conv2d_layer

        self.fc1_layer = nn.Linear(960*2+40, 980)
        self.act1 = nn.ReLU()
        self.fc2_layer = nn.Linear(980, num_emotions)
        self.softmax_out = nn.Softmax(dim=1)

    def forward(self, x):
        conv2d_embedding1 = self.conv2Dblock1(x)
        conv2d_embedding1 = torch.flatten(conv2d_embedding1, start_dim = 1)

        conv2d_embedding2 = self.conv2Dblock2(x)
        conv2d_embedding2 = torch.flatten(conv2d_embedding2, start_dim = 1)

        x_maxpool = self.transformer_maxpool(x)
        x_maxpool_reduced = torch.squeeze(x_maxpool,1) ############
        x = x_maxpool_reduced.permute(2,0,1) ###########
        transformer_output = self.transformer_encoder(x)
        transformer_embedding = torch.mean(transformer_output, dim = 0)

        complete_embedding = torch.cat([conv2d_embedding1, conv2d_embedding2, transformer_embedding], dim = 1)
        fc1 = self.fc1_layer(complete_embedding)
        ac1 = self.act1(fc1)
        output_logits = self.fc2_layer(ac1)
        output_softmax = self.softmax_out(output_logits)
        return output_logits, output_softmax

In [6]:
class preprocess_real_data():
    def __init__(self,file):
      self.file = file
    def mp4_to_wav(self, file):
        # Load the MP4 file
        # video = VideoFileClip(file)
        # # Extract the audio from the video
        # audio = video.audio
        # # Export the audio as a WAV file
        # file = file[:-4]+'.wav'
        # audio.write_audiofile(file)
        output_file = file.replace('raw_datas', 'wav_datas')
        output_file = output_file.split('.')[0] + ".wav"
        subprocess.call(['ffmpeg', '-i', file, output_file])
        return output_file
    def remove_noise(self, file):
        # Detect non-silent parts of the audio
        sound_file = AudioSegment.from_wav(file)
        non_sil_times = detect_nonsilent(sound_file, min_silence_len=400, silence_thresh=sound_file.dBFS * 0.65)

        # Concatenate the non-silent parts of the audio
        if len(non_sil_times) > 0:
            non_sil_times_concat = [non_sil_times[0]]
            if len(non_sil_times) > 1:
                for t in non_sil_times[1:]:
                    if t[0] - non_sil_times_concat[-1][1] < 100:
                        non_sil_times_concat[-1] = (non_sil_times_concat[-1][0], t[1])
                    else:
                        non_sil_times_concat.append(t)
            new_audio = sound_file[non_sil_times_concat[0][0]:non_sil_times_concat[0][1]]
            for t in non_sil_times_concat[1:]:
                new_audio += sound_file[t[0]:t[1]]
        else:
            new_audio = sound_file

        # Export the new audio file
        file_name = file.replace('wav_datas', 'denoised_datas')
        file_name = file_name[:-4]+'_denoised.wav'
        new_audio.export(file_name, format="wav")
        return file_name
    def resize(self, file):
        tensor = torch.load(file)
        tensor1 = tensor[0,:,:]
        # tensor2 = tensor[1,:,:]
        n = tensor1.shape[-1]//500 + 1
        sample1 = torch.zeros((40,n * 500))
        sample1[:,:tensor1.shape[-1]] = tensor1
        sample1 = torch.transpose(torch.transpose(sample1.reshape((40,500,n)),dim0=0, dim1=2),dim0=1, dim1=2).reshape((n,1,40,500))
        return sample1
    def save_tensor_file(self, file):
        waveform, sample_rate = torchaudio.load(file)
        transform = torchaudio.transforms.MFCC(sample_rate=sample_rate)
        mfcc = transform(waveform)
        file_name = file[:-4]+'.pt'
        torch.save(mfcc,file_name)
        return file_name
    def complete_preprocessing(self, file):
      file = self.mp4_to_wav(file)
      file = self.remove_noise(file)
      file = self.save_tensor_file(file)
      tensor = self.resize(file)
      return file, tensor


In [7]:
def load_model(model_path):
    model = define_model(3)
    model.eval()
    model.load_state_dict(torch.load(model_path, map_location='cpu'))
    # model.load_state_dict(torch.load(model_path))
    return model

In [8]:
def predict_1_sample(file_sample, model, preprocesser):
    file_sample, tensor = preprocesser.complete_preprocessing(file_sample)
    output_logit, output_softmax = model(tensor)
    output_softmax = torch.argmax(output_softmax, dim=1)
    final_output = max(set(output_softmax.tolist()), key=output_softmax.tolist().count)
    emotion_dict = {0: 'positive',
                    1: 'neutral',
                    2: 'negative'}
    label = emotion_dict[final_output]
    return final_output, label



In [9]:
# #predict 1 sample
# model = load_model('/content/drive/MyDrive/DPL/model/model2_SGD_200e.pt')
# preprocess = preprocess_real_data('ok')
# final_output, label = predict_1_sample('file.wav', model, preprocess)
# print(label)

## Test for dataset

In [10]:
def extract_label_file(root):
  header = ['sessionID','labels','path']
  emotion_dict = {
      'positive': 0,
      'neutral': 1,
      'negative': 2
  }
  with open('test_real_data.csv',mode='w',newline='') as file:
          write = csv.writer(file)
          write.writerow(header)
  df = pd.read_csv('test_real_data.csv')
  for file in os.listdir(root):
          for sample in os.listdir(os.path.join(root, file)):
              processer = preprocess_real_data(0)
              path_sample, tensor = processer.complete_preprocessing(os.path.join(root, file, sample))
              sample_info = {'sessionID': sample, 'labels': emotion_dict[file],'path': path_sample}
              df = pd.concat([df, pd.DataFrame([sample_info])], ignore_index=True)
  df.to_csv('test_real_data.csv')

In [19]:
def test_real_data(model_file):
  df = pd.read_csv('/content/drive/MyDrive/DPL/test_data/test_real_data.csv')
  model = load_model(model_file)
  preprocesser = preprocess_real_data('ok')
  y = df.labels
  y = torch.as_tensor(y)
  y_hat = torch.zeros(len(y))
  ind_sample = 0
  for sample in df.path:
    final_output, label = predict_1_sample(sample, model, preprocesser)
    y_hat[ind_sample] = final_output
    ind_sample += 1
  return y, y_hat

In [78]:
# root = '/content/drive/MyDrive/DPL/test_data/raw_datas'
# extract_label_file(root)
# model_file = '/content/drive/MyDrive/DPL/model/model2_SGD_200e_2.pt'
# y, y_hat = test_real_data(model_file)
# print(accuracy)

In [76]:
def metric():
  df = pd.read_csv('/content/drive/MyDrive/DPL/test_data/test_real_data.csv')
  labels = {'positive':0,'neutral':0,'negative':0}
  start_num = 0
  for i, label in enumerate(labels):
    end_num = start_num+len(df.loc[df.labels == i])
    TP = torch.sum(y[start_num:end_num] == y_hat[start_num:end_num])
    the_last = torch.cat((y_hat[:start_num],y_hat[end_num:]))
    FP = torch.count_nonzero( the_last == i)
    FN = torch.count_nonzero( y_hat[start_num:end_num] != i)
    print('Precision of',label, ': ', TP/(TP+FP))
    F1 = 2*TP / (2*TP+FP+FN)
    labels[label] = round(F1.item(),2)
    start_num = len(df.loc[df.labels == i])
  accuracy = torch.sum(y==y_hat)/float(len(y))
  print('F1_score of every label: ', labels)
  print('Average F1: ',round(statistics.mean(labels.values()),2))
  print('Accuracy: ', accuracy)

In [79]:
# metric()