In [17]:
import torch
torch.cuda.is_available()

True

In [18]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
import sys
import random

import matplotlib.pyplot as plt
import IPython.display as ipd

from tqdm import tqdm

In [19]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [20]:
# dataset 나누기

import os
import random
random.seed(10)
import torchaudio
import torch
from torch.utils.data import Dataset

class SpeechCommandsDataset(Dataset):
    def __init__(self, dataset_path, keywords, subset, unknown_label, sample_ratio=0.2, noise_label='_background_noise_', num_noise_samples=100):
        self.dataset_path = dataset_path
        self.keywords = keywords
        self.unknown_label = unknown_label
        self.noise_label = noise_label
        self.num_noise_samples = num_noise_samples
        self.all_classes =self.unknown_label+ self.keywords +  [self.noise_label]
        self.subset = subset
        self.sample_ratio = sample_ratio

        self.keywords_to_index=['unknown']+self.keywords+[self.noise_label]
        
        # Load lists for validation and test datasets
        self.validation_list = self._load_file_list(os.path.join(dataset_path, 'validation_list.txt'))
        self.testing_list = self._load_file_list(os.path.join(dataset_path, 'testing_list.txt'))

        self.audio_files = []
        self.labels = []
        self.background_noises = []
        unknown_files = []

        # Load all audio files and corresponding labels
        for root, dirs, files in os.walk(dataset_path):
            label = os.path.basename(root)
            if label in self.all_classes:
                for file in files:
                    if file.endswith('.wav'):
                        file_path = os.path.join(root, file)
                        file_path = r"{}".format(file_path)
                        if label == noise_label: #noise append -> (txt에 있으면) unknown에 append + (txt에 있으면) audio_file에 append
                            self.background_noises.append(file_path)
                        else:
                            if label not in self.keywords: #지정 keyword (10개) 중 없는 경우 -> unknown 할당
                                label = 'unknown'
                                if self._is_in_subset(file_path):
                                    unknown_files.append((file_path, label))
                            else: 
                                if self._is_in_subset(file_path):
                                    self.audio_files.append(file_path)
                                    self.labels.append(label)

        # unknown class data를 sample_ratio(default=0.2) 비율로 sampling
        if unknown_files:
            total_desired_unknowns = int((len(self.audio_files) / (1 - self.sample_ratio)) * self.sample_ratio)
            if total_desired_unknowns < len(unknown_files):
                unknown_files = random.sample(unknown_files, total_desired_unknowns)

        # Add the sampled unknown files to the dataset
        for file_path, label in unknown_files:
            self.audio_files.append(file_path)
            self.labels.append(label)

        # Generate random slices of background noise
        self.noise_samples = []
        for _ in range(num_noise_samples):
            noise_path = random.choice(self.background_noises)
            waveform, sample_rate = torchaudio.load(noise_path)
            max_offset = waveform.size(1) - sample_rate
            offset = random.randint(0, max_offset)
            noise_slice = waveform[:, offset:offset + sample_rate]
            self.noise_samples.append(noise_slice)

    def _load_file_list(self, file_path):
        with open(file_path, 'r') as f:
            file_list = f.read().splitlines()
        return set(file_list)

    def _is_in_subset(self, file_path):
        relative_path = os.path.relpath(file_path, self.dataset_path)
        relative_path = relative_path.replace('\\', '/')
        if self.subset == 'train':
            return relative_path not in self.validation_list and relative_path not in self.testing_list
        elif self.subset == 'validation':
            return relative_path in self.validation_list
        elif self.subset == 'test':
            return relative_path in self.testing_list
        else:
            raise ValueError("Subset must be one of ['train', 'validation', 'test']")

    def _pad_waveform(self, waveform, target_length=16000):
      current_length = waveform.shape[1]
      if current_length < target_length:
        pad_amount = target_length - current_length
        padding = torch.zeros((waveform.shape[0], pad_amount))  # (channels, pad_amount)
        waveform = torch.cat((waveform, padding), dim=1)
      return waveform


    def __len__(self):
        return len(self.audio_files) + self.num_noise_samples

    def __getitem__(self, idx):

        #classes=self.all_classes ## to be fixed
        if idx < len(self.audio_files):
            file_path = self.audio_files[idx]
            label = self.labels[idx]
            label_index=self.keywords_to_index.index(label) # label to index
            waveform, sample_rate = torchaudio.load(file_path)
            waveform=self._pad_waveform(waveform) #padding
            filename = os.path.basename(file_path)
            speaker_id, utterance_number=tuple(filename.split('_nohash_'))
            utterance_number=utterance_number.split('.')[0]
        else:
            noise_idx = idx - len(self.audio_files)
            waveform = self.noise_samples[noise_idx-1]
            sample_rate = waveform.shape[1]
            label = self.noise_label
            label_index=self.keywords_to_index.index(label) # label to index
            speaker_id= None
            utterance_number=None

        return waveform, sample_rate, label_index, speaker_id, utterance_number




