In [82]:
import numpy as np
import tensorflow as tf
from tqdm import tqdm
import os
os.chdir('../Python')

import TrainingsDataInterface
import Constants
import RTISI
import WaveInterface

In [74]:
LengthOfFilterInSamples = 501

def LowpassFilter(fc, r):
    assert fc < r/2, 'violation of sampling theorem'
    n = np.arange(LengthOfFilterInSamples) - np.floor(LengthOfFilterInSamples / 2)
    t = n / r
    h_LP = np.sinc(2 * fc * t)
    w = 0.5 * (1 + np.cos(np.pi * n / LengthOfFilterInSamples)) # Hann-Window
    h = w * h_LP
    h /= np.sum(h**2)
    return h

def ApplyFCenter(h, f_center, r):
    t = np.arange(h.shape[0]) / r
    t -= np.mean(t)
    return h * np.cos(2*np.pi*f_center*t)

def BandpassFilter(f_low, f_high, r):
    assert f_high > f_low, 'lower frequency must be lower than higher frequency'
    assert f_high < r/2, 'violation of sampling theorem'
    h_LP = LowpassFilter((f_high - f_low)/2, r)
    f_center = (f_low + f_high) / 2
    return 2*ApplyFCenter(h_LP, f_center, r)

def HighpassFilter(fc, r):
    assert fc < r/2, 'violation of sampling theorem'
    h_LP = LowpassFilter(r/2 - fc, r)
    f_center = r/2
    return ApplyFCenter(h_LP, f_center, r)

ATrainingsDataInterface = TrainingsDataInterface.CTrainingsDataInterface()
[x, samplerate, bits] = ATrainingsDataInterface.GetWaveOfCommandInstance(0, 0)
assert np.abs(samplerate - r) < 1e-1, 'wrong sampling rate'

h_BP = BandpassFilter(50, 7000, r)
#h = LowpassFilter(7000, r)
#h = HighpassFilter(50, r)
z = np.convolve(x, h)
T = h_BP.shape[0] // 2
print('delay due to filtering in samples: T = ', T)
z = z[T:x.shape[0]+T]
a = np.sum(x*z) / np.sum(x**2)
print('gain due to filtering = ', a)
z /= a
SNR = 10*np.log10(np.sum(x**2) / np.sum((x - z)**2))
print('SNR after the bandpass: ', SNR, ' dB')

delay due to filtering in samples: T =  250
gain due to filtering =  1.0029705994993294
SNR after the bandpass:  26.59551161636277  dB


In [115]:
def Bark2KiloHertz(b):
    return 1.96 * (b + 0.53) / (26.28 - b)

def MelFilterBank(NumberOfCenterFrequenciesPerBark, NumberOfInputFrequencyBins, SamplingRate):
    FFTLen = 2 * NumberOfInputFrequencyBins - 2
    Deltaf = SamplingRate / FFTLen
    f = np.arange(NumberOfInputFrequencyBins) * Deltaf
    CutoffFrequenciesInBark = np.arange(24*NumberOfCenterFrequenciesPerBark+1) / NumberOfCenterFrequenciesPerBark
    CutoffFrequenciesInHertz = Bark2KiloHertz(CutoffFrequenciesInBark) * 1000
    CenterFrequenciesInHertz = np.diff(CutoffFrequenciesInHertz) / 2
    CenterFrequenciesInHertz += CutoffFrequenciesInHertz[0:CutoffFrequenciesInHertz.shape[0]-1]
    T_Hertz2Bark = np.zeros((CenterFrequenciesInHertz.shape[0], NumberOfInputFrequencyBins))
    for b in range(T_Hertz2Bark.shape[0]):
        m1 = (1 - 1/np.sqrt(2)) / (CenterFrequenciesInHertz[b] - CutoffFrequenciesInHertz[b]) # first derivative of first line
        m2 = (1 - 1/np.sqrt(2)) / (CenterFrequenciesInHertz[b] - CutoffFrequenciesInHertz[b+1]) # first derivative of second line
        assert m1 > 0, 'm1 must be greater 0'
        assert m2 < 0, 'm2 must be smaller 0'
        b1 = 1 - m1 * CenterFrequenciesInHertz[b] # offset of first line
        b2 = 1 - m2 * CenterFrequenciesInHertz[b] # offset of second line
        assert b1 < 1/np.sqrt(2), 'b1 must be smaller than 1/sqrt(2)'
        assert b2 > 0, 'b2 must be greater 0'
        v1 = m1 * f + b1
        v2 = m2 * f + b2
        v3 = np.minimum(v1, v2)
        v4 = np.maximum(v3, 0.0)
        f1 = -b1 / m1 # zero crossing of the first line
        f2 = -b2 / m2 # zero crossing of the second line
        T_Hertz2Bark[b, :] = v4 / (0.5*(f2 - f1))
    assert np.amin(T_Hertz2Bark) >= 0.0, 'T_Hertz2Bark must be greater or equal zero'
    return T_Hertz2Bark, CenterFrequenciesInHertz

