In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import soundfile as sf
import pandas as pd
import numpy as np
import os
import librosa
import torch
from tqdm import tqdm
import pickle
from IPython.display import FileLink
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score
import torch.utils.data as data_utils
import torch.optim as optim
import random
from IPython.display import Audio
import torchaudio.transforms as T

In [None]:
import numpy as np
import scipy.signal as sg
import matplotlib.pyplot as plt

# Filter Function

In [None]:
def get_energy(signal, sr, window_size=1024, hop_size=512, freq_range=(300, 3000)):
    frames = sg.windows.hann(window_size) * np.array(
        [signal[i : i + window_size] for i in range(0, len(signal) - window_size, hop_size)]
    )

    fft_frames = np.fft.rfft(frames, axis=1)
    freqs = np.fft.rfftfreq(window_size, 1/sr)
    
    amplitude_spectrum = np.abs(fft_frames)

    freq_mask = (freqs >= freq_range[0]) & (freqs <= freq_range[1])

    energy = np.sum(amplitude_spectrum[:, freq_mask], axis=1)
    return energy

In [None]:
def threshold_filter(signal, sr, window_size=1024, hop_size=512, freq_range=(300, 3000), threshold=1.0, draw=False, threshold_is_mean=False):
    """
    Пороговый фильтр на основе энергии спектра в заданном диапазоне частот.
    """
    frames = []
    hann_window = sg.windows.hann(window_size)
    for i in range(0, len(signal) - window_size, hop_size):
        frame = signal[i:i + window_size] * hann_window
        frames.append(frame)
    frames = np.array(frames)

    fft_frames = np.fft.rfft(frames, axis=1)
    freqs = np.fft.rfftfreq(window_size, 1/sr)

    amplitude_spectrum = np.abs(fft_frames)

    freq_mask = (freqs >= freq_range[0]) & (freqs <= freq_range[1])

    energy = np.sum(amplitude_spectrum[:, freq_mask], axis=1)

    if threshold_is_mean:
        threshold = np.mean(energy)

    if draw:
        plt.figure(figsize=(10, 5))
        plt.plot(energy, linestyle="-", color="b")
        plt.axhline(y=threshold, color='r', linestyle='--', label="Порог")
        plt.xlabel("Окно")
        plt.ylabel("Энергия")
        plt.title("График энергии аудио")
        plt.grid(True)
        plt.legend()
        plt.show()

    mask = (energy >= threshold).astype(int)

    return mask


def apply_mask(signal, sr, mask, window_size=1024, hop_size=512, remove_silence=True):
    hann_window = sg.windows.hann(window_size)
    num_samples = len(signal)
    
    if remove_silence:
        output = np.zeros(num_samples)
        norm = np.zeros(num_samples)
        index = 0

        for m in mask:
            if index + window_size > num_samples:
                break
            if m == 1:
                output[index:index + window_size] += signal[index:index + window_size] * hann_window
                norm[index:index + window_size] += hann_window
            index += hop_size

        norm[norm == 0] = 1
        return output / norm
    else:
        filtered_signal = np.zeros_like(signal)
        window_sum = np.zeros_like(signal)
        index = 0

        for i in range(len(mask)):
            if index + window_size > len(signal):
                break
            if mask[i] == 1:
                filtered_signal[index:index + window_size] += signal[index:index + window_size] * hann_window
                window_sum[index:index + window_size] += hann_window
            index += hop_size

        window_sum[window_sum == 0] = 1
        filtered_signal /= window_sum

        return filtered_signal

def apply_mask_last(signal, sr, mask, window_size=1024, hop_size=512, remove_silence=True):
    """
    Применяет маску к аудиосигналу, удаляя участки с низкой энергией или оставляя только немаскированные окна.
    
    Параметры:
    - signal: 1D массив, исходный аудиосигнал
    - sr: частота дискретизации (Hz)
    - mask: список 0 и 1, где 1 — сохранить окно, 0 — убрать
    - window_size: размер окна (в сэмплах)
    - hop_size: шаг окна (в сэмплах)
    - remove_silence: если True, сохраняет только немаскированные окна без пауз

    Возвращает:
    - filtered_signal: сигнал после фильтрации
    """
    filtered_signal = np.zeros_like(signal) if not remove_silence else []
    window_sum = np.zeros_like(signal) if not remove_silence else None
    hann_window = sg.windows.hann(window_size)
    
    index = 0
    for i in range(len(mask)):
        if index + window_size > len(signal):
            break
        
        if mask[i] == 1:
            if remove_silence:
                filtered_signal.append(signal[index : index + window_size] * hann_window)
            else:
                filtered_signal[index : index + window_size] += signal[index : index + window_size] * hann_window
                window_sum[index : index + window_size] += hann_window
        
        index += hop_size
    
    if remove_silence:
        filtered_signal = np.concatenate(filtered_signal) if filtered_signal else np.array([])
    else:
        window_sum[window_sum == 0] = 1
        filtered_signal /= window_sum
    
    return filtered_signal

