## Visualization of the label inference for TF representations
### Efthymios Tzinis

In [None]:
import os, sys, librosa, matplotlib, plotly
import numpy as np 
from pprint import pprint 
from sklearn.cluster import KMeans
from sklearn.decomposition import FastICA
from matplotlib.pyplot import subplot, hist, tight_layout
from matplotlib.pylab import title
root_dir = '../../'
sys.path.insert(0, root_dir)
import spatial_two_mics.examples.mixture_example as me
import spatial_two_mics.utils.audio_mixture_constructor as mix_constructor
import spatial_two_mics.data_generator.source_position_generator as position_generator
import spatial_two_mics.labels_inference.tf_label_estimator as label_estimator
mixture_info = me.mixture_info_example()

In [None]:
# random positions if you want
random_positioner = position_generator.RandomCirclePositioner()
positions_info = random_positioner.get_sources_locations(2)
mixture_info['positions'] = positions_info

In [None]:
mixture_creator = mix_constructor.AudioMixtureConstructor(
        n_fft=1024, win_len=400, hop_len=200, mixture_duration=2.0,
        force_all_signals_one_sample_delay=True)

tf_representations = mixture_creator.construct_mixture(mixture_info)

In [None]:
pprint(mixture_info)

In [None]:
pprint(tf_representations)

In [None]:
# Plotly Functions 
import plotly
import plotly.tools as tls
import plotly.plotly as py
import plotly.figure_factory as ff
import plotly.graph_objs as go
plotly.offline.init_notebook_mode()

def plot_tf_representation(tf, for_title = '', fs=16000, duration=2.0, log_scale=False):
    freq_max, time_max = tf.shape
    bins = np.arange(time_max)
    bins = (duration * bins) / time_max 
    freqs = np.arange(freq_max)
    freqs = (freqs * fs) / (2.0 * freq_max) 
    trace = [go.Heatmap(
        x= bins,
        y= freqs,
#         z= 10*np.log10(Pxx),
        z = 10*np.log10(tf) if log_scale else tf,
        colorscale='Jet',
        )]
    layout = go.Layout(
        title = 'Spectrogram '+for_title,
        yaxis = dict(title = 'Frequency'), # x-axis label
        xaxis = dict(title = 'Time'), # y-axis label
        )
    fig = dict(data=trace, layout=layout)
    plotly.offline.iplot(fig, filename=for_title)

### Different sources spectrograms for microphone 1 (only apmlitude) And mixture as well

In [None]:
for i, source_tf in enumerate(tf_representations['sources_tf']):
    plot_tf_representation(abs(source_tf), for_title = 'At mic 1, true source ' + str(i))
plot_tf_representation(abs(tf_representations['m1_tf']), for_title = 'Mic 1 mixture')

### Different sources spectrograms for microphone 2 (log scale for better visualization) 

In [None]:
for i, source_tf in enumerate(tf_representations['delayed_sources_tf']):
    plot_tf_representation(abs(source_tf), for_title = 'At mic 2, true source ' + str(i), log_scale=True)
plot_tf_representation(abs(tf_representations['m2_tf']), for_title = 'Mic 2 mixture', log_scale=True)

### Compare the difference of the spectrograms for 2 sources

In [None]:
for mic_id in np.arange(1,3):
    distances_from_mic = [(source_id+1, mixture_info['positions']['distances']['m'+str(mic_id)+'s'+str(source_id+1)])
                          for source_id in np.arange(len(mixture_info['sources_ids']))]
    s_distances = sorted(distances_from_mic, key=lambda x: x[1])
    print('Closer to mic: {} are the following sources with incremental order:\n{}'.format(mic_id, s_distances)) 

Differences in amplitude first between the two recorded mixtures 

In [None]:
amp_dif = abs(tf_representations['m1_tf']) - abs(tf_representations['m2_tf'])
plot_tf_representation(amp_dif, for_title = 'difference in amplitudes of m1 - m2', log_scale=False)

Then, difference in terms of phase 

In [None]:
phase_dif = np.angle(tf_representations['m1_tf']) - np.angle(tf_representations['m2_tf'])
plot_tf_representation(phase_dif, for_title = ' phase difference of m1 - m2', log_scale=False)

Infer the ground truth for TF in the mixtures

In [None]:
ground_truth_estimator = label_estimator.TFMaskEstimator(
                             inference_method='Ground_truth')
gt_labels = ground_truth_estimator.infer_mixture_labels(tf_representations)
plot_tf_representation(gt_labels, for_title = 'Ground Truth Mask', log_scale=False)

Simple phase difference estimation of labels >< 0

In [None]:
simple_phase_dif_labels = np.where(phase_dif >= 0.0, 1.0, 0.0)
plot_tf_representation(simple_phase_dif_labels, for_title = 'Simple phase difference estimation of labels >< 0', log_scale=False)

Perform k-means with 2 clusters in order to infer the labels from the phase dif (DUET with only d as a feature)

