In [None]:
import os
import numpy as np
from collections import defaultdict

import numpy as np
from scipy.interpolate import interp1d

import numpy as np

def pad_or_truncate(arr: np.ndarray, target_len: int) -> np.ndarray:

    T, D = arr.shape
    if T == target_len:
        return arr
    elif T < target_len:
        pad_len = target_len - T
        pad = np.zeros((pad_len, D), dtype=arr.dtype)
        return np.concatenate([arr, pad], axis=0)
    else:  # T > target_len
        return arr[:target_len]



X = []
y = []
uids = []
for i in ['dev_clean', 'train_clean_100', 'dev_other', 'test_clean', 'test_other']:
    folder = f"librispeech_ppg_raw/{i}"
    for filename in os.listdir(folder):
        if not filename.endswith(".npy"):
            continue

        name_without_ext = filename[:-4]  # "FAEM0_1"

        path = os.path.join(folder, filename)
        arr = np.load(path)
        target = np.load(f'/home/main/Desktop/librispeech_mcc/{name_without_ext[4:]}-feat.npy')


        #i_vec = np.tile(result[uid], (arr.shape[0], 1))  # (5, 3)
        #arr = np.hstack([arr, i_vec])
        #arr = np.concat([arr, i_vec], axis=1)
        arr = pad_or_truncate(arr, len(target))

        X.append(arr)
        y.append(target)

        uids.append(name_without_ext[4:])



In [19]:
import torch
X = [torch.tensor(arr, dtype=torch.float32) for arr in X]
y = [torch.tensor(arr, dtype=torch.float32) for arr in y]

# X_test = [torch.tensor(arr, dtype=torch.float32) for arr in X_test]
# y_test = [torch.tensor(arr, dtype=torch.float32) for arr in y_test]

In [None]:
# import torch
# import random

# # Fix seed
# random.seed(42)

# # Random selection out of 10% in all indexs
# num_samples = len(X)
# test_size = int(num_samples * 0.1)
# test_indices = random.sample(range(num_samples), test_size)

# # Structure test set
# X_test = [X[i] for i in test_indices]
# y_test = [y[i] for i in test_indices]


In [21]:
# train_indices = list(set(range(num_samples)) - set(test_indices))
# X = [X[i] for i in train_indices]
# y = [y[i] for i in train_indices]


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torch.nn.utils.rnn import pad_sequence


# hyper parameters
input_dim = 144
hidden_dim = 256
output_dim = 40
num_layers = 2
bidirectional = True
batch_size = 32
learning_rate = 0.001
num_epochs = 20
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1. Custom Dataset
class SequenceDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


# collate_fn definition
def collate_fn_pad(batch):
    # batch: list of tuples (x_i, y_i), where each x_i: [Ti, input_dim], y_i: [Ti, output_dim]
    x_list, y_list = zip(*batch)  # unzip
    lengths = [x.shape[0] for x in x_list]

    # padding
    x_padded = pad_sequence(x_list, batch_first=True)  # [batch, max_len, input_dim]
    y_padded = pad_sequence(y_list, batch_first=True)  # [batch, max_len, output_dim]

    lengths = torch.tensor(lengths, dtype=torch.long)  # [batch]

    return x_padded, y_padded, lengths


# 2. Model definition
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

class ManyToManyDBLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, bidirectional):
        super(ManyToManyDBLSTM, self).__init__()
        self.lstm = nn.LSTM(
            input_dim,
            hidden_dim,
            num_layers=num_layers,
            bidirectional=bidirectional,
            batch_first=True
        )
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)

    def forward(self, x, lengths):
        # lengths: [batch] (CPU Tensor)
        packed = pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_out, _ = self.lstm(packed)
        unpacked, _ = pad_packed_sequence(packed_out, batch_first=True)  # [batch, max_len, hidden_dim*2]
        output = self.fc(unpacked)  # [batch, max_len, output_dim]
        return output


# 3. train/val split
X_train, X_val, y_train, y_val, uids_train, uids_val= train_test_split(X, y,uids, test_size=0.1, random_state=42)

train_dataset = SequenceDataset(X_train, y_train)
val_dataset = SequenceDataset(X_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,
                          drop_last=False, collate_fn=collate_fn_pad)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False,
                        drop_last=False, collate_fn=collate_fn_pad)


# 4. Prepare Train
model = ManyToManyDBLSTM(input_dim, hidden_dim, output_dim, num_layers, bidirectional).to(device)
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)

# 5. Train Loop
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for xb, yb, lengths in train_loader:
        xb, yb, lengths = xb.to(device), yb.to(device), lengths.to(device)

        optimizer.zero_grad()
        outputs = model(xb, lengths)
        loss = criterion(outputs, yb)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()


    # Validation Loop
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for xb, yb, lengths in val_loader:
            xb, yb, lengths = xb.to(device), yb.to(device), lengths.to(device)

            outputs = model(xb, lengths)
            loss = criterion(outputs, yb)
            val_loss += loss.item()

    print(f"[Epoch {epoch+1}/{num_epochs}] Train Loss: {train_loss/len(train_loader):.4f} | Val Loss: {val_loss/len(val_loader):.4f}")


