## Импорт

In [None]:
%matplotlib inline
from collections import defaultdict
import time

import timit_utils as tu
import timit_utils.audio_utils as au
import timit_utils.drawing_utils as du
#import os
import librosa
import librosa.display

import numpy as np
import pandas as pd

from tqdm import tqdm

import torch
import torch.nn as nn
from torch.optim import Adam
import torch.nn.functional as F
from torchsummary import summary

import matplotlib.pyplot as plt

from sklearn.metrics import accuracy_score, precision_score, recall_score

import IPython

## Загрузка

In [None]:
device_num = 0
device = f"cuda:{device_num}" if torch.cuda.is_available() else "cpu"
device

In [None]:
torch.cuda.get_device_name(0)

In [None]:
torch.cuda.get_device_properties(0)

In [None]:
# сохраняем путь к папке
_TIMIT_PATH = 'data/lisa/data/timit/raw/TIMIT'
# in speech processing, the recommended value is 512, corresponding to 23 milliseconds at a sample rate of 22050 Hz
# для 16000 Hz n_fft = 372
n_fft = 512#32ms
sr = 16000

In [None]:
# прослушаем пример аудизаписи
data, sr = librosa.load('data/lisa/data/timit/raw/TIMIT/TRAIN/DR1/MCPM0/SA2.WAV', sr=sr, mono=True)
data.shape[0]

In [None]:
IPython.display.Audio(data, rate=sr)

In [None]:
# посмотрим на график
plt.figure(figsize=(15,5))
plt.plot(data)
plt.grid()

## Ручная проверка

#### Преобразование Фурье и Mel-спектрограмма