In [21]:
from torchaudio.datasets import SPEECHCOMMANDS

dataset = SPEECHCOMMANDS(root="./", download=True)

In [6]:
# Usage
dataset_path = r'SpeechCommands\speech_commands_v0.02'
unknown_label='backward, bed, bird, cat, dog, eight, five, follow, forward, four, happy, house, learn, marvin, nine, one, seven, sheila, six, three, tree, two, visual, wow, zero'.split(', ')
keywords = 'yes, no, up, down, left, right, on, off, stop, go'.split(', ')
num_noise_samples = 2000

if (len(unknown_label)+len(keywords)==35): #unknown과 keyword가 합쳐서 35개 되는지 확인
  # Train dataset
  train_set = SpeechCommandsDataset(dataset_path, keywords, unknown_label=unknown_label, subset='train', num_noise_samples=num_noise_samples)

  # Validation dataset
  #val_set = SpeechCommandsDataset(dataset_path, keywords, unknown_label=unknown_label, subset='validation', num_noise_samples=num_noise_samples)

  # Test dataset
  test_set = SpeechCommandsDataset(dataset_path, keywords, unknown_label=unknown_label, subset='test', num_noise_samples=num_noise_samples)
else: print("Error")

waveform, sample_rate, label, speaker_id, utterance_number = train_set[0]

In [None]:
print("train_set 개수 : {:}, test_set 개수 : {:}".format(len(train_set), len(test_set)))

In [None]:
from collections import Counter

label_list = []
for _, _, label, _, _ in test_set:
    label_list.append(label)

label_counts = Counter(label_list)

# 라벨과 개수 오름차순으로 출력
for label in sorted(label_counts.keys()):
    print(f"Label: {label}, Count: {label_counts[label]}")

In [None]:
print("Shape of waveform: {}".format(waveform.size()))
print("Sample rate of waveform: {}".format(sample_rate))

plt.plot(waveform.t().numpy());

In [None]:
# spectrogram 변환
def sp_transform(waveform):

  waveform = waveform.cpu()

  spectrogram_transform = torchaudio.transforms.MelSpectrogram(sample_rate = 16000, win_length = 480, n_fft = 480, hop_length = 160, n_mels = 40)
  spectrogram = spectrogram_transform(waveform)
  epsilon = 1e-6
  spectrogram = spectrogram + epsilon
  spectrogram = spectrogram.log2()

  spectrogram = spectrogram.to(device)
  return spectrogram

sp = sp_transform(waveform)[0]
plt.imshow(sp.cpu().numpy(), aspect='auto')
print("size of spectrogram: {:}".format(sp.shape))

In [11]:
#데이터 길이 맞춰주기 위해 padding
def pad_sequence(batch):
    # Make all tensor in a batch the same length by padding with zeros
    batch = [item.t() for item in batch]
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
    return batch.permute(0, 2, 1)


# data중 waveform, label(index로 된)를 각각 tensor, target에 추가
def collate_fn(batch):

    # A data tuple has the form:
    # waveform, sample_rate, label, speaker_id, utterance_number

    tensors, targets = [], []

    # Gather in lists, and encode labels as indices
    for waveform, _, label, *_ in batch:
        tensors += [waveform]
        targets += [label]

    # Group the list of tensors into a batched tensor
    tensors = pad_sequence(tensors)
    targets = torch.tensor(targets)
    return tensors, targets


# train, test 위한 data batch_size로 나눔
batch_size = 128

if device == "cuda":
    num_workers = 1
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False

train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=num_workers,
    pin_memory=pin_memory,
)
test_loader = torch.utils.data.DataLoader(
    test_set,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False,
    collate_fn=collate_fn,
    num_workers=num_workers,
    pin_memory=pin_memory,
)

In [12]:
# Copyright (c) 2023 Qualcomm Technologies, Inc.
# All Rights Reserved.