# Data

In [None]:
df = pd.read_csv('/kaggle/input/lw1-acc/train.csv')

In [None]:
way = '/kaggle/input/lw1-acc/audio_train/train/'

In [None]:
df['way'] = way + df['fname']

In [None]:
df['label_id'] = pd.factorize(df['label'])[0]

In [None]:
len(df['label_id'].unique())

In [None]:
unique_labels = df['label'].unique().tolist()
unique_labels

## Get threshold (ineffective)

In [None]:
def get_threshold(path):
    signal, sr = sf.read(path)
    return np.mean(get_energy(signal, sr))

In [None]:
# df['threshold'] = df['way'].apply(get_threshold)

# Get features

In [None]:
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification
from transformers import AutoProcessor, ASTModel, ASTFeatureExtractor, AutoFeatureExtractor

In [None]:
train, test = train_test_split(df, test_size=0.01, random_state=42, stratify=df['label'])

## Simple sollution (ineffective)

In [None]:
def get_features(data):
    features = {'spectr':[], 'mel':[], 'mfcc':[], 'labels':[]}
    for i in tqdm(range(len(data))):
        info = dict(data.iloc[i])
        curr_audio, file_sr = sf.read(info['way'])
        n_fft = min(2048, len(curr_audio))
        features['spectr'].append(np.abs(np.fft.rfft(curr_audio, n=n_fft))),
        features['mel'].append(librosa.feature.melspectrogram(y=curr_audio, sr=file_sr, n_mels=64, n_fft=n_fft))
        features['mfcc'].append(librosa.feature.mfcc(y=curr_audio, sr=file_sr, n_mfcc=13))
        features['labels'].append(info['label_id'])
    return features

## Add augmentation (ineffective)

In [None]:
def time_stretch(curr_audio, rate=0.5):
    augmented_audio = librosa.effects.time_stretch(curr_audio, rate=rate)
    return augmented_audio

def get_features2(data):
    features = {'spectr':[], 'mel':[], 'mfcc':[], 'labels':[]}
    for i in tqdm(range(len(data))):
        info = dict(data.iloc[i])
        first_audio, file_sr = sf.read(info['way'])
        threshold = info['threshold']
        mask = threshold_filter(first_audio, file_sr, threshold=threshold)
        curr_audio = apply_mask(first_audio, file_sr, mask)
        while len(curr_audio) == 0:
            threshold -= 2
            mask = threshold_filter(first_audio, file_sr, threshold=threshold)
            curr_audio = apply_mask(first_audio, file_sr, mask)
            curr_audio = apply_mask_last(first_audio, file_sr, mask)
            curr_audio = time_stretch(curr_audio)
        n_fft = min(2048, len(curr_audio))
        features['spectr'].append(np.abs(np.fft.rfft(curr_audio, n=n_fft))),
        features['mel'].append(librosa.feature.melspectrogram(y=curr_audio, sr=file_sr, n_mels=16, n_fft=n_fft))
        features['mfcc'].append(librosa.feature.mfcc(y=curr_audio, sr=file_sr, n_mfcc=13))
        features['labels'].append(info['label_id'])
    return features

## Add pretrain output feature extraction (ineffective)

In [None]:
# extractor = AutoFeatureExtractor.from_pretrained("bookbot/distil-ast-audioset")
# model_extractor = AutoModelForAudioClassification.from_pretrained("bookbot/distil-ast-audioset") 
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model_extractor.to(device)
# def get_features3(data):
#     features = {'spectr':[], 'mel':[], 'mfcc':[], 'labels':[]}
#     for i in tqdm(range(len(data))):
#         info = dict(data.iloc[i])
#         first_audio, file_sr = sf.read(info['way'])
#         inputs = extractor(first_audio, sampling_rate=file_sr, return_tensors="pt")
#         with torch.no_grad():
#             emb = model_extractor.audio_spectrogram_transformer(**inputs.to(device))['pooler_output'][0].to('cpu')
#         features['spectr'].append(emb)
#         features['labels'].append(info['label_id'])
#     return features