In [None]:
# выполним оконное преобразование Фурье
data_stft = librosa.stft(y=data, n_fft=n_fft, window='blackman', hop_length=n_fft//4)
# построим спектрограмму
data_stft = np.abs(data_stft)
fig, ax = plt.subplots(figsize=(20,5))
img = librosa.display.specshow(librosa.amplitude_to_db(data_stft, ref=np.max),x_axis='s', y_axis='log', sr=sr, ax=ax)
ax.set_title('Power spectrogram')
fig.colorbar(img, ax=ax, format="%+2.0f dB")
data_stft.shape, type(data_stft[0][0]), data_stft[0].shape

In [None]:
# mel-спектрограмма
D = np.abs(librosa.stft(data))**2
S = librosa.feature.melspectrogram(S=D, sr=sr, n_fft=n_fft)
fig, ax = plt.subplots(figsize=(20,5))
S_dB = librosa.power_to_db(S, ref=np.max)
img = librosa.display.specshow(S_dB, x_axis='time', y_axis='mel', sr=sr, ax=ax)
ax.set_title('Mel-frequency spectrogram')
fig.colorbar(img, ax=ax, format="%+2.0f dB")

#### Работа с корпусом TIMIT

In [None]:
# считаем корпус TITIM через Timit-utils
corpus = tu.Corpus(_TIMIT_PATH)
# прочитаем первого человека в первой папке
p = corpus.train.region_by_index(0).person_by_index(0)
p.name, p.gender

In [None]:
# каждым человеком записано несколько дорожек
len(corpus.train.region_by_index(0).person_by_index(0).sentences)

In [None]:
# список имен
corpus.train.people_names[:5]

In [None]:
# посморим список фраз, которые есть
p.sentences

In [None]:
# считаем первую дорожку
data = p.sentence_by_index(0).raw_audio 
sr = p.sentence_by_index(0).sample_rate
words = p.sentence_by_index(0).words_df

In [None]:
# пример визуализации через TIMIT-UTILS с GitHub
gained_padded_audio = au.audio_gained(au.audio_zero_padded(512, data, 8000), 1.0)
audio_features = au.audio_features(gained_padded_audio, sr)
sampled_audio = au.resampled_audio(data, sample_rate = sr, pad = 8000, to_sample_rate = 16000)
print(gained_padded_audio.shape, sampled_audio.shape, audio_features.shape)
du.DrawVerticalPanels([du.AudioPanel(data, show_x_axis=True), 
                       du.WordsPanel(words, data.shape[0], show_x_axis=True),
                       #du.PhonesPanel(s0.phones_df, s0.raw_audio.shape[0]),
                       #du.AudioPanel(sampled_audio, show_x_axis=True),
                       ##du.WordsPanel(sentence_words_input, sampled_audio.shape[0], show_x_axis=True),
                       ##du.PhonesPanel(sentence_phones_input, sampled_audio.shape[0]),
                       #du.SignalsPanel(audio_features)
                      ])

## Подготовка данных

#### Объявление функций

In [None]:
# объявим функцию, которая будет принимать дорожку, делать STFT, mel и строить график по запросу
def get_mel(data, sr=16000, n_fft=1024, sec=2.04, draw=False, fill_zero=False):

    # если короче 3 секунд - добиваем нулями
    if data.shape[0] < int(sr*sec):
      if fill_zero:
        data = np.concatenate((data, np.zeros(int(sr*sec)-data.shape[0])), axis=0)
      else:
        data = np.pad(data,data.shape[0], 'reflect')
    # берем первые 3 секунды
    D = np.abs(librosa.stft(data[:int(sr*sec)], n_fft = n_fft,hop_length=n_fft//4))**2
    S = librosa.feature.melspectrogram(S=D, sr=sr, n_fft=n_fft,hop_length=n_fft//4)
    S_dB = librosa.power_to_db(S, ref=np.max)
    
    if draw == True:
      fig, ax = plt.subplots(figsize=(20,5))
      img = librosa.display.specshow(S_dB, x_axis='time', y_axis='mel', sr=sr, ax=ax)
      ax.set_title('Mel-frequency spectrogram')
      fig.colorbar(img, ax=ax, format="%+2.0f dB")

    return S_dB#.ravel()

In [None]:
# проверка работы функции
S2 = get_mel(data, sr=sr, n_fft=1024,sec=2.04, draw=True)
type(S2), S2.shape

In [None]:
# объявим функцию нормализации спектрограммы
def mel_norm(S_dB):

  mean = S_dB.mean()
  std = S_dB.std()
  S_dB_norm = (S_dB - mean)/(std + 0.000001)
  S_dB_min = S_dB_norm.min()
  S_dB_max = S_dB_norm.max()
  S_dB_scaled = (S_dB_norm - S_dB_min)/(S_dB_max - S_dB_min)#255*

  return S_dB_scaled

In [None]:
df = pd.read_csv('SPKRinfo.csv', sep=';')

In [None]:
# в dataset хранятся имя, пол, возраст и другие дополнительные фичи
df.head(5)

In [None]:
df.info()

In [None]:
df['age'].mean(), df['age'].min(), df['age'].max()

In [None]:
# посмотрим на рапсределение возраста в train и test
df[df['Use']=='TRN']['age'].plot.hist(bins=20),df[df['Use']=='TST']['age'].plot.hist(bins=20) 

In [None]:
# определим функцию которая считает количество записей из корпуса TIMIT
# для train или test 

def count_TIMIT_sent(corpus_path, subcorp='train'):

  persons_name = []
  persons_gender = []
  count = 0
  count_pers = 0
  corpus = tu.Corpus(corpus_path)
  
  if subcorp == 'train':
    temp_subregion = corpus.train
  elif subcorp == 'test':
    temp_subregion = corpus.test

  i = 0
  n_regions = len(temp_subregion.regions)
  while i < n_regions:
    j = 0
    n_persons = len(temp_subregion.region_by_index(i).people)
    while j < n_persons:
      count = count + len(temp_subregion.region_by_index(i).person_by_index(j).sentences)
      j = j + 1
    i = i + 1  

  return count

In [None]:
# посмортим кол-во записей в train и test
train_sent_count = count_TIMIT_sent(_TIMIT_PATH)
test_sent_count = count_TIMIT_sent(_TIMIT_PATH, subcorp='test')
train_sent_count, test_sent_count, test_sent_count/(train_sent_count + test_sent_count)

In [None]:
df[df['Use']=='TRN'].shape

In [None]:
# напишем фунцию, которая берет ID из датасета, ищет в корпусе TIMIT
# дорожки, преобразует их в mel-спектрограмму и складывет их в один массив,
# а пол и возраст в другой
def mel_TIMIT(corpus_path, dataset, subcorp='train', sr=16000, n_fft=1024, sec=2.04):

  corpus = tu.Corpus(corpus_path)
  # определяем где искать
  if subcorp == 'train':
    filter = "TRN"
  else:
    filter = "TST"
  # объявляем переменные
  dataset = dataset[dataset['Use'] == filter]
  mel_set = np.zeros([dataset.shape[0]*10,int(n_fft/8),128]) #int(n_fft/4)  int((sr*sec-n_fft/4)/n_fft+1)
  gender_set = np.zeros(dataset.shape[0]*10)
  age_set = np.zeros(dataset.shape[0]*10)
  age_class = np.zeros(dataset.shape[0]*10)
  x = 0
  # перебираем ID
  for id in tqdm(dataset['ID']):
    # определяем где искать
    if subcorp == 'train':
      person = corpus.train.person_by_name(id)
    else:
      person = corpus.test.person_by_name(id)
    # для каждого ID определяем пол и возраст
    gender = person.gender
    age = dataset[dataset['ID']==id]['age']
    age_cls = dataset[dataset['ID']==id]['AgeClass']
    # кодируем пол в [0,1]
    if gender == "M":
      gender = 0
    else:
      gender = 1
    n_sentences = len(person.sentences)
    # перебираем все sentences для данного ID
    for sent in person.sentences:
      sentence = person.sentence_by_name(sent)
      mel_set[x] = mel_norm(get_mel(sentence.raw_audio,sr=sr,n_fft=n_fft, sec = sec))
      gender_set[x] = gender
      age_set[x] = age
      age_class[x] = age_cls
      x = x + 1
  
  return mel_set, gender_set, age_set, age_class

#### Преобразование данных

In [None]:
X_train, y_train1, y_train2, y_train3 = mel_TIMIT(_TIMIT_PATH, df)

In [None]:
# изменим размернсоти для подачи в нейросеть
X_train1 = np.expand_dims(X_train, axis=1)
y_train1 = np.expand_dims(y_train1, axis=1)
y_train2 = np.expand_dims(y_train2, axis=1)
y_train3 = np.expand_dims(y_train3, axis=1)
y_train1.shape, y_train1.shape

In [None]:
#убедимся что преобразование произошло корректно
ig, ax = plt.subplots(figsize=(20,5))
import seaborn as sns
# img =np.reshape( X_train[0],(128,94))
sns.heatmap(X_train[10])
ax.invert_yaxis()
X_train[10].shape

In [None]:
X_test, y_test1, y_test2, y_test3 = mel_TIMIT(_TIMIT_PATH, df, subcorp='test')

In [None]:
X_test1=np.expand_dims(X_test, axis=1)
y_test1=np.expand_dims(y_test1, axis=1)
y_test2=np.expand_dims(y_test2, axis=1)
y_test3=np.expand_dims(y_test3, axis=1)

In [None]:
X_train1.shape, X_test1.shape, y_train1.shape, y_test1.shape,y_train3.shape, y_test3.shape

## Работа с нейросетью (определение пола)

#### Создание тензора

In [None]:
# опишем класс, который будет преобразовывать исходный массив в тензор
class PeopleDataset(torch.utils.data.Dataset):

  def __init__(self, X, y, target = False):
   
    self.x_data = torch.tensor(X, dtype = torch.float32)#.to(device)
    if target == False:
      self.y_data = torch.tensor(y, dtype = torch.float32)#.to(device)
    else:
      self.y_data = torch.tensor(y, dtype = torch.long)
  def __len__(self):
    return len(self.x_data)  # required

  def __getitem__(self, idx):
    if torch.is_tensor(idx):
      idx = idx.tolist()
    spec = self.x_data[idx]
    tar = self.y_data[idx]

    sample = (spec,tar)
    return sample

In [None]:
train_dataset = PeopleDataset(X_train1,y_train1)
test_dataset = PeopleDataset(X_test1,y_test1)

torch.manual_seed(42)
np.random.seed(42)

# train_dataset, val_dataset = torch.utils.data.random_split(train_dataset, [3696, 924],generator=torch.Generator().manual_seed(42))

In [None]:
# проверим результат
train_dataset[0]

#### Генератор батчей

In [None]:
# Инициализируем генераторы батчей
batch_size = 32

train_batch_gen = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_batch_gen = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
# test_batch_gen = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
print(len(train_batch_gen))
train_features, train_labels = next(iter(train_batch_gen))
print(f"Feature batch shape: {train_features.size()[0]}")
print(f"Labels batch shape: {train_labels.size()}")

#### Объявление функций

In [None]:
# Функция для обучения нейросети

def train(model,criterion,optimizer,train_batch_gen,val_batch_gen,num_epochs=50):
    '''
    Функция для обучения модели и вывода лосса и метрики во время обучения.
    :param model: обучаемая модель
    :param criterion: функция потерь
    :param optimizer: метод оптимизации
    :param train_batch_gen: генератор батчей для обучения
    :param val_batch_gen: генератор батчей для валидации
    :param num_epochs: количество эпох
    :return: обученная модель
    :return: (dict) accuracy и loss на обучении и валидации ("история" обучения)
    '''


    history = defaultdict(lambda: defaultdict(list))

    for epoch in range(num_epochs):
        train_loss = 0
        train_acc = 0
        train_prec = 0
        train_rec = 0
        val_loss = 0
        val_acc = 0
        val_prec = 0
        val_rec = 0

        start_time = time.time()

        # Устанавливаем поведение dropout / batch_norm  в обучение
        model.train(True) 

        # На каждой "эпохе" делаем полный проход по данным
        for X_batch,y_batch in train_batch_gen:
            # Обучаемся на батче (одна "итерация" обучения нейросети)
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            
            logits = model(X_batch)
            loss = criterion(logits, y_batch.float().to(device))
            
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            
            train_loss += np.sum(loss.detach().cpu().numpy())
            y_pred = logits.round().detach().cpu().numpy()
            train_acc += np.mean(y_batch.cpu().numpy() == y_pred)
            #prec=TP/(TP+FP)
            TP = np.sum(y_batch.cpu().numpy())
            if np.sum(y_pred) > TP:
              FP = np.sum(y_pred) - TP
            else:
              FP = 0
            train_prec += TP/(TP+FP+0.000001)
            #rec=TP/(TP+FN)
            F_target = X_batch.size()[0] - np.sum(y_batch.cpu().numpy())
            F_pred = X_batch.size()[0] - np.sum(y_pred)
            if F_pred > F_target:
              FN = F_pred - F_target
            else:
              FN = 0        
            train_rec += TP/(TP+FN+0.000001)

        # Подсчитываем лоссы и сохраням в "историю"
        train_loss /= len(train_batch_gen)
        train_acc /= len(train_batch_gen) 
        train_prec /= len(train_batch_gen)
        train_rec /= len(train_batch_gen)
        history['loss']['train'].append(train_loss)
        history['acc']['train'].append(train_acc)
        history['prec']['train'].append(train_prec)
        history['rec']['train'].append(train_rec)

        # Устанавливаем поведение dropout / batch_norm в режим тестирования
        model.train(False)

        # Полный проход по валидации    
        for X_batch, y_batch in val_batch_gen:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            
            with torch.no_grad():
                logits = model(X_batch)
                
                loss = criterion(logits, y_batch.float().to(device))
                val_loss += np.sum(loss.detach().cpu().numpy())
                y_pred = logits.round().detach().cpu().numpy()#max(1)[1]
                val_acc += np.mean(y_batch.cpu().numpy() == y_pred)
                #prec=TP/(TP+FP)
                TP = np.sum(y_batch.cpu().numpy())
                if np.sum(y_pred) > TP:
                  FP = np.sum(y_pred) - TP
                else:
                  FP = 0
                val_prec += TP/(TP+FP+0.000001)
                #rec=TP/(TP+FN)
                F_target = X_batch.size()[0] - np.sum(y_batch.cpu().numpy())
                F_pred = X_batch.size()[0] - np.sum(y_pred)
                if F_pred > F_target:
                  FN = F_pred - F_target
                else:
                  FN = 0        
                val_rec += TP/(TP+FN+0.000001)

        # Подсчитываем лоссы и сохраням в "историю"
        val_loss /= len(val_batch_gen)
        val_acc /= len(val_batch_gen) 
        val_prec /= len(val_batch_gen)
        val_rec /= len(val_batch_gen)
        history['loss']['val'].append(val_loss)
        history['acc']['val'].append(val_acc)
        history['prec']['val'].append(val_prec)
        history['rec']['val'].append(val_rec)
        
        IPython.display.clear_output()

        # Печатаем результаты после каждой эпохи
        print("Epoch {} of {} took {:.3f}s".format(
            epoch + 1, num_epochs, time.time() - start_time))
        print("  training loss (in-iteration): \t{:.6f}".format(train_loss))
        print("  validation loss (in-iteration): \t{:.6f}".format(val_loss))  
        print("  training accuracy: \t\t\t{:.2f} %".format(train_acc * 100))
        print("  validation accuracy: \t\t\t{:.2f} %".format(val_acc * 100))
        print("  training precision: \t\t\t{:.2f} %".format(train_prec*100))
        print("  validation precision: \t\t{:.2f} %".format(val_prec*100))
        print("  training recall: \t\t\t{:.2f} %".format(train_rec*100))
        print("  validation recall: \t\t\t{:.2f} %".format(val_rec*100))
         
    plot_learning_curves(history)
        
    return model, history

In [None]:
def plot_learning_curves(history):
    '''
    Функция для обучения модели и вывода лосса и метрики во время обучения.

    :param history: (dict)
        accuracy и loss на обучении и валидации
    '''
    fig = plt.figure(figsize=(20, 12))

    plt.subplot(2,2,1)
    plt.title('Loss', fontsize=15)
    plt.plot(history['loss']['train'], label='train')
    plt.plot(history['loss']['val'], label='val')
    plt.ylabel('loss', fontsize=15)
    plt.xlabel('epoch', fontsize=15)
    plt.legend()

    plt.subplot(2,2,2)
    plt.title('Accuracy', fontsize=15)
    plt.plot(history['acc']['train'], label='train')
    plt.plot(history['acc']['val'], label='val')
    plt.ylabel('acc', fontsize=15)
    plt.xlabel('epoch', fontsize=15)
    plt.legend()

    plt.subplot(2,2,3)
    plt.title('Precision', fontsize=15)
    plt.plot(history['prec']['train'], label='train')
    plt.plot(history['prec']['val'], label='val')
    plt.ylabel('prec', fontsize=15)
    plt.xlabel('epoch', fontsize=15)
    plt.legend()
    
    plt.subplot(2,2,4)
    plt.title('Recall', fontsize=15)
    plt.plot(history['rec']['train'], label='train')
    plt.plot(history['rec']['val'], label='val')
    plt.ylabel('rec', fontsize=15)
    plt.xlabel('epoch', fontsize=15)
    plt.legend()

    plt.show()

#### Объявление нейросети

In [None]:
#
class Model(nn.Module):
    def __init__(self,num_classes=1,dropout_rate=0.2):
        super().__init__()

        self.c1 = torch.nn.Conv2d(in_channels=1, out_channels=2048, kernel_size=[3,128], stride=(1,1), padding=(1,0))
        self.bn1 = nn.BatchNorm2d(2048)
        self.maxpool1 = nn.MaxPool1d(128)
        self.act = torch.nn.ReLU()
        self.dropout1 = nn.Dropout(dropout_rate)
        
        self.c2 = torch.nn.Conv2d(in_channels=1, out_channels=2048, kernel_size=[5,128], stride=(1,1), padding=(2,0))       
        self.bn2 = nn.BatchNorm2d(2048)
        self.maxpool2 = nn.MaxPool1d(128)
        self.dropout2 = nn.Dropout(dropout_rate)

        self.c3 = torch.nn.Conv2d(in_channels=1, out_channels=2048, kernel_size=[7,128], stride=(1,1), padding=(3,0))   
        self.bn3 = nn.BatchNorm2d(2048)
        self.maxpool3 = nn.MaxPool1d(128)
        self.dropout3 = nn.Dropout(dropout_rate)

        self.flt = torch.nn.Flatten()
        self.fc1 = nn.Linear(6144,2048,bias=False)
        self.bn4 = nn.BatchNorm1d(2048)
        self.dropout4 = nn.Dropout(dropout_rate)

        self.fc2 = nn.Linear(2048,2048,bias=False)
        self.bn5 = nn.BatchNorm1d(2048)
        self.dropout5 = nn.Dropout(dropout_rate)

        self.fc_out1 = nn.Linear(2048,num_classes)
        self.softmax = nn.Softmax(dim=-1)
        self.sigm = nn.Sigmoid()
        
    def forward(self, x):
        # x=torch.unsqueeze(x,1)
        
        x1=self.c1(x)
        
        x1=self.bn1(x1)
        x1=torch.squeeze(x1,-1)
        
        x1=self.maxpool1(x1)
        # x1 = F.max_pool1d(x1,x1.size(2))
        
        x1=self.act(x1)
        x1=self.dropout1(x1)

        # x=torch.unsqueeze(x,1)
        
        x2=self.c2(x)
        
        x2=self.bn2(x2)
        x2=torch.squeeze(x2,-1)
        
        x2=self.maxpool2(x2)
        # x2 = F.max_pool1d(x2,x2.size(2))
        
        x2=self.act(x2)
        x2=self.dropout2(x2)

        
        x3=self.c3(x)
        
        x3=self.bn3(x3)
        x3=torch.squeeze(x3,-1)
       
        x3=self.maxpool3(x3)
        # x3 = F.max_pool1d(x3,x3.size(2))
       
        x3=self.act(x3)
        x3=self.dropout2(x3)

        x = torch.cat((x1,x2,x3),1)
        x=self.flt(x)
        
        x = x.view(x.size(0),-1)

        x= self.fc1(x)
        x= self.bn4(x)
        x= self.act(x)
        x=self.dropout4(x)

        x= self.fc2(x)
        x= self.bn5(x)
        x= self.act(x)
        x=self.dropout5(x)
        
        x=self.fc_out1(x)
       
        # x = x.transpose(1, 0, 2, 3)
        # x.view(x.size(0), -1)
        # x = x.view(x.size(0),-1)
        x = self.sigm(x)
       
        return x

#### Обучение нейросети

In [None]:
model = Model(num_classes=1).to(device)
criterion = nn.BCELoss()#CrossEntropyLoss()#BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
# input to have size [batch_size, channels,  height, width]
#                           32       1         128    128
summary(model.to(device), ( 1, 128, 128))

In [None]:
model, history = train(model, criterion, optimizer,train_batch_gen, val_batch_gen , num_epochs=5) #train_loader, test_loader

In [None]:
# ручная проверка
for X_batch,y_batch in train_batch_gen:
            model.train(False)
            # Обучаемся на батче (одна "итерация" обучения нейросети)
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)          
            logits = model(X_batch)
            break

In [None]:
logits = logits.detach().cpu().numpy()
y_batch = y_batch.detach().cpu().numpy()
np.concatenate([y_batch,logits.round()], axis=1)

In [None]:
for X_batch,y_batch in val_batch_gen:
            model.train(False)
            # Обучаемся на батче (одна "итерация" обучения нейросети)
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)         
            logits = model(X_batch)
            break

In [None]:
logits = logits.detach().cpu().numpy()
y_batch = y_batch.detach().cpu().numpy()
np.concatenate([y_batch,logits.round()], axis=1)

## Работа с нейросетью (возраст)

#### Создание тензора

In [None]:
train_dataset2 = PeopleDataset(X_train1,y_train2,True)
test_dataset2 = PeopleDataset(X_test1,y_test2,True)

torch.manual_seed(42)
np.random.seed(42)

# train_dataset2, val_dataset2 = torch.utils.data.random_split(train_dataset2, [3696, 924],generator=torch.Generator().manual_seed(42))

#### Генератор батчей

In [None]:
# Инициализируем генераторы батчей
batch_size = 32

train_batch_gen2 = torch.utils.data.DataLoader(train_dataset2, batch_size=batch_size, shuffle=True)
val_batch_gen2 = torch.utils.data.DataLoader(test_dataset2, batch_size=batch_size, shuffle=True)
# test_batch_gen2 = torch.utils.data.DataLoader(test_dataset2, batch_size=batch_size, shuffle=False)

#### Объявление функций

In [None]:
def train2(model,criterion,optimizer,train_batch_gen,val_batch_gen,num_epochs=50):
    '''
    Функция для обучения модели и вывода лосса и метрики во время обучения.
    :param model: обучаемая модель
    :param criterion: функция потерь
    :param optimizer: метод оптимизации
    :param train_batch_gen: генератор батчей для обучения
    :param val_batch_gen: генератор батчей для валидации
    :param num_epochs: количество эпох
    :return: обученная модель
    :return: (dict) accuracy и loss на обучении и валидации ("история" обучения)
    '''


    history = defaultdict(lambda: defaultdict(list))

    for epoch in range(num_epochs):
        train_loss = 0
        train_acc = 0
        train_MAE = 0
        train_prec = 0
        train_rec = 0
        val_loss = 0
        val_acc = 0
        val_MAE = 0
        val_prec = 0
        val_rec = 0

        start_time = time.time()

        # Устанавливаем поведение dropout / batch_norm  в обучение
        model.train(True) 

        # На каждой "эпохе" делаем полный проход по данным
        for X_batch,y_batch in train_batch_gen:
            # Обучаемся на батче (одна "итерация" обучения нейросети)
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            
            logits = model(X_batch)
           
            loss = criterion(logits, y_batch.float().to(device))
            
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            
            train_loss += np.sum(loss.detach().cpu().numpy())
            #y_pred = logits.max(1)[1].detach().cpu().numpy()
            y_pred = logits.round().detach().cpu().numpy()
            y_true = y_batch.detach().cpu().numpy()
            #train_acc += np.mean(y_batch.cpu().numpy() == y_pred)
            train_MAE += np.mean(np.abs(y_true - y_pred))

        # Подсчитываем лоссы и сохраням в "историю"
        train_loss /= len(train_batch_gen)
        #train_acc /= len(train_batch_gen)
        train_MAE /= len(train_batch_gen) 
        history['loss']['train'].append(train_loss)
        history['acc']['train'].append(train_MAE)
        history['prec']['train'].append(train_prec)
        history['rec']['train'].append(train_rec)
        model.train(False)

        # Полный проход по валидации    
        for X_batch, y_batch in val_batch_gen:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            
            with torch.no_grad():
                logits = model(X_batch)
                
                loss = criterion(logits, y_batch.float().to(device))
                val_loss += np.sum(loss.detach().cpu().numpy())
                #y_pred = logits.max(1)[1].detach().cpu().numpy()#max(1)[1]
                y_pred = logits.round().detach().cpu().numpy()
                y_true = y_batch.detach().cpu().numpy()
                #val_acc += np.mean(y_batch.cpu().numpy() == y_pred)
                val_MAE += np.mean(np.abs(y_true - y_pred))
                
        # Подсчитываем лоссы и сохраням в "историю"
        val_loss /= len(val_batch_gen)
        #val_acc /= len(val_batch_gen)
        val_MAE /= len(val_batch_gen) 
        history['loss']['val'].append(val_loss)
        history['acc']['val'].append(val_MAE)
        history['prec']['val'].append(val_prec)
        history['rec']['val'].append(val_rec)

        IPython.display.clear_output()

        # Печатаем результаты после каждой эпохи
        print("Epoch {} of {} took {:.3f}s".format(
            epoch + 1, num_epochs, time.time() - start_time))
        print("  training loss (in-iteration): \t{:.6f}".format(train_loss))
        print("  validation loss (in-iteration): \t{:.6f}".format(val_loss))  
        print("  training MAE: \t\t\t{:.2f}".format(train_MAE))
        print("  validation MAE: \t\t\t{:.2f}".format(val_MAE))
         
    plot_learning_curves(history)
        
    return model, history

#### Объявление нейросети

In [None]:
#
class Model2(nn.Module):
    def __init__(self,num_classes=1,dropout_rate=0.2):
        super().__init__()

        self.c1 = torch.nn.Conv2d(in_channels=1, out_channels=2048, kernel_size=[3,128], stride=(1,1), padding=(1,0))
        self.bn1 = nn.BatchNorm2d(2048)
        self.maxpool1 = nn.MaxPool1d(128)
        self.act = torch.nn.ReLU()
        self.dropout1 = nn.Dropout(dropout_rate)
        
        self.c2 = torch.nn.Conv2d(in_channels=1, out_channels=2048, kernel_size=[5,128], stride=(1,1), padding=(2,0))       
        self.bn2 = nn.BatchNorm2d(2048)
        self.maxpool2 = nn.MaxPool1d(128)
        self.dropout2 = nn.Dropout(dropout_rate)

        self.c3 = torch.nn.Conv2d(in_channels=1, out_channels=2048, kernel_size=[7,128], stride=(1,1), padding=(3,0))   
        self.bn3 = nn.BatchNorm2d(2048)
        self.maxpool3 = nn.MaxPool1d(128)
        self.dropout3 = nn.Dropout(dropout_rate)

        self.flt = torch.nn.Flatten()
        self.fc1 = nn.Linear(16384,4096,bias=False)
        self.bn4 = nn.BatchNorm1d(4096)
        self.dropout4 = nn.Dropout(dropout_rate)

        self.fc2 = nn.Linear(4096,2048,bias=False)
        self.bn5 = nn.BatchNorm1d(2048)
        self.dropout5 = nn.Dropout(dropout_rate)

        self.fc_out1 = nn.Linear(2048,num_classes)
        self.softmax = nn.Softmax(dim=-1)


    def forward(self, x):
        # x=torch.unsqueeze(x,1)
        #print('1input    ',x.shape)
        #x1=self.c1(x)
        #print('1conv     ',x1.shape)
        #x1=self.bn1(x1)
        #x1=torch.squeeze(x1,-1)
        #print('1squeeze  ',x1.shape)
        #x1=self.maxpool1(x1)
        # x1 = F.max_pool1d(x1,x1.size(2))
        #print('1maxpool  ',x1.shape)
        #x1=self.act(x1)
        #x1=self.dropout1(x1)

        # x=torch.unsqueeze(x,1)
        #print('2input    ',x.shape)
        #x2=self.c2(x)
        #print('2conv     ',x2.shape)
        #x2=self.bn2(x2)
        #x2=torch.squeeze(x2,-1)
        #print('2squeeze  ',x2.shape)
        #x2=self.maxpool2(x2)
        # x2 = F.max_pool1d(x2,x2.size(2))
        #print('2maxpool  ',x2.shape)
        #x2=self.act(x2)
        #x2=self.dropout2(x2)

        # print('3input    ',x.shape)
        #x3=self.c3(x)
        # print('3conv     ',x3.shape)
        #x3=self.bn3(x3)
        #x3=torch.squeeze(x3,-1)
        # print('3squeeze  ',x3.shape)
        #x3=self.maxpool3(x3)
        # x3 = F.max_pool1d(x3,x3.size(2))
        #print('3maxpool  ',x3.shape)
        #x3=self.act(x3)
        #x3=self.dropout2(x3)

        #x = torch.cat((x1,x2,x3),1)
        x=self.flt(x)
        #print('flattern ',x.shape)
        #x = x.view(x.size(0),-1)

        x= self.fc1(x)
        x= self.bn4(x)
        x= self.act(x)
        x=self.dropout4(x)

        x= self.fc2(x)
        x= self.bn5(x)
        x= self.act(x)
        x=self.dropout5(x)
        #print('view     ',x.shape)
        x=self.fc_out1(x)
        #print('out      ',x.shape)
        # x = x.transpose(1, 0, 2, 3)
        # x.view(x.size(0), -1)
        # x = x.view(x.size(0),-1)
        #x = self.softmax(x)
        # print('end       ',x.shape)

        return x

#### Обучение нейросети

In [None]:
torch.cuda.empty_cache()

In [None]:
model2 = Model2(num_classes=1).to(device)
criterion2 = nn.MSELoss()#BCEWithLogitsLoss()CrossEntropyLoss()
optimizer2 = torch.optim.Adam(model2.parameters(),lr=0.001)
# input to have size [batch_size, channels,  height, width]
#                           32       1         128    128
summary(model2.to(device), ( 1, 128, 128))

In [None]:
model2, history = train2(model2, criterion2, optimizer2, train_batch_gen2, val_batch_gen2 , num_epochs=5)

In [None]:
for X_batch,y_batch in train_batch_gen2:
            model2.train(False)
            # Обучаемся на батче (одна "итерация" обучения нейросети)
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            logits = model2(X_batch)
            y_pred = logits.max(1)[1]
            #break

In [None]:
logits = logits.detach().cpu().numpy().reshape(12,1)
y_batch = y_batch.detach().cpu().numpy().reshape(12,1)
np.concatenate([y_batch,logits], axis=1)

In [None]:
for X_batch,y_batch in val_batch_gen2:
            # Обучаемся на батче (одна "итерация" обучения нейросети)
            model.train(False)
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)         
            logits = model2(X_batch)
            y_pred = logits.max(1)[1]
            #break

In [None]:
logits = logits.detach().cpu().numpy().reshape(16,1)
y_batch = y_batch.detach().cpu().numpy().reshape(16,1)
np.concatenate([y_batch,logits], axis=1)

## Проверка на внешних данных

#### Объявление функций

In [None]:
def get_mel_sample(data, model_sex, model_age, sr=16000, n_fft=1024, sec=2.04, fill_zero=False): 

  mel = mel_norm(get_mel(data,sr=sr,n_fft=n_fft, sec = sec))
  mel = torch.tensor(mel, dtype = torch.float32).to(device)
  mel = mel.unsqueeze(dim=0).unsqueeze(dim=0)
  #mel = mel.unsqueeze(dim=0)
  pred_sex = np.round(model_sex(mel)[0][0].detach().cpu().numpy())
  pred_age = np.sum(model_age(mel)[0][0].detach().cpu().numpy())
  if pred_sex == 0:
    pred_sex = 'M'
  else:
    pred_sex = 'F'
  return (pred_sex, pred_age)

In [None]:
# проверка
data, sr = librosa.load('/content/data/lisa/data/timit/raw/TIMIT/TRAIN/DR1/MCPM0/SA2.WAV', sr=sr, mono=True)
S3 = get_mel_sample(data, model, model2)
S3

In [None]:
df[df['SexID']=='MCPM0'][['Sex','age']]

In [None]:
data, sr = librosa.load('/content/haters-low-pitched-male-vocal-fx_104bpm_G_minor (1).wav', sr=sr, mono=True)

In [None]:
get_mel_sample(data, model, model2)

In [None]:
data, sr = librosa.load('/content/flowing-smooth-female-vocal-singing_113bpm.wav', sr=sr, mono=True)

In [None]:
get_mel_sample(data, model, model2)

##Еще

In [None]:
# plt.figure(figsize=(12, 5))

# plt.subplot(1, 2, 1)
# plt.plot(train_loss_list, label='Train')
# plt.plot(test_loss_list, label='Test')
# plt.xlabel('Epoch')
# plt.title('Loss')
# plt.legend()

# plt.subplot(1, 2, 2)
# plt.plot(train_accuracy_list, label='Train')
# plt.plot(test_accuracy_list, label='Test')
# plt.xlabel('Epoch')
# plt.title('Accuracy')
# plt.legend()


# plt.show()

In [None]:
#источник
#https://kubilaybozak.medium.com/record-audio-from-your-microphone-in-colab-colab-%C3%BCzerinden-mikrofon-ile-ses-kayd%C4%B1-alma-bfa56013624e
!pip install ffmpeg-python

In [None]:
"""
To write this piece of code I took inspiration/code from a lot of places.
It was late night, so I'm not sure how much I created or just copied o.O
Here are some of the possible references:
https://blog.addpipe.com/recording-audio-in-the-browser-using-pure-html5-and-minimal-javascript/
https://stackoverflow.com/a/18650249
https://hacks.mozilla.org/2014/06/easy-audio-capture-with-the-mediarecorder-api/
https://air.ghost.io/recording-to-an-audio-file-using-html5-and-js/
https://stackoverflow.com/a/49019356
"""
from IPython.display import HTML, Audio
from google.colab.output import eval_js
from base64 import b64decode
import numpy as np
from scipy.io.wavfile import read as wav_read
import io
import ffmpeg

AUDIO_HTML = """
<script>
var my_div = document.createElement("DIV");
var my_p = document.createElement("P");
var my_btn = document.createElement("BUTTON");
var t = document.createTextNode("Press to start recording");
my_btn.appendChild(t);
//my_p.appendChild(my_btn);
my_div.appendChild(my_btn);
document.body.appendChild(my_div);
var base64data = 0;
var reader;
var recorder, gumStream;
var recordButton = my_btn;
var handleSuccess = function(stream) {
  gumStream = stream;
  var options = {
    //bitsPerSecond: 8000, //chrome seems to ignore, always 48k
    mimeType : 'audio/webm;codecs=opus'
    //mimeType : 'audio/webm;codecs=pcm'
  };            
  //recorder = new MediaRecorder(stream, options);
  recorder = new MediaRecorder(stream);
  recorder.ondataavailable = function(e) {            
    var url = URL.createObjectURL(e.data);
    var preview = document.createElement('audio');
    preview.controls = true;
    preview.src = url;
    document.body.appendChild(preview);
    reader = new FileReader();
    reader.readAsDataURL(e.data); 
    reader.onloadend = function() {
      base64data = reader.result;
      //console.log("Inside FileReader:" + base64data);
    }
  };
  recorder.start();
  };
recordButton.innerText = "Recording... press to stop";
navigator.mediaDevices.getUserMedia({audio: true}).then(handleSuccess);
function toggleRecording() {
  if (recorder && recorder.state == "recording") {
      recorder.stop();
      gumStream.getAudioTracks()[0].stop();
      recordButton.innerText = "Saving the recording... pls wait!"
  }
}
// https://stackoverflow.com/a/951057
function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}
var data = new Promise(resolve=>{
//recordButton.addEventListener("click", toggleRecording);
recordButton.onclick = ()=>{
toggleRecording()
sleep(2000).then(() => {
  // wait 2000ms for the data to be available...
  // ideally this should use something like await...
  //console.log("Inside data:" + base64data)
  resolve(base64data.toString())
});
}
});
      
</script>
"""

def get_audio():
  display(HTML(AUDIO_HTML))
  data = eval_js("data")
  binary = b64decode(data.split(',')[1])
  
  process = (ffmpeg
    .input('pipe:0')
    .output('pipe:1', format='wav')
    .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True, quiet=True, overwrite_output=True)
  )
  output, err = process.communicate(input=binary)
  
  riff_chunk_size = len(output) - 8
  # Break up the chunk size into four bytes, held in b.
  q = riff_chunk_size
  b = []
  for i in range(4):
      q, r = divmod(q, 256)
      b.append(r)

  # Replace bytes 4:8 in proc.stdout with the actual size of the RIFF chunk.
  riff = output[:4] + bytes(b) + output[8:]

  sr, audio = wav_read(io.BytesIO(riff))

  return audio, sr