In [1]:
import librosa
import numpy as np
import torch
from torch.utils.data import Subset

# Utils

In [14]:
import os
import shutil

# COPY WAV FILES

def copy_wav(diretorio_origem: str, diretorio_destino: str, num_copies: int):
    for arquivo in os.listdir(diretorio_origem):
        caminho_completo_origem = os.path.join(diretorio_origem, arquivo)
        
        # Verifica se é um arquivo (e não um diretório)
        if os.path.isfile(caminho_completo_origem):
            partes = arquivo.split('.')
            
            # Verifica se há ao menos duas partes (nome + algo depois do primeiro ponto)
            if len(partes) > 1:
                # Checa se a segunda parte (partes[1]) começa com '1'
                if partes[1].startswith('1'):
                    caminho_completo_destino = os.path.join(diretorio_destino, arquivo)
                    # Copia o arquivo para o destino
                    shutil.copy2(caminho_completo_origem, caminho_completo_destino)
                    print(f"Copiado: {arquivo} -> {diretorio_destino}")


In [15]:
origem = 'data/clarinet'
destino = 'data/clarinet_test'
copy_wav(origem, destino, 20)

Copiado: 036.1.wav -> data/clarinet_test
Copiado: 037.1.wav -> data/clarinet_test
Copiado: 033.1.wav -> data/clarinet_test
Copiado: 029.1.wav -> data/clarinet_test
Copiado: 051.1.wav -> data/clarinet_test
Copiado: 021.1.wav -> data/clarinet_test
Copiado: 032.1.wav -> data/clarinet_test
Copiado: 026.1.wav -> data/clarinet_test
Copiado: 024.1.wav -> data/clarinet_test
Copiado: 010.1.wav -> data/clarinet_test
Copiado: 006.1.wav -> data/clarinet_test
Copiado: 012.1.wav -> data/clarinet_test
Copiado: 028.1.wav -> data/clarinet_test
Copiado: 049.1.wav -> data/clarinet_test
Copiado: 039.1.wav -> data/clarinet_test
Copiado: 004.1.wav -> data/clarinet_test
Copiado: 020.1.wav -> data/clarinet_test
Copiado: 017.1.wav -> data/clarinet_test
Copiado: 023.1.wav -> data/clarinet_test
Copiado: 018.1.wav -> data/clarinet_test
Copiado: 009.1.wav -> data/clarinet_test
Copiado: 041.1.wav -> data/clarinet_test
Copiado: 050.1.wav -> data/clarinet_test
Copiado: 030.1.wav -> data/clarinet_test
Copiado: 052.1.w

# Macros

In [3]:
PIANO_PATH = "data/piano"
CLARINET_PATH = "data/clarinet"
MODEL_SAVEPATH = "models/model.pth"
PIANO_AUDIO_FILE = "data/piano/002.5.wav"
CLARINET_AUDIO_FILE = "data/clarinet/001.1.wav"
sr_target = 22050   # se quiser forçar uma taxa de amostragem
n_fft = 2048
hop_length = 512
n_mels = 128

In [4]:
def wav_to_melspectogram(wav_path: str) :
    """
    Converts a wav from wav_path to melspectogram.
    """
    

# Wav to melspectogram

In [5]:
y, sr = librosa.load(PIANO_AUDIO_FILE, sr=sr_target)

mel_spec = librosa.feature.melspectrogram(
    y=y, 
    sr=sr,
    n_fft=n_fft,
    hop_length=hop_length,
    n_mels=n_mels
)

In [6]:
mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
spec_torch = torch.from_numpy(mel_spec_db).float()
spec_torch = spec_torch.unsqueeze(0).unsqueeze(0)  # agora fica [1, 1, n_mels, T]
print("Shape final do tensor:", spec_torch.shape)

Shape final do tensor: torch.Size([1, 1, 128, 21056])


In [7]:
import torch
import torch.nn.functional as F

# Suponha que spec_torch seja o tensor já carregado: [1, 1, 128, 21056]
# Dimensões: [Batch=1, Channel=1, Mel_Bins=128, Time=21056]

chunk_size = 256  # Quantidade de frames em cada janela
overlap = 0       # Se quiser overlap, coloque um valor > 0