class CTransformSpectralEnvelope(object):
    
    def __init__(self):
        self.__NumberOfBandsPerBark = -1.0
        self.__XShape0 = 0
    
    def __EvaluateTransformMatrix(self, SamplingRate):
        T_Hertz2Bark, CenterFrequenciesInHertz = MelFilterBank(NumberOfCenterFrequenciesPerBark = self.__NumberOfBandsPerBark, NumberOfInputFrequencyBins = self.__XShape0, SamplingRate = SamplingRate)
        D = np.eye(T_Hertz2Bark.shape[0])
        for d in range(D.shape[0]):
            #D[d, d] = 1/np.sum(T_Hertz2Bark[d, :]**2)
            D[d, d] = 1 / (np.sqrt(np.sum(T_Hertz2Bark[d, :]**2))+1e-16)
        #self.__T = np.matmul(np.transpose(T_Hertz2Bark), np.matmul(D, T_Hertz2Bark))
        self.__T = np.matmul(D, T_Hertz2Bark)
        #self.__T = T_Hertz2Bark
        assert np.amin(D) >= 0.0, 'D must be greater or equal zero'
        assert np.amin(T_Hertz2Bark) >= 0.0, 'T_Hertz2Bark must be greater or equal zero'
        assert np.amin(self.__T) >= 0.0, 'self.__T must be greater or equal zero'
    
    def TransformSpectralEnvelope(self, X, SamplingRate, NumberOfBandsPerBark):
        assert NumberOfBandsPerBark > 0, 'only positive number of bands per Bark are reasonable'
        SomethingChanged = np.abs(NumberOfBandsPerBark - self.__NumberOfBandsPerBark) > 1e-3
        SomethingChanged = SomethingChanged or (np.abs(self.__XShape0 - X.shape[0]) > 1e-3)
        if SomethingChanged:
            self.__NumberOfBandsPerBark = NumberOfBandsPerBark
            self.__XShape0 = X.shape[0]
            self.__EvaluateTransformMatrix(SamplingRate)
        return np.matmul(self.__T, X)  

    def InverseTransformSpectralEnvelope(self, X):
        return np.matmul(np.transpose(self.__T), X)

ATransformSpectralEnvelope = CTransformSpectralEnvelope()

NumberOfBandsPerBark = 27.0
HopSizeInMilliseconds = 10
OverlappingInPercent = 50
hs = int(Fs * HopSizeInMilliseconds / 1000)
ws = int(hs / (1 - OverlappingInPercent/100))
w_Rectangular = np.ones((ws))
w_Hann = 0.5 - 0.5 * np.cos(2*np.pi*(np.arange(ws)+0.5)/ws)