[Epoch 1/20] Train Loss: 0.0380 | Val Loss: 0.0468
[Epoch 2/20] Train Loss: 0.0273 | Val Loss: 0.0435
[Epoch 3/20] Train Loss: 0.0255 | Val Loss: 0.0418
[Epoch 4/20] Train Loss: 0.0246 | Val Loss: 0.0401
[Epoch 5/20] Train Loss: 0.0236 | Val Loss: 0.0392
[Epoch 6/20] Train Loss: 0.0226 | Val Loss: 0.0376
[Epoch 7/20] Train Loss: 0.0221 | Val Loss: 0.0367
[Epoch 8/20] Train Loss: 0.0213 | Val Loss: 0.0356
[Epoch 9/20] Train Loss: 0.0206 | Val Loss: 0.0338
[Epoch 10/20] Train Loss: 0.0201 | Val Loss: 0.0328
[Epoch 11/20] Train Loss: 0.0196 | Val Loss: 0.0330
[Epoch 12/20] Train Loss: 0.0191 | Val Loss: 0.0318
[Epoch 13/20] Train Loss: 0.0187 | Val Loss: 0.0319
[Epoch 14/20] Train Loss: 0.0182 | Val Loss: 0.0307
[Epoch 15/20] Train Loss: 0.0178 | Val Loss: 0.0304
[Epoch 16/20] Train Loss: 0.0177 | Val Loss: 0.0305
[Epoch 17/20] Train Loss: 0.0173 | Val Loss: 0.0296
[Epoch 18/20] Train Loss: 0.0172 | Val Loss: 0.0307
[Epoch 19/20] Train Loss: 0.0175 | Val Loss: 0.0293
[Epoch 20/20] Train L

In [41]:
# test_dataset = SequenceDataset(X_test, y_test)

# test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False,
#                         drop_last=False, collate_fn=collate_fn_pad)


In [42]:
model.eval()
y_pred = []

with torch.no_grad():
    for xb, yb, lengths in val_loader:
        xb, yb, lengths = xb.to(device), yb.to(device), lengths.to(device)

        outputs = model(xb, lengths)
        y_pred.append(outputs)

In [24]:
len(uids_val)

3967

In [25]:
uids_val[100]

'2428-83705-0009'

In [13]:
print(X[10].shape, y[10].shape)

torch.Size([838, 144]) torch.Size([838, 127])


In [None]:
import numpy as np
import pyworld as pw
import pysptk
import soundfile as sf
import scipy.ndimage
import torch

# diffsptk MLPG import
from diffsptk import MLPG
import diffsptk
#wav_path = "kaldi/egs/timit/s5/data/TIMIT/TEST/DR4/MLLL0/SX283.WAV"

test_index = 100

uid = uids_val[test_index]
uid_split = uid.split('-')

import os

for i in ['dev-clean', 'dev-other', 'test-clean', 'test-other', 'train-clean-100']:
    wav_path = f"/home/main/Desktop/kaldi/egs/librispeech/s5/data/{i}/LibriSpeech/{i}/{uid_split[0]}/{uid_split[1]}/{uid}.flac"

    if os.path.exists(wav_path):
        print(f"File already exists.\n{uid}\n{i}")
        break
    else:
        print("File doesn't exist.")


x, fs = sf.read(wav_path)
frame_period = 10.0

# 1. WORLD 분석
f0, timeaxis = pw.harvest(x, fs, frame_period=frame_period)
f0 = pw.stonemask(x, f0, timeaxis, fs)
sp = pw.cheaptrick(x, f0, timeaxis, fs)
ap = pw.d4c(x, f0, timeaxis, fs)

frame_period = 10.0
fs = 16000

#features = np.hstack([mcc, delta, delta_delta])
features_torch = y_pred[test_index].float().to('cpu')#.unsqueeze(0)
features_torch = torch.concat([features_torch[:,:,:40], features_torch[:,:,42:42+40], features_torch[:,:,84:84+40]], axis=2)
#features_torch = torch.from_numpy(features).float().unsqueeze(0)  # shape: (1, T, 3*D)

T = features_torch.shape[1]
mlpg = MLPG(size=T)  
smoothed = mlpg(features_torch)  # shape: (1, T, D)

smoothed_mcc = smoothed.squeeze(0).numpy().T  # shape: (D, T)


fftlen = 1024

sp_recon = np.array([
    pysptk.mc2sp(m, alpha=0.42, fftlen=fftlen)
    for m in y_pred[test_index].squeeze(0).cpu().numpy()
])


wav = pw.synthesize(f0, sp_recon.astype(np.double), ap, fs, frame_period=frame_period)

sf.write(f"reconstructed_mlpg_{test_index}.wav", wav, fs)
sf.write(f"reconstructed_mlpg_{test_index}_real.wav", x, fs)



파일이 존재합니다.
2428-83705-0009
dev-clean
