In [5]:
import math, random
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio

from torch.utils.data import DataLoader, Dataset, random_split

import pandas as pd
import os

import torch.nn.functional as F
import torch.nn as nn
from torch.nn import init

torch.set_num_threads(4)

class AudioUtil():
  # Загружаем аудиофайл и возвращаем тензор
    @staticmethod
    def open(audio_file):
        sig, sr = torchaudio.load(audio_file)
        return (sig, sr)
    
    @staticmethod
    def resample(aud, newsr):
        sig, sr = aud

        if (sr == newsr):
    # Nothing to do
            return aud

        num_channels = sig.shape[0]
    # Resample first channel
        resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
        if (num_channels > 1):
    # Resample the second channel and merge both channels
            retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
            resig = torch.cat([resig, retwo])

        return ((resig, newsr))

  # ----------------------------
  # Pad (or truncate) the signal to a fixed length 'max_ms' in milliseconds
  # ----------------------------
    @staticmethod
    def pad_trunc(aud, max_ms):
        sig, sr = aud
        num_rows, sig_len = sig.shape
        max_len = sr//1000 * max_ms

        if (sig_len > max_len):
          # Truncate the signal to the given length
          sig = sig[:,:max_len]

        elif (sig_len < max_len):
          # Length of padding to add at the beginning and end of the signal
            pad_begin_len = random.randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len

            # Pad with 0s
            pad_begin = torch.zeros((num_rows, pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))

            sig = torch.cat((pad_begin, sig, pad_end), 1)

        return (sig, sr)
    
    
    @staticmethod
    def time_shift(aud, shift_limit):
        sig,sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)


  # ----------------------------
  # Generate a Spectrogram
  # ----------------------------
    @staticmethod
    def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
        sig,sr = aud
        top_db = 80

        # spec has shape [channel, n_mels, time], where channel is mono, stereo etc
        spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)

        # Convert to decibels
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        return (spec)

  # ----------------------------
  # Augment the Spectrogram by masking out some sections of it in both the frequency
  # dimension (ie. horizontal bars) and the time dimension (vertical bars) to prevent
  # overfitting and to help the model generalise better. The masked sections are
  # replaced with the mean value.
  # ----------------------------
@staticmethod
def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
    _, n_mels, n_steps = spec.shape
    mask_value = spec.mean()
    aug_spec = spec

    freq_mask_param = max_mask_pct * n_mels
    for _ in range(n_freq_masks):
        aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

    time_mask_param = max_mask_pct * n_steps
    for _ in range(n_time_masks):
        aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

    return aug_spec

In [6]:


# ----------------------------
# Sound Dataset
# ----------------------------
class SoundDS(Dataset):
    def __init__(self, df, data_path):
    
        self.df = df
        self.data_path = str(data_path)
        self.duration = 4000
        self.sr = 44100
        self.channel = 2
        self.shift_pct = 0.4

    # Длина датасета
    def __len__(self):
        return len(self.df)    
    
  # ----------------------------
  # Get i'th item in dataset
  # ----------------------------
    def __getitem__(self, idx):
        # путь до файла
        audio_file = self.data_path + self.df.loc[idx, 'relative_path']
        # Берем класс
        class_id = self.df.loc[idx, 'classID']

        aud = AudioUtil.open(audio_file)
        reaud = AudioUtil.resample(aud, self.sr)

        dur_aud = AudioUtil.pad_trunc(reaud, self.duration)
        
        #добавление шумов
        shi_aud=AudioUtil.time_shift(dur_aud,self.shift_pct)  #нужно установить размер шумов
        
        sgram = AudioUtil.spectro_gram(shi_aud, n_mels=64, n_fft=1024, hop_len=None)

        return sgram, class_id

In [7]:
mp = './hackaton_ds/train'

In [8]:
for i in os.listdir(mp):
    print(i)

down
go
left
no
off
on
right
stop
up
yes


In [18]:
audio_path='./hackaton_ds/train/'

df = pd.DataFrame()
data = {'slice_file_name': [], 'classID': [], 'fold':[]}
for ix, i in enumerate(os.listdir(mp)):
    audio_path_i = mp+'/'+i
    for file_name in os.listdir(audio_path_i):
#         print()
        data['slice_file_name'].append(i+'/'+file_name)
        data['classID'].append(ix)
        data['fold'].append(i)

df=pd.DataFrame(data)    

In [19]:
df.to_csv("dataset.csv")

In [17]:
#установите свой путь
data_path = audio_path #'C:/Users/skienbear/Desktop/Audio Recognizer/Training_Data/'
myds = SoundDS(df, data_path)

# Random split of 80:20 between training and validation
num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])

# Create training and validation data loaders
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, shuffle=False)