## Add distill pretrain input feature extraction (less effective)

In [None]:
# extractor = AutoFeatureExtractor.from_pretrained("bookbot/distil-ast-audioset")
# def get_features4(data):
#     features = {'spectr':[], 'mel':[], 'mfcc':[], 'labels':[]}
#     for i in tqdm(range(len(data))):
#         info = dict(data.iloc[i])
#         first_audio, file_sr = sf.read(info['way'])
#         inputs = extractor(first_audio, sampling_rate=file_sr, return_tensors="pt")
#         features['spectr'].append(inputs)
#         features['labels'].append(info['label_id'])
#     return features

## Add large pretrain input extraction (most effective)

In [None]:
extractor = ASTFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
def get_features5(data):
    features = {'spectr':[], 'mel':[], 'mfcc':[], 'labels':[]}
    for i in tqdm(range(len(data))):
        info = dict(data.iloc[i])
        x = librosa.load(info['way'], sr=16000)[0]
        x, _ = librosa.effects.trim(x)
        x = extractor(x, sampling_rate=16000, return_tensors="pt")["input_values"]
        features['spectr'].append(x)
        features['labels'].append(info['label_id'])
    return features

## Add large pretrain input extraction (too large for Kaggle)

In [None]:
def add_noise(x, noise_level=0.01):
    noise = np.random.randn(len(x))
    return x + noise_level * noise

def pad_or_trim(x, length=16000):
    if len(x) > length:
        return x[:length]
    else:
        return np.pad(x, (0, length - len(x)))


In [None]:
extractor = ASTFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
def get_features5(data, p_time_mask=0.5, p_freq_mask=0.5, p_noise=0.5):
    time_mask = T.TimeMasking(time_mask_param=80)
    freq_mask = T.FrequencyMasking(freq_mask_param=16)
    
    features = {'spectr': [], 'mel': [], 'mfcc': [], 'labels': []}
    
    for i in tqdm(range(len(data))):
        info = dict(data.iloc[i])
        x = librosa.load(info['way'], sr=16000)[0]
        xx, _ = librosa.effects.trim(x)

        x_orig = extractor(xx, sampling_rate=16000, return_tensors="pt")["input_values"]
        features['spectr'].append(x_orig)
        features['labels'].append(info['label_id'])

        if random.random() < p_time_mask:
            x_tm = time_mask(x_orig.clone())  
            features['spectr'].append(x_tm)
            features['labels'].append(info['label_id'])

        if random.random() < p_freq_mask:
            x_fm = freq_mask(x_orig.clone())
            features['spectr'].append(x_fm)
            features['labels'].append(info['label_id'])

        if random.random() < p_noise:
            x_noisy = add_noise(xx, noise_level=0.01)
            x_aug = extractor(x_noisy, sampling_rate=16000, return_tensors="pt")["input_values"]
            features['spectr'].append(x_aug)
            features['labels'].append(info['label_id'])

    return features

## Generation

In [None]:
train_features = get_features5(df)

In [None]:
test_features = get_features5(test)

In [None]:
with open('train.pkl', 'wb') as f:
    pickle.dump(train_features, f)

with open('test.pkl', 'wb') as f:
    pickle.dump(test_features, f)

In [None]:
def pad_sequence(seq, max_len):
    seq = seq.flatten()  # Преобразуем в массив, если это список
    if len(seq) < max_len:
        return np.pad(seq, (0, max_len - len(seq)), mode='constant')
    else:
        return seq[:max_len]

## Pooling

### Own data pooling

In [None]:
# max_len = max([spectr.flatten().shape for spectr in train_features['spectr']])[0] 
# train_features['spectr'] = np.array([pad_sequence(m, max_len) for m in train_features['spectr']], dtype=np.float32)
# test_features['spectr'] = np.array([pad_sequence(m, max_len) for m in test_features['spectr']], dtype=np.float32)

# max_len = max([mfcc.flatten().shape for mfcc in train_features['mfcc']])[0] 
# train_features['mfcc'] = np.array([pad_sequence(m, max_len) for m in train_features['mfcc']], dtype=np.float32)
# test_features['mfcc'] = np.array([pad_sequence(m, max_len) for m in test_features['mfcc']], dtype=np.float32)