import torch.nn.functional as F
from torch import nn



class ConvBNReLU(nn.Module):
    def __init__(
        self,
        in_plane,
        out_plane,
        idx,
        kernel_size=3,
        stride=1,
        groups=1,
        use_dilation=False,
        activation=True,
        swish=False,
        BN=True,
        ssn=False,
    ):
        super().__init__()

        def get_padding(kernel_size, use_dilation):
            rate = 1  # dilation rate
            padding_len = (kernel_size - 1) // 2
            if use_dilation and kernel_size > 1:
                rate = int(2**self.idx)
                padding_len = rate * padding_len
            return padding_len, rate

        self.idx = idx

        # padding and dilation rate
        if isinstance(kernel_size, (list, tuple)):
            padding = []
            rate = []
            for k_size in kernel_size:
                temp_padding, temp_rate = get_padding(k_size, use_dilation)
                rate.append(temp_rate)
                padding.append(temp_padding)
        else:
            padding, rate = get_padding(kernel_size, use_dilation)

        # convbnrelu block
        layers = []
        layers.append(
            nn.Conv2d(in_plane, out_plane, kernel_size, stride, padding, rate, groups, bias=False)
        )
        if ssn:
            layers.append(SubSpectralNorm(out_plane, 5))
        elif BN:
            layers.append(nn.BatchNorm2d(out_plane))
        if swish:
            layers.append(nn.SiLU(True))
        elif activation:
            layers.append(nn.ReLU6(True))
        self.block = nn.Sequential(*layers)

    def forward(self, x):
        return self.block(x)


class BCResBlock(nn.Module):
    def __init__(self, in_plane, out_plane, idx, stride):
        super().__init__()
        self.transition_block = in_plane != out_plane
        kernel_size = (3, 3)

        # 2D part (f2)
        layers = []
        if self.transition_block:
            layers.append(ConvBNReLU(in_plane, out_plane, idx, 1, 1))
            in_plane = out_plane
        layers.append(
            ConvBNReLU(
                in_plane,
                out_plane,
                idx,
                (kernel_size[0], 1),
                (stride[0], 1),
                groups=in_plane,
                ssn=False, # modified
                activation=False,
            )
        )
        self.f2 = nn.Sequential(*layers)
        self.avg_gpool = nn.AdaptiveAvgPool2d((1, None))

        # 1D part (f1)
        self.f1 = nn.Sequential(
            ConvBNReLU(
                out_plane,
                out_plane,
                idx,
                (1, kernel_size[1]),
                (1, stride[1]),
                groups=out_plane,
                swish=False,
                use_dilation=True,
            ),
            nn.Conv2d(out_plane, out_plane, 1, bias=False),
            nn.Dropout2d(0.1),
        )

    def forward(self, x):
        # 2D part
        shortcut = x
        x = self.f2(x)
        aux_2d_res = x
        x = self.avg_gpool(x)

        # 1D part
        x = self.f1(x)
        x = x + aux_2d_res
        if not self.transition_block:
            x = x + shortcut
        x = F.relu6(x, True)
        return x


def BCBlockStage(num_layers, last_channel, cur_channel, idx, use_stride):
    stage = nn.ModuleList()
    channels = [last_channel] + [cur_channel] * num_layers
    for i in range(num_layers):
        stride = (2, 1) if use_stride and i == 0 else (1, 1)
        stage.append(BCResBlock(channels[i], channels[i + 1], idx, stride))
    return stage