_, _, n_mels, total_frames = spec_torch.shape

chunks = []
start = 0

while start < total_frames:
    end = start + chunk_size
    # Recorte do espectrograma ao longo do eixo do tempo (dim=-1)
    chunk = spec_torch[..., start:end]  # shape será [1, 1, 128, chunk_length]

    chunk_length = chunk.shape[-1]
    if chunk_length < chunk_size:
        # Se chegar no final e faltar frames, faça zero-padding para manter chunk_size
        pad_amount = chunk_size - chunk_length
        # Pad no lado direito do eixo do tempo => (left, right) = (0, pad_amount)
        chunk = F.pad(chunk, (0, pad_amount))

    # Armazena este chunk
    chunks.append(chunk)

    # Atualiza 'start' para o próximo segmento
    # Se overlap=0, simplesmente pula para o final do chunk atual
    start += chunk_size - overlap

# Agora "chunks" é uma lista de tensores, cada um com shape [1, 1, 128, chunk_size].
# Se quiser transformá-los em um único tensor com shape [N, 1, 128, chunk_size], faça:
chunks_tensor = torch.cat(chunks, dim=0)  # empilha ao longo do batch
print(chunks_tensor.shape)
# Exemplo: [82, 1, 128, 256], se tivermos 82 "fatias"


torch.Size([83, 1, 128, 256])


## Dataset

In [16]:
import os
import glob

import librosa
import numpy as np
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

