# Inference
 Model

In [1]:
import os
import numpy as np
import pandas as pd
import librosa
from tqdm.auto import tqdm
from preprocessing_sig2feat import *
from ysp_func import *

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split

from einops import rearrange
from torchvision.models import resnet18
from torchvision import transforms

import lightning as L
from lightning.pytorch.callbacks import EarlyStopping

import warnings
warnings.filterwarnings(action='ignore') 

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


### Dataset

In [3]:
class CustomDataset(Dataset):
    def __init__(self, root_path, dir_name):
        super(CustomDataset, self).__init__()

        # fs = 16000
        self.fframe = 2**10      # 1024
        self.delay = int(self.fframe/4)    # 256

        self.root_path = root_path
        self.dir_name = dir_name
        self.df_datainfo = pd.read_csv(f'{self.root_path}/{self.dir_name}.csv')        
        
    def __getitem__(self, index):
        filename = self.df_datainfo.loc[index]['path']
        file_path = f'{self.root_path}/{filename}'        
                
        sig, fs = librosa.load(file_path, sr=None, mono=False)
        sig = sig / np.max(np.abs(sig))
        
        feat1 = feature_spectrogram_tensor(sig, fs, self.fframe, device)
        self.x1_data = standardization_tensor(feat1)

        feat2 = feature_sthd_tensor(sig, fs, self.fframe, self.delay, device)
        self.x2_data = standardization_tensor(feat2)
        
        label = self.df_datainfo.loc[index][['car_left', 'car_right', 'cv_left', 'cv_right']]
        y = torch.FloatTensor(label)
        self.y_data = y

        return self.x1_data, self.x2_data, self.y_data

    def __len__(self):
        return len(self.df_datainfo)
    
    def get_filename(self, index):        
        return self.df_datainfo.loc[index]['path']

### Model

In [4]:
batch_size = 16
num_workers = 0
loss_func = nn.MSELoss()

''' dual_ResNet_GRU Model class (Lightning) '''
class dual_ResNet_GRU(L.LightningModule):
    def __init__(self, hidden_dimension, output_dimension, dropout):
        super().__init__()
        self.validation_step_outputs = []
        self.test_step_outputs = []                

        resnet_temp = resnet18()        # pretrained=True
        resnet_temp.conv1 = nn.Conv2d(4, 64, kernel_size=7, stride=2, padding=3, bias=False) # in : 4ch, out : 64ch
        resnet_temp.avgpool = nn.AdaptiveAvgPool2d((59, 1))
        resnet_temp = nn.Sequential(*list(resnet_temp.children())[:-1])
        self.resnet_spectrogram = resnet_temp

        resnet_temp = resnet18()        # pretrained=True
        resnet_temp.conv1 = nn.Conv2d(6, 64, kernel_size=7, stride=2, padding=3, bias=False) # in : 6ch, out : 64ch
        resnet_temp.avgpool = nn.AdaptiveAvgPool2d((59, 1))
        resnet_temp = nn.Sequential(*list(resnet_temp.children())[:-1])
        self.resnet_sthd = resnet_temp
        
        input_dimension = 1024
        self.gru_layer = nn.GRU(input_dimension, 
                           hidden_dimension, 
                           num_layers=2, 
                           bidirectional=False,     # bidirectional=True, # Not Bi-GRU
                           batch_first=True,
                           dropout=dropout)
        
        self.fc_layer = nn.Linear(hidden_dimension, output_dimension)
 
    def forward(self, x1, x2):
        '''input: (batch_size, feat_bins, time_steps, channels)'''        
        x1 = rearrange(x1, "batch feat time ch -> batch ch time feat")
        x1 = self.resnet_spectrogram(x1)        
        x2 = rearrange(x2, "batch feat time ch -> batch ch time feat")
        x2 = self.resnet_sthd(x2)

        x = torch.cat((x1, x2), dim=1)
        x = rearrange(x, "batch ch time feat -> batch time (ch feat)")

        x, hidden = self.gru_layer(x)        
        
        x = self.fc_layer(hidden[-1])  # hidden state # x[:, -1, :] # Take the output of the last time step
        
        return x

    def training_step(self, batch, batch_num):
        train_x1, train_x2, train_y = batch
        y_pred = self(train_x1, train_x2)        
        training_loss = loss_func(y_pred, train_y)
        
        self.log('train_loss', training_loss, on_epoch=True, prog_bar=True)
        return training_loss

    def validation_step(self, batch, batch_num):
        val_x1, val_x2, val_y = batch
        y_pred = self(val_x1, val_x2)        
        val_loss = loss_func(y_pred, val_y)
        self.validation_step_outputs.append(val_loss)
        
        self.log('val_loss', val_loss, on_step=True, on_epoch=True, prog_bar=True)
        return val_loss

    def on_validation_epoch_end(self):
        avg_loss = torch.stack(self.validation_step_outputs).mean()
        self.validation_step_outputs.clear()        
        return avg_loss

    def test_step(self, batch, batch_num):        
        test_x1, test_x2, test_y = batch
        y_pred = self(test_x1, test_x2)        
        test_loss = loss_func(y_pred, test_y)
        self.test_step_outputs.append(test_loss)
        
        self.log('test_loss', test_loss, on_step=True, on_epoch=True, prog_bar=True)
        return test_loss

    def on_test_epoch_end(self):
        avg_loss = torch.stack(self.test_step_outputs).mean()
        self.test_step_outputs.clear()
        return avg_loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-4)     # 1e-3

    # def train_dataloader(self):
    #     return DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)

    # def val_dataloader(self):
    #     return DataLoader(dataset=valid_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    # def test_dataloader(self):
    #     return DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