# max_len = max([mel.flatten().shape for mel in train_features['mel']])[0] 
# train_features['mel'] = np.array([pad_sequence(m, max_len) for m in train_features['mel']], dtype=np.float32)
# test_features['mel'] = np.array([pad_sequence(m, max_len) for m in test_features['mel']], dtype=np.float32)

### Preptrain 3.3 - 3.4 pooling

In [None]:
# train_features['spectr'] = np.array([m['input_values'] for m in train_features['spectr']], dtype=np.float32)
# test_features['spectr'] = np.array([m['input_values'] for m in test_features['spectr']], dtype=np.float32)

### Large pretrain pooling

In [None]:
train_features['spectr'] = np.array([m[0] for m in train_features['spectr']], dtype=np.float32)
test_features['spectr'] = np.array([m[0] for m in test_features['spectr']], dtype=np.float32)

### Check pooling

In [None]:
# len(train_features['mfcc']), len(train_features['mel']), len(train_features['spectr']),
# len(test_features['mfcc']), len(test_features['mel']), len(test_features['spectr'])

# Training

## Preparation

In [None]:
import math

def approximate_factors(n):
    sqrt_n = int(math.sqrt(n))
    
    for i in range(sqrt_n, 0, -1):
        if n % i == 0:
            return (i, n // i)  

In [None]:
class model_register():
    def __init__(self, ):
        self.batch_size = 10 #128
        self.loss_function = nn.CrossEntropyLoss()
        self.lr = 1e-5
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.results = {}
        self.epoch = 0
        
    def gen_datasets(self, train, test, mode='spectr'):
        # self.device = 'cpu'
        X_train, X_test, y_train, y_test = train[mode], test[mode], train['labels'], test['labels']
        inputs_train = torch.tensor(X_train, dtype=torch.float32)#.to(self.device)
        targets_train = torch.tensor([i for i in y_train], dtype=torch.long)
        inputs_test = torch.tensor(X_test, dtype=torch.float32)
        targets_test = torch.tensor([i for i in y_test], dtype=torch.long)
        self.input_dim = inputs_train.shape[1:]
        self.input_dim = torch.prod(torch.tensor(model.input_dim))
        inputs_train = inputs_train.view(inputs_train.shape[0], self.input_dim)
        inputs_test = inputs_test.view(inputs_test.shape[0], self.input_dim)
        train = data_utils.TensorDataset(inputs_train.to(self.device), targets_train.to(self.device))
        test = data_utils.TensorDataset(inputs_test.to(self.device), targets_test.to(self.device))
        self.trainset = torch.utils.data.DataLoader(train, batch_size=self.batch_size, shuffle=True)
        self.testset = torch.utils.data.DataLoader(test, batch_size=self.batch_size, shuffle=False)
        # self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def get_model(self):
        self.model = Classifier(self.input_dim).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
        self.scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=5, gamma=0.1)

    def train(self, epochs=10):
        for epoch in range(epochs):
                with tqdm(self.trainset, desc=f"Epoch {epoch+1}/{epochs}", leave=True) as pbar:
                    for X, y in pbar:
                        self.optimizer.zero_grad()
                        # print(X.shape)
                        out = self.model(X.to(self.device))
                        out = out.view(-1, out.shape[-1])
                        loss = self.loss_function(out, y.to(self.device))
                        loss.backward()
                        self.optimizer.step()
                        pbar.set_postfix(loss=loss.item())
                self.scheduler.step()

    def test(self, to_print=True):
        self.model.eval()
        predictions = []
        targets = []
    
        with torch.no_grad():
            with tqdm(self.testset, desc="Testing", leave=True) as pbar:
                for X, y in pbar:
                    X, y = X.to(self.device), y.cpu()
    
                    output = self.model(X)
    
                    preds = torch.argmax(output, dim=-1).cpu().numpy()
    
                    targets.extend(y.numpy())
                    predictions.extend(preds)
    
        f1 = f1_score(targets, predictions, average="macro")

        if to_print:
            print(f"F1-score (macro): {f1:.4f}")
        return f1

## Classifier Transformer Encoder