class BCResNets(nn.Module):
    def __init__(self, base_c, num_classes=12):
        super().__init__()
        self.num_classes = num_classes
        self.n = [1, 1, 1]  # identical modules repeated n times
        self.c = [
            int(base_c * 1.5),
            base_c,
            int(base_c * 1.5),
            base_c * 2,
            #int(base_c * 2.5),
            base_c * 4,
        ]  # num channels
        self.s = [1, 2]  # stage using stride
        self._build_network()

    def _build_network(self):
        # Head: (Conv-BN-ReLU)
        self.cnn_head = nn.Sequential(
            nn.Conv2d(1, self.c[0], 5, (2, 1), 2, bias=False),
            nn.BatchNorm2d(self.c[0]),
            nn.ReLU6(True),
        )
        # Body: BC-ResBlocks
        self.BCBlocks = nn.ModuleList([])
        for idx, n in enumerate(self.n):
            use_stride = idx in self.s
            self.BCBlocks.append(BCBlockStage(n, self.c[idx], self.c[idx + 1], idx, use_stride))

        # Classifier
        self.classifier = nn.Sequential(
            #nn.Conv2d(
            #    self.c[-2], self.c[-2], (5, 5), bias=False, groups=self.c[-2], padding=(0, 2)
            #),
            #nn.Conv2d(self.c[-2], self.c[-1], 1, bias=False),
            #nn.BatchNorm2d(self.c[-1]),
            #nn.ReLU6(True),
            #nn.AdaptiveAvgPool2d((1, 1)),
            #nn.Conv2d(self.c[-1], self.num_classes, 1),

            #nn.Conv2d(self.c[-2], self.c[-1], kernel_size=1),
            nn.BatchNorm2d(self.c[-2]),
            nn.Conv2d(self.c[-2], self.num_classes, kernel_size=1),
            nn.AdaptiveAvgPool2d(1),
        )

    def forward(self, x):
        x = self.cnn_head(x)
        for i, num_modules in enumerate(self.n):
            for j in range(num_modules):
                x = self.BCBlocks[i][j](x)
        x = self.classifier(x)
        x = x.view(-1, x.shape[1])
        return x

