In [1]:
# import packages
import sklearn
from sklearn import preprocessing
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
import os
# from typing import List
# from numpy.typing import list
import matplotlib.pyplot as plt
import torchaudio
# import audiotools

from pathlib import PurePath
import pickle
# from pydub import AudioSegment

In [None]:
# flac to wav test
# origindir=DATASET_AUDIO_PATH
# convertdir=root_path+"train_wav"
# originpath=DATASET_AUDIO_PATH+"/spk001/spk001_002.flac"
# convertpath=root_path+"train_wav/"+originpath.split('/')[-2]+"/"+originpath.split('/')[-1].split('.')[0]+".wav"

# print(originpath)
# print("/".join(convertpath.split('/')[0:-1]))
# if os.path.exists("/".join(convertpath.split('/')[0:-1])) is False:
#     os.makedirs("/".join(convertpath.split('/')[0:-1]))
# os.system('ffmpeg -i %s %s' % (originpath,convertpath))

# test load flac file
# torchaudio.load(DATASET_AUDIO_PATH+"spk001/spk001_002.wav")

In [2]:
# parameters
VALID_SPLIT = 0.1
SHUFFLE_SEED = 43
SAMPLING_RATE = 16000
SCALE = 0.5
BATCH_SIZE = 4
EPOCHS = 100

LENGTH = 8*16000

AUDIO_IDX=0

In [None]:
# generate dataset
# root_path="/mnt/f/workspace/AAI_project/LibriSpeech-SI/";
# train="train_wav";
# noise="noise_wav";
# test="test_wav";
# test_noise="test_noise_wav";

# DATASET_AUDIO_PATH = os.path.join(root_path,train)
# DATASET_NOISE_PATH = os.path.join(root_path,noise)

# # get all audio data path and label
# # get class_names firts, then add audio data to data_X, class_names to label_Y
# class_names = os.listdir(DATASET_AUDIO_PATH)
# X=[];Y=[];
# def listdir_addXY(path, labels, X, Y):
#     for label in labels:
#         for file in os.listdir(path+'/'+label):
#             file_path = os.path.join(path+'/'+label, file)
#             if os.path.isdir(file_path):
#                 listdir_addXY(file_path, X, Y)
#             else:
#                 X.append(file_path)
#                 Y.append(label)
# listdir_addXY(DATASET_AUDIO_PATH,class_names,X,Y)
# Y = torch.as_tensor(preprocessing.LabelEncoder().fit_transform(Y))
# print(class_names)

In [3]:
# load processed X,Y
# with open("X.pkl",mode="wb") as f:
#     pickle.dump(X, f)
# with open("Y.pkl",mode="wb") as f:
#     pickle.dump(Y, f)

if os.path.exists('X.pkl'):
    X=pickle.load(open('X.pkl','rb'))

if os.path.exists('Y.pkl'):
    Y=pickle.load(open('Y.pkl','rb'))

In [4]:
def fixed_length(audio):
    if audio.size()[0] >= LENGTH:
        return audio[0:LENGTH]
    else:
        audio = torch.cat((audio,audio),0)
        return fixed_length(audio) 

# def paths_and_labels_to_dataset(audio_paths, labels):
#     """Constructs a dataset of audios and labels."""

def path_to_audio(path):
    """Reads and decodes an audio file. data size should have same length"""
    audio,sample_rate=torchaudio.load(path)
    if sample_rate != SAMPLING_RATE:
        print("error samplerate")
    else:
        audio = audio.squeeze()
        return fixed_length(audio)
# audio=fixed_length(path_to_audio(X[0]))

# def add_noise(audio, noises=None, scale=0.5):
#     """add noise to data"""

