In [1]:
# Base
import os

import torch
import librosa
import os
import h5py
from torchinfo import summary
from torch.utils.data import DataLoader
import sounddevice as sd
import numpy as np
import matplotlib.pyplot as plt

# Preprocessing, Metrics
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score

# CUDA: https://pytorch.org/get-started/locally/
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

ModuleNotFoundError: No module named 'torchinfo'

In [4]:
# Parameters
fs = 8000 # Sampling frequency
min_rec_len = 8032 # Minimum samples of recording - STFT output dimensions should be divisible by factor 32 (first conv layer no. of filters)

In [5]:
# Datase loading
def read_dataset(dataset):

    data = []
    data_prop = { "fs": [], "len": []}

    for path, _, files in os.walk(dataset):
        for name in files:

            # Skip transcriptions
            if "txt" not in name.lower(): 

                # Load file (sig-signal; sr-sampling rate) and downsample to 8 kHz
                y, sr = librosa.load(os.path.join(path, name), sr=fs)

                data_prop["fs"].append(sr)
                data_prop["len"].append(len(y) / sr)

                # Use whole signal!

                # Check length
                # Signals can be shorter than min requested
                if len(y) < min_rec_len:
                    print(f"Recording {name} is too short ({len(y)}) - padding with zeros ({min_rec_len - len(y)})")
                    y = np.append(y, np.zeros((min_rec_len - len(y), )))

                # Creating UrbanSound8K frames of min_rec_len
                for i in range (0, y.shape[0] - min_rec_len, min_rec_len):
                    data.append(y[i:i + min_rec_len])

    print(dataset)
    print(np.shape(data))
    print(f"Sampling freq: {np.mean(data_prop['fs'])} [{np.min(data_prop['fs'])} - {np.max(data_prop['fs'])}] Hz")
    print(f"Length: {np.mean(data_prop['len']) :.2f} [{np.min(data_prop['len']) :.2f} - {np.max(data_prop['len']) :.2f}] s")

    return data

In [8]:
# Load all recordings from LibriSpeech-dev
# Large-scale (1000 hours) corpus of read English speech
# http://openslr.elda.org/resources/12/dev-clean.tar.gz
data_libri = read_dataset("./librispeech/")

# Save to h5 file
hf = h5py.File('dataset_raw.h5', 'w')
hf.create_dataset('data_libri', data=data_libri)
hf.close()

./librispeech/


NameError: name 'np' is not defined

In [None]:
# Load dataset from h5 file
hf = h5py.File('dataset_raw.h5', 'r')

data_libri = hf.get('data_libri')
data_libri = np.array(data_libri)

# Samples, Length
print('data_libri size:', np.shape(data_libri))

hf.close()

In [9]:
# Add "noise" to speech
data_original = [] # Original speech
data_noisy = [] # Speech with added "noise" 

# STFT
# Output dimensions should be divisible by factor 32
n_fft = 1022
hop = 63

samples = 5000 # Number of samples

for i in range(0, samples):
    
    # Get random speech
    signal_index = np.random.randint(0, data_libri.shape[0])
    signal_original = data_libri[signal_index]

    # Signal power
    signal_original_power = np.mean(signal_original ** 2)

    # Calculate SNR
    snr_db = 30
    snr_linear = 10 ** (snr_db / 10)
    
    # Calculate noise power
    noise_power = signal_original_power / snr_linear
    
    # Generate Gaussian noise
    noise = np.random.normal(0, np.sqrt(noise_power), np.shape(signal_original))

    # Add noise to signal
    signal_noisy = signal_original + noise

    # STFT for original signal
    st_ft = librosa.amplitude_to_db(np.abs(librosa.stft(signal_original, n_fft=n_fft, hop_length=hop)), ref=np.max)
    data_original.append(st_ft)

    # STFT for noisy signal
    st_ft = librosa.amplitude_to_db(np.abs(librosa.stft(signal_noisy, n_fft=n_fft, hop_length=hop)), ref=np.max)
    data_noisy.append(st_ft)

# Samples, (STFT)
print('data_origi size:', np.shape(data_original))
print('data_noisy size:', np.shape(data_noisy))

# Save to h5 file
hf = h5py.File('dataset.h5', 'w')
hf.create_dataset('data_original', data=np.float32(data_original))
hf.create_dataset('data_noisy', data=np.float32(data_noisy))
hf.close()