In [18]:
# Combining all the pieces together
class PytorchBasedGenericGradientBoost():
    def __init__(self, type, n_trees, max_depth, GRADIENT_BOOST_LEARNING_RATE = 0.1, MINIMIZER_LEARNING_RATE = 0.001, MINIMIZER_TRAINING_EPOCHS = 5000):
        '''
        type : "regressor" or "classifier"
        '''
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.type = type
        self.gradient_boost_learning_rate = GRADIENT_BOOST_LEARNING_RATE
        self.minimizer_learning_rate = MINIMIZER_LEARNING_RATE
        self.minimizer_training_epochs = MINIMIZER_TRAINING_EPOCHS
        # Variables to hold output of algorithm
        self.initial_prediction = None
        self.regression_trees = []
        # Get an instance of a minimizer
        self.minimizer = LossFunctionMinimizer(self.type)
        if USE_CUDA:
            self.minimizer.cuda()
        self.minimizer_optimizer = torch.optim.Adam(self.minimizer.parameters(), lr=self.minimizer_learning_rate)
    def minimize_loss_function(self, targets, previous_predictions):
        self.minimizer.reinitialize_variable()
        for training_epoch in range(self.minimizer_training_epochs):
            targets_leaf_tensor = FloatTensor(targets)
            loss = self.minimizer.loss_classifier(previous_predictions, targets_leaf_tensor)
            self.minimizer.zero_grad()
            loss.backward()
            self.minimizer_optimizer.step()
        return [el for el in self.minimizer.parameters()][0].cpu().detach().numpy()[0]
    def compute_residuals(self, targets, predicted_values):
        model = ResidualsCalculator(predicted_values, self.type)
        if USE_CUDA:
            model.cuda()
        loss = model.loss(targets)
        model.zero_grad()
        loss.backward()
        residuals = model.predicted_values.grad.clone() # deep copy of the input/gradients
        return residuals
    def fit(self, X, y):
        X_values = X.copy()
        y_values = y.copy()
        # Initialization phase
        if USE_CUDA:
            initial_values = torch.zeros(y_values.shape,1).cuda()
        else:
            initial_values = torch.zeros(y_values.shape)
        self.initial_prediction = self.minimize_loss_function(y_values, initial_values)
        prediction_values = np.ones(y_values.shape) * self.initial_prediction

        for classifier_index in range(self.n_trees):
            self.regression_trees.append({"tree_index": classifier_index})
            residuals = self.compute_residuals(FloatTensor(y_values), FloatTensor(prediction_values))
            leaf_buckets, unique_clusters, tree_regressor = fit_regression_tree_classifier_to_residuals(X_values, residuals.cpu(), self.max_depth)
            self.regression_trees[-1]["tree_regressor"] = tree_regressor
            
            X_values_temp = np.array([])
            y_values_temp = np.array([])
            prediction_values_temp = np.array([])

            for unique_cluster in unique_clusters:
                indices = [1 if el == unique_cluster else 0 for el in leaf_buckets]
                y_leaf = y_values[np.array(indices) == 1]
                X_leaf = X_values[np.array(indices) == 1]
                predictions_leaf = prediction_values[np.array(indices) == 1]
                prediction_for_leaf = self.minimize_loss_function(FloatTensor(np.array(y_leaf)), FloatTensor(predictions_leaf))
                predictions_for_leaf_array = np.ones(y_leaf.shape) * self.gradient_boost_learning_rate * prediction_for_leaf + predictions_leaf
                self.regression_trees[-1][str(unique_cluster)] = prediction_for_leaf
                X_values_temp = X_leaf if X_values_temp.shape == (0, ) else np.append(X_values_temp, X_leaf, axis=0)
                y_values_temp = np.append(y_values_temp, y_leaf)
                prediction_values_temp = np.append(prediction_values_temp, predictions_for_leaf_array)
            y_values = y_values_temp
            X_values = X_values_temp
            prediction_values = prediction_values_temp    
    def predict(self, X):
        predictions = []
        for index in range(X.shape[0]):
            prediction = self.initial_prediction
            for tree_index in range(self.n_trees):
                tree = self.regression_trees[tree_index]
                prediction += self.gradient_boost_learning_rate * tree[str(tuple(tree["tree_regressor"].decision_path(X[index, :].reshape(1,-1)).todok().keys()))]
            predictions.append(prediction)
        if self.type == "regressor":
            return predictions
        elif self.type == "classifier":
            return torch.sigmoid(torch.tensor(predictions)).numpy()
        else:
            raise Exception("Not supported")

In [19]:


