In [1]:
# Import Modules
import os
import pandas as pd
import numpy as np
import librosa
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm

In [3]:
# Get device information
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [4]:
# Functions for Extracting data and feature

# Normalize Audio data
def Amplitude_Normalization(y):
    audio_data = []
    max_signal = max(y); min_signal = min(y)
    for i, signal in enumerate(y):
        norm_signal = (signal - min_signal)/(max_signal - min_signal)
        audio_data.append(norm_signal)
    audio_data = np.array(audio_data)

    return audio_data

# Raw data
def _Extract_audio(wav_dir, file_df):
    audio_data = []
    for filename in file_df['filename']:
        path = os.path.join(wav_dir, filename)
        y, sr = librosa.load(path, mono=False)
        audio_data.append(y[0].T)

    return audio_data

# Get Pitches from audio
def _audio_pitch(wav_dir, file_df):
    pitches = []
    for filename in file_df['filename']:
        path = os.path.join(wav_dir, filename)
        y, sr = librosa.load(path, mono=False)
        try:
            pitch, _ = librosa.piptrack(y=y[0], sr=sr)
        except:
            pitch, _ = librosa.piptrack(y=y, sr=sr)
        pitch = pitch.mean(axis=0)
        pitches.append(pitch)
        
    return pitches

# Get Chroma Features from audio
def _audio_chroma(wav_dir, file_df):
    chromas = []
    for filename in file_df['filename']:
        path = os.path.join(wav_dir, filename)
        y, sr = librosa.load(path, mono=False)
        chroma = librosa.feature.chroma_stft(y=y[0], sr=sr)
        chromas.append(chroma.T)

    return chromas

# Get Mel-Frequency Cepstral Coefficient from audio
def _audio_MFCC(wav_dir, file_df):
    audio_data = []
    for filename in file_df['filename']:
        path = os.path.join(wav_dir, filename)
        y, sr = librosa.load(path, mono=False)
        mfcc = librosa.feature.mfcc(y=y[0], sr=sr, n_mfcc=40)
        audio_data.append(mfcc.T)  

    return audio_data

# Get Zero Crossing Rate from audio
def _audio_ZCR(wav_dir, file_df):
    zcrs = []
    for filename in file_df['filename']:
        path = os.path.join(wav_dir, filename)
        y, sr = librosa.load(path, mono=False)
        zcr = librosa.feature.zero_crossing_rate(y[0])
        zcrs.append(zcr.T)

    return zcrs

# Get RMS Energy from audio
def _audio_energy(wav_dir, file_df):
    energies = []
    for filename in file_df['filename']:
        path = os.path.join(wav_dir, filename)
        y, sr = librosa.load(path, mono=False)
        energy = librosa.feature.rms(y=y[0])
        energies.append(energy.T)

    return energies

# Normalize labels
def normalize_CQ(df):
    list_CQ = []
    for i, y in enumerate(df):
        y_min = min(df); y_max = max(df)
        n_CQ = (y - y_min) / (y_max - y_min)
        list_CQ.append(n_CQ)
        
    return list_CQ

In [5]:
# Custom dataset -> [batch, seq_length, input_size]
class wavDataset(Dataset):
    def __init__(self, wav_dir, csv_path, feature='Raw',subset=None):
        # Load dataframe
        df = pd.read_csv(csv_path)

        # Extract Audio data
        if(feature == 'Raw'):
            self.audio = _Extract_audio(wav_dir, df)
        elif(feature == 'Pitch'):
            self.audio = _audio_pitch(wav_dir, df)
        elif(feature == 'MFCC'):
            self.audio = _audio_MFCC(wav_dir, df)
        elif(feature == 'Chroma'):
            self.audio = _audio_chroma(wav_dir, df)
        elif(feature == 'ZCR'):
            self.audio = _audio_ZCR(wav_dir, df)
        elif(feature == "Energy"):
            self.audio = _audio_energy(wav_dir, df)
        # Target data
        self.CQ = df['CQ']

        # Split dataset for train/test
        split_idx1 = 193
        split_idx2 = 215
        if subset == 0: # Return Train dataset
            self.data = self.audio[:split_idx1]
            self.labels = self.CQ[:split_idx1].reset_index(drop=True)
        elif subset == 1: # Return Validation dataset
            self.data = self.audio[split_idx1:split_idx2]
            self.labels = self.CQ[split_idx1:split_idx2].reset_index(drop=True)
        elif subset == 2: # Return Test dataset
            self.data = self.audio[split_idx2:]
            self.labels = self.CQ[split_idx2:].reset_index(drop=True)
        else: # Return Complete dataset
            self.data = self.audio
            self.labels = self.CQ

    def __getitem__(self, index):
        audio_tensor = torch.from_numpy(self.data[index]).float()
        audio_tensor = audio_tensor.unsqueeze(1)
        label_tensor = torch.tensor(self.labels[index]).float()

        return audio_tensor, label_tensor
    
    def __len__(self):
        return len(self.data)