def EvalLogMelSpectrogram(x, Fs):    
    FFTLen = 2*int(2**np.ceil(np.log2(ws)))
    NumberOfBlocks = int((x.shape[0] - ws) / hs)
    X = None
    for BlockNumber in range(NumberOfBlocks):
        idx1 = BlockNumber * hs
        idx2 = idx1 + ws
        BlockAnalysis = x[idx1:idx2] * w_Hann
        SpectrumAnalysis = np.abs(np.fft.rfft(BlockAnalysis, n = FFTLen))
    
        SmoothedEnvelopeSpectrum = ATransformSpectralEnvelope.TransformSpectralEnvelope(SpectrumAnalysis,
                                                                                        SamplingRate = Fs,
                                                                                        NumberOfBandsPerBark = NumberOfBandsPerBark)
        assert np.amin(SmoothedEnvelopeSpectrum) >= 0, 'SmoothedEnvelopeSpectrum must be greater or equal zero'
        if X is None: X = np.zeros((SmoothedEnvelopeSpectrum.shape[0], NumberOfBlocks))
        X[:, BlockNumber] = np.log(SmoothedEnvelopeSpectrum + 1e-16)
    return X

ARTISI = RTISI.CRTISI(hs, w_Rectangular, w_Hann)
def EvalTimeDomainSignal(X):
    x = np.zeros(((X.shape[1] - 1) * hs + ws))
    for BlockNumber in range(X.shape[1]):
        idx1 = BlockNumber * hs
        idx2 = idx1 + ws
        SpectrumSynthesis = ATransformSpectralEnvelope.InverseTransformSpectralEnvelope(np.exp(X[:, BlockNumber]))
        BlockSynthesis = ARTISI.ProcessNewColumnOfSpectrogram(SpectrumSynthesis)
        x[idx1:idx2] += BlockSynthesis
    return x

In [104]:
TimeMemoryOfInput = 5
TrainingsCounter = 0
Input = np.zeros((100000, X_WB.shape[0], TimeMemoryOfInput))
Output = np.zeros((Input.shape[0], X_WB.shape[0]))

for CommandIndex in tqdm(range(10)):#range(ATrainingsDataInterface.GetNumberOfCommands()):
    for InstanceIndex in range(10):#range(ATrainingsDataInterface.GetNumberOfCommandInstances(CommandIndex)):
        x, Fs, bits = ATrainingsDataInterface.GetWaveOfCommandInstance(CommandIndex, InstanceIndex)
        x_WB = np.convolve(x, h_WB)
        x_NB = np.convolve(x, h_NB)
        
        X_WB = EvalLogMelSpectrogram(x_WB, Fs)
        X_NB = EvalLogMelSpectrogram(x_NB, Fs)
        assert X_WB.shape == X_NB.shape, 'wideband and narrowband must have same shape'
        
        idx1 = 0
        idx2 = Input.shape[2]
        while idx2 < X_WB.shape[1]:
            Input[TrainingsCounter, :, :] = X_NB[:, idx1:idx2]
            Output[TrainingsCounter, :] = X_WB[:, idx2]
            idx1 += 1
            idx2 += 1
            TrainingsCounter += 1

# partitioning in training, validation, test
PercentageTraining = 0.8
PercentageValidation = 0.1
PercentageTest = 1.0 - PercentageTraining - PercentageValidation
assert PercentageTest > 0.0, 'wrong partitioning between training, validation and test'
LastIndexTraining = int(TrainingsCounter * PercentageTraining)
LastIndexValidation = int(TrainingsCounter * (PercentageTraining + PercentageValidation))
Input_Training = Input[:LastIndexTraining, ...]
Output_Training = Output[:LastIndexTraining, ...]
Input_Validation = Input[LastIndexTraining:LastIndexValidation, ...]
Output_Validation = Output[LastIndexTraining:LastIndexValidation, ...]
Input_Test = Input[LastIndexValidation:TrainingsCounter, ...]
Output_Test = Output[LastIndexValidation:TrainingsCounter, ...]

print(Input_Training.shape)
print(Output_Training.shape)

(12760, 648, 5)
(12760, 648)