In [None]:
phase_dif = np.angle(tf_representations['m1_tf']) - (1e-7 + np.angle(tf_representations['m2_tf']))
n_sources = len(tf_representations['sources_tf'])
d_feature = np.reshape(phase_dif, (np.product(phase_dif.shape), 1))

kmeans = KMeans(n_clusters=n_sources, random_state=0).fit(d_feature)
d_labels = kmeans.labels_
d_feature_mask = np.reshape(d_labels, phase_dif.shape)
plot_tf_representation(d_feature_mask, for_title = ' Phase Diff only 2-means', log_scale=False)

In [None]:
import plotly.figure_factory as ff 
def plot_1d_histogram(hs, title='', group_labels=None, bin_size=0.1):
    fig = ff.create_distplot(hs, group_labels, bin_size=bin_size)
    fig['layout'].update(title=title)
    plotly.offline.iplot(fig, filename='')
    
def get_normalized_1d(matrix):
    one_dim = np.reshape(matrix, np.product(matrix.size,))
    return (one_dim - one_dim.mean()) / one_dim.std()

In [None]:
# try to normalize fist the phase diff with linear phase incremet
max_freq, max_time = phase_dif.shape
freqs_vec = np.linspace(1e-5, np.pi, max_freq)
freq_normalizer = np.tile(-freqs_vec, (max_time, 1)).T
freq_normalizer = np.flip(freq_normalizer, axis=1)

norm_phase_dif = phase_dif / freq_normalizer
hs = [get_normalized_1d(phase_dif),
      get_normalized_1d(norm_phase_dif)]
group_labels = ['Phase Difference', 'Phase Diff Normalized by Freq']
# print(phase_dif)
# print(norm_phase_dif)
# plot_1d_histogram(hs, title='Phase difference distributions', 
#                   group_labels=group_labels, bin_size=100)

# Show me
subplot( 2, 1, 1), hist(hs[0], np.linspace( -np.pi, np.pi, 200)); title(group_labels[0])
subplot( 2, 1, 2), hist(hs[1], np.linspace( -np.pi, np.pi, 200)); title(group_labels[1])
tight_layout()

In [None]:
d_normalized_feature = np.reshape(phase_dif, (np.product(phase_dif.shape), 1))

kmeans = KMeans(n_clusters=n_sources, random_state=0).fit(d_normalized_feature)
d_labels = kmeans.labels_
d_feature_mask = np.reshape(d_labels, phase_dif.shape)
plot_tf_representation(d_feature_mask, for_title = ' Phase Diff only 2-means Normalized', log_scale=False)

Perform k-means with 2 clusters in order to infer the labels from DUET using attenuation also

In [None]:
attenuation = abs(tf_representations['m1_tf']) / abs(tf_representations['m2_tf'])
smoothed_attenuation = attenuation - (1. / attenuation)
smoothed_attenuation_feature = np.reshape(smoothed_attenuation, (np.product(smoothed_attenuation.shape), 1))
duet_features = np.concatenate((d_feature, smoothed_attenuation_feature), axis=1)

kmeans = KMeans(n_clusters=n_sources, random_state=0).fit(duet_features)
duet_labels = kmeans.labels_
duet_mask = np.reshape(duet_labels, phase_dif.shape)
plot_tf_representation(duet_mask, for_title = ' Phase Diff only 2-means', log_scale=False)

In [None]:
# try to normalize fist 
normalized_smoothed_attenuation_feature = (smoothed_attenuation_feature - np.mean(smoothed_attenuation_feature)) / np.std(smoothed_attenuation_feature)

duet_normalized_features = np.concatenate((d_normalized_feature, normalized_smoothed_attenuation_feature), axis=1)

kmeans = KMeans(n_clusters=n_sources, random_state=0).fit(duet_normalized_features)
duet_labels = kmeans.labels_
duet_mask = np.reshape(duet_labels, phase_dif.shape)
plot_tf_representation(duet_mask, for_title = ' Phase Diff only 2-mean Normalized', log_scale=False)


Perform ICA in order to infer the mask

In [None]:
X = np.array([tf_representations['m1_raw'], tf_representations['m2_raw']])
ica = FastICA(n_components=n_sources)
S_reconstructed = ica.fit_transform(X.T)
reconstructed_signals = [S_reconstructed[:, i] for i in np.arange(S_reconstructed.shape[1])]

reconstructed_mix_info = mixture_info.copy()
import scipy.io.wavfile as wavfile
paths = ['/tmp/wavaros_'+str(i) for i in np.arange(n_sources)]
for i, path in enumerate(paths): 
    wavfile.write(path, 16000, reconstructed_signals[i])
    reconstructed_mix_info['sources_ids'][i]['wav'] = reconstructed_signals[i]
    reconstructed_mix_info['sources_ids'][i]['wav_path'] = path

ica_tf_mixtures = mixture_creator.construct_mixture(reconstructed_mix_info)
ica_tf_labels = ground_truth_estimator.infer_mixture_labels(ica_tf_mixtures)

plot_tf_representation(ica_tf_labels, for_title = ' ICA Labels', log_scale=False)