In [None]:
class Classifier(nn.Module):
    def __init__(self, input_dim, hidden_dim=512, num_layers=4, nheads=8, num_classes=41, dropout_rate=0.3):
        super().__init__()

        self.hidden_dim = hidden_dim * 8

        self.fc1 = nn.Linear(input_dim, hidden_dim)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim, nhead=nheads, dim_feedforward=hidden_dim * 4, dropout=dropout_rate
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.fc_out = nn.Linear(hidden_dim, num_classes)

        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = x.unsqueeze(0)  

        x = self.transformer(x)
        x = x.squeeze(0)

        x = self.dropout(x)

        out = self.fc_out(x)  
        return out  

## Classifier Inception+Residual 

In [None]:
class InceptionBlock(nn.Module):
    def __init__(self, in_channels):
        super(InceptionBlock, self).__init__()
        self.branch1x1 = nn.Conv2d(in_channels, 16, kernel_size=1)

        self.branch5x5_1 = nn.Conv2d(in_channels, 16, kernel_size=1)
        self.branch5x5_2 = nn.Conv2d(16, 16, kernel_size=5, padding=2)

        self.branch3x3_1 = nn.Conv2d(in_channels, 16, kernel_size=1)
        self.branch3x3_2 = nn.Conv2d(16, 16, kernel_size=3, padding=1)

        self.branch_pool = nn.Conv2d(in_channels, 16, kernel_size=1)

    def forward(self, x):
        branch1 = self.branch1x1(x)

        branch5x5 = self.branch5x5_1(x)
        branch5x5 = self.branch5x5_2(branch5x5)

        branch3x3 = self.branch3x3_1(x)
        branch3x3 = self.branch3x3_2(branch3x3)

        branch_pool = F.avg_pool2d(x, kernel_size=3, stride=1, padding=1)
        branch_pool = self.branch_pool(branch_pool)

        outputs = [branch1, branch5x5, branch3x3, branch_pool]
        return torch.cat(outputs, 1)

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ResidualBlock, self).__init__()
        out_channels = out_channels - in_channels
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.batch_norm = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        if len(x.shape) < 3:
            a, b = approximate_factors(x.shape[-1])
            x = x.unsqueeze(1).view(x.shape[0], 1, a, b) 
        identity = x 
        out = self.conv(x)
        out = self.batch_norm(out)
        out = self.relu(out)
        out = torch.cat((identity, out), dim=1)
        return out

class Classifier(nn.Module):
    def __init__(self, input_dim, batch_size=64, hidden_dim=512, num_layers=4, nheads=8, num_classes=41, dropout_rate=0.3):
        super(Classifier, self).__init__()
        self.hidden_dim = hidden_dim * 8

        self.res_block1 = ResidualBlock(1, 32)  
        self.res_block2 = ResidualBlock(32, 64)
        self.res_block3 = ResidualBlock(64, 128)
        self.res_block4 = ResidualBlock(128, 256)
        self.res_block5 = ResidualBlock(256, 512)

        self.inception = InceptionBlock(128)

        self.fc1 = nn.Linear(input_dim*batch_size, hidden_dim)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim, nhead=nheads, dim_feedforward=hidden_dim * 4, dropout=dropout_rate
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.fc_out = nn.Linear(hidden_dim, num_classes)

        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        x = self.res_block1(x)
        x = self.res_block2(x)
        x = self.res_block3(x)
        x = self.res_block4(x)
        x = self.res_block5(x)

        x = self.inception(x)

        x = x.view(x.size(0), -1)

        x = F.relu(self.fc1(x))
        x = x.unsqueeze(0) 

        x = self.transformer(x)
        x = x.squeeze(0)

        x = self.dropout(x)

        out = self.fc_out(x)
        return out