class PairedInstrumentDataset(Dataset):
    def __init__(self, 
                 piano_dir, 
                 clarinet_dir, 
                 sr=22050, 
                 n_fft=2048, 
                 hop_length=512, 
                 n_mels=128, 
                 chunk_size=256, 
                 overlap=0,
                 size = 20):
        """
        Cria um dataset onde cada item é (chunk_piano, chunk_clarinet), usando
        a convenção de que os arquivos têm formato algo como 'piano.123.wav',
        e 'clarinet.123.wav', de forma que o inteiro após o primeiro ponto seja
        o ID da música.

        piano_dir      = caminho para a pasta com os .wav (ou .mp3 etc.) do piano
        clarinet_dir   = caminho para a pasta com os .wav do clarinete
        sr             = taxa de amostragem a ser utilizada
        n_fft, hop_length, n_mels = parâmetros para gerar Mel-spectrogram
        chunk_size     = quantos frames no eixo do tempo para cada chunk
        overlap        = quantidade de overlap (em frames) entre chunks
        """
        self.piano_dir = piano_dir
        self.clarinet_dir = clarinet_dir
        self.sr = sr
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.n_mels = n_mels
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.size = size
        # Vamos criar um dicionário de arquivos de clarinete indexados pelo ID
        clarinet_files = glob.glob(os.path.join(clarinet_dir, '*.wav'))
        clarinet_files += glob.glob(os.path.join(clarinet_dir, '*.mp3'))
        clarinet_map = {}
        count = 0
        for cfile in clarinet_files:
            if(count >= 20):
                break
            base_c = os.path.basename(cfile)
            parts_c = base_c.split('.')
            if len(parts_c) < 3:
                # Se não tiver esse formato, pula
                continue
            clar_id = parts_c[0]  # inteiro depois do primeiro ponto
            clarinet_map[clar_id] = cfile
            count+=1
        
        # Agora, percorremos os arquivos de piano, extraímos o ID e buscamos no clarinet_map
        piano_files = glob.glob(os.path.join(piano_dir, '*.wav'))
        piano_files += glob.glob(os.path.join(piano_dir, '*.mp3'))
        piano_files = sorted(piano_files)

        # Armazenaremos todos os pares (mel_chunk_piano, mel_chunk_clarinet) em uma lista
        self.pairs = []
        count = 0
        for piano_path in piano_files:
            if(count >= 20):
                break
            base_p = os.path.basename(piano_path)
            parts_p = base_p.split('.')
            piano_id = parts_p[0]

            if piano_id not in clarinet_map:
                continue

            clarinet_path = clarinet_map[piano_id]

            # 1) Gerar mel-spectrogram do piano
            mel_piano = self._load_and_mel(piano_path)
            
            # 2) Gerar mel-spectrogram do clarinete
            mel_clarinet = self._load_and_mel(clarinet_path)
            
            # Ajustar para terem mesmo tamanho no eixo do tempo
            mel_piano, mel_clarinet = self._match_time_length(mel_piano, mel_clarinet)
            
            # 3) Dividir em chunks
            chunks_piano = self._split_into_chunks(mel_piano)
            chunks_clarinet = self._split_into_chunks(mel_clarinet)
            
            # Esperamos que chunks_piano e chunks_clarinet tenham o mesmo número de janelas
            num_chunks = min(len(chunks_piano), len(chunks_clarinet))
            
            for i in range(num_chunks):
                self.pairs.append((chunks_piano[i], chunks_clarinet[i]))
            count+=1
        print(f"Total de pares gerados: {len(self.pairs)}")

    def _load_and_mel(self, audio_path):
        """Carrega o áudio e retorna um Mel-spectrogram em formato torch.Tensor 
           [1, n_mels, time_frames].
        """
        y, sr = librosa.load(audio_path, sr=self.sr)
        mel_spec = librosa.feature.melspectrogram(y=y, 
                                                  sr=sr, 
                                                  n_fft=self.n_fft,
                                                  hop_length=self.hop_length,
                                                  n_mels=self.n_mels)
        mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
        
        # Convertemos para tensor com shape [1, n_mels, time_frames]
        mel_tensor = torch.from_numpy(mel_spec_db).float().unsqueeze(0)
        # Fica [1, 128, T], por exemplo
        return mel_tensor

    def _match_time_length(self, mel_piano, mel_clarinet):
        """
        Faz com que os dois tensores [1, n_mels, T] tenham o mesmo T (time frames).
        Podemos cortar ou usar zero-padding para igualar.
        """
        time_piano = mel_piano.shape[-1]
        time_clarinet = mel_clarinet.shape[-1]
        
        if time_piano == time_clarinet:
            return mel_piano, mel_clarinet
        
        # Se não forem iguais, vamos usar o mínimo e cortar o excedente
        min_len = min(time_piano, time_clarinet)
        mel_piano = mel_piano[..., :min_len]
        mel_clarinet = mel_clarinet[..., :min_len]
        
        return mel_piano, mel_clarinet

    def _split_into_chunks(self, mel_tensor):
        """
        Divide o mel_tensor [1, n_mels, T] em janelas de chunk_size (com overlap, se definido).
        Retorna lista de tensores [1, n_mels, chunk_size].
        """
        _, n_mels, total_frames = mel_tensor.shape
        chunks = []
        
        start = 0
        while start < total_frames:
            end = start + self.chunk_size
            chunk = mel_tensor[..., start:end]  # shape [1, n_mels, (end-start)]
            
            chunk_len = chunk.shape[-1]
            if chunk_len < self.chunk_size:
                # Zero-pad no final
                pad_amount = self.chunk_size - chunk_len
                chunk = F.pad(chunk, (0, pad_amount))  # pad no eixo do tempo
            
            chunks.append(chunk)
            # avança para o próximo chunk considerando o overlap
            start += (self.chunk_size - self.overlap)
        
        return chunks

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        # Cada item é (chunk_piano, chunk_clarinet)
        return self.pairs[idx]


## Model def

In [19]:
import torch.nn as nn

class SimpleConditionalUNet(nn.Module):
    def __init__(self, n_mels=128):
        super().__init__()
        # Exemplo: Convolução 2D de entrada
        # e algumas camadas (muito simples)
        
        # Para condicionar, podemos inserir o chunk_piano como um canal extra,
        # ou passar via algum mecanismo de cross-attention, ou concatenar embeddings.
        
        self.net = nn.Sequential(
            nn.Conv2d(in_channels=2, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=1, kernel_size=3, padding=1),
        )
    
    def forward(self, x_noisy, x_cond):
        # x_noisy: [B, 1, n_mels, T]  (clarinet com ruído)
        # x_cond:  [B, 1, n_mels, T]  (piano)
        # Concatenar no canal
        inp = torch.cat([x_noisy, x_cond], dim=1)  # [B, 2, n_mels, T]
        
        out = self.net(inp)  # [B, 1, n_mels, T]
        return out
        
