In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.transforms import Compose, ToTensor
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
import numpy as np
from scipy.io import wavfile
import matplotlib.pyplot as plt
import librosa
import os
import sys

from shutil import copyfile
from typing import Union, Optional
from random import sample, seed, choices, shuffle
from math import isclose


In [9]:
main_path = '/Users/jaewone/developer/tensorflow/baby-cry-classification'
data_path = os.path.join(main_path, 'data')

work_path = os.path.join(main_path, 'model', 'coAtNet')
AUDIO_DIR = os.path.join(work_path, 'audios2')
MODEL_PATH = os.path.join(work_path, 'model.pt')

sys.path.append(main_path)

In [2]:
# main_path = '/content/drive/MyDrive/baby_cry'
# data_path = os.path.join(main_path, 'data2')

# AUDIO_DIR = data_path
# MODEL_PATH = os.path.join(main_path, 'model.pt')

In [None]:
def __format_extension(extension: Optional[Union[str, list[str]]]) -> Optional[list[str]]:
    if extension:
        if type(extension) == str:
            extension = [extension]
        extension = [ex.replace('.', '') for ex in extension]
    return extension


def __file_itorator(path: str,
                    include: Optional[list[str]] = None,
                    exclude: Optional[list[str]] = None):
    for (parent_path, dirs, files) in os.walk(path):
        for dir in dirs:
            __file_itorator(os.path.join(path, dir), include)

        for file in files:
            s = file.rsplit('.', 1)
            if len(s) == 1:
                continue
            if include and s[1] not in include:
                continue
            if exclude and s[1] in exclude:
                continue

            yield [parent_path, file]


# Itorate files with including and excluding files from path
def file_itorator(path: str,
                  include: Optional[Union[str, list[str]]] = None,
                  exclude: Optional[Union[str, list[str]]] = None):

    include = __format_extension(include)
    exclude = __format_extension(exclude)

    return __file_itorator(path, include, exclude)


def get_state_file_list(data_path: str,
                        state_list: Optional[list[str]] = None,
                        include: Optional[Union[str, list[str]]] = None,
                        exclude: Optional[Union[str, list[str]]] = None):
    """
    각각의 state에 존재하는 모든 파일들의 경로를 반환한다.

    Parameters:

        * data_path : 파일의 경로

        * state_list=None : state 리스트를 받을 경우 state_list가 포함하는 state 폴더의 파일들만 이름을 변경한다.

    Returns: 파일의 경로 리스트
    """
    if not os.path.exists(data_path):
        raise OSError(f'path {data_path} not exist.')

    state_list = ['sleepy', 'uncomfortable', 'diaper', 'awake', 'sad', 'hug', 'hungry']

    file_list = []
    for state in state_list:
        file_list.extend([os.path.join(path, file) for path, file in file_itorator(
            os.path.join(data_path, state),
            include, exclude
        )])

    return file_list

In [None]:
# from trans_data import extract_state_sample, get_state_file_list

# if not os.path.exists(AUDIO_DIR):
#     extract_state_sample(data_path, AUDIO_DIR, 10, with_dir=True)

In [4]:
torch.cuda.is_available()

False

In [18]:
from trans_data import get_state_file_list

file_list = get_state_file_list(data_path)

train_file_list, val_file_list = train_test_split(file_list, test_size=0.3, stratify=[file.rsplit('/', 2)[1] for file in file_list])
val_file_list, test_file_list = train_test_split(val_file_list, test_size=0.33, stratify=[file.rsplit('/', 2)[1] for file in val_file_list])

print(len(train_file_list))
print(len(val_file_list))
print(len(test_file_list))


with open(os.path.join(work_path, "test_file_list.txt"), "w") as txt_file:
    # txt_file.write(str([file.rsplit('/', 1)[1] for file in test_file_list]))
    txt_file.write(str(test_file_list))

7658
2199
1084


In [11]:
class CoAtNet(nn.Module):
    def __init__(self, num_classes=7):
        super(CoAtNet, self).__init__()

        # Convolutional part
        self.conv_layers = nn.Sequential(
            # in_channels = 1 : Number of channels in the input image
            # out_channels = 32 : Number of channels produced by the convolution
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        # Transformer part
        encoder_layer = nn.TransformerEncoderLayer(d_model=32, nhead=8)
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer, num_layers=2)

        # Linear classifier
        self.fc = nn.Linear(32, num_classes)

    def forward(self, x):
        x = self.conv_layers(x)

        # Flattening
        x = x.view(x.size(0), -1, x.size(1))

        # Transformer encoder
        x = self.transformer_encoder(x)

        # Max pooling over time
        x, _ = torch.max(x, dim=1)

        # Classifier
        x = self.fc(x)
        return x


In [12]:
class ToMelSpectrogram:
    def __call__(self, samples):
        # print(samples.shape) # (16000,)
        mel_spectrogram = librosa.feature.melspectrogram(y=samples, sr=16000, n_mels=64, hop_length=225)
        log_mel_spectrogram = librosa.power_to_db(mel_spectrogram)
        return log_mel_spectrogram