class SubSpectralNorm(nn.Module):
    def __init__(self, num_features, spec_groups=16, affine="Sub", batch=True, dim=2):
        super().__init__()
        self.spec_groups = spec_groups
        self.affine_all = False
        affine_norm = False
        if (
            affine == "Sub"
        ):  # affine transform for each sub group. use affine of torch implementation
            affine_norm = True
        elif affine == "All":
            self.affine_all = True
            self.weight = nn.Parameter(torch.ones((1, num_features, 1, 1)))
            self.bias = nn.Parameter(torch.zeros((1, num_features, 1, 1)))
        if batch:
            self.ssnorm = nn.BatchNorm2d(num_features * spec_groups, affine=affine_norm)
        else:
            self.ssnorm = nn.InstanceNorm2d(num_features * spec_groups, affine=affine_norm)
        self.sub_dim = dim

    def forward(self, x):  # when dim h is frequency dimension
        if self.sub_dim in (3, -1):
            x = x.transpose(2, 3)
            x = x.contiguous()
        b, c, h, w = x.size()
        assert h % self.spec_groups == 0
        x = x.view(b, c * self.spec_groups, h // self.spec_groups, w)
        x = self.ssnorm(x)
        x = x.view(b, c, h, w)
        if self.affine_all:
            x = x * self.weight + self.bias
        if self.sub_dim in (3, -1):
            x = x.transpose(2, 3)
            x = x.contiguous()
        return x


In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

model=BCResNets(base_c=8)
model.to("cuda")

n = count_parameters(model)
print(f"Number of parameters: {n}")

In [None]:
from thop import profile
input = torch.randn(1, 1, 40, 101).to(device)
macs, params = profile(model, inputs=(input, ))
print(macs, params)

In [None]:
print(model)

In [None]:
import numpy as np
total_epoch = 100
warmup_epoch = 5
init_lr = 1e-1
lr_lower_limit = 0

# optimizer
optimizer = torch.optim.SGD(model.parameters(), lr=0, weight_decay=1e-3, momentum=0.9)
n_step_warmup = len(train_loader) * warmup_epoch
total_iter = len(train_loader) * total_epoch
iterations = 0

# train
for epoch in range(total_epoch):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for sample in tqdm(train_loader, desc="epoch %d, iters" % (epoch + 1)):
        # lr cos schedule
        iterations += 1
        if iterations < n_step_warmup:
            lr = init_lr * iterations / n_step_warmup
        else:
            lr = lr_lower_limit + 0.5 * (init_lr - lr_lower_limit) * (
                1
                + np.cos(
                    np.pi * (iterations - n_step_warmup) / (total_iter - n_step_warmup)
                )
            )
        for param_group in optimizer.param_groups:
            param_group["lr"] = lr

        inputs, labels = sample
        inputs = inputs.to(device)
        labels = labels.to(device)
        inputs = sp_transform(inputs)
        outputs = model(inputs)
        loss = F.cross_entropy(outputs, labels)

        # Backward pass
        loss.backward()
        optimizer.step()
        model.zero_grad()

        # Track loss
        running_loss += loss.item()

        # Calculate accuracy
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    # Calculate and print epoch loss and accuracy
    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = 100. * correct / total
    print(f"Epoch {epoch+1}/{total_epoch} - Loss: {epoch_loss:.4f} - Accuracy: {epoch_accuracy:.2f}%")



In [None]:
def Test(dataset, loader, augment):
        """
        Tests the model on a given dataset.

        Parameters:
            dataset (Dataset): The dataset to test the model on.
            loader (DataLoader): The data loader to use for batching the data.
            augment (bool): Flag indicating whether to use data augmentation during testing.

        Returns:
            float: The accuracy of the model on the given dataset.
        """
        true_count = 0.0
        num_testdata = float(len(dataset))
        for inputs, labels in loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            inputs = sp_transform(inputs)
            outputs = model(inputs)
            prediction = torch.argmax(outputs, dim=-1)
            true_count += torch.sum(prediction == labels).detach().cpu().numpy()
            print('answer: {:} predicted: {:}'.format(labels, prediction))
        acc = true_count / num_testdata * 100.0  # percentage
        return acc


test_acc = Test(test_set, test_loader, augment=False)  # official testset
print("test acc: %.3f" % (test_acc))
print("End.")

In [None]:
def Test(dataset, loader, augment):
        """
        Tests the model on a given dataset.

        Parameters:
            dataset (Dataset): The dataset to test the model on.
            loader (DataLoader): The data loader to use for batching the data.
            augment (bool): Flag indicating whether to use data augmentation during testing.

        Returns:
            float: The accuracy of the model on the given dataset.
        """
        true_count = 0.0
        num_testdata = float(len(dataset))
        for inputs, labels in loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            inputs = sp_transform(inputs)
            outputs = model(inputs)
            prediction = torch.argmax(outputs, dim=-1)
            true_count += torch.sum(prediction == labels).detach().cpu().numpy()
        acc = true_count / num_testdata * 100.0  # percentage
        return acc
  

test_acc = Test(dataset=test_set, loader=test_loader, augment=False)  # official testset
print("test acc: %.3f" % (test_acc))
print("End.")

In [None]:
plt.plot(losses);
plt.title("training loss");

In [None]:
def predict(tensor):
    # Use the model to predict the label of the waveform
    tensor = tensor.to(device)
    tensor = sp_transform(tensor)

    # 입력 텐서가 2D 또는 3D일 때 올바른 차원으로 확장
    if tensor.dim() == 2:  # 예: (height, width)인 경우
        tensor = tensor.unsqueeze(0).unsqueeze(0)  # (1, 1, height, width)로 확장
    elif tensor.dim() == 3:  # 예: (channels, height, width)인 경우
        tensor = tensor.unsqueeze(0)  # (1, channels, height, width)로 확장

    tensor = model(tensor)
    tensor = get_likely_index(tensor)
    tensor = tensor.squeeze()
    return tensor

# 나머지 코드는 동일하게 유지
waveform, sample_rate, utterance, *_ = train_set[100]
ipd.Audio(waveform.numpy(), rate=sample_rate)

print(f"Expected: {utterance}. Predicted: {predict(waveform)}.")

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 실제 라벨과 예측된 라벨을 저장할 리스트 초기화
true_labels = []
predicted_labels = []

# 테스트 셋을 순회하며 예측 수행
for i, (waveform, sample_rate, utterance, *_) in enumerate(test_set):
    output = predict(waveform)
    true_labels.append(utterance)
    predicted_labels.append(output)

# 라벨 리스트 정의 (unknown 라벨도 포함)
labels_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

# 혼동 행렬 계산
cm = confusion_matrix(true_labels, predicted_labels, labels=labels_list)

# 각 라벨에 대한 정확도 계산
label_accuracies = {}
for i, label in enumerate(labels_list):
    true_positive = cm[i, i]
    total_samples = cm[i, :].sum()
    accuracy = true_positive / total_samples if total_samples > 0 else 0
    label_accuracies[label] = accuracy

# 혼동 행렬 시각화
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels_list, yticklabels=labels_list)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# # 혼동 행렬을 텍스트 파일로 저장
# output_matrix_path = '/content/confusion_matrix.txt'
# np.savetxt(output_matrix_path, cm, fmt='%d', delimiter=',', header=','.join(labels_list), comments='')
# print(f"Confusion matrix saved to {output_matrix_path}")

# 각 라벨에 대한 정확도 출력
for label, accuracy in label_accuracies.items():
    print(f"Accuracy for label '{label}': {accuracy:.2%}")
