<a href="https://colab.research.google.com/github/JohEder/bachelor_thesis_audio_ml/blob/master/transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install torchaudio
!pip install pytorch-model-summary

Collecting torchaudio
[?25l  Downloading https://files.pythonhosted.org/packages/a8/20/eab40caad8f4b97f5e91a5de8ba5ec29115e08fa4c9a808725490b7b4844/torchaudio-0.9.0-cp37-cp37m-manylinux1_x86_64.whl (1.9MB)
[K     |████████████████████████████████| 1.9MB 6.5MB/s 
Installing collected packages: torchaudio
Successfully installed torchaudio-0.9.0
Collecting pytorch-model-summary
  Downloading https://files.pythonhosted.org/packages/fe/45/01d67be55fe3683a9221ac956ba46d1ca32da7bf96029b8d1c7667b8a55c/pytorch_model_summary-0.1.2-py3-none-any.whl
Installing collected packages: pytorch-model-summary
Successfully installed pytorch-model-summary-0.1.2


In [4]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.utils.data as data
from torch.autograd import Variable
import numpy as np
import torch.nn.functional as F
from torch.utils.data import Dataset 
import torchaudio
import pandas as pd
import os
from torch.nn import TransformerEncoder, TransformerEncoderLayer
import math

In [5]:
#just copied the official import script for the dataset, custom preprocessing happens afterwards
""" Import script for IDMT-Traffic dataset
Ref:
    J. Abeßer, S. Gourishetti, A. Kátai, T. Clauß, P. Sharma, J. Liebetrau: IDMT-Traffic: An Open Benchmark
    Dataset for Acoustic Traffic Monitoring Research, EUSIPCO, 2021
"""

import os
import glob
import pandas as pd

__author__ = 'Jakob Abeßer (jakob.abesser@idmt.fraunhofer.de)'


def import_idmt_traffic_dataset(fn_txt: str = "idmt_traffic_all") -> pd.DataFrame:
    """ Import IDMT-Traffic dataset
    Args:
        fn_txt (str): Text file with all WAV files
    Returns:
        df_dataset (pd.Dataframe): File-wise metadata
            Columns:
                'file': WAV filename,
                'is_background': True if recording contains background noise (no vehicle), False else
                'date_time': Recording time (YYYY-MM-DD-HH-mm)
                'location': Recording location
                'speed_kmh': Speed limit at recording site (km/h), UNK if unknown,
                'sample_pos': Sample position (centered) within the original audio recording,
                'daytime': M(orning) or (A)fternoon,
                'weather': (D)ry or (W)et road condition,
                'vehicle': (B)us, (C)ar, (M)otorcycle, or (T)ruck,
                'source_direction': Source direction of passing vehicle: from (L)eft or from (R)ight,
                'microphone': (SE)= (high-quality) sE8 microphones, (ME) = (low-quality) MEMS microphones (ICS-43434),
                'channel': Original stereo pair channel (12) or (34)
    """
    # load file list
    df_files = pd.read_csv(fn_txt, names=('file',))
    fn_file_list = df_files['file'].to_list()

    # load metadata from file names
    df_dataset = []

    for f, fn in enumerate(fn_file_list):
        fn = fn.replace('.wav', '')
        parts = fn.split('_')

        # background noise files
        if '-BG' in fn:
            date_time, location, speed_kmh, sample_pos, mic, channel = parts
            vehicle, source_direction, weather, daytime = 'None', 'None', 'None', 'None'
            is_background = True

        # files with vehicle passings
        else:
            date_time, location, speed_kmh, sample_pos, daytime, weather, vehicle_direction, mic, channel = parts
            vehicle, source_direction = vehicle_direction
            is_background = False

        channel = channel.replace('-BG', '')
        speed_kmh = speed_kmh.replace('unknownKmh', 'UNK')
        speed_kmh = speed_kmh.replace('Kmh', '')

        df_dataset.append({'file': fn,
                           'is_background': is_background,
                           'date_time': date_time,
                           'location': location,
                           'speed_kmh': speed_kmh,
                           'sample_pos': sample_pos,
                           'daytime': daytime,
                           'weather': weather,
                           'vehicle': vehicle,
                           'source_direction': source_direction,
                           'microphone': mic,
                           'channel': channel})

    df_dataset = pd.DataFrame(df_dataset, columns=('file', 'is_background', 'date_time', 'location', 'speed_kmh', 'sample_pos', 'daytime', 'weather', 'vehicle',
                                                   'source_direction', 'microphone', 'channel'))

    return df_dataset

In [6]:
"""
Anomalous Sound Transformer Model for my Bachelor Thesis
"""

__author__ = 'Johannes Eder (Jo.Eder@campus.lmu.de)'

In [9]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
CLASSES = ['None','C','T', 'M', 'B'] #Background Noise, Car, Truck, Motorcycle, Bus
NORMAL_CLASSES = ['C', 'None']
ANOMALOUS_CLASSES = ['C','T', 'M', 'B']

SAMPLE_RATE = 22500
N_FFT=2048 #is also window size
HOP_LENGTH=1024
N_MELS=128
NUMBER_OF_FRAMES = 4
melspectogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=SAMPLE_RATE,
        n_fft=N_FFT, # Frame Size
        hop_length=HOP_LENGTH, #here half the frame size
        n_mels=N_MELS
    )

transforms = transforms.Compose([
    transforms.ToPILImage(mode='L'),
    #transforms.Grayscale(num_output_channels=3),
    #transforms.Resize([224, 224]),
    #transforms.RandomCrop(size=[N_MELS, NUMBER_OF_FRAMES]), #only train on random slice of the spectogram
    transforms.ToTensor(),
])

AUDIO_DIR = "/content/drive/My Drive/datasets/IDMT_Traffic/audio"
train_annotations = "/content/drive/My Drive/datasets/IDMT_Traffic/annotation/eusipco_2021_train.csv"
test_annotatons = "/content/drive/My Drive/datasets/IDMT_Traffic/annotation/eusipco_2021_test.csv"
all_annotations_txt = "/content/drive/My Drive/datasets/IDMT_Traffic/annotation/idmt_traffic_all.txt"

LEARNING_RATE = 0.001
EPOCHS = 10
BATCH_SIZE = 16
BATCH_SIZE_VAL = 1

In [6]:
class IdmtTrafficDataSet(Dataset):

    def __init__(self, annotations_file, audio_dir, audio_transformation, transformation, target_sample_rate, normal_classes):
        self.annotations =  annotations_file if isinstance(annotations_file, pd.DataFrame) else pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.audio_transformation = audio_transformation
        self.transformation = transformation
        self.target_sample_rate = target_sample_rate
        #self.classes = ['None','C','T', 'M', 'B']
        self.normal_classes = normal_classes


    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        audio_sample_path = self._get_audio_sample_path(index)
        label = self._get_audio_sample_label(index)
        signal, sr = torchaudio.load(audio_sample_path)
        signal = self._resample(signal, sr) #adjust sample rates
        # signal -> (num_channels, samples) i.e. (2, 16000)
        signal  = self._mix_down(signal) #stereo to mono
        signal = self.audio_transformation(signal) #(1, 16000) -> torch.Size([1, 64, 63])
        signal = self.transformation(signal)
        #label = self.normal_classes.index(label)
        label = 0 if label in self.normal_classes else 1 #1 for normal 0 for anomalous
        return signal, label

    def _resample(self, signal, sr):
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate)
            signal = resampler(signal)
        return signal
    
    def _mix_down(self, signal):
        if signal.shape[0] > 1: #(2, 16000)
            #mean operation: aggregating multiple channels
            signal = torch.mean(signal, 0, True)
        return signal

    def _get_audio_sample_path(self, index):
        path = os.path.join(self.audio_dir, self.annotations.iloc[index, 0])
        return path + '.wav'

    def _get_audio_sample_label(self, index):
        return self.annotations.iloc[index, 9]