## Classifier InceptionResidual 

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class InceptionResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(InceptionResidualBlock, self).__init__()
        out_channel = int(out_channels//5)
        
        self.branch1x1 = nn.Conv2d(in_channels, out_channel, kernel_size=1)

        self.branch5x5_1 = nn.Conv2d(in_channels, 16, kernel_size=1)
        self.branch5x5_2 = nn.Conv2d(16, out_channel, kernel_size=5, padding=2)

        self.branch3x3_1 = nn.Conv2d(in_channels, 16, kernel_size=1)
        self.branch3x3_2 = nn.Conv2d(16, out_channel, kernel_size=3, padding=1)

        self.branch_pool = nn.Conv2d(in_channels, out_channel, kernel_size=1)

        self.conv_residual = nn.Conv2d(in_channels, out_channel, kernel_size=1)

    def forward(self, x):
        if len(x.shape) < 3:
            a, b = approximate_factors(x.shape[-1])
            x = x.unsqueeze(1).view(x.shape[0], 1, a, b)
            
        identity = self.conv_residual(x)

        branch1 = self.branch1x1(x)

        branch5x5 = self.branch5x5_1(x)
        branch5x5 = self.branch5x5_2(branch5x5)

        branch3x3 = self.branch3x3_1(x)
        branch3x3 = self.branch3x3_2(branch3x3)

        branch_pool = F.avg_pool2d(x, kernel_size=3, stride=1, padding=1)
        branch_pool = self.branch_pool(branch_pool)

        outputs = [branch1, branch5x5, branch3x3, branch_pool]
        out = torch.cat(outputs, 1)

        out = torch.cat((identity, out), dim=1)

        return F.relu(out)

class Classifier(nn.Module):
    def __init__(self, input_dim, batch_size=64, hidden_dim=512, num_layers=4, nheads=8, num_classes=41, dropout_rate=0.3):
        super(Classifier, self).__init__()
        self.hidden_dim = hidden_dim * 8

        self.inception_res_block1 = InceptionResidualBlock(1, 80) 
        self.inception_res_block2 = InceptionResidualBlock(80, 125)
        self.inception_res_block3 = InceptionResidualBlock(125, 250)
        self.inception_res_block4 = InceptionResidualBlock(250, 500)
        self.inception_res_block5 = InceptionResidualBlock(500, 750)

        self.fc1 = nn.Linear(input_dim * 750, hidden_dim)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim, nhead=nheads, dim_feedforward=hidden_dim * 4, dropout=dropout_rate
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.fc_out = nn.Linear(hidden_dim, num_classes)

        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):

        x = self.inception_res_block1(x)
        x = self.inception_res_block2(x)
        x = self.inception_res_block3(x)
        x = self.inception_res_block4(x)
        x = self.inception_res_block5(x)
        x = x.view(x.size(0), -1)  
        x = F.relu(self.fc1(x))
        x = x.unsqueeze(0)

        x = self.transformer(x)
        x = x.squeeze(0) 

        x = self.dropout(x)
        out = self.fc_out(x)

        return out