# ----------------------------
# Audio Classification Model
# ----------------------------
class AudioClassifier (nn.Module):
    # ----------------------------
    # Build the model architecture
    # ----------------------------
    def __init__(self):
        super().__init__()
        conv_layers = []

        # First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv1 = nn.Conv2d(1, 16, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        # Second Convolution Block
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        # Second Convolution Block
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        # Linear Classifier
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=10)

        # Wrap the Convolutional Blocks
        self.conv = nn.Sequential(*conv_layers)
 
    # ----------------------------
    # Forward pass computations
    # ----------------------------
    def forward(self, x):
        # Run the convolutional blocks
#         print(x.shape)
        x = self.conv(x)

        # Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)

        # Linear layer
        x = self.lin(x)

        # Final output
        return x

# Create the model and put it on the GPU if available
myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel = myModel.to(device)
# Check that it is on Cuda
next(myModel.parameters()).device

device(type='cpu')

In [None]:
# ----------------------------
# Тренировка
# ----------------------------
def training(model, train_dl, num_epochs):
    # Loss Function, Optimizer and Scheduler
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

    # Repeat for each epoch
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_prediction = 0
        total_prediction = 0

        # Repeat for each batch in the training set
        for i, data in enumerate(train_dl):
            # Get the input features and target labels, and put them on the GPU
            inputs, labels = data[0].to(device), data[1].to(device)

            # Normalize the inputs
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s

            # Zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()

            # Keep stats for Loss and Accuracy
            running_loss += loss.item()

            # Get the predicted class with the highest score
            _, prediction = torch.max(outputs,1)
            # Count of predictions that matched the target label
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]

            #if i % 10 == 0:    # print every 10 mini-batches
            #    print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))

        # Print stats at the end of the epoch
        num_batches = len(train_dl)
        avg_loss = running_loss / num_batches
        acc = correct_prediction/total_prediction
        print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

        print('Finished Training')
  
num_epochs=2   # Just for demo, adjust this higher.
training(myModel, train_dl, num_epochs)

In [None]:
# torchaudio.load('C:/Users/skienbear/Desktop/Audio Recognizer/Training_Data/human/human_07839.wav')

In [None]:
from sklearn.metrics import confusion_matrix
# ----------------------------
# Inference
# ----------------------------
def inference (model, val_dl):
    correct_prediction = 0
    total_prediction = 0

  # Disable gradient updates
    with torch.no_grad():
        for data in val_dl:
          # Get the input features and target labels, and put them on the GPU
          inputs, labels = data[0].to(device), data[1].to(device)

          # Normalize the inputs
          inputs_m, inputs_s = inputs.mean(), inputs.std()
          inputs = (inputs - inputs_m) / inputs_s

          # Get predictions
          outputs = model(inputs)
          print(outputs)

          # Get the predicted class with the highest score
          _, prediction = torch.max(outputs,1)
            
        
            
          # Count of predictions that matched the target label
          correct_prediction += (prediction == labels).sum().item()
          total_prediction += prediction.shape[0]
          
        
        
    
    acc = correct_prediction/total_prediction
    res=confusion_matrix(labels, prediction)
    print(res)
    
    print(f'Accuracy: {acc:.2f}, Total items: {total_prediction}')
# val_dlned model with the validation set
inference(myModel, val_dl)

In [None]:
PATH="./Desktop/Audio Recognizer/Model/"
torch.save(myModel.state_dict(), PATH+'model')

In [None]:
import torch.nn.functional as nnf
def inference (model, val_dl):
    correct_prediction = 0
    total_prediction = 0

  # Disable gradient updates
    with torch.no_grad():
        for data in val_dl:
          # Get the input features and target labels, and put them on the GPU
          inputs, labels = data[0].to(device), data[1].to(device)

          # Normalize the inputs
          inputs_m, inputs_s = inputs.mean(), inputs.std()
          inputs = (inputs - inputs_m) / inputs_s

          # Get predictions
          outputs = model(inputs)
          #print(outputs)
          prob = nnf.softmax(outputs, dim=1)
          topk, _ =torch.topk(prob, 2)
          print(prob)
        
          #prob = torch.sigmoid(outputs)
          #res=topk[0][1]/sum(topk[0][1],topk[0][0])
          #print(topk[1])
          #_, prediction = torch.max(outputs,1)
            
audio_path3="C:/Users/skienbear/Desktop/Audio Recognizer/Testing_Data/"
# df1 = pd.DataFrame()
data1 = {'relative_path': [], 'classID': []}
for file_name in os.listdir(audio_path3):
    data1['relative_path'].append(file_name)
    data1['classID'].append(0)
df1=pd.DataFrame(data1)
        
data_path_test = 'C:/Users/skienbear/Desktop/Audio Recognizer/Testing_Data/'
myds_test = SoundDS(df1, data_path_test)

result = {'relative_path': [], 'classID': []}
        
test_dl = torch.utils.data.DataLoader(myds_test, batch_size=16, shuffle=False)
inference(myModel, test_dl)