In [2]:
!pip install pydub

Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1


In [9]:
import os
import time
import torch
import torch.nn as nn
torch.backends.cudnn.enabled = False
import torchaudio
import numpy as np
from pydub import AudioSegment
from torch.utils.data import DataLoader, TensorDataset


def normalize_waveform(waveform):
    return waveform / waveform.abs().max()


def load_dataset_from_folder(dataset_root):
    clean_dir = os.path.join(dataset_root, "clean")
    effect_dir = os.path.join(dataset_root, "distortion")

    clean_data_list = []
    effect_data_list = []

    for file in sorted(os.listdir(clean_dir)):
        if file.endswith(".wav") and os.path.exists(os.path.join(effect_dir, file)):
            clean_path = os.path.join(clean_dir, file)
            effect_path = os.path.join(effect_dir, file)

            clean_waveform, _ = torchaudio.load(clean_path)
            effect_waveform, _ = torchaudio.load(effect_path)

            if clean_waveform.size(0) > 1:
                clean_waveform = clean_waveform.mean(dim=0, keepdim=True)
            if effect_waveform.size(0) > 1:
                effect_waveform = effect_waveform.mean(dim=0, keepdim=True)

            clean_waveform = normalize_waveform(clean_waveform)
            effect_waveform = normalize_waveform(effect_waveform)

            min_len = min(clean_waveform.shape[1], effect_waveform.shape[1])
            clean_waveform = clean_waveform[:, :min_len]
            effect_waveform = effect_waveform[:, :min_len]

            clean_tensor = clean_waveform.T.unsqueeze(-1)
            effect_tensor = (effect_waveform - clean_waveform).T.unsqueeze(-1)

            clean_data_list.append(clean_tensor)
            effect_data_list.append(effect_tensor)

    clean_data = torch.cat(clean_data_list, dim=0)
    effect_data = torch.cat(effect_data_list, dim=0)
    return clean_data, effect_data


class EffectumHybrid(nn.Module):
    def __init__(self):
        super(EffectumHybrid, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv1d(1, 16, 3, padding=1),
            nn.BatchNorm1d(16),
            nn.LeakyReLU(0.01),
            nn.Conv1d(16, 32, 3, padding=1),
            nn.BatchNorm1d(32),
            nn.LeakyReLU(0.01),
            nn.Conv1d(32, 64, kernel_size=3, padding=2, dilation=2),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(0.01)
        )

        self.rnn = nn.GRU(input_size=64, hidden_size=64, num_layers=1, batch_first=True, bidirectional=True)

        self.fc = nn.Sequential(
            nn.Linear(128, 32),
            nn.LeakyReLU(0.01),
            nn.Linear(32, 1),
            nn.Tanh()
        )

    def forward(self, x):
        if x.dim() == 4:
            x = x.squeeze(-1)
        x = x.permute(0, 2, 1)
        x = self.cnn(x)
        x = x.permute(0, 2, 1)
        x = x.contiguous()  # ✅ fix for CUDNN error
        out, _ = self.rnn(x)
        out = self.fc(out)
        return out


def train_model(model, clean_data, effect_data, device, epochs=8, batch_size=1024):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    mse = nn.MSELoss()

    dataset = TensorDataset(clean_data, effect_data)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=2)

    model.train()
    for epoch in range(epochs):
        total_loss = 0
        start_time = time.time()
        last_log_time = start_time
        num_batches = len(dataloader)
        print(f"Epoch {epoch + 1}/{epochs}")

        for i, (batch_clean, batch_effect) in enumerate(dataloader):
            batch_clean = batch_clean.to(device)
            batch_effect = batch_effect.to(device)

            optimizer.zero_grad()
            output = model(batch_clean)
            loss_mse = mse(output, batch_effect)

            fft_pred = torch.fft.rfft(output.squeeze(-1), dim=1)
            fft_true = torch.fft.rfft(batch_effect.squeeze(-1), dim=1)
            loss_spec = torch.mean(torch.abs(fft_pred - fft_true))

            loss = loss_mse + 0.1 * loss_spec

            loss.backward()
            optimizer.step()
            total_loss += loss.item()

            current_time = time.time()
            if current_time - last_log_time >= 1.0:
                percent = int((i + 1) / num_batches * 100)
                print(f"  Training progress: {percent}%")
                last_log_time = current_time

        print(f"  Loss: {total_loss:.4f}")

    torch.save(model.state_dict(), "effect_model.pth")