### 모델 평가
다른 데이터

In [None]:
# 평가할 데이터
loc_dict = {1: 'loc1', 2: 'loc2', 3: 'loc3', 4: 'loc4', 5: 'loc5', 6: 'loc6'}
loc_nn = 1
# path0 = "C:/Users/" + os.getenv('USERNAME') + f"/Desktop/DCASE2024-Task10-Dataset/{loc_dict[loc_nn]}"
# test_name = 'val'
path0 = "C:/Users/" + os.getenv('USERNAME') + f"/Desktop/DCASE2024-Task10-Evaluation_Dataset/{loc_dict[loc_nn]}"
test_name = 'test'      
test_dataset = CustomDataset(path0, test_name)

In [6]:
# 모델 불러오기
logs_name = f'logs_gen3_{loc_dict[loc_nn]}'
ver_num = 3
ckpt_path = f"{logs_name}/lightning_logs/version_{ver_num}/checkpoints"
ckpt_name = [file for file in os.listdir(ckpt_path) if file.endswith('.ckpt')][0]

model = dual_ResNet_GRU.load_from_checkpoint(f"{ckpt_path}/{ckpt_name}", hidden_dimension=256, output_dimension=4, dropout=0.05)
model = model.to(device)
model = model.eval()

In [None]:
# 테스트 데이터 확인
test_csv_filename = f'{path0}/{test_name}_{logs_name}_{ver_num}.csv'

columns = ['path', 'car_left', 'car_right', 'cv_left', 'cv_right']
df = pd.DataFrame(columns=columns)

for idx in tqdm(range(test_dataset.__len__())):
    test_data = test_dataset.__getitem__(idx)
    test_x1, test_x2, test_y = test_data
    
    y_pred = model(test_x1.unsqueeze(0).to(device), test_x2.unsqueeze(0).to(device))
    y_pred = y_pred.squeeze().to('cpu')         # y_pred = y_pred.squeeze().round().to('cpu')
    
    data_filename = test_dataset.get_filename(idx)
    
    y_pred[y_pred < 0] = 0
    df.loc[idx] = [data_filename] + y_pred.tolist()

df[['car_left', 'car_right', 'cv_left', 'cv_right']] = df[['car_left', 'car_right', 'cv_left', 'cv_right']]     #.astype(int)
df.to_csv(test_csv_filename, index=False)

# Evaluation
- ktau_corr
- rmse

In [98]:
import os
import numpy as np
import pandas as pd
from scipy.stats import kendalltau

In [99]:
gt_csv_name = f'{path0}/{test_name}.csv'
pred_csv_name = f'{path0}/{test_name}_{logs_name}_{ver_num}.csv'

TARGET_CLASSES = ["car_left", "car_right", "cv_left", "cv_right"]
METRICS = ["Kendall's Tau Corr", "RMSE"]

output_path = f'{pred_csv_name}_metrics.csv'

In [100]:
# Load predictions and ground truth
df_gt = pd.read_csv(gt_csv_name)
df_pred = pd.read_csv(pred_csv_name)

if len(df_pred) != len(df_gt):
    raise ValueError("Predictions and ground truth must contain the same number of samples.")

len_df = len(df_pred)
df_gt.sort_values("path", inplace=True)
df_pred.sort_values("path", inplace=True)

ktau_corr_dict, rmse_dict = {}, {}
for label in TARGET_CLASSES:
    gt_scores = df_gt[label].values
    pred_scores = df_pred[label].values

    # RMSE score
    rmse = np.sqrt(1.0 / len(gt_scores) * np.sum((gt_scores - pred_scores) ** 2.0))

    # correlation-based metrics
    ktau_corr = kendalltau(gt_scores, pred_scores).correlation

    # output results
    ktau_corr_dict[label] = round(ktau_corr, 3)
    rmse_dict[label] = round(rmse, 3)

results = pd.DataFrame([ktau_corr_dict, rmse_dict], index=METRICS)
results.fillna(0, inplace=True)
results.to_csv(output_path, index=True, index_label="Metric")
results

Unnamed: 0,car_left,car_right,cv_left,cv_right
Kendall's Tau Corr,0.773,0.664,0.716,0.645
RMSE,1.875,2.013,0.58,0.528


### make test.csv

In [14]:
import csv

loc_dict = {1: 'loc1', 2: 'loc2', 3: 'loc3', 4: 'loc4', 5: 'loc5', 6: 'loc6'}
loc_nn = 6
path0 = "C:/Users/" + os.getenv('USERNAME') + f"/Desktop/DCASE2024-Task10-Evaluation_Dataset/{loc_dict[loc_nn]}"
test_name = 'test'

csv_name = f'{path0}/{test_name}.csv'
f = open(csv_name, "w", newline='')
writer = csv.writer(f)
writer.writerow(['path', 'car_left', 'car_right', 'cv_left', 'cv_right']) ## 여기 주목!

list_filepath = list_all_file_path(f'{path0}/{test_name}')
for filepath in list_filepath:
    filepath = filepath.replace('\\','/')
    filename = filepath.split(f'{loc_dict[loc_nn]}/')[-1]
    
    if '.flac' in filename:
        # print(filename)
        writer.writerow([filename])

f.close()