def audio_to_fft(audio):
    """do fft"""
    # Since tf.signal.fft applies FFT on the innermost dimension,
    # we need to squeeze the dimensions and then expand them again
    # after FFT
    # print(audio.size())
    fft = torch.fft.fft(audio)[0:len(audio)//2]
    # plt.plot(fft)
    global AUDIO_IDX
    AUDIO_IDX=AUDIO_IDX+1
    return torch.abs(fft)

class dataset(Dataset):
    def __init__(self, audio_paths, labels):
        self.audio_paths = audio_paths
        self.labels = labels

    def __getitem__(self,idx):
        audio_path = self.audio_paths[idx]
        audio = path_to_audio(audio_path)
        fft = audio_to_fft(audio)
        fft=fft.unsqueeze(dim=-2)
        label = torch.tensor(self.labels[idx]) 
        return fft,label

    def __len__(self):
        return len(self.labels)

In [5]:
# model definition
# use residential block

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

# residential block  todo group, conv use groups
class Bottleneck(nn.Module):
    # output channels = expansion * input channels
    expansion = 4
    def __init__(self, in_channel, out_channel, stride=1, downsample=None, conv_nums=3, down_sample=None):
        super(Bottleneck, self).__init__()
        self.expansion = 4
        self.conv1 = nn.Conv1d(in_channels=in_channel, out_channels=out_channel, kernel_size=1, stride=1)
        self.relu1 = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv1d(in_channels=out_channel, out_channels=out_channel, kernel_size=3, stride=stride, padding=1)
        self.relu2 = nn.ReLU(inplace=True)

        self.conv3 = nn.Conv1d(in_channels=out_channel, out_channels=out_channel*self.expansion, kernel_size=3, stride=1, padding=1)
        self.relu3 = nn.ReLU(inplace=True)

        self.downsample = downsample
    
    def forward(self,x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.relu1(out)

        out = self.conv2(out)
        out = self.relu2(out)
        
        out = self.conv3(out)

        out += identity
        out = self.relu3(out)

        return out


# whole model for acoustic signal feature
class ResNet(nn.Module):
    def __init__(self, block=Bottleneck, blocks_num=[3, 4, 6, 3, 3], num_classes=250, include_top=True):
        super(ResNet, self).__init__()
        self.include_top = include_top
        self.in_channel = 32

        self.conv1 = nn.Conv1d(1,self.in_channel, kernel_size=1,padding=0)
        self.relu1 = nn.ReLU(inplace=True)
        self.maxpool1 = nn.MaxPool1d(kernel_size=3,stride=2,padding=1) # stride=2 padding=1 size/2
        self.layer1 = self._make_layer(block, 32, block_num=blocks_num[0], stride=2)
        self.layer2 = self._make_layer(block, 64, block_num=blocks_num[1], stride=2)
        self.layer3 = self._make_layer(block, 128, block_num=blocks_num[2], stride=2)
        self.layer4 = self._make_layer(block, 256, block_num=blocks_num[3], stride=2)
        self.layer5 = self._make_layer(block, 512, block_num=blocks_num[4], stride=2)

        if self.include_top:
            self.avgpool = nn.AdaptiveAvgPool1d(1)
            self.fc = nn.Linear(in_features=512*block.expansion,out_features=num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv1d): # normal distribution initialize the weights
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

        

    def _make_layer(self, block, channel, block_num=3, stride=1):
        downsample = None
        if stride != 1 or self.in_channel != channel * block.expansion:
            downsample = nn.Sequential(
                nn.Conv1d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, padding=0)
                # nn.BatchNorm1d(channel * block.expansion)
            )

        layers = []
        layers.append(
            block(
                self.in_channel,
                channel,
                downsample=downsample,
                stride=stride,
            )
        )
        self.in_channel = channel * block.expansion

        for _ in range(1, block_num):
            layers.append(
                block(
                    self.in_channel,
                    channel
                )
            )
        
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)

        if self.include_top:
            x = self.avgpool(x)
            x = torch.flatten(x, 1)
            x = self.fc(x)

        return x


model = ResNet().to(device)
print(model)

Using cuda device
ResNet(
  (conv1): Conv1d(1, 32, kernel_size=(1,), stride=(1,))
  (relu1): ReLU(inplace=True)
  (maxpool1): MaxPool1d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv1d(32, 32, kernel_size=(1,), stride=(1,))
      (relu1): ReLU(inplace=True)
      (conv2): Conv1d(32, 32, kernel_size=(3,), stride=(2,), padding=(1,))
      (relu2): ReLU(inplace=True)
      (conv3): Conv1d(32, 128, kernel_size=(3,), stride=(1,), padding=(1,))
      (relu3): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv1d(32, 128, kernel_size=(1,), stride=(2,))
      )
    )
    (1): Bottleneck(
      (conv1): Conv1d(128, 32, kernel_size=(1,), stride=(1,))
      (relu1): ReLU(inplace=True)
      (conv2): Conv1d(32, 32, kernel_size=(3,), stride=(1,), padding=(1,))
      (relu2): ReLU(inplace=True)
      (conv3): Conv1d(32, 128, kernel_size=(3,), stride=(1,), padding=(1,))
      (relu3): ReLU(inplace=True

In [28]:
# train
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # print(batch)
        # try:
        #     X, y = X.to(device), y.to(device)
        # except:
        #     print(batch,X,y)
        X, y = X.to(device), y.to(device)
        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]",flush=True)


# test
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            # try:
            #     X, y = X.to(device), y.to(device)
            # except:
            #     print(X,y)
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")


In [22]:
# loss function and optimizer

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

data_set = dataset(X,Y)
data_sizes=len(data_set)
train_len=int(data_sizes*0.8)
test_len=data_sizes-train_len
train_dataset,test_dataset=torch.utils.data.random_split(data_set,[train_len,test_len],generator=torch.Generator().manual_seed(42))
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# for X, y in test_dataloader:
#     print(f"Shape of X [N, C, H, W]: {X.shape}")
#     print(f"Shape of y: {y.shape} {y.dtype}")
#     break


Shape of X [N, C, H, W]: torch.Size([4, 1, 64000])
Shape of y: torch.Size([4]) torch.int64


  label = torch.tensor(self.labels[idx])


In [29]:
epochs = 2
for t in range(epochs):
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)

  label = torch.tensor(self.labels[idx])


Test Error: 
 Accuracy: 0.8%, Avg loss: 5.340520 



In [None]:
# print(torch.cuda.is_available())

# print(torch.__version__,torchaudio.__version__)

# torch.cuda.device_count()
# torch.cuda.get_device_name(0)

# m = nn.Conv1d(1, 16, 3, stride=1, padding=1)
# pool = nn.MaxPool1d(3,2,1)
# avgpool = nn.AdaptiveAvgPool1d(1)
# input = torch.randn(1, 16, 50)
# output = avgpool(input)
# output = torch.flatten(output, 1)
# output.size()

In [None]:
# path=["path"]
# for i in path:
#     print(i)

# audio.shape[0]
# # audio = audio[1:10*16000]
# # a= audio.squeeze()

# fft = audio_to_fft(audio)

# train_data = dataset(X,Y)
# data = train_set.__getitem__(1)

# plt.plot(data[0])
# audio=path_to_audio(X[2])
# fft = audio_to_fft(audio)
# fft=fft.unsqueeze(dim=-2)

# audio,sample_rate=torchaudio.load(X[2])
# audio = audio.squeeze()
# # audio=fixed_length(audio)
# audio = torch.cat((audio,audio),0)
# audio = audio[0:LENGTH]