The mean squared error is typically used as a loss function if a target function should be approximated. The mean squared error is defined as follows:

$L=\frac{1}{2}\sum_j \left(z_j-o_j\right)^2$

with $o_j$ corresponding to the correct output and $z_j$ corresponding to the current output of the neural network during training.

In [123]:
model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=(Input_Training.shape[1], Input_Training.shape[2])),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dense(500, activation='relu'),
    tf.keras.layers.Dense(200, activation='relu'),
    tf.keras.layers.Dense(50, activation='relu'),
    tf.keras.layers.Dense(Output_Training.shape[1], activation=None)
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.MeanSquaredError())

In [135]:
try:
    os.mkdir("NeuralNetworks/ArtificialWidebandExtension")
except:
    pass
checkpoint_path = "NeuralNetworks/ArtificialWidebandExtension/cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
cbCheckpoints = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=True,
                                                 verbose=1)
cbEarlyStopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)

try:
    model.load_weights(checkpoint_path)
except:
    print('problem loading old weights, starting with scratch new network')

In [136]:
history = model.fit(Input_Training, Output_Training, epochs=1000,
                    validation_data=(Input_Validation, Output_Validation),
                    callbacks=[cbEarlyStopping, cbCheckpoints], verbose = 1)
print('training finished after ', len(history.history['loss']), ' epochs')

Epoch 1/1000
Epoch 1: saving model to NeuralNetworks/ArtificialWidebandExtension\cp.ckpt
Epoch 2/1000

KeyboardInterrupt: 

In [126]:
SNR = 0.0
y = model.predict(Input_Test)
for SampleIndex in range(Input_Test.shape[0]):
    x = Output_Test[SampleIndex, :]
    SNR += 10*np.log10(np.sum(x**2) / np.sum((x-y[SampleIndex, :])**2))
SNR /= Input_Test.shape[0]
print('mean SNR of prediction = ', SNR, ' dB')

mean SNR of prediction =  27.7673857263767  dB


In [129]:
def EstimateArtificialWidebandExtension(X):
    Input = np.zeros((X.shape[1] - TimeMemoryOfInput, X.shape[0], TimeMemoryOfInput))
    for n in range(TimeMemoryOfInput, X.shape[1]):
        Input[n-TimeMemoryOfInput, :, :] = X[:, n-TimeMemoryOfInput:n]
    Output = model.predict(Input, verbose = 0)
    Y = np.zeros(X.shape)
    for n in range(TimeMemoryOfInput, X.shape[1]):
        Y[:, n] = Output[n-TimeMemoryOfInput, :]
    return Y
    
#x, Fs, bits = WaveInterface.ReadWave('../Audio/P501_D_EN_fm_SWB_48k.wav')
x, Fs, bits = WaveInterface.ReadWave('../Audio/Malmsheimer48kHz.wav')
#x, Fs, bits = ATrainingsDataInterface.GetWaveOfCommandInstance(0, 0)
if x.shape[0] > 4*Fs: x = x[0:4*Fs]
x_WB = np.convolve(x, h_WB)
x_NB = np.convolve(x, h_NB)

X_WB = EvalLogMelSpectrogram(x_WB, Fs)
X_NB = EvalLogMelSpectrogram(x_NB, Fs)
X_EB = EstimateArtificialWidebandExtension(X_NB)

y_WB = EvalTimeDomainSignal(X_WB)
y_NB = EvalTimeDomainSignal(X_NB)
y_EB = EvalTimeDomainSignal(X_EB)
WaveInterface.WriteWave(y_WB * 0.9 / np.amax(np.abs(y_WB)), Fs, bits, 'output_WB.wav')
WaveInterface.WriteWave(y_NB * 0.9 / np.amax(np.abs(y_WB)), Fs, bits, 'output_NB.wav')
WaveInterface.WriteWave(y_EB * 0.9 / np.amax(np.abs(y_EB)), Fs, bits, 'estimation_WB.wav')