## Classifier InceptionResidual Pretrain

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class InceptionResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(InceptionResidualBlock, self).__init__()
        out_channel = int(out_channels//5)
        
        self.branch1x1 = nn.Conv2d(in_channels, out_channel, kernel_size=1)

        self.branch5x5_1 = nn.Conv2d(in_channels, 16, kernel_size=1)
        self.branch5x5_2 = nn.Conv2d(16, out_channel, kernel_size=5, padding=2)

        self.branch3x3_1 = nn.Conv2d(in_channels, 16, kernel_size=1)
        self.branch3x3_2 = nn.Conv2d(16, out_channel, kernel_size=3, padding=1)

        self.branch_pool = nn.Conv2d(in_channels, out_channel, kernel_size=1)

        self.conv_residual = nn.Conv2d(in_channels, out_channel, kernel_size=1)

    def forward(self, x):
        if len(x.shape) < 3:
            a, b = approximate_factors(x.shape[-1])
            x = x.unsqueeze(1).view(x.shape[0], 1, a, b)
            
        identity = self.conv_residual(x)

        branch1 = self.branch1x1(x)

        branch5x5 = self.branch5x5_1(x)
        branch5x5 = self.branch5x5_2(branch5x5)

        branch3x3 = self.branch3x3_1(x)
        branch3x3 = self.branch3x3_2(branch3x3)

        branch_pool = F.avg_pool2d(x, kernel_size=3, stride=1, padding=1)
        branch_pool = self.branch_pool(branch_pool)

        outputs = [branch1, branch5x5, branch3x3, branch_pool]
        out = torch.cat(outputs, 1)

        out = torch.cat((identity, out), dim=1)

        return F.relu(out)

class Classifier(nn.Module):
    def __init__(self, input_dim, batch_size=64, hidden_dim=512, num_layers=4, nheads=8, num_classes=41, dropout_rate=0.3):
        super(Classifier, self).__init__()
        self.model_extractor = AutoModelForAudioClassification.from_pretrained("bookbot/distil-ast-audioset") 
        input_dim = 527
        self.hidden_dim = hidden_dim * 8

        self.inception_res_block1 = InceptionResidualBlock(1, 80) 
        self.inception_res_block2 = InceptionResidualBlock(80, 125)
        self.inception_res_block3 = InceptionResidualBlock(125, 250)
        self.inception_res_block4 = InceptionResidualBlock(250, 500)
        self.inception_res_block5 = InceptionResidualBlock(500, 750)

        self.fc1 = nn.Linear(527 * 750, hidden_dim)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim, nhead=nheads, dim_feedforward=hidden_dim * 4, dropout=dropout_rate
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.fc_out = nn.Linear(hidden_dim, num_classes)

        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        batch = x.size(0)
        x = self.model_extractor(x.reshape(x.size(0), 1024, 128))['logits']
        x = self.inception_res_block1(x) 
        x = self.inception_res_block2(x)
        x = self.inception_res_block3(x)
        x = self.inception_res_block4(x)
        x = self.inception_res_block5(x)
        x = x.view(batch, 750*527)  
        x = F.relu(self.fc1(x))
        x = x.unsqueeze(0) 
        x = self.transformer(x)
        x = x.squeeze(0) 

        x = self.dropout(x)
        out = self.fc_out(x)
        return out

## Classifier InceptionResidual Large Pretrain

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class InceptionResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(InceptionResidualBlock, self).__init__()
        out_channel = int(out_channels//5)
        
        self.branch1x1 = nn.Conv2d(in_channels, out_channel, kernel_size=1)

        self.branch5x5_1 = nn.Conv2d(in_channels, 16, kernel_size=1)
        self.branch5x5_2 = nn.Conv2d(16, out_channel, kernel_size=5, padding=2)

        self.branch3x3_1 = nn.Conv2d(in_channels, 16, kernel_size=1)
        self.branch3x3_2 = nn.Conv2d(16, out_channel, kernel_size=3, padding=1)

        self.branch_pool = nn.Conv2d(in_channels, out_channel, kernel_size=1)

        self.conv_residual = nn.Conv2d(in_channels, out_channel, kernel_size=1)

    def forward(self, x):
        if len(x.shape) < 3:
            a, b = approximate_factors(x.shape[-1])
            x = x.unsqueeze(1).view(x.shape[0], 1, a, b)
            
        identity = self.conv_residual(x)

        branch1 = self.branch1x1(x)

        branch5x5 = self.branch5x5_1(x)
        branch5x5 = self.branch5x5_2(branch5x5)

        branch3x3 = self.branch3x3_1(x)
        branch3x3 = self.branch3x3_2(branch3x3)

        branch_pool = F.avg_pool2d(x, kernel_size=3, stride=1, padding=1)
        branch_pool = self.branch_pool(branch_pool)

        outputs = [branch1, branch5x5, branch3x3, branch_pool]
        out = torch.cat(outputs, 1)

        out = torch.cat((identity, out), dim=1)

        return F.relu(out)

class Classifier(nn.Module):
    def __init__(self, input_dim, batch_size=64, hidden_dim=512, num_layers=4, nheads=8, num_classes=41, dropout_rate=0.3):
        super(Classifier, self).__init__()
        self.model_extractor = ASTModel.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
        input_dim = 768
        self.hidden_dim = hidden_dim * 8

        self.inception_res_block1 = InceptionResidualBlock(1, 80) 
        self.inception_res_block2 = InceptionResidualBlock(80, 125)
        self.inception_res_block3 = InceptionResidualBlock(125, 250)
        self.inception_res_block4 = InceptionResidualBlock(250, 500)
        self.inception_res_block5 = InceptionResidualBlock(500, 750)

        self.fc1 = nn.Linear(input_dim * 750, hidden_dim)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim, nhead=nheads, dim_feedforward=hidden_dim * 4, dropout=dropout_rate
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.fc_out = nn.Linear(hidden_dim, num_classes)

        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        batch = x.size(0)
        x = self.model_extractor(x.reshape(batch, 1024, 128))['pooler_output']
        x = self.inception_res_block1(x) 
        x = self.inception_res_block2(x)
        x = self.inception_res_block3(x)
        x = self.inception_res_block4(x)
        x = self.inception_res_block5(x)
        x = x.view(batch, 750*768)  
        x = F.relu(self.fc1(x))
        x = x.unsqueeze(0)  

        x = self.transformer(x)
        x = x.squeeze(0)

        x = self.dropout(x)
        out = self.fc_out(x) 

        return out

## Classifier InceptionResidual3 Large Pretrain

In [None]:
class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channels, channels // reduction),
            nn.ReLU(inplace=True),
            nn.Linear(channels // reduction, channels),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)


class InceptionResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(InceptionResidualBlock, self).__init__()
        out_channel = int(out_channels // 4)

        self.branch1x1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channel, kernel_size=1),
            nn.BatchNorm2d(out_channel),
            nn.ReLU()
        )

        self.branch5x5 = nn.Sequential(
            nn.Conv2d(in_channels, out_channel, kernel_size=5, padding=2),
            nn.BatchNorm2d(out_channel),
            nn.ReLU()
        )

        self.branch3x3 = nn.Sequential(
            nn.Conv2d(in_channels, out_channel, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channel),
            nn.ReLU()
        )

        self.branch_pool = nn.Sequential(
            nn.AvgPool2d(kernel_size=3, stride=1, padding=1),
            nn.Conv2d(in_channels, out_channel, kernel_size=1),
            nn.BatchNorm2d(out_channel),
            nn.ReLU()
        )

        self.residual_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1),
            nn.BatchNorm2d(out_channels)
        )

        self.se = SEBlock(out_channels)
        self.final_bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()

    def forward(self, x):
        identity = self.residual_conv(x)

        out = torch.cat([
            self.branch1x1(x),
            self.branch5x5(x),
            self.branch3x3(x),
            self.branch_pool(x)
        ], 1)

        out += identity
        out = self.se(out)
        out = self.final_bn(out)
        return self.relu(out)