def apply_effect_gain_only(model, input_waveform, device, gain=0.5):
    model.eval()
    input_waveform = normalize_waveform(input_waveform)
    input_data = input_waveform.T.unsqueeze(0).unsqueeze(-1).to(device)

    with torch.no_grad():
        delta = model(input_data).squeeze().T.cpu()

    delta = delta[:input_waveform.shape[-1]]
    output = input_waveform.squeeze(0) + delta * gain
    output = output / output.abs().max()
    return output.unsqueeze(0)


def apply_effect_to_new_file(model, input_file, output_file, sr, device, gain=0.5):
    if not os.path.exists(input_file):
        raise FileNotFoundError(f"{input_file} не найден")

    input_waveform, sr_actual = torchaudio.load(input_file)
    if input_waveform.size(0) > 1:
        input_waveform = input_waveform.mean(dim=0, keepdim=True)

    effected = apply_effect_gain_only(model, input_waveform, device, gain=gain)
    torchaudio.save(output_file, effected, sr_actual)


def main():
    if torch.cuda.is_available():
        print('cuda')
    else:
        print('cpu')

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    dataset_root = "dataset"
    clean_data, effect_data = load_dataset_from_folder(dataset_root)
    print("Датасет загружен:", clean_data.shape, effect_data.shape)

    model = EffectumHybrid()

    model_path = "effect_model.pth"
    if os.path.exists(model_path):
        print(f"Загружается существующая модель из {model_path}")
        model.load_state_dict(torch.load(model_path, map_location=device))
        model.to(device)
    else:
        print("Модель не найдена, начинается обучение...")
        train_model(model, clean_data, effect_data, device, epochs=10)

    new_input_file = "clean2.wav"
    new_output_file = "new_output.wav"
    _, sr = torchaudio.load(new_input_file)
    apply_effect_to_new_file(model, new_input_file, new_output_file, sr, device, gain=0.5)
    print(f"Эффект наложен на новый файл, результат сохранён в {new_output_file}")


if __name__ == "__main__":
    main()


cuda
Датасет загружен: torch.Size([6123857, 1, 1]) torch.Size([6123857, 1, 1])
Загружается существующая модель из effect_model.pth
Эффект наложен на новый файл, результат сохранён в new_output.wav


  delta = model(input_data).squeeze().T.cpu()


In [None]:
torch.backends.cudnn.enabled = Falseimport os
import time
import torch
import torch.nn as nn
import torchaudio
import numpy as np
from pydub import AudioSegment
from torch.utils.data import DataLoader, TensorDataset


def normalize_waveform(waveform):
    return waveform / waveform.abs().max()


def load_dataset_from_folder(dataset_root):
    clean_dir = os.path.join(dataset_root, "clean")
    effect_dir = os.path.join(dataset_root, "distortion")

    clean_data_list = []
    effect_data_list = []

    for file in sorted(os.listdir(clean_dir)):
        if file.endswith(".wav") and os.path.exists(os.path.join(effect_dir, file)):
            clean_path = os.path.join(clean_dir, file)
            effect_path = os.path.join(effect_dir, file)

            clean_waveform, _ = torchaudio.load(clean_path)
            effect_waveform, _ = torchaudio.load(effect_path)

            if clean_waveform.size(0) > 1:
                clean_waveform = clean_waveform.mean(dim=0, keepdim=True)
            if effect_waveform.size(0) > 1:
                effect_waveform = effect_waveform.mean(dim=0, keepdim=True)

            clean_waveform = normalize_waveform(clean_waveform)

            min_len = min(clean_waveform.shape[1], effect_waveform.shape[1])
            clean_waveform = clean_waveform[:, :min_len]
            effect_waveform = effect_waveform[:, :min_len]

            clean_tensor = clean_waveform.T.unsqueeze(-1)
            effect_tensor = (effect_waveform - clean_waveform).T.unsqueeze(-1)

            clean_data_list.append(clean_tensor)
            effect_data_list.append(effect_tensor)

    clean_data = torch.cat(clean_data_list, dim=0)
    effect_data = torch.cat(effect_data_list, dim=0)
    return clean_data, effect_data


class EffectumHybrid(nn.Module):
    def __init__(self):
        super(EffectumHybrid, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv1d(1, 16, 3, padding=1),
            nn.BatchNorm1d(16),
            nn.LeakyReLU(0.01),
            nn.Conv1d(16, 32, 3, padding=1),
            nn.BatchNorm1d(32),
            nn.LeakyReLU(0.01),
            nn.Conv1d(32, 64, kernel_size=3, padding=2, dilation=2),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(0.01)
        )

        self.rnn = nn.GRU(input_size=64, hidden_size=64, num_layers=1, batch_first=True, bidirectional=True)

        self.fc = nn.Sequential(
            nn.Linear(128, 32),
            nn.LeakyReLU(0.01),
            nn.Linear(32, 1),
            nn.Tanh()
        )

    def forward(self, x):
        if x.dim() == 4:
            x = x.squeeze(-1)
        x = x.permute(0, 2, 1)
        x = self.cnn(x)
        x = x.permute(0, 2, 1)
        out, _ = self.rnn(x)
        out = self.fc(out)
        return out


