### О данном ноутбуке

В данном ноутбуке я займусь тем, что реализую свою нейроночку, написав ее самостоятельно с поддержкой Grok))), для обучения декодировать код Морзе.

### Imports

In [1]:
import os
import numpy as np
import pandas as pd 
import librosa 
import torch
import torch.nn as nn 
import torch.optim as optim 
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import GradScaler, autocast
import torch.nn.functional as F
from Levenshtein import distance as levenshtein_distance
from collections import defaultdict
import uuid
import glob

### Преднастройки

In [2]:
# Параметры
SAMPLE_RATE = 8000  # Частота дискретизации, как в основном коде
DATA_DIR = 'data/morse_dataset/morse_dataset'  # Путь к папке с файлами *.opus

def find_max_duration():
    """Находит максимальную длительность аудиофайлов в папке."""
    durations = []
    
    # Получаем список всех *.opus файлов в директории
    opus_files = glob.glob(os.path.join(DATA_DIR, '*.opus'))
    
    if not opus_files:
        print(f"В папке {DATA_DIR} не найдено *.opus файлов")
        return 0
    
    for file_path in opus_files:
        try:
            # Загружаем аудиофайл без ограничения длительности
            audio, _ = librosa.load(file_path, sr=SAMPLE_RATE)
            # Вычисляем длительность в секундах
            duration = len(audio) / SAMPLE_RATE
            durations.append(duration)
        except Exception as e:
            print(f"Ошибка при обработке файла {file_path}: {e}")
    
    if durations:
        max_duration = max(durations)
        print(f"Максимальная длительность: {max_duration:.2f} секунд")
        print(f"Количество обработанных файлов: {len(durations)}")
        return max_duration
    else:
        print("Не удалось обработать ни один файл")
        return 0

if __name__ == '__main__':
    max_duration = find_max_duration()

KeyboardInterrupt: 

МАКСИМАЛЬНАЯ ДЛИТЕЛЬНОСТЬ = 48 СЕКУНД


In [None]:
# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Parameters
SAMPLE_RATE = 4000 # Downsample to reduce memory usage
DURATION = 20 # Я поставлю в 2 раза меньше, для начала)
N_MELS = 32
MAX_SEQ_LEN = 50
BATCH_SIZE = 8
EPOCHS = 30
LEARNING_RATE = 1e-3
DATA_DIR = 'data/morse_dataset/morse_dataset'
DEBUG_TRAIN_SIZE = 100  # Number of training samples for debug
DEBUG_TEST_SIZE = 20   # Number of test samples for debug

# Russian alphabet + digits + special characters
CHAR_MAP = {
    'А': '.-', 'Б': '-...', 'В': '.--', 'Г': '--.', 'Д': '-..', 'Е': '.', 'Ё': '.', 
    'Ж': '...-', 'З': '--..', 'И': '..', 'Й': '.---', 'К': '-.-', 'Л': '.-..', 
    'М': '--', 'Н': '-.', 'О': '---', 'П': '.--.', 'Р': '.-.', 'С': '...', 
    'Т': '-', 'У': '..-', 'Ф': '..-.', 'Х': '....', 'Ц': '-.-.', 'Ч': '---.', 
    'Ш': '----', 'Щ': '--.-', 'Ъ': '-..-', 'Ы': '-.--', 'Ь': '-..-', 'Э': '..-..', 
    'Ю': '..--', 'Я': '.-.-', '0': '-----', '1': '.----', '2': '..---', 
    '3': '...--', '4': '....-', '5': '.....', '6': '-....', '7': '--...', 
    '8': '---..', '9': '----.', ' ': ' ', '#': '#'
}
CHAR_TO_INT = {char: idx + 1 for idx, char in enumerate(CHAR_MAP.keys())}  # 1-based indexing
CHAR_TO_INT[''] = 0  # Blank for CTC
INT_TO_CHAR = {v: k for k, v in CHAR_TO_INT.items()}
NUM_CLASSES = len(CHAR_TO_INT)

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
# Кастомный класс для создания датасетам
class MorseDataset(Dataset):
    """Dataset for Morse code audio and transcripts."""
    def __init__(self, df, data_dir, is_train=True):
        self.df = df
        self.data_dir = data_dir
        self.is_train = is_train

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        file_path = os.path.join(self.data_dir, self.df.iloc[idx]['id'])
        audio, sr = librosa.load(file_path, sr=SAMPLE_RATE)
        audio, _ = librosa.effects.trim(audio, top_db=20)  # Remove silence
        mel = librosa.feature.melspectrogram(y=audio, sr=SAMPLE_RATE, n_mels=N_MELS)
        mel_db = librosa.power_to_db(mel, ref=np.max)
        mel_db = (mel_db - np.mean(mel_db)) / np.std(mel_db)
        spectrogram = torch.FloatTensor(mel_db).unsqueeze(0)  # Add channel dimension
        if self.is_train:
            transcript = self.df.iloc[idx]['message']
            label = []
            for c in transcript:
                if c not in CHAR_TO_INT:
                    print(f"Warning: Symbol '{c}' in transcript '{transcript}' (id: {self.df.iloc[idx]['id']}) not in CHAR_MAP")
                else:
                    label.append(CHAR_TO_INT[c])
            label = torch.LongTensor(label)
            return spectrogram, label, len(label), mel_db.shape[1]
        return spectrogram, self.df.iloc[idx]['id'], mel_db.shape[1]

