In [None]:
!tar -xvf /content/drive/MyDrive/L/genres.tar

In [None]:
import json
import os
import math
import librosa

DATASET_PATH = "/content/genres/"
JSON_PATH = "data_10.json"
SAMPLE_RATE = 22050
TRACK_DURATION = 30 # measured in seconds
SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION


def save_mfcc(dataset_path, json_path, num_mfcc=13, n_fft=2048, hop_length=512, num_segments=5):
    """Extracts MFCCs from music dataset and saves them into a json file along witgh genre labels.
        :param dataset_path (str): Path to dataset
        :param json_path (str): Path to json file used to save MFCCs
        :param num_mfcc (int): Number of coefficients to extract
        :param n_fft (int): Interval we consider to apply FFT. Measured in # of samples
        :param hop_length (int): Sliding window for FFT. Measured in # of samples
        :param: num_segments (int): Number of segments we want to divide sample tracks into
        :return:
        """

    # dictionary to store mapping, labels, and MFCCs
    data = {
        "mapping": [],
        "labels": [],
        "mfcc": []
    }

    samples_per_segment = int(SAMPLES_PER_TRACK / num_segments)
    num_mfcc_vectors_per_segment = math.ceil(samples_per_segment / hop_length)
    print
    # loop through all genre sub-folder
    print([key[0] for key in os.walk(dataset_path)])
    for i, (dirpath, dirnames, filenames) in enumerate(os.walk(dataset_path)):

        # ensure we're processing a genre sub-folder level
        if dirpath is not dataset_path:

            # save genre label (i.e., sub-folder name) in the mapping
            semantic_label = dirpath.split("/")[-1]
            data["mapping"].append(semantic_label)
            print("\nProcessing: {}".format(semantic_label))

            # process all audio files in genre sub-dir
            for f in filenames:

		# load audio file
                file_path = os.path.join(dirpath, f)
                signal, sample_rate = librosa.load(file_path, sr=SAMPLE_RATE)

                # process all segments of audio file
                for d in range(num_segments):

                    # calculate start and finish sample for current segment
                    start = samples_per_segment * d
                    finish = start + samples_per_segment

                    # extract mfcc
                    mfcc = librosa.feature.mfcc(signal[start:finish], sample_rate, n_mfcc=num_mfcc, n_fft=n_fft, hop_length=hop_length)
                    mfcc = mfcc.T

                    # store only mfcc feature with expected number of vectors
                    if len(mfcc) == num_mfcc_vectors_per_segment:
                        data["mfcc"].append(mfcc.tolist())
                        data["labels"].append(i-1)

    # save MFCCs to json file
    with open(json_path, "w") as fp:
        json.dump(data, fp, indent=4)
        
        

# save_mfcc(DATASET_PATH, JSON_PATH, num_segments=10)


In [None]:
import json
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow.keras as keras
import matplotlib.pyplot as plt
import os
import shutil
from tqdm.notebook import tqdm
from IPython.display import Audio 
from IPython.core.display import display
import librosa
import gzip
import shutil
import torch.nn.functional as F
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset

import torchvision.transforms as transforms

from sklearn.model_selection import train_test_split

from IPython.display import clear_output
import seaborn as sns
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
%matplotlib inline
import torch.nn as nn
import numpy as np
import scipy.spatial.distance as ds
minicosdistance = 0
miniindex = 0




DATA_PATH = "/content/data_10.json"


def load_data(data_path):
    """Loads training dataset from json file.
        :param data_path (str): Path to json file containing data
        :return X (ndarray): Inputs
        :return y (ndarray): Targets
    """

    with open(data_path, "r") as fp:
        data = json.load(fp)

    X = np.array(data["mfcc"])
    y = np.array(data["labels"])
    print(np.unique(y))
    return X, y