def train_model(model, clean_data, effect_data, epochs=10, batch_size=1024):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    mse = nn.MSELoss()

    dataset = TensorDataset(clean_data, effect_data)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    model.train()
    for epoch in range(epochs):
        total_loss = 0
        start_time = time.time()
        last_log_time = start_time
        num_batches = len(dataloader)
        print(f"Epoch {epoch + 1}/{epochs}")

        for i, (batch_clean, batch_effect) in enumerate(dataloader):
            optimizer.zero_grad()
            output = model(batch_clean)
            loss_mse = mse(output, batch_effect)

            fft_pred = torch.fft.rfft(output.squeeze(-1), dim=1)
            fft_true = torch.fft.rfft(batch_effect.squeeze(-1), dim=1)
            loss_spec = torch.mean(torch.abs(fft_pred - fft_true))

            loss = loss_mse + 0.1 * loss_spec

            loss.backward()
            optimizer.step()
            total_loss += loss.item()

            current_time = time.time()
            if current_time - last_log_time >= 1.0:
                percent = int((i + 1) / num_batches * 100)
                print(f"  Training progress: {percent}%")
                last_log_time = current_time

        print(f"  Loss: {total_loss:.4f}")

    torch.save(model.state_dict(), "effect_model.pth")


def apply_effect_gain_only(model, input_waveform, gain=0.5):
    model.eval()
    input_waveform = normalize_waveform(input_waveform)
    input_data = input_waveform.T.unsqueeze(0).unsqueeze(-1)

    with torch.no_grad():
        delta = model(input_data).squeeze().T

    delta = delta[:input_waveform.shape[-1]]
    output = input_waveform.squeeze(0) + delta * gain
    output = output / output.abs().max()
    return output.unsqueeze(0)


def apply_effect_to_new_file(model, input_file, output_file, sr, gain=0.5):
    if not os.path.exists(input_file):
        raise FileNotFoundError(f"{input_file} не найден")

    input_waveform, sr_actual = torchaudio.load(input_file)
    if input_waveform.size(0) > 1:
        input_waveform = input_waveform.mean(dim=0, keepdim=True)

    effected = apply_effect_gain_only(model, input_waveform, gain=gain)
    torchaudio.save(output_file, effected, sr_actual)


def main():
    dataset_root = "dataset"
    clean_data, effect_data = load_dataset_from_folder(dataset_root)
    print("Датасет загружен:", clean_data.shape, effect_data.shape)

    model = EffectumHybrid()
    train_model(model, clean_data, effect_data, epochs=10)

    new_input_file = "clean.wav"
    new_output_file = "new_output.wav"
    _, sr = torchaudio.load(new_input_file)
    apply_effect_to_new_file(model, new_input_file, new_output_file, sr, gain=0.5)
    print(f"Эффект наложен на новый файл, результат сохранён в {new_output_file}")


if __name__ == "__main__":
    main()


In [5]:
clean_file = "clean.wav"
effect_file = "effect.wav"
output_file = "output_audio.wav"
model = EffectumHybrid()

if os.path.exists("effect_model.pth"):
    model.load_state_dict(torch.load("effect_model.pth"))
    print("Загружена ранее обученная модель.")
else:
    clean_data, effect_data, sr = prepare_data(clean_file, effect_file)
    train_model(model, clean_data, effect_data, epochs=10)
    print("Модель обучена и сохранена.")

# Получаем sr из файла clean_file (гарантированно существует)
_, sr = torchaudio.load(clean_file)

device = torch.device("cpu")

apply_effect_to_new_file(model, clean_file, output_file, device, sr, gain=1)
print(f"Эффект наложен, результат сохранён в {output_file}")

song = AudioSegment.from_wav(output_file)
(song * 3).export("test.wav", format="wav")

new_input_file = "clean2.wav"
new_output_file = "new_output.wav"
apply_effect_to_new_file(model, new_input_file, new_output_file, sr, gain=0.1)
print(f"Эффект наложен на новый файл, результат сохранён в {new_output_file}")


Загружена ранее обученная модель.


RuntimeError: Input type (torch.cuda.FloatTensor) and weight type (torch.FloatTensor) should be the same