In [6]:
# Function for padding sequence
def my_collate_fn(batch):
    inputs, outputs = zip(*batch)
    padded_inputs = pad_sequence(inputs,
                                 batch_first=True,
                                 padding_value=0)
    
    outputs = torch.stack(outputs)

    return padded_inputs, outputs

In [7]:
def load_data(ft, batch):
    train_dir='/home/hyeonbin/hbb_work/EGG_data/train'
    test_dir='/home/hyeonbin/hbb_work/EGG_data/test'
    csv_train='/home/hyeonbin/hbb_work/wavtxt/wav_CQ_train.csv'
    csv_test='/home/hyeonbin/hbb_work/wavtxt/wav_CQ_test.csv'

    train_dataset = wavDataset(wav_dir=train_dir,
                                csv_path=csv_train,
                                feature=ft) # Train data

    test_dataset = wavDataset(wav_dir=test_dir,
                                csv_path=csv_test,
                                feature=ft) # Test data

    train_loader = DataLoader(train_dataset,
                            batch_size=batch,
                            shuffle=True)

    test_loader = DataLoader(test_dataset,
                            batch_size=batch,
                            shuffle=True)
    
    return train_loader, test_loader

In [8]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def mean_absolute_error(outputs, targets):
    return torch.mean(torch.abs(outputs - targets))

def root_mean_squared_error(outputs, targets):
    return torch.sqrt(torch.mean((outputs - targets) ** 2))

def MAPELoss(outputs, targets):
    diff = torch.abs((outputs - targets) / (targets)) * 10
    loss = torch.mean(diff)

    return loss


In [None]:
def plot_performance(y_true, y_pred, title):
    TimesNewRoman = fm.FontProperties(fname="/home/hyeonbin/anaconda3/fonts/Times_New_Roman.ttf")
    TimesNewRomanBold = fm.FontProperties(fname="/home/hyeonbin/anaconda3/fonts/Times_New_Roman_Bold.ttf")

    lim = [y_true.min(), y_true.max(), y_pred.min(), y_pred.max()]
    plt.figure((5, 5))
    x = np.arange(lim.min(), lim.max()+0.1, 0.1)
    y = x
    
    titledict = {'fontsize': 20,
                 'style': 'normal', # 'oblique' 'italic'
                 'fontweight': 'normal'} # 'bold', 'heavy', 'light', 'ultrabold', 'ultralight

    labeldict = {'fontsize': 15,
                 'style': 'normal', # 'oblique' 'italic'
                 'fontweight': 'normal'} # 'bold', 'heavy', 'light', 'ultrabold', 'ultralight'
    
    x_label = "Exact data"; y_label = "Predict data"
    plt.figure(figsize=(5, 5))
    plt.scatter(y_true, y_pred, s=10, c='black')
    plt.plot(x, y, linestyle='-.', color='red')
    plt.xlim(lim.min(), lim.max())
    plt.ylim(lim.min(), lim.max())
    plt.title(title, fontproperties=TimesNewRomanBold, **titledict)
    plt.xlabel(f'{x_label:>60}', fontproperties=TimesNewRoman, **labeldict)
    plt.ylabel(f'{y_label:>60}', fontproperties=TimesNewRoman, **labeldict)
    plt.show()