dataset = PairedInstrumentDataset(piano_dir= PIANO_PATH, clarinet_dir=CLARINET_PATH)

Total de pares gerados: 1157


## Training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random

# Hiperparâmetros de difusão (exemplo)
num_timesteps = 1000
device = 'cuda'
betas = torch.linspace(1e-4, 0.02, num_timesteps, device=device)  # apenas um exemplo
alphas = 1.0 - betas
alphas_cumprod = torch.cumprod(alphas, dim=0)

def forward_diffusion_sample(x0, t, device):
    # x0: [B, 1, n_mels, T]
    # t:  [B] (os timesteps escolhidos)
    # Retorna x_noisy e o ruído gerado
    
    # alphas_cumprod[t] -> shape [B], precisamos expandir para [B,1,1,1]
    # Pegamos alpha_t cumulativo
    alpha_t = alphas_cumprod[t].reshape(-1, 1, 1, 1).to(device)
    
    # ruído gaussiano
    eps = torch.randn_like(x0).to(device)
    
    x_noisy = torch.sqrt(alpha_t) * x0 + torch.sqrt(1 - alpha_t) * eps
    return x_noisy, eps

# Exemplo de loop de treinamento
model = SimpleConditionalUNet(n_mels=128).to('cuda')
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.MSELoss()

epochs = 10
fraction = 1
num_samples = int(len(dataset) * fraction)  # Número de amostras que queremos
print(f'Treinando com {num_samples}/{len(dataset)} musicas!')
indices = np.random.choice(len(dataset), num_samples, replace=False)

subset_dataset = Subset(dataset, indices)
dataloader = DataLoader(subset_dataset, batch_size=8, shuffle=True)

for epoch in range(epochs):
    for batch_idx, (chunk_piano, chunk_clarinet) in enumerate(dataloader):
        # chunk_piano: [B, 1, 128, T]
        # chunk_clarinet: [B, 1, 128, T]
        
        chunk_piano = chunk_piano.to('cuda')
        chunk_clarinet = chunk_clarinet.to('cuda')
        
        # 1) Escolher timesteps aleatórios para cada item no batch
        #    Por ex., t = [12, 987, 500, ...]
        t = torch.randint(0, num_timesteps, (chunk_piano.size(0),), device='cuda').long()
        
        # 2) Gerar x_noisy e eps
        x_noisy, eps = forward_diffusion_sample(chunk_clarinet, t, 'cuda')
        
        # 3) Predição do modelo: model tenta prever eps a partir de (x_noisy, chunk_piano)
        eps_pred = model(x_noisy, chunk_piano)
        
        # 4) Loss é MSE(eps_pred, eps)
        loss = criterion(eps_pred, eps)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if batch_idx % 10 == 0:
            print(f"Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}")

torch.save(model.state_dict(), MODEL_SAVEPATH)

Treinando com 1157/1157 musicas!


# Sample

In [14]:
@torch.no_grad()
def generate_full_spectrogram(model, piano_chunks, steps=1000):
    """
    Converte uma sequência de chunks de piano em chunks de clarinete usando difusão.
    
    Args:
        model: modelo de difusão treinado
        piano_chunks: lista de tensores [1, 1, 128, chunk_size]
        steps: número de passos na amostragem da difusão
        
    Retorna:
        full_spectrogram: tensor [1, 128, total_time]
    """
    clarinet_chunks = []

    # Hiperparâmetros de difusão (exemplo)
    num_timesteps = 1000
    device = 'cuda'
    betas = torch.linspace(1e-4, 0.02, num_timesteps, device=device)  # apenas um exemplo
    alphas = 1.0 - betas
    alphas_cumprod = torch.cumprod(alphas, dim=0)

    for chunk_piano in piano_chunks:
        chunk_piano = chunk_piano.to("cuda")  # Enviar para GPU se necessário

        # Começa com ruído gaussiano
        x = torch.randn_like(chunk_piano).to(chunk_piano.device)

        for i in reversed(range(steps)):
            t = torch.tensor([i], device=x.device).long()
            
            # Prediz eps (ruído) para esse timestep
            eps_pred = model(x, chunk_piano)

            # Obtém alpha e alpha_cumprod para esse timestep
            alpha = alphas[t]
            alpha_cum = alphas_cumprod[t]

            # Reverte um passo de ruído
            x = (1 / torch.sqrt(alpha[0])) * (x - (1 - alpha[0]) / torch.sqrt(1 - alpha_cum[0]) * eps_pred)

            # Adiciona ruído extra se i > 0 (DDPM)
            if i > 0:
                z = torch.randn_like(x)
                beta = betas[t]
                sigma = torch.sqrt(beta)
                x = x + sigma[0] * z

        clarinet_chunks.append(x)

    # Concatena todos os chunks no eixo do tempo
    full_spectrogram = torch.cat(clarinet_chunks, dim=-1)  # Concatena ao longo do tempo

    return full_spectrogram  # [1, 128, total_time]