NameError: name 'np' is not defined

In [None]:
# Load dataset from h5 file
hf = h5py.File('dataset.h5', 'r')

data_original = hf.get('data_original')
data_original = np.array(data_original)

data_noisy = hf.get('data_noisy')
data_noisy = np.array(data_noisy)

# Samples, (STFT)
print('data_original size:', np.shape(data_original))

# Samples, (STFT)
print('data_noisy size:', np.shape(data_noisy))

In [None]:
# Plot original
plt.figure(figsize=(16, 4))
plt.subplot(1,3,1)
plt.pcolormesh(data_original[0], shading='auto')
plt.title('Original')
plt.xlabel('Window')
plt.ylabel('FFT bin')
plt.colorbar()

# Plot noisy
plt.subplot(1,3,2)
plt.pcolormesh(data_noisy[0], shading='auto')
plt.title('Noisy')
plt.xlabel('Window')
plt.colorbar()
plt.show()

In [None]:
# Scale
scaler = MinMaxScaler()
data_input_sc = scaler.fit_transform(data_noisy.reshape(-1, data_noisy.shape[-1])).reshape(data_noisy.shape)
data_output_sc = scaler.fit_transform(data_original.reshape(-1, data_original.shape[-1])).reshape(data_original.shape)

# Split into test and train
X_train, X_test, y_train, y_test = train_test_split(data_input_sc, data_output_sc, test_size=0.2)

# Split into train and valid
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25)

# Sizes
print('Dims')
X_train = np.expand_dims(X_train, 1)
y_train = np.expand_dims(y_train, 1)
X_test = np.expand_dims(X_test, 1)
y_test = np.expand_dims(y_test, 1)
X_val = np.expand_dims(X_val, 1)
y_val = np.expand_dims(y_val, 1)

print('Train X:', np.shape(X_train))
print('Train Y:', np.shape(y_train))
print('Test X:', np.shape(X_test))
print('Test Y:', np.shape(y_test))
print('Val X:', np.shape(X_val))
print('Val Y:', np.shape(y_val))