class AudioDataset(torch.utils.data.Dataset):
    def __init__(self, file_list, transform=None):
        self.file_list = file_list
        self.transform = transform
        self.class_map = {'sleepy': 0, 'uncomfortable': 1, 'diaper': 2, 'awake': 3, 'sad': 4, 'hug': 5, 'hungry': 6}
        self.label_list = np.array([self.class_mapping(file.rsplit('/', 2)[1]) for file in file_list])

    def class_mapping(self, class_name:str) -> int:
        return self.class_map[class_name]

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        waveform, _ = librosa.load(self.file_list[idx],
                                   sr=None,
                                   duration=2.0,
                                   mono=True)
        label = self.label_list[idx]

        if self.transform:
            waveform = self.transform(waveform)

        return waveform, label



In [None]:
def train():
    # We will use the transformation to convert the audio into Mel spectrogram
    transform = Compose([ToMelSpectrogram(), ToTensor()])

    train_set = AudioDataset(train_file_list, transform=transform)
    val_set = AudioDataset(val_file_list, transform=transform)

    train_loader = DataLoader(dataset=train_set, batch_size=16, shuffle=True)
    val_loader = DataLoader(dataset=val_set, batch_size=16, shuffle=True)

    # Assuming we have this class implemented following the paper or using a library
    model = CoAtNet()
    model = model.cuda()
    optimizer = optim.Adam(model.parameters(), lr=5e-4)
    criterion = nn.CrossEntropyLoss().cuda()

    num_epochs = 500
    top_val_acc = 0.0

    for epoch in range(num_epochs):
        model.train()
        for data in train_loader:
            inputs, labels = data
            inputs, labels = inputs.cuda(), labels.cuda()  # Move both tensors to GPU


            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward and optimize
            loss.backward()
            optimizer.step()

        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item()}")

        # Validation
        if (epoch + 1) % 5 == 0:
            model.eval()
            with torch.no_grad():
                correct = 0
                total = 0
                for inputs, labels in val_loader:
                    inputs = inputs.cuda()
                    labels = labels.cuda()
                    outputs = model(inputs)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

                val_acc = correct/total
                print(f"Validation Accuracy: {val_acc}")

        if top_val_acc < val_acc:
            top_val_acc = val_acc
            torch.save(model.state_dict(), MODEL_PATH)

train()

Epoch [1/500], Loss: 1.920064926147461
Epoch [2/500], Loss: 1.3267863988876343
Epoch [3/500], Loss: 1.8584970235824585
Epoch [4/500], Loss: 2.1657590866088867
Epoch [5/500], Loss: 1.7441984415054321
Validation Accuracy: 0.3005911778080946
Epoch [6/500], Loss: 1.6334692239761353
Epoch [7/500], Loss: 1.7668952941894531
Epoch [8/500], Loss: 1.3941328525543213
Epoch [9/500], Loss: 1.3565025329589844
Epoch [10/500], Loss: 2.045431613922119
Validation Accuracy: 0.31832651205093226
Epoch [11/500], Loss: 1.0445263385772705
Epoch [12/500], Loss: 1.7678775787353516
Epoch [13/500], Loss: 1.7707273960113525
Epoch [14/500], Loss: 1.4342319965362549
Epoch [15/500], Loss: 1.1951143741607666
Validation Accuracy: 0.3483401546157344
Epoch [16/500], Loss: 1.7901904582977295
Epoch [17/500], Loss: 1.7093932628631592
Epoch [18/500], Loss: 1.2657653093338013
Epoch [19/500], Loss: 1.573033094406128
Epoch [20/500], Loss: 1.8999027013778687
Validation Accuracy: 0.36880400181900863
Epoch [21/500], Loss: 1.145245

  waveform, _ = librosa.load(self.file_list[idx],
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


FileNotFoundError: ignored

In [20]:
import ast

with open(os.path.join(work_path, 'test_file_list.txt'), 'r', encoding='utf-8') as file:
    content = file.readlines()

test_file_list = ast.literal_eval(content[0])
print(test_file_list[0])
len(test_file_list)

/Users/jaewone/developer/tensorflow/baby-cry-classification/data/diaper/diaper_1340.wav


1084

In [None]:
class PredictDataset(torch.utils.data.Dataset):
    def __init__(self, file_list, transform=None):
        self.file_list = file_list
        self.transform = transform

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        waveform, _ = librosa.load(self.file_list[idx],
                                   sr=None,
                                   duration=2.0,
                                   mono=True)

        if self.transform:
            waveform = self.transform(waveform)

        return waveform


def load_model(path):
    model = CoAtNet()  # should match the architecture of the trained model
    model.load_state_dict(torch.load(path))
    model.eval()
    return model


def predict(test_file_list):
    model = load_model(MODEL_PATH)

    transform = Compose([ToMelSpectrogram(), ToTensor()])
    test_set = PredictDataset(test_file_list, transform=transform)
    test_loader = DataLoader(dataset=test_set, batch_size=1, shuffle=False)

    predictions = []

    for batch in test_loader:
        batch = batch.cuda()
        outputs = model(batch)
        # change if multi-label classification
        _, predicted = torch.max(outputs.data, 1)
        predictions.append(predicted.item())

    return predictions


def main():
    predictions = predict(test_file_list)
    print(predictions)


if __name__ == "__main__":
    main()