In [None]:
def collate_fn(batch):
    if len(batch[0]) == 4:  # Train
        spectrograms, labels, label_lengths, spec_lengths = zip(*batch)
        max_width = max(spec_lengths)
        padded_specs = [F.pad(spec, (0, max_width - spec.size(2))) for spec in spectrograms]
        spectrograms = torch.stack(padded_specs)
        labels = nn.utils.rnn.pad_sequence(labels, batch_first=True, padding_value=0)
        label_lengths = torch.LongTensor(label_lengths)
        spec_lengths = torch.LongTensor(spec_lengths)
        return spectrograms, labels, label_lengths, spec_lengths
    else:  # Test
        spectrograms, file_ids, spec_lengths = zip(*batch)
        max_width = max(spec_lengths)
        padded_specs = [F.pad(spec, (0, max_width - spec.size(2))) for spec in spectrograms]
        spectrograms = torch.stack(padded_specs)
        spec_lengths = torch.LongTensor(spec_lengths)
        # print(f"Test batch: len(spectrograms)={len(spectrograms)}, len(file_ids)={len(file_ids)}")
        return spectrograms, file_ids, spec_lengths

In [7]:
# Модель
class MorseModel(nn.Module):
    """CNN-LSTM model for Morse code decoding."""
    def __init__(self, num_classes):
        super(MorseModel, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.lstm = nn.LSTM(32 * (N_MELS // 4), 64, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(128, num_classes)  # 128 = 64 * 2 (bidirectional)

    def forward(self, x):
        x = self.conv(x)
        batch, channels, height, width = x.size()
        x = x.permute(0, 3, 1, 2).contiguous().view(batch, width, channels * height)
        x, _ = self.lstm(x)
        x = self.fc(x)
        return x

In [None]:
def decode_predictions(preds):
    decoded = []
    print(f"decode_predictions: preds shape={preds.shape}")
    preds = preds.permute(1, 0, 2)  # [T, N, C] -> [N, T, C]
    for pred in preds:
        pred = pred.argmax(dim=-1).cpu().numpy()
        text = []
        last = None
        for p in pred:
            if p != last and p != 0:
                text.append(INT_TO_CHAR[p])
            last = p
        decoded.append(''.join(text))
    # print(f"decode_predictions: len(decoded)={len(decoded)}")
    return decoded

In [9]:
def levenshtein_mean(y_true, y_pred):
    """Compute mean Levenshtein distance."""
    distances = []
    for true, pred in zip(y_true, y_pred):
        dist = levenshtein_distance(true, pred)
        distances.append(dist / max(len(true), 1))
    return np.mean(distances)

In [None]:
def train_epoch(model, dataloader, criterion, optimizer, scaler):
    model.train()
    total_loss = 0
    for spectrograms, labels, label_lengths, spec_lengths in dataloader:
        spectrograms, labels = spectrograms.to(device), labels.to(device)
        spec_lengths = spec_lengths.to(device)
        
        optimizer.zero_grad()
        with autocast():
            outputs = model(spectrograms).log_softmax(2)
            input_lengths = spec_lengths // 4  # Account for two MaxPool2d(2) layers
            # отладка
            # for batch_labels in labels:
            #     for i in batch_labels:
            #         if i.item() not in INT_TO_CHAR and i != 0:
            #             print(f"Invalid label index: {i.item()}")

            loss = criterion(outputs.permute(1, 0, 2), labels, input_lengths, label_lengths)
        
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()


        total_loss += loss.item()
    return total_loss / len(dataloader)

In [11]:
def evaluate(model, dataloader):
    model.eval()
    preds, trues = [], []
    with torch.no_grad():
        for spectrograms, labels, label_lengths, spec_lengths in dataloader:
            spectrograms = spectrograms.to(device)
            with autocast():
                outputs = model(spectrograms).log_softmax(2)
            pred_texts = decode_predictions(outputs.permute(1, 0, 2))
            true_texts = [''.join(INT_TO_CHAR[i.item()] for i in label if i != 0 and i.item() in INT_TO_CHAR) for label in labels]
            preds.extend(pred_texts)
            trues.extend(true_texts)
    return levenshtein_mean(trues, preds)

In [None]:
def main():
    # Load data
    train_df = pd.read_csv('data/train.csv')
    test_df = pd.read_csv('data/test.csv')
    
    # Limit to debug size
    # train_df = train_df.head(DEBUG_TRAIN_SIZE)
    # test_df = test_df.head(DEBUG_TEST_SIZE)
    
    # Create datasets
    train_dataset = MorseDataset(train_df, DATA_DIR, is_train=True)
    test_dataset = MorseDataset(test_df, DATA_DIR, is_train=False)
    
    # Split train into train and validation
    val_size = int(0.1 * len(train_dataset))  # 10 samples
    train_size = len(train_dataset) - val_size  # 90 samples
    train_dataset, val_dataset = torch.utils.data.random_split(train_dataset, [train_size, val_size])
    
    # Create dataloaders
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)
    
    # Initialize model, loss, and optimizer
    model = MorseModel(NUM_CLASSES).to(device)
    criterion = nn.CTCLoss(blank=0, zero_infinity=True)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3, factor=0.5)
    scaler = GradScaler()
    
    # Training loop
    best_val_lev = float('inf')
    for epoch in range(EPOCHS):
        train_loss = train_epoch(model, train_loader, criterion, optimizer, scaler)
        val_lev = evaluate(model, val_loader)
        scheduler.step(val_lev)
        
        print(f"Epoch {epoch+1}/{EPOCHS}, Train Loss: {train_loss:.4f}, Val Levenshtein Mean: {val_lev:.4f}")
        
        if val_lev < best_val_lev:
            best_val_lev = val_lev
            torch.save(model.state_dict(), 'best_model_debug.pt')
    
    # Load best model
    model.load_state_dict(torch.load('best_model_debug.pt'))
    
    # Predict on test set
    model.eval()
    test_preds = []
    test_file_paths = []
    with torch.no_grad():
        for spectrograms, file_ids, spec_lengths in test_loader:
            spectrograms = spectrograms.to(device)
            with autocast():
                outputs = model(spectrograms).log_softmax(2)
            pred_texts = decode_predictions(outputs.permute(1, 0, 2))
            # print(f"Batch: len(file_ids)={len(file_ids)}, len(pred_texts)={len(pred_texts)}")
            test_preds.extend(pred_texts)
            test_file_paths.extend(file_ids)

    # Debug lengths
    # print(f"Total: len(test_file_paths)={len(test_file_paths)}, len(test_preds)={len(test_preds)}")

    # Create submission
    submission = pd.DataFrame({
        'id': test_file_paths,
        'message': test_preds
    })
    # Save submission
    submission.to_csv('submission.csv', encoding='utf-8', index=False)

In [13]:
if __name__ == '__main__':
    main()

  from .autonotebook import tqdm as notebook_tqdm


decode_predictions: preds shape=torch.Size([15, 8, 46])
decode_predictions: len(decoded)=8
decode_predictions: preds shape=torch.Size([15, 8, 46])
decode_predictions: len(decoded)=8
decode_predictions: preds shape=torch.Size([15, 8, 46])
decode_predictions: len(decoded)=8
decode_predictions: preds shape=torch.Size([15, 8, 46])
decode_predictions: len(decoded)=8
decode_predictions: preds shape=torch.Size([15, 8, 46])
decode_predictions: len(decoded)=8
decode_predictions: preds shape=torch.Size([15, 8, 46])
decode_predictions: len(decoded)=8
decode_predictions: preds shape=torch.Size([15, 8, 46])
decode_predictions: len(decoded)=8
decode_predictions: preds shape=torch.Size([15, 8, 46])
decode_predictions: len(decoded)=8
decode_predictions: preds shape=torch.Size([15, 8, 46])
decode_predictions: len(decoded)=8
decode_predictions: preds shape=torch.Size([15, 8, 46])
decode_predictions: len(decoded)=8
decode_predictions: preds shape=torch.Size([15, 8, 46])
decode_predictions: len(decoded)=8