def plot_history(history):
    """Plots accuracy/loss for training/validation set as a function of the epochs
        :param history: Training history of model
        :return:
    """

    fig, axs = plt.subplots(2)

    # create accuracy sublpot
    axs[0].plot(history.history["accuracy"], label="train accuracy")
    axs[0].plot(history.history["val_accuracy"], label="test accuracy")
    axs[0].set_ylabel("Accuracy")
    axs[0].legend(loc="lower right")
    axs[0].set_title("Accuracy eval")

    # create error sublpot
    axs[1].plot(history.history["loss"], label="train error")
    axs[1].plot(history.history["val_loss"], label="test error")
    axs[1].set_ylabel("Error")
    axs[1].set_xlabel("Epoch")
    axs[1].legend(loc="upper right")
    axs[1].set_title("Error eval")

    plt.show()


def prepare_datasets(test_size, validation_size):
    """Loads data and splits it into train, validation and test sets.
    :param test_size (float): Value in [0, 1] indicating percentage of data set to allocate to test split
    :param validation_size (float): Value in [0, 1] indicating percentage of train set to allocate to validation split
    :return X_train (ndarray): Input training set
    :return X_validation (ndarray): Input validation set
    :return X_test (ndarray): Input test set
    :return y_train (ndarray): Target training set
    :return y_validation (ndarray): Target validation set
    :return y_test (ndarray): Target test set
    """

    # load data
    X, y = load_data(DATA_PATH)
    
    # create train, validation and test split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size)
    X_train, X_validation, y_train, y_validation = train_test_split(X_train, y_train, test_size=validation_size)

    return X_train, X_validation, X_test, y_train, y_validation, y_test

# class Net(nn.Module):
#   def __init__(self):
#       super(Net, self).__init__()
#       self.k = nn.LSTM(13, 64, num_layers=2, batch_first=True)
#       self.r = nn.Linear(128, 64)
#       self.r2 = nn.Linear(64, 10)
#       self.dropout = nn.Dropout(0.3)


#   def forward(self, x):
#       x = self.k(x)[1][0]
#       x = torch.cat((x[0], x[1]), 1)
#       x = F.relu(self.r(x))
#       x = self.dropout(x)
#       x = self.r2(x)
#       return x

class Net(nn.Module):
  def __init__(self):
      super(Net, self).__init__()
      self.features = nn.Sequential(
            nn.Conv1d(13, 64, kernel_size=5, stride=4, padding=2),
            nn.ReLU(inplace=True),
            # nn.MaxPool1d(kernel_size=3, stride=2),
            nn.Conv1d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            # nn.MaxPool1d(kernel_size=3, stride=2),
            nn.Conv1d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv1d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv1d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=3, stride=2),
            )
      self.avgpool = nn.AdaptiveAvgPool1d((6, 6))
      self.classifier = nn.Sequential(
          nn.Dropout(0.5),
          nn.Linear(256, 256*3),
          nn.ReLU(inplace=True),
          nn.Dropout(),
          nn.Linear(256*3, 256*3),
          nn.ReLU(inplace=True),
          nn.Linear(256*3, 10),
      )


  def forward(self, x):
        x = x.transpose(2, 1)
        # print(x.shape)
        x = self.features(x)
        # print(x.shape)
        x = torch.mean(x, dim=2)
        # print(x.shape)
        # x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x


def build_model(input_shape):
    """Generates RNN-LSTM model
    :param input_shape (tuple): Shape of input set
    :return model: RNN-LSTM model
    """



    # build network topology
    h = Net()
    
    return h



    

# get train, validation, test splits
X_train, X_validation, X_test, y_train, y_validation, y_test = prepare_datasets(0.25, 0.2)
# X_train = np.resize(X_train, (1689, 1, 5997))
# print(X_train.shape)
# X_train = transform(X_train)
# print(y_train)
y_train = np.array(y_train)
# print(type(y_train))
# print(y_train.shape, X_train.shape)
# print(y_train, X_train)
# y_train = np.concatenate(y_train, 0)
# y_train = np.resize(y_train, (5997, -1))
# y_train = transform(y_train)
# y_train = transform(y_train)
# y_train = np.resize(y_train, (1689, 5997, 1))
# print(y_train)
# y_train = transform(y_train)
dts = TensorDataset(torch.Tensor(X_train), torch.Tensor(y_train))
dtl = DataLoader(dts, batch_size=32, shuffle=True)
# create network
input_shape = (X_train.shape[1], X_train.shape[2]) # 130, 13

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
model = build_model(input_shape).to(DEVICE)
# функция ошибки -- бинарная кросс-энтропия
criterion = nn.CrossEntropyLoss().to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=0.001)   


