In [None]:
# Импорты
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
import matplotlib
from pydub import AudioSegment
import python_speech_features as psf
from tensorflow.data import Dataset
from sklearn.model_selection import train_test_split
import six
import math
import logging
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, callbacks, models, callbacks
from einops.layers.tensorflow import Rearrange

%matplotlib inline
logging.basicConfig(level=logging.INFO)

In [None]:
# Константы
DATA_PATH = '../new-data-without_silence'
TRAIN_DATA_PATH = f'{DATA_PATH}/train'
TASK_DATA_PATH = f'{DATA_PATH}/test'
TRAIN_FILENAME = f'{DATA_PATH}/train_gt.csv'
TASK_FILENAME = f'{DATA_PATH}/test.csv'

In [None]:
# Загрзка данных
df = pd.read_csv(TRAIN_FILENAME, header=None, names=['audio', 'label'])
df['audio'] = TRAIN_DATA_PATH + "/" + df['audio']

x, y = df['audio'], df['label'].to_numpy().reshape(-1, 1)

weight_for_0 = (1 / (len(y) - np.sum(y))) * (len(y) / 2.0)
weight_for_1 = (1 / np.sum(y)) * (len(y) / 2.0)
class_weight = {0: weight_for_0, 1: weight_for_1}
print(class_weight)

In [None]:
# # Обработка аудио

# def preprocess_function(audio_path):
#     segment = AudioSegment.from_mp3(audio_path)

#     features, energy = psf.fbank(
#         np.array(segment.get_array_of_samples()), segment.frame_rate
#     )

#     return features

# features_list = []

# for audio in tqdm(x):
#     features_list.append(preprocess_function(audio))

# import tensorflow as tf
# X = tf.keras.utils.pad_sequences(features_list, padding='post', maxlen=4000)

# X = X.reshape(-1, 1, 26, 4000)

# np.save("train", X)

In [None]:
# Загрзка предпросчитанных (15-20 минут долго ждать, поэтому лучше так)
X = np.load('train.npy').astype(float)

In [None]:
# Нормализация данных по лейблам ~50/50
diff = len(y) - sum(y) - sum(y)

indexes = []

for ind, i in enumerate(y):
    if i == 0:
        diff -= 1
        indexes.append(ind)
        if diff == 0:
            break
X = np.delete(X, indexes, axis=0)
y = np.delete(y, indexes, axis=0)

In [None]:
# Проверка размерностей
X.shape, y.shape

In [None]:
# Разделение на train/test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=13)

print('Относительное количество положительный записей:')
print(f'Вся выборка: {sum(y) / len(y)}')
print(f'Обучающая выборка: {sum(y_train) / len(y_train)}')
print(f'Валидационная выборка: {sum(y_test) / len(y_test)}')

In [None]:
# Заготовки
# resource: https://github.com/ashishpatel26/Vision-Transformer-Keras-Tensorflow-Pytorch-Examples/blob/main/Vision_Transformer_with_tf2.ipynb

def gelu(x):
    """Gaussian Error Linear Unit.
    This is a smoother version of the RELU.
    Original paper: https://arxiv.org/abs/1606.08415
    Args:
        x: float Tensor to perform activation.
    Returns:
        `x` with the GELU activation applied.
    """
    cdf = 0.5 * (1.0 + tf.tanh(
        (math.sqrt(2 / math.pi) * (x + 0.044715 * tf.pow(x, 3)))))
    return x * cdf


def get_activation(identifier):
    """Maps a identifier to a Python function, e.g., "relu" => `tf.nn.relu`.
    It checks string first and if it is one of customized activation not in TF,
    the corresponding activation will be returned. For non-customized activation
    names and callable identifiers, always fallback to tf.keras.activations.get.
    Args:
        identifier: String name of the activation function or callable.
    Returns:
        A Python function corresponding to the activation function.
    """
    if isinstance(identifier, six.string_types):
        name_to_fn = {"gelu": gelu}
        identifier = str(identifier).lower()
        if identifier in name_to_fn:
            return tf.keras.activations.get(name_to_fn[identifier])
    return tf.keras.activations.get(identifier)


class Residual(tf.keras.Model):

    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def call(self, x):
        return self.fn(x) + x