# Inference

In [13]:
# Criar dataset de pares piano -> clarinete
dataset = PairedInstrumentDataset(
    piano_dir="data/piano_test",
    clarinet_dir="data/clarinet_test",
    chunk_size=256,
    overlap=0
)

# Pegamos um exemplo do dataset (um espectrograma de piano completo)
piano_full = dataset[0][0]  # [1, 128, total_time]



# Dividimos o espectrograma em chunks do mesmo jeito que fizemos no treinamento
def split_into_chunks(mel_tensor, chunk_size=256):
    """
    Divide um espectrograma completo em janelas menores para processamento por chunk.
    
    Args:
        mel_tensor: tensor [1, 128, total_time]
        chunk_size: tamanho de cada pedaço no eixo do tempo
    
    Retorna:
        Lista de tensores [1, 1, 128, chunk_size]
    """
    _, n_mels, total_frames = mel_tensor.shape
    chunks = []
    
    start = 0
    while start < total_frames:
        end = start + chunk_size
        chunk = mel_tensor[..., start:end]  # shape [1, 128, (end-start)]

        chunk_len = chunk.shape[-1]
        if chunk_len < chunk_size:
            pad_amount = chunk_size - chunk_len
            chunk = F.pad(chunk, (0, pad_amount))  # Preenche com zeros se necessário
        
        chunks.append(chunk.unsqueeze(0))  # Adiciona batch dim [1, 1, 128, chunk_size]
        start += chunk_size

    return chunks

# Criar chunks de piano
piano_chunks = split_into_chunks(piano_full)


Total de pares gerados: 181


In [15]:
# Carregar o modelo treinado
model = SimpleConditionalUNet(n_mels=128).to('cuda')
model.load_state_dict(torch.load(MODEL_SAVEPATH))
model.eval()

# Gerar espectrograma de clarinete a partir do piano
generated_spectrogram = generate_full_spectrogram(model, piano_chunks)

print(generated_spectrogram.shape)  # [1, 128, total_time]


torch.Size([1, 1, 128, 256])


In [20]:
import librosa
import librosa.display
import numpy as np
import soundfile as sf

def mel_to_audio(mel_spectrogram, sr=22050, n_fft=2048, hop_length=512):
    """
    Converte um mel-spectrograma de volta para áudio usando Griffin-Lim.
    
    Args:
        mel_spectrogram: tensor [1, 128, T]
    
    Retorna:
        Áudio reconstruído
    """
    mel_spectrogram = mel_spectrogram.squeeze(0).cpu().numpy()  # Remover batch dim
    mel_spectrogram = librosa.db_to_power(mel_spectrogram)  # Voltar para escala linear

    # Converter de mel para STFT
    stft = librosa.feature.inverse.mel_to_stft(mel_spectrogram, sr=sr, n_fft=n_fft)

    # Aplicar Griffin-Lim para estimar fase
    audio = librosa.griffinlim(stft, hop_length=hop_length)
    
    return audio

# Converter para áudio
audio_reconstructed = mel_to_audio(generated_spectrogram)
audio_reconstructed = np.array(audio_reconstructed, dtype=np.float32)
# Salvar como arquivo WAV
sf.write("tmp/output.wav", np.ravel(audio_reconstructed), 22050)

# Ouvir no Jupyter/Colab
import IPython.display as ipd
ipd.Audio(audio_reconstructed, rate=22050)