In [None]:
def train_model(model, num_epochs, criterion, optimizer, batch, feature, tag):
    # Record Losses to txt files
    f_train = open(f"/home/hyeonbin/hbb_work/Model_eval/{feature}/{tag}_train.txt", "w+")
    f_test = open(f"/home/hyeonbin/hbb_work/Model_eval/{feature}/{tag}_test.txt", "w+")

    train_loader, test_loader = load_data(feature, batch)

    for epoch in range(num_epochs):
        model.train() # Training Mode
        total_train_loss = 0
        total_train_samples = 0

        for i, (inputs, targets) in enumerate(train_loader):
            inputs = inputs.to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)            
            total_train_loss += loss.item() * inputs.size(0)
            total_train_samples += inputs.size(0)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        average_train_loss = total_train_loss / total_train_samples
        # writer.add_scalar(f"{feature}_{tag}/Loss/Train", average_train_loss, epoch)
        f_train.write(f"{average_train_loss}\n")
        print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {average_train_loss:.6f}')

        model.eval() # Inference Mode
        total_test_loss = 0
        total_test_samples = 0

        with torch.no_grad():

            for inputs, targets in test_loader: # Test
                inputs = inputs.to(device)
                targets = targets.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                total_test_loss += loss.item() * inputs.size(0)
                total_test_samples += inputs.size(0)

            average_test_loss = total_test_loss / total_test_samples
            # writer.add_scalar(f"{feature}_{tag}/Loss/Test", average_test_loss, epoch)
            f_test.write(f"{average_test_loss}\n")
            print(f'Epoch [{epoch+1}/{num_epochs}], Test Loss: {average_test_loss:.6f}')
    
    # Epoch End
    model.eval() # Inference Mode
    y_exact_test = []
    y_pred_test = []
    y_exact_train = []
    y_pred_train = []
    with torch.no_grad():
        for inputs, targets in test_loader: # Test
            inputs = inputs.to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            for i in targets.cpu():
                y_exact_train.append(i)
            for i in outputs.cpu():
                y_pred_train.append(i)

        for inputs, targets in train_loader: # Train
            inputs = inputs.to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            for i in targets.cpu():
                y_exact_train.append(i)
            for i in outputs.cpu():
                y_pred_train.append(i)
    
    plot_performance('Train', y_exact_train, y_pred_train)
    plot_performance('Test', y_exact_test, y_pred_test)

    # writer.flush()
    f_train.close
    f_test.close

: 

In [None]:
def train_transformer_model(model, num_epochs, criterion, optimizer, batch, feature, tag):
    # Record Losses to txt files
    f_train = open(f"/home/hyeonbin/hbb_work/Model_eval/{feature}/{tag}_train.txt", "w+")
    f_test = open(f"/home/hyeonbin/hbb_work/Model_eval/{feature}/{tag}_test.txt", "w+")

    train_loader, test_loader = load_data(feature, batch)

    for epoch in range(num_epochs):
        model.train() # Training Mode
        total_train_loss = 0
        total_train_samples = 0

        for i, (inputs, targets) in enumerate(train_loader):
            inputs = inputs.squeeze(2).to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)            
            total_train_loss += loss.item() * inputs.size(0)
            total_train_samples += inputs.size(0)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        average_train_loss = total_train_loss / total_train_samples
        # writer.add_scalar(f"{feature}_{tag}/Loss/Train", average_train_loss, epoch)
        f_train.write(f"{average_train_loss}\n")
        print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {average_train_loss:.6f}')

        model.eval() # Inference Mode
        total_test_loss = 0
        total_test_samples = 0

        with torch.no_grad():

            for inputs, targets in test_loader: # Test
                inputs = inputs.squeeze(2).to(device)
                targets = targets.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                total_test_loss += loss.item() * inputs.size(0)
                total_test_samples += inputs.size(0)

            average_test_loss = total_test_loss / total_test_samples
            # writer.add_scalar(f"{feature}_{tag}/Loss/Test", average_test_loss, epoch)
            f_test.write(f"{average_test_loss}\n")
            print(f'Epoch [{epoch+1}/{num_epochs}], Test Loss: {average_test_loss:.6f}')
    
    # Epoch End
    model.eval() # Inference Mode
    y_exact_test = []
    y_pred_test = []
    y_exact_train = []
    y_pred_train = []
    with torch.no_grad():
        for inputs, targets in train_loader: # Train
            inputs = inputs.squeeze(2).to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            for i in targets.cpu():
                y_exact_train.append(i.item())
            for i in outputs.cpu():
                y_pred_train.append(i.item())

        for inputs, targets in test_loader: # Test
            inputs = inputs.squeeze(2).to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            for i in targets.cpu():
                y_exact_test.append(i.item())
            for i in outputs.cpu():
                y_pred_test.append(i.item())

    plot_performance('Train', y_exact_train, y_pred_train)
    plot_performance('Test', y_exact_test, y_pred_test)

    # writer.flush()
    f_train.close
    f_test.close