In [None]:
class Classifier(nn.Module):
    def __init__(self, num_classes=41, dropout_rate=0.4):
        super(Classifier, self).__init__()
        self.model_extractor = ASTModel.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
        self.cnn_input_dim = 768  # AST feature dimension

        self.inception_blocks = nn.Sequential(
            InceptionResidualBlock(1, 128),
            InceptionResidualBlock(128, 256),
            InceptionResidualBlock(256, 512),
            InceptionResidualBlock(512, 768)
        )

        self.feature_reduce = nn.Conv2d(768, 256, kernel_size=1)
        self.norm = nn.LayerNorm(196608)
        self.fc1 = nn.Linear(196608, 512)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=512, nhead=8, dim_feedforward=2048, dropout=dropout_rate
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=12)

        self.dropout = nn.Dropout(dropout_rate)
        self.fc_out = nn.Linear(512, num_classes)

    def forward(self, x):
        batch = x.size(0)
        x = self.model_extractor(x.view(batch, 1024, 128))['pooler_output'] 

        x = x.view(batch, 1, 24, 32) 
        x = self.inception_blocks(x)

        x = self.feature_reduce(x) 
        x = x.flatten(start_dim=1)

        x = self.norm(x)
        x = F.relu(self.fc1(x)).unsqueeze(0)

        x = self.transformer(x).squeeze(0)
        x = self.dropout(x)
        return self.fc_out(x)


# Training

In [None]:
model = model_register()
model.batch_size = 10
model.gen_datasets(train_features, test_features, 'spectr')
model.get_model()

In [None]:
model = model_register()
model.gen_datasets(train_features, test_features, 'spectr')
model.get_model()
model.train(epochs=15)
model.test()

In [None]:
with open('model.pkl', 'wb') as f:
    pickle.dump(model.model, f)

# Submission generation

In [None]:
subm = pd.read_csv('/kaggle/input/lw1-acc/sample_submission.csv')
subm['way'] = '/kaggle/input/lw1-acc/audio_test/audio_test/test/' + subm['fname']

In [None]:
subm['label_id'] = 0

In [None]:
subm_test = get_features5(subm)

In [None]:
subm_test['spectr'] = np.array([m[0] for m in subm_test['spectr']], dtype=np.float32)

In [None]:
# input
# subm_test['spectr'] = np.array([[pad_sequence(m, 1025)] for m in subm_test['spectr']], dtype=np.float32)

In [None]:
# 3.3
# subm_test['spectr'] = np.array([m['input_values'] for m in subm_test['spectr']], dtype=np.float32)

In [None]:
pred = []
for i in tqdm(subm_test['spectr']):
    with torch.no_grad():
        i = torch.tensor(i, dtype=torch.float32).to(model.device)
        i = i.unsqueeze(0).to(model.device)
        out = model.model(i).to('cpu')
    probabilities = torch.softmax(out[0], dim=0)
    predicted_classes = torch.argmax(probabilities, dim=0)
    pred.append(unique_labels[predicted_classes])

In [None]:
pd.DataFrame({'fname':subm['fname'].to_list(), 'label':pred}).to_csv('submission.csv', index=False)