In [None]:
class EncoderModel(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()

        # Convolutional layers
        self.conv1 = torch.nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1)
        self.batch1 = torch.nn.BatchNorm2d(32)
        self.relu1 = torch.nn.ReLU()
        self.norm1 = torch.nn.Dropout2d(0.15)
    
        # Flatten
        self.flatten = torch.nn.Flatten()

        # Linear layers
        self.linear1 = torch.nn.Linear(#calculate#, 128)
        self.linear2 = torch.nn.Linear(128, 1)
    
    def forward(self, x):
        # Convolutional layers
        x = self.conv1(x)
        x = self.batch1(x)
        x = self.relu1(x)
        x = self.norm1(x)

        # Flatten
        x = self.flatten(x)

        # Linear layers
        x = self.linear1(x)
        x = self.linear2(x)
        
        return x

# Check model summary
model_enc = EncoderModel()
summary(model_enc, input_size=(1, 1, 512, 128))

In [None]:
class DecoderModel(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()

        # Linear layers
        self.linear1 = torch.nn.Linear(1, 128)
        self.linear2 = torch.nn.Linear(128, #calculate#)

        # Convolutional layers
        self.conv1 = torch.nn.ConvTranspose2d(32, 32, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.batch1 = torch.nn.BatchNorm2d(32)
        self.relu1 = torch.nn.ReLU()
        self.norm1 = torch.nn.Dropout2d(0.15)

        # Head convolution
        self.output = torch.nn.Conv2d(32, 1, kernel_size=1, stride=1)
    
    def forward(self, x):
        # Linear layers
        x = self.linear1(x)
        x = self.linear2(x)

        # Reshape for Convolutional layers
        x = x.reshape(x.shape[0], 128, 64, 16)

        # Convolutional layers
        x = self.conv1(x)
        x = self.batch1(x)
        x = self.relu1(x)
        x = self.norm1(x)

        # Head convolution
        x = self.output(x)

        return x

# Check model summary
model_dec = DecoderModel()
summary(model_dec, input_size=(1, 1))

In [None]:
class AutoEncoder(torch.nn.Module):
    def __init__(self):
        super(AutoEncoder, self).__init__()
        self.encoder = EncoderModel()
        self.decoder = DecoderModel()
        
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Check model summary
model = AutoEncoder()
summary(model, input_size=(1, 1, 512, 128))

In [None]:
# Data generator for PyTorch model
class data_generator(torch.utils.data.Dataset):
    def __init__(self, inputs, outputs) -> None:
        super().__init__()

        self.inputs = inputs
        self.outputs = outputs

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, index):
        return self.inputs[index], self.outputs[index]

In [None]:
# Model
model = AutoEncoder()
model.to(device)

# Hyper parameters
num_epochs = 100
learning_rate = 1e-4
batch_size = 16

# Dataloaders
train_dataset = data_generator(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)

valid_dataset = data_generator(X_val, y_val)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)

# For mixed precision training
scaler = torch.amp.GradScaler(device)

# Functions
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Train
for e in range(num_epochs):

    # Set to train mode
    model.train()
    train_loss = 0.0

    for i, batch in enumerate(train_loader):

        # Input
        input = batch[0].to(device)
        output = batch[1].to(device)

        optimizer.zero_grad()

        # Implement the mixed precision training
        with torch.autocast("cuda"):
            pred = model(input)
            loss = loss_fn(pred, output)

        # Izracunaj gradiente vseh tenzorjev
        scaler.scale(loss).backward()

        # Posodobi vrednosti utezi
        scaler.step(optimizer)
        scaler.update()
        
        # Train loss
        train_loss += loss.item()*input.size(0)      
        
        # Remove unnecessary cache in CUDA memory
        torch.cuda.empty_cache()
        del pred, input, output
    
    train_loss /= len(train_loader)

    # # Validate
    # model.eval()
    # valid_loss = 0.0
       
    # for i, batch in enumerate(valid_loader):

    #    # Input
    #    input = batch[0].to(device)
    #    output = batch[1].to(device)
        
    #     with torch.no_grad():
    #         pred = model(input)
    #         loss = loss_fn(pred, output)
        
    #     valid_loss += loss.item()*input.size(0) 
                
    #     torch.cuda.empty_cache()
    #     del pred, input, output
    
    # valid_loss /= len(valid_loader)
    
    # Print
    #print(f"Epoch {e+1}/{num_epochs}\tTrain loss: {train_loss:.4f}\t Validation loss: {valid_loss:.4f}\tlr: {curr_lr:.4f}")
    print(f"Epoch {e+1}/{num_epochs}\tTrain loss: {train_loss:.4f}")

torch.save(model, "model")

In [None]:
# How can we reconstruct signal from STFT?
n_fft = 1022
hop = 63
data = data_libri[50]

# Play original
sd.play(np.array(data).flatten(), fs)
sd.wait()

# STFT
# For reconstruction we need to save the phase as well
st_ft = librosa.stft(data, n_fft=n_fft, hop_length=hop)
st_ft_mag, st_ft_pha = librosa.magphase(st_ft)
st_ft_mag = librosa.amplitude_to_db(st_ft_mag, ref=np.max)

# Scale
scaler = MinMaxScaler()
st_ft_mag_scaled = scaler.fit_transform(st_ft_mag.reshape(-1, st_ft_mag.shape[-1])).reshape(st_ft_mag.shape)

# Inverse scale
st_ft_mag_scaled = scaler.inverse_transform(st_ft_mag_scaled.reshape(-1, st_ft_mag_scaled.shape[-1])).reshape(st_ft_mag_scaled.shape)

# Inverse STFT
st_ft_mag = librosa.db_to_amplitude(st_ft_mag_scaled, ref=17.0)
st_ft = st_ft_mag * st_ft_pha
reconstructed = librosa.core.istft(st_ft, hop_length=hop)
reconstructed = np.array(reconstructed).flatten()

# Play reconstructed
sd.play(np.array(reconstructed).flatten(), fs)
sd.wait()

In [None]:
# Generate test signal
# Will be combined like: ...|speech|noisy speech|...|speech|noisy speech|...

# STFT
# Output dimensions should be divisible by factor 32
n_fft = 1022
hop = 63

# Final signal
test_signal = []
test_signal_parts_mag = []
test_signal_parts_pha = []

# Append 10 signals
signals = 10
for i in range(0, signals):

    # Get random speech
    signal_index = np.random.randint(0, data_libri.shape[0])
    signal_original = data_libri[signal_index]

    # Signal power
    signal_original_power = np.mean(signal_original ** 2)

    # Calculate SNR
    snr_db = 30
    snr_linear = 10 ** (snr_db / 10)
    
    # Calculate noise power
    noise_power = signal_original_power / snr_linear
    
    # Generate Gaussian noise
    noise = np.random.normal(0, np.sqrt(noise_power), np.shape(signal_original))

    # Add noise to signal
    signal_noisy = signal_original + noise

    # Append
    test_signal.append(signal_original)
    test_signal.append(signal_noisy)

    # Calculate STFT for original signal
    # For reconstruction we need to save the phase as well
    st_ft = librosa.stft(signal_original, n_fft=n_fft, hop_length=hop)
    st_ft_mag, st_ft_pha = librosa.magphase(st_ft)
    st_ft_mag = librosa.amplitude_to_db(st_ft_mag, ref=np.max)

    test_signal_parts_mag.append(st_ft_mag)
    test_signal_parts_pha.append(st_ft_pha)

    # Calculate STFT for noisy signal
    # For reconstruction we need to save the phase as well
    st_ft = librosa.stft(signal_noisy, n_fft=n_fft, hop_length=hop)
    st_ft_mag, st_ft_pha = librosa.magphase(st_ft)
    st_ft_mag = librosa.amplitude_to_db(st_ft_mag, ref=np.max)

    test_signal_parts_mag.append(st_ft_mag)
    test_signal_parts_pha.append(st_ft_pha)

test_signal = np.array(test_signal).flatten()
test_signal_parts_mag = np.array(test_signal_parts_mag)
test_signal_parts_pha = np.array(test_signal_parts_pha)

# Samples
print('test_signal size:', np.shape(test_signal))
print('test_signal_parts_mag size:', np.shape(test_signal_parts_mag))
print('test_signal_parts_pha size:', np.shape(test_signal_parts_pha))

In [None]:
# Play what we have generated
sd.play(test_signal, fs)
sd.wait()

In [None]:
# Scale
scaler = MinMaxScaler()
test_signal_parts = scaler.fit_transform(test_signal_parts_mag.reshape(-1, test_signal_parts_mag.shape[-1])).reshape(test_signal_parts_mag.shape)

# Add dimension
test_signal_parts = np.expand_dims(test_signal_parts, 1)

print(f"test_signal_parts: {np.shape(test_signal_parts)}")

# Load model
model = torch.load("model", weights_only=False)
model.to(device)
model.eval()

# Clean signals from frames
test_signal_parts_tensor = torch.from_numpy(np.float32(test_signal_parts)).to(device)
predicted = model(test_signal_parts_tensor)

# Remove second dimension
predicted = predicted[:, 0, :, :]
predicted = predicted.detach().cpu().numpy()

# Inverse scale
predicted = scaler.inverse_transform(predicted.reshape(-1, predicted.shape[-1])).reshape(predicted.shape)

print('predicted:', np.shape(predicted))

In [None]:
# Plot original
plt.figure(figsize=(16, 4))
plt.subplot(1,4,1)
plt.pcolormesh(test_signal_parts_mag[0], shading='auto')
plt.title('Original')
plt.xlabel('Window')
plt.ylabel('FFT bin')
plt.colorbar()

# Plot noisy
plt.subplot(1,4,2)
plt.pcolormesh(test_signal_parts_mag[1], shading='auto')
plt.title('Noisy')
plt.xlabel('Window')
plt.colorbar()

# Plot predicted original
plt.subplot(1,4,3)
plt.pcolormesh(predicted[0], shading='auto')
plt.title('Predicted/Original')
plt.xlabel('Window')
plt.colorbar()

# Plot predicted noisy
plt.subplot(1,4,4)
plt.pcolormesh(predicted[1], shading='auto')
plt.title('Predicted/Noisy')
plt.xlabel('Window')
plt.colorbar(label='Power [dB]')
plt.show()

In [None]:
# Inverse STFT
st_ft_mag = librosa.db_to_amplitude(predicted, ref=17.0)
st_ft = st_ft_mag * test_signal_parts_pha
reconstructed = librosa.core.istft(st_ft, hop_length=hop)
reconstructed = np.array(reconstructed).flatten()

print('reconstructed:', np.shape(reconstructed))

In [None]:
sd.play(np.array(reconstructed).flatten(), fs)
sd.wait()

In [None]:
sd.play(test_signal, fs)
sd.wait()