In [13]:
class model_GRU(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(model_GRU, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.gru = nn.GRU(input_size,
                          hidden_size,
                          num_layers,
                          batch_first=True,
                          dropout=0.5)
        
        self.fc = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.gru(x, h0)
        out = out[:, -1, :]

        out = self.fc(out)
        out = self.sigmoid(out)

        return out

# GRU 1Layer Bidirectional Model

In [36]:
class model_BiGRU(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(model_BiGRU, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.gru = nn.GRU(input_size,
                          hidden_size,
                          num_layers,
                          batch_first=True,
                          bidirectional=True,
                          dropout=0.3)

        self.fc = nn.Linear(hidden_size*2, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.gru(x, h0)
        out = out[:, -1, :]

        out = self.fc(out)

        return out

# Conv1D + GRU Model

In [40]:
class CNN_GRU(nn.Module):
    def __init__(self, input_size, num_channels, gru_hidden_size, gru_num_layers, output_size):
        super(CNN_GRU, self).__init__()

        # Conv1D Layers
        self.conv1 = nn.Conv1d(in_channels=input_size,
                               out_channels=num_channels,
                               kernel_size=3, 
                               padding=1)
        self.conv2 = nn.Conv1d(in_channels=num_channels,
                               out_channels=num_channels*2,
                               kernel_size=3,
                               padding=1)
        self.pool = nn.MaxPool1d(2)

        # GRU Layers
        self.gru = nn.GRU(input_size=num_channels*2,
                        hidden_size=gru_hidden_size,
                          num_layers=gru_num_layers,
                          batch_first=True,
                          dropout=0.3)

        # Fully Connected Layer
        self.fc = nn.Linear(gru_hidden_size, output_size)

    def forward(self, x):
        # Conv1D Layers
        x = x.transpose(1, 2)
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)

        # Convert result of Conv1D fit to inputs of GRU
        x = x.transpose(1, 2)

        # GRU Layer
        out, _ = self.gru(x)

        # Use only the output of the last time step
        out = out[:, -1, :]

        # Fully Connected Layer
        out = self.fc(out)

        return out

# Transformer Model

In [None]:
class Transformer(nn.Module):
    def __init__(self, input_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward):
        super(Transformer, self).__init__()

        self.transformer = nn.Transformer(d_model=d_model,
                                          nhead=nhead,
                                          num_encoder_layers=num_encoder_layers,
                                          num_decoder_layers=num_decoder_layers,
                                          dim_feedforward=dim_feedforward,
                                          dropout=0.1)
        self.fc_in = nn.Linear(input_dim, d_model)
        self.fc_out = nn.Linear(d_model, 1)  # Target: Scalar

    def forward(self, src):
        src = self.fc_in(src)  # 입력 차원을 d_model로 변환
        src = src.permute(1, 0, 2)  # Transformer 기대 입력 형태로 차원 순서 변경: (S, N, E)
        output = self.transformer.encoder(src)  # 인코더 한 번만 호출
        output = output.permute(1, 0, 2)  # 원래 배치 차원 순서로 되돌림: (N, S, E)
        output = self.fc_out(output[:, -1, :])  # 마지막 시퀀스 요소의 출력만 사용
        return output

# XGboost

In [None]:
import xgboost as xgb

: 

In [None]:
def train_XGB(max_depth, eta, seed, rounds, feature, batch=1):
    train_loader, test_loader = load_data(feature, batch)

    # Load Training data
    for inputs, targets in train_loader:
        X_train = inputs.numpy()
        Y_train = targets.numpy()
        break

    # Load Test data
    for inputs, targets in test_loader:
        X_test = inputs.numpy()
        Y_test = targets.numpy()
        break

    # Generate DMatrix
    dtrain = xgb.DMatrix(X_train, label=Y_train)
    dtest = xgb.DMatrix(X_test, label=Y_test)

    params = {
        'objective': 'reg:squarederror',
        'max_depth': max_depth,
        'eta': eta,
        'eval_metric': 'rmse',
        'seed': seed
    }

    evals = [(dtrain, 'train'), (dtest, 'test')]
    bst = xgb.train(params, dtrain, rounds, evals=evals, early_stopping_rounds=10)

    y_pred_train = bst.predict(dtrain)
    y_exact_train = dtrain.get_label()

    y_pred_test = bst.predict(dtest)
    y_exact_test = dtest.ger_label()

    plot_performance('Train', y_exact_train, y_pred_train)
    plot_performance('Test', y_exact_test, y_pred_test)