In [7]:
def get_train_and_val_idmt(audio_dir, train_annotations, test_annotations):
    #this method is for the classifciation task

    train_data = IdmtTrafficDataSet(train_annotations, audio_dir,melspectogram, transforms, SAMPLE_RATE)
    test_data = IdmtTrafficDataSet(test_annotatons, audio_dir,melspectogram, transforms, SAMPLE_RATE)
    return train_data, test_data

def get_normal_and_anomalous_data(normal_classes, anomalous_classes, audio_dir, annotations, batch_size):
    #this method is for the AD Task

    all_data = import_idmt_traffic_dataset(annotations)

    normal_data = all_data[all_data.vehicle.isin(normal_classes)]
    anomalous_data = all_data[all_data.vehicle.isin(anomalous_classes)]

    train_data = normal_data.iloc[:6000, :] #test data needs to have some amount of normal data as well
    train_data = adjust_sample_number_to_batch_size(train_data, batch_size)

    normal_test_data = normal_data.iloc[6001:6101, :] #later more all test samples need to be used, but for now it is faster
    number_of_normal_test_sampels = len(normal_test_data)
    print(f"testing with {number_of_normal_test_sampels} normal samples")

    #sample same number of anomalous data to test
    anomalous_data = anomalous_data.sample(number_of_normal_test_sampels)

    frames = [anomalous_data, normal_test_data]
    concatenated_test_data = pd.concat(frames)
    concatenated_test_data.reset_index(drop=True, inplace=True)
    concatenated_test_data = adjust_sample_number_to_batch_size(concatenated_test_data, batch_size)

    normal_train_data = IdmtTrafficDataSet(train_data, audio_dir, melspectogram, transforms_training, SAMPLE_RATE, normal_classes)
    test_data = IdmtTrafficDataSet(concatenated_test_data, audio_dir, melspectogram, transforms_test_data, SAMPLE_RATE, normal_classes)

    return normal_train_data, test_data