class PreNorm(tf.keras.Model):

    def __init__(self, dim, fn):
        super().__init__()
        self.norm = layers.LayerNormalization(epsilon=1e-5)
        self.fn = fn

    def call(self, x):
        return self.fn(self.norm(x))


class FeedForward(tf.keras.Model):

    def __init__(self, dim, hidden_dim):
        super().__init__()
        self.net = models.Sequential([layers.Dense(hidden_dim, activation=get_activation('gelu')),
                                        layers.Dense(dim)])

    def call(self, x):
        return self.net(x)

class Attention(tf.keras.Model):

    def __init__(self, dim, heads = 8):
        super().__init__()
        self.heads = heads
        self.scale = dim ** -0.5

        self.to_qkv = layers.Dense(dim * 3, use_bias=False)
        self.to_out = layers.Dense(dim)

        self.rearrange_qkv = Rearrange('b n (qkv h d) -> qkv b h n d', qkv = 3, h = self.heads)
        self.rearrange_out = Rearrange('b h n d -> b n (h d)')

    def call(self, x):
        qkv = self.to_qkv(x)
        qkv = self.rearrange_qkv(qkv)
        q = qkv[0]
        k = qkv[1]
        v = qkv[2]

        dots = tf.einsum('bhid,bhjd->bhij', q, k) * self.scale
        attn = tf.nn.softmax(dots,axis=-1)

        out = tf.einsum('bhij,bhjd->bhid', attn, v)
        out = self.rearrange_out(out)
        out =  self.to_out(out)
        return out

class Transformer(tf.keras.Model):
    
    def __init__(self, dim, depth, heads, mlp_dim):
        super().__init__()
        layers = []
        for _ in range(depth):
            layers.extend([
                Residual(PreNorm(dim, Attention(dim, heads = heads))),
                Residual(PreNorm(dim, FeedForward(dim, mlp_dim)))
            ])
        self.net = tf.keras.Sequential(layers)

    def call(self, x):
        return self.net(x)