for epoch in range(30):  # loop over the dataset multiple times
  accu = 0
  count = 0
  allelems = 0
  running_loss = 0.0
  for i, data in enumerate(dtl):
      # get the inputs; data is a list of [inputs, labels]
      inputs, labels = data
      inputs = inputs.to(DEVICE)
      labels = labels.to(DEVICE)
      # zero the parameter gradients
      optimizer.zero_grad()
      # forward + backward + optimize
      outputs = model(inputs)
      outputs = outputs.to(DEVICE)
      labels = labels.long()
      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()
      # print statistics
      running_loss += loss.item()
      outputs = outputs.tolist()
      labels = labels.tolist()
      for i in range(len(outputs)):
        allelems += 1
        f = outputs[i].index(max(outputs[i])) 
        if f == labels[i]:
          count += 1
      if i % 999 == 0:    # print every 2000 mini-batches

          count = 0

          for i in range(len(outputs)):
            f = outputs[i].index(max(outputs[i])) 
            if f == labels[i]:
              count += 1
          print(count/30*100)
          print('[%d, %5d] loss: %.3f' %
                (epoch + 1, i + 1, running_loss / 2000))
          running_loss = 0.0
  
  print(count / allelems * 100)

print('Finished Training')



In [None]:
torch.save(model, '/content/drive/MyDrive/L/model')

In [None]:
model = torch.load('/content/drive/MyDrive/L/mode1')
model.train(False)

Net(
  (features): Sequential(
    (0): Conv1d(13, 64, kernel_size=(5,), stride=(4,), padding=(2,))
    (1): ReLU(inplace=True)
    (2): Conv1d(64, 192, kernel_size=(5,), stride=(1,), padding=(2,))
    (3): ReLU(inplace=True)
    (4): Conv1d(192, 384, kernel_size=(3,), stride=(1,), padding=(1,))
    (5): ReLU(inplace=True)
    (6): Conv1d(384, 256, kernel_size=(3,), stride=(1,), padding=(1,))
    (7): ReLU(inplace=True)
    (8): Conv1d(256, 256, kernel_size=(3,), stride=(1,), padding=(1,))
    (9): ReLU(inplace=True)
    (10): MaxPool1d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (avgpool): AdaptiveAvgPool1d(output_size=(6, 6))
  (classifier): Sequential(
    (0): Dropout(p=0.5, inplace=False)
    (1): Linear(in_features=256, out_features=768, bias=True)
    (2): ReLU(inplace=True)
    (3): Dropout(p=0.5, inplace=False)
    (4): Linear(in_features=768, out_features=768, bias=True)
    (5): ReLU(inplace=True)
    (6): Linear(in_features=768, out_features=10, b

In [None]:
count = 0
allelems = 0
x = torch.Tensor(X_test).to(DEVICE)
x = model(x)
x = x.tolist()
y = y_test.tolist()
for i in range(len(x)):
        allelems += 1
        f = x[i].index(max(x[i])) 
        if f == y[i]:
          count += 1
print(count/allelems * 100)

In [None]:
!pip install telebot

In [None]:
# accuracy test
count = 0
allelems = 0
x = torch.Tensor(X_test).to(DEVICE)
x = model(x)
x = x.tolist()
y = y_test.tolist()
for i in range(len(x)):
        allelems += 1
        f = x[i].index(max(x[i])) 
        if f == y[i]:
          count += 1
print(count/allelems * 100)