def adjust_sample_number_to_batch_size(data, batch_size):
  if len(data) % batch_size == 0:
    return data
  else:
    remainder = len(data) % batch_size
    return data.iloc[remainder + 1:,:]

In [10]:
train_data, test_data = get_normal_and_anomalous_data(NORMAL_CLASSES, ANOMALOUS_CLASSES, audio_dir=AUDIO_DIR, annotations=all_annotations_txt, batch_size=BATCH_SIZE)
first_sample, first_label = train_data[0]
input_dim = NUMBER_OF_FRAMES #first_sample.shape[1] * first_sample.shape[2]
print(f"Train Data Shape: {first_sample.shape}") #Train Data Shape: torch.Size([1, 128, 44]), Frame Size 2: Train Data Shape: torch.Size([1, 128, 2])
print(f"Input dimension: {input_dim}")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

testing with 100 normal samples
Train Data Shape: torch.Size([1, 128, 4])
Input dimension: 4


In [60]:
class TransformerModel(nn.Module):
  def __init__(self, d_model, n_inputs, n_heads, dim_feedforward, n_encoder_layers, dropout=0.5):
    super(TransformerModel, self).__init__()
    self.model_type = 'Transformer'
    self.pos_encoder = PositionalEncoding(n_inputs, dropout)
    encoder_layers = TransformerEncoderLayer(d_model=n_inputs, nhead=n_heads, dim_feedforward=dim_feedforward, dropout=dropout)
    self.transformer_encoder = TransformerEncoder(encoder_layers, n_encoder_layers)
    self.patch_embedding = PatchEmbedding(n_inputs, d_model)
    self.n_inputs = n_inputs
    #self.decoder = not needed?

  def init_weights(self):
    initrange = 0.1
    self.encoder.weight.data.uniform_(-initrange, initrange)
  
  def forward(self, input):
    embedded = self.patch_embedding(input)  #scaling necessary?* math.sqrt(self.n_inputs)
    pos_encoded_embedded = self.pos_encoder(embedded)
    output = self.transformer_encoder(pos_encoded_embedded)
    return output

In [11]:
class PositionalEncoding(nn.Module):
  def __init__(self, n_inputs, dropout=0.1, max_len=5000):
    super(PositionalEncoding, self).__init__()
    self.dropout = nn.Dropout(p=dropout)

    pe = torch.zeros(max_len, n_inputs)
    #print(f"Shape: {pe.shape}")
    position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
    #print(f"Position shape: {position.shape}")
    div_term = torch.exp(torch.arange(0, n_inputs, 2).float() * (-math.log(10000.0) / n_inputs))
    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)
    pe = pe.unsqueeze(0).transpose(0, 1)
    self.register_buffer('pe', pe)
  
  def forward(self, x):
    x = x + self.pe[:x.size(0), :]
    return self.dropout(x)

In [7]:
class PatchEmbedding(nn.Module):
  def __init__(self, input_dim, embedding_dimension):
    super().__init__()
    self.embedding_layer = nn.Linear(input_dim, embedding_dimension)
  
  def forward(self, input_data):
    embedding = self.embedding_layer(input_data)
    return embedding

In [23]:
NUMBER_OF_FRAMES = 2
EMBEDDING_SIZE = 512
sample_input = torch.rand(8, 1, 128, 44) #batch_size, mel_filters, frames
#flattened = sample_input.view(8, -1)
#print(flattened.shape)
unfold = nn.Unfold(kernel_size=(128, NUMBER_OF_FRAMES), stride=NUMBER_OF_FRAMES) #patching the spectogram
unfolded = unfold(sample_input) #(batch_size, features, number_of_patches)
unfolded = unfolded.transpose(1, 2) #(batch_size, number_of_patches, features)
print(unfolded.shape)
patch_embedding_module = PatchEmbedding(unfolded.shape[2], EMBEDDING_SIZE)
embedded = patch_embedding_module(unfolded)
print(embedded.shape) #(batch_size: 8, num_patches: 22, features: 512)

torch.Size([8, 22, 256])
torch.Size([8, 22, 512])


In [12]:
positional_encoder = PositionalEncoding(EMBEDDING_SIZE)
pos_encoded = positional_encoder(embedded)
print(pos_encoded.shape) #torch.Size([1024, 1024, 512]) pos_enc, ,embed_size


torch.Size([8, 8, 512])


In [13]:
N_HEADS = 2
N_ENCODER_LAYERS = 2
DROPOUT = 0.2
DIM_FEED_FORWARD = 512
transformer = TransformerModel(NUMBER_OF_FRAMES, EMBEDDING_SIZE, N_HEADS, DIM_FEED_FORWARD, N_ENCODER_LAYERS, DROPOUT)
output = transformer(flattened)
#encoder_layer = nn.TransformerEncoderLayer(d_model=EMBEDDING_SIZE,nhead= batch_first=True)
src = torch.rand(32, 10, 512)
out = encoder_layer(src)
print(out.shape) #(batch, sequence, feature)

NameError: ignored