In [None]:
# Класс модели
class ViTButForAIIJC(tf.keras.Model):
    def __init__(self, *, image_size: tuple[int, int], patch_size: tuple[int, int], num_classes: int,
                 dim: int, depth: int, heads: int, mlp_dim: int, channels: int):
        """Visual Transformer model for non-square images

        Args:
            image_size (tuple[int, int]): input vector sizes
            patch_size (tuple[int, int]): path sizes
            num_classes (int): number of classes and output shape
            dim (int): embedding dims
            depth (int): depth of transformer itself
            heads (int): number of transforomer heads
            mlp_dim (int): number of neurons in classification nn
            channels (int): number channels in input
        """
        super().__init__()
        assert image_size[0] % patch_size[0] == 0 and image_size[1] % patch_size[1] == 0, 'image dimensions must be divisible by the patch size'
        num_patches = (image_size[0] // patch_size[0]) * (image_size[1] // patch_size[1])
        patch_dim = channels * patch_size[0] * patch_size[1]

        self.patch_size = patch_size
        self.dim = dim
        self.pos_embedding = self.add_weight(name="position_embeddings",
                                             shape=[num_patches + 1, dim],
                                             initializer=tf.keras.initializers.RandomNormal(),
                                             dtype=tf.float32)
        self.patch_to_embedding = layers.Dense(dim)
        self.cls_token = self.add_weight(name="cls_token",
                                         shape=[1,
                                                1,
                                                dim],
                                         initializer=tf.keras.initializers.RandomNormal(),
                                         dtype=tf.float32)

        self.rearrange = Rearrange('b c (w p1) (l p2) -> b (w l) (p1 p2 c)', p1=patch_size[0], p2=patch_size[1])

        self.transformer = Transformer(dim, depth, heads, mlp_dim)

        self.to_cls_token = tf.identity

        self.mlp_head = models.Sequential([
            layers.Dense(mlp_dim, activation=get_activation('gelu')),
            layers.Dense(num_classes, activation=('sigmoid' if num_classes == 1 else 'softmax'))
        ])
        self.mlp_head.name = 'classification_head'

    @tf.function
    def call(self, fbank):
        shapes = tf.shape(fbank)

        x = self.rearrange(fbank)

        x = self.patch_to_embedding(x)

        cls_tokens = tf.broadcast_to(self.cls_token,(shapes[0],1,self.dim))

        x = tf.concat((cls_tokens, x), axis=1)
        x += self.pos_embedding
        x = self.transformer(x)

        x = self.to_cls_token(x[:, 0])
        return self.mlp_head(x)

In [None]:
# Проверка шейпов (читай)
X[0].shape # (channels, image_size[0], image_size[1])
# | | |
# V V V

In [None]:
# Конфиг для модели
model = ViTButForAIIJC(
    # Лучше не менять
    image_size=(26, 4000),
    patch_size=(2, 100),
    num_classes=1,
    channels=1,

    # Можно крутить
    dim=256,
    depth=4,
    heads=4,
    mlp_dim=1024,
)

In [None]:
# Конфиг для обучения
epochs = 50
batch_size=8
optim = 'adam' # adam/sgd
lr = 2e-5
momentum = 0.8 # sgd only
# early stop
es_delta = 1e-3
es_patience = 20
# reducing lr
red_lr_factor = 0.5
red_lr_patience = 10

In [None]:
# Применяем конфиг
loss_fn = tf.keras.losses.BinaryCrossentropy()
if optim == 'adam':
    optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
else:
    optimizer = tf.keras.optimizers.SGD(learning_rate=lr, momentum=momentum)

custom_callbacks = [
    callbacks.EarlyStopping(
        monitor='val_loss',
        min_delta=es_delta,
        patience=es_patience,
        verbose=1,
        mode='auto',
        baseline=None,
        restore_best_weights=True
    ),
    callbacks.ModelCheckpoint(
        filepath='./saves/rnn_mfcc.weights.h5',
        monitor='val_f1_score',
        mode='max',
        save_best_only=True,
        save_weights_only=True,
        verbose=1
    ),
    callbacks.ReduceLROnPlateau(monitor='val_loss', factor=red_lr_factor, patience=red_lr_patience, verbose=1)
]

model.compile(
    optimizer=optimizer,
    loss=loss_fn,
    metrics=[
        'accuracy', 
        tf.keras.metrics.F1Score(average='macro', threshold=0.5),
        tf.keras.metrics.Precision(),  # correct 1 / all predicted as 1
        tf.keras.metrics.Recall()  # correct 1 / all 1
    ]
)

In [None]:
# Лес гоу 🥵
hist = model.fit(
    X_train, y_train,
    epochs=epochs,
    callbacks=custom_callbacks,
    verbose=1,
    validation_data=(X_test, y_test),
    batch_size=batch_size
#    class_weight=class_weight  # У нас 50/50
)

In [None]:
# Как прошли уроки?
hist_df = pd.DataFrame(hist.history).drop(columns=['learning_rate'])
hist_df.plot(figsize=(8,5))
plt.show()

In [None]:
# Саммари
model.summary()

In [None]:
# # Загрзка весов (можно закоментить, если видно, что лучшая глупая - много одинаковых классов)
# model.load_weights('./saves/rnn_mfcc.weights.h5')

In [None]:
# Финальная оценочка
print('На валидационной выборке')
model.evaluate(X_test, y_test, return_dict=True)
print('На всех данных')
model.evaluate(X, y, return_dict=True)

## Task

In [None]:
# task = pd.read_csv(TASK_FILENAME, header=None, names=['audio', 'label'])
# task['audio'] = TASK_DATA_PATH + "/" + task['audio']

# task_x = task['audio']

In [None]:
# task_features_list = []

# for audio in tqdm(task_x):
#     task_features_list.append(preprocess_function(audio))

In [None]:
# task_X = tf.keras.utils.pad_sequences(task_features_list, padding='post', maxlen=4000)

In [None]:
# np.save("test", task_X)

In [None]:
task_X = np.load('test.npy').astype(float)

In [None]:
task_y = model.predict(task_X)
task_y = np.round(task_y).astype(int)

In [None]:
np.sum(task_y) / len(task_y)

In [None]:
submit = pd.read_csv(TASK_FILENAME, header=None, names=['audio', 'label'])
submit['label'] = task_y
submit.to_csv('submit.csv', header=False, index=False)