In [1]:
import import_ipynb

In [2]:
import tensorflow as tf
import tensorflow.keras as keras
from keras.losses import mean_squared_error
from keras import backend as K

import numpy as np
import random

In [3]:
from tensorflow.keras.layers import Lambda

In [4]:
import logging
logging.basicConfig(level=logging.INFO)

In [5]:
import import_ipynb
try:
    from bandERB import ERBBand, ERB_pro_matrix
except:
    from bandERB import ERBBand, ERB_pro_matrix

importing Jupyter notebook from bandERB.ipynb
importing Jupyter notebook from params.ipynb


In [6]:
from params import model_params
p = model_params('config.ini')

In [7]:
# import os
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [8]:
def as_complex(x):
    if x.dtype == tf.complex64 or x.dtype == tf.complex128:
        return x
    else:
        return tf.complex(x[...,-2], x[..., -1], name='as_complex')

def as_real(x):
    if x.dtype == tf.complex64 or x.dtype == tf.complex128:
        return tf.concat([tf.expand_dims(tf.math.real(x),axis=-1), 
                          tf.expand_dims(tf.math.imag(x),axis=-1)],axis=-1, name='as_real')
    else: return x

In [9]:
def gain_target_cal(clean_spec, noisy_spec, eps: float = 1e-12):
    p = model_params('config.ini')
    
    ERBB = ERBBand(N=p.nb_erb, high_lim=p.sr//2, NFFT=p.fft_size)
    
    ERB_Matrix = ERB_pro_matrix(ERBB, NFFT=p.fft_size, mode=0) #  ERB convert matrix
    ERBB_tf = tf.convert_to_tensor(ERB_Matrix, dtype=tf.float32)

#     clean_spec_amp = clean_spec[..., 0]**2 + clean_spec[..., 1]**2
#     noisy_spec_amp = noisy_spec[..., 0]**2 + noisy_spec[..., 1]**2
    
#     clean_power = clean_spec_amp @ ERBB_tf
#     noisy_power = noisy_spec_amp @ ERBB_tf
    
    clean_power = (clean_spec[:,:,:,0]**2 + clean_spec[:,:,:,1]**2) @ ERBB_tf
    noisy_power = (noisy_spec[:,:,:,0]**2 + noisy_spec[:,:,:,1]**2) @ ERBB_tf
#     noisy_power_sum = tf.tile(tf.reduce_sum(noisy_power, axis=-1, keepdims=True),[1,1,p.nb_erb])
    
    band_gain = tf.sqrt(clean_power/(noisy_power+eps))
    band_gain = tf.clip_by_value(band_gain, 0.0, 1.0)
    
    return band_gain

In [10]:
# # Construct your custom loss as a tensor
def MaskLoss(inputs, clean, noisy, factor, r=0.6, f_under=2.0):
    # Input mask shape: [B, T, F]

    g_t = gain_target_cal(clean, noisy) 
    g_p = inputs 

    tmp = (g_p** r) - (g_t** r)
    
#     if f_under != 1: tmp *= tf.where(g_p < g_t, f_under, 1.0)
    
    loss =  10*K.mean(K.square(K.square(tmp))) + K.mean(K.square(tmp))

    return loss * factor

In [11]:
# # Construct your custom loss as a tensor
# def MaskLoss(inputs, target, factor, r=0.6, f_under=2.0):
#     # Input mask shape: [B, T, F]

#     g_t = target
#     g_p = inputs 

#     tmp = tf.pow(g_p, r) - tf.pow(g_t, r)
    
# #     if f_under != 1: tmp *= tf.where(g_p < g_t, f_under, 1.0)

# #     loss =  K.mean(10*tf.pow(tmp,4)) + K.mean(tf.pow(tmp,2))
#     loss =  K.mean(tf.pow(tmp,2))
#     return loss 

In [12]:
class LocalSnrTarget():
    def __init__(
        self, ws: int = 20, db: bool = True, ws_ns= None, target_snr_range=None, eps: float = 1e-12):
        super().__init__()
        self.ws = self.calc_ws(ws)
        self.ws_ns = self.ws * 2 if ws_ns is None else self.calc_ws(ws_ns)
        self.db = db
        self.range = target_snr_range

    def calc_ws(self, ws_ms: int) -> int:
        # Calculates windows size in stft domain given a window size in ms
        p = model_params('config.ini')
        ws = ws_ms - p.fft_size / p.sr * 1000  # length ms of an fft_window
        ws = 1 + ws / (p.hop_size / p.sr * 1000)  # consider hop_size
        return max(int(round(ws)), 1)

    def forward(self, clean, noise, max_bin = None):
        # clean: [B, 1, T, F]
        # out: [B, T']
        clean = as_complex(clean)
        noise = as_complex(noise)
        
        if max_bin is not None:
            clean = clean[..., :max_bin]
            noise = noise[..., :max_bin]
        return (tf.clip_by_value(local_snr(clean, noise, window_size=self.ws, db=self.db, window_size_ns=self.ws_ns)[0]
            ,self.range[0], self.range[1]))

In [13]:
def _local_energy(x, ws: int):
    if (ws % 2) == 0:
        ws += 1
    ws_half = ws // 2
#     print(ws)
    x = tf.reduce_sum(tf.reduce_sum(x**2, -1), -1)
    shape = x.get_shape().as_list()
    
    x = tf.expand_dims(x, -1)
    x = tf.expand_dims(x, -1)
    
    
    win = tf.signal.hann_window(ws)
    if ws == 3:
        win = tf.constant([[0.0, 0.75, 0.75]]) 
    win = tf.reshape(win, (1, 1, ws))

    x_unfold = tf.squeeze(
                tf.image.extract_patches(x, sizes=[1,ws,1,1], strides=[1,1,1,1], 
                                           rates=[1,1,1,1], padding='SAME')
                , axis = -2)

    x_unfold = tf.multiply(x_unfold, win)
    
    x = tf.reduce_mean(x_unfold,-1)
    return x

In [14]:
def local_snr(clean, noise,
    window_size: int, db: bool = False,
    window_size_ns = None,
    eps: float = 1e-12,):
    # clean shape: [B, C, T, F]
    clean = as_real(clean)
    noise = as_real(noise)

    assert len(clean.get_shape()) == 4

    E_speech = _local_energy(clean, window_size)
    window_size_ns = window_size if window_size_ns is None else window_size_ns
    E_noise = _local_energy(noise, window_size_ns)
    
    snr = tf.divide(E_speech, (E_noise + eps))
    if db:
#         snr = 10*  tf.divide(tf.math.log(snr+eps), tf.math.log(10) )
        snr = Lambda(lambda v: 10 * tf.experimental.numpy.log10(
                                        tf.cast(v, dtype=tf.float32))
                                        )(snr+eps)
    return snr, E_speech, E_noise

In [86]:
def lsnr_mapping(lsnr, lsnr_thresh: float, lsnr_min = None):
        """Map lsnr_min to 1 and lsnr_thresh to 0"""
        # s = a * lsnr + b
        lsnr_min = float(-10.0) if lsnr_min is None else lsnr_min
        a_ = 1 / (lsnr_thresh - lsnr_min)
        b_ = -a_ * lsnr_min
        return 1 - tf.clip_by_value(a_ * lsnr + b_, 0.0, 1.0)
    
# Construct your custom loss as a tensor
def DfAlphaLoss(pred_alpha, target_lsnr, factor, lsnr_thresh=-7.5, lsnr_min=-10.0):
    """Add a penalty to use DF for very noisy segments.
    Starting from lsnr_thresh, the penalty is increased and has its maximum at lsnr_min.
    """
    # pred_alpha: [B, T, 1]  # target_lsnr: [B, T]

    # loss for lsnr < -5 -> penalize DF usage
    shape = pred_alpha.get_shape().as_list()
    w = tf.reshape(lsnr_mapping(target_lsnr, lsnr_thresh, lsnr_min),(-1,shape[1],shape[2]))
    l_off = K.mean(tf.pow(pred_alpha * w,2))

    # loss for lsnr > 0
    w = tf.reshape(lsnr_mapping(target_lsnr, lsnr_thresh + 2.5, 0.0),(-1,shape[1],shape[2]))
    l_on = 0.1 * K.mean(tf.abs((1 - pred_alpha) * w))
    return (l_off + l_on) * factor

In [16]:
# Construct your custom loss as a tensor
def SpectralLoss(inputs, target, gamma=0.6, factor_mag=1.0, factor_img=1.0, eps: float = 1e-12):
    inputs = as_complex(inputs)
    target = as_complex(target)

    input_abs = tf.abs(inputs)
    target_abs = tf.abs(target)
    
    if gamma != 1:
        input_abs = tf.where(input_abs<eps, eps, input_abs)
        target_abs = tf.where(target_abs<eps, eps, target_abs)
        input_abs = tf.pow(input_abs,gamma)
        target_abs = tf.pow(target_abs,gamma)
        
    loss = K.mean(tf.pow(target_abs-input_abs,2)) * factor_mag
    
    if factor_img>0:
        if gamma != 1:
            inputs = tf.complex(input_abs, 0.0) * tf.math.exp(tf.complex(0.0,tf.math.angle(inputs + eps)))
            target = tf.complex(target_abs, 0.0) * tf.math.exp(tf.complex(0.0,tf.math.angle(target + eps)))
        loss_c = K.mean(tf.pow(as_real(target)-as_real(inputs),2)) * factor_img
        loss += loss_c
    return loss

In [17]:
# def SISNR_Loss(x, s, eps=1e-8, remove_dc=True):
#     """
#     Compute SI-SNR
#     Arguments:
#         x: vector, enhanced/separated signal
#         s: vector, reference signal(ground truth)
#     """
    
#     # zero mean, seems do not hurt results
#     x_zm = x - K.mean(x, -1, keepdims=True)
#     s_zm  = s - K.mean(s, -1, keepdims=True)
        
#     s_target = K.sum(x_zm * s_zm, axis=1, keepdims=True)  # [1, B]
#     s_zm_norm = K.sum(s_zm ** 2, axis=1, keepdims=True) + eps  # [1, B]
    
#     proj = s_target * s_zm / s_zm_norm  # [T, B]
    
#     # e_noise = s' - s_zm
#     e_noise = x_zm - proj  # [T, B]
    
#     # SI-SNR = 10 * log_10(||s_zm||^2 / ||e_noise||^2)
#     si_snr_beforelog = K.sum(proj ** 2, axis=1) / (K.sum(e_noise ** 2, axis=1) + eps)
#     si_snr = 10 * tf.experimental.numpy.log10(si_snr_beforelog + eps)  # [B]

#     return -K.mean(si_snr)

In [18]:
def SISNR_Loss(input, target, eps=1e-12):
    # Input shape: [B, T]
    # Einsum for batch vector dot product
#     input = input - K.mean(input, -1, keepdims=True)
#     target  = target - K.mean(target, -1, keepdims=True)
    
    Rss = tf.expand_dims(tf.einsum("bi,bi->b", target, target), -1) # dot product
    a = tf.expand_dims(tf.einsum("bi,bi->b", target, input), -1) / (Rss+eps) # dot product

    e_true = a * target
    e_res = input - e_true
    Sss = e_true **2
    Snn = e_res **2
    # Only reduce over each sample. Supposed to be used when used as a metric.
    Sss = tf.reduce_sum(Sss,-1)
    Snn = tf.reduce_sum(Snn,-1)
    return -K.mean(10 * tf.experimental.numpy.log10(Sss/(Snn+eps)+eps))