> ### Blind kilde separasjon
#### Jorge Hernandez   
#### Februar 2019






Det følgende prosjektet handler om å prosessere og hente ut informasjon fra datamengder. Da tar man som utgangspunkt det såkalte "cocktailselskap problemet" hvor flere personer snakker samtidig i et åpent rom sammen andre lydkilder i tillegg. Lydsignalene er da registrert og lagret med noen mikrofoner spredt rundt romet. Resultatet av signalene er da en miks fra flere kilder.

For å separere lydsignalene kan man først tenke seg, matematisk sett, at det registrerte signalet er en lineær blanding av d kilder $s_1(t),...,s_d(t)$ og man registrer $x_i(t)$ ved mirkofon nr $i$  $x_i(t)$ = $a_{i1}s_1(t)+...+a_{id}s_d(t)$  $i$ = $1,...n$ . Så man har altså d ukjente kilder $s_1(t),...,s_d(t)$ og en ukjent mikse matrise __A__ = $((a_{ij}))$


For Enkelhets skyld tar vi i dette prosjektet n = d, hvor d = 3 

For å løse problemet trenger man først et par antagelser: 

1- Kildene kan ses på som statistisk uavhengige av hverandre (dette må oppfylles for at analysen skal fungere) 

2- Signalene ikker er Gaussiske(normalfordelte). Ved hjelp av av sentralgrenseteoremet kan vi konkludere med at de registrerte signalene $x_i$ er "mer gaussiske" enn kildene $s_j$

Ideen er da å finne den kombinasjonen __y__: 

$y(t)$ = $w_1x_1(t)$ + $w_2x_2(t)$ + $w_dx_d(t)$ 
som gjør at $y(t)$ mest mulig "ikke gaussisk". $x_i$ er gitt, så det er $w_i$ vi må maksimere med hensyn på. Det resulterende signalet $y(t)$ tas som aproximasjon til et av kildesignalene $s_j$.


Her  brukes to alternativer som et mål for ikke-gaussiskhet: kurtose $G_1$ og negentropy $G_2$.

$G_1(u)$ = $4u^3$,    

$G_2(u)$ = $ue^{-u^2/2}$


Algoritmen stortsett består av stegene:

1- Preprosessering : 
a) Her kreves det at signalet oscillerer mellom positive og negative verdier omkring 0 (froventningsverdien er 0). Dette gjøres ved trekke fra gjennomsnittverdien til hvert av de registrerte signalene. Dette gjøres med hver av de d radene i x (matrisen av de registrerte signalene) og man får da en såkalt sentrert versjone av dataene.

b) Whitening: Man antar at kildene er uavhengige, dermed ukorrelerte, men det kan man ikke gjør med de miksede signalene. Men det er mulig å transformere dem slik at de blir ukorrelerte med varians 1. Denne forenkler prossesen videre. Fra __x__ kan man utlede en  symmetrisk d x d matrise "C" kalt Kovariansmatrisen. Deretter ønskes det å finne den invers kvadraroten av C.

2- Maksimering

Detter er det siste steget og det handler om maksimering av et mål for ikke-gaussiskhet. En iterasjonsmetode blir brukt for å maksimere et slikt mål. Målet for ikke-gaussiskhet er uttrykt ved en generell funksjon G(u) (Kurtose eller Negentropy) og dens deriverte. Hvergang det itereres, regnes det ut en aviksmengde mellom forrige og nye iterasjon og når aviksmengden er mindre enn en gitt toleranse (grense) stoppes iterasjonen. 

Algoritmen ble også testen både med Kurtose og Negentropy. I tillegg,  har  rene egne lydsignaler vært mikset og separert med metoden. Her ble, i tillegg, benyttet to nye funskjoner som mikser de rene lydsignalene.














         
         





       







In [110]:
"""I denne cella laster vi opp tre miksede lydklipp som vi skal bruke å teste ut algoritmen på"""
import numpy as np
from wav_file_loader import read_wavefiles



paths = ['audio/mix_1.wav', 'audio/mix_2.wav', 'audio/mix_3.wav']
data, sampling_rate = read_wavefiles(paths)
num_signals = data.shape[0]

"""I denne cellen fins en funksjon som normaliserer lydsignalenes volum, slik at de får omtrent samme lydstyrke"""
def normalize_audio(data):
    """Scale amplitude s.t. max(data[i]) == 1."""
    abs_data = np.absolute(data)
    maximums = np.amax(abs_data,1)
    # Divide each row by a different vector element:
    data = data / maximums.reshape((3,1))
    return data

data = normalize_audio(data)

"""Her kan man spille av de tre opplastede lydklippene"""
import IPython.display as ipd

ipd.display(ipd.Audio(data[0,:], rate=sampling_rate))
ipd.display(ipd.Audio(data[1,:], rate=sampling_rate))
ipd.display(ipd.Audio(data[2,:], rate=sampling_rate))






In [111]:
def center_rows(Z):
           
        # Med denne funskjonen blir det enklere formler dersom det kreves at forventningsverdien er 0
    
    
    
    #Input: Data, Z, fra originallydfiler
    
    
    # Gjennomsnitt av hver rad
    mean = np.mean(Z,axis=1)
    
    # Trekk fra hver rad gjennomsnittet av den raden 
    Zc = Z-mean.reshape((3,1))
    
    
    #Output: returnerer en  dxN-matrix,Zc, hvor hver rad har gjennomsnittet null:
    return Zc



In [112]:
def whiten_rows(Z):
   
    #Input: Zc, dxN Sentrert matrise 
    
    # Kovarians matrisen til Z:
    C = np.cov(Z)
        
    
    
    E, D, _ = np.linalg.svd(C, full_matrices=False)  #regner ut egenvektoren E og en diagonal matrise D til Covarians matrisen,
    #diagonal matrisen inneholder egenvektorene til C 
    
    
    C_kvadrat_invers  = E @ np.diag(1 / np.sqrt(D)) @ E.T #Invers kvadrat av covarians matrisen.
    
    #ukorrelert verson av: 
    Zcw = C_kvadrat_invers@Z
    
    #Output: Zcw, dxN den urkorrelert matrisen til Zc
    return Zcw



#Lydkildene antas å være uavhengig og ukorrelerte, men vi kan ikke anta at de miksede signalene er ukorrelerte. 

#Poenget med "whiten_rows" er å ukorrelere den miksede matrisen.

#Det gjøres ved å tranformere miks matrisen ved å ta den invers kvadrat av covarians matrisen og gange det med miksmatrisen.


In [113]:
def normalize_rownorms(W):
    
    
    
    # Input: W, Tilfeldig 3X3 matrix 
    
    # Escaling each row of W by the Euclidean norm of each row
    W[0] = W[0]/np.linalg.norm(W[0])
    W[1] = W[1]/np.linalg.norm(W[1])
    W[2] = W[2]/np.linalg.norm(W[2])
    
    #Output: W, 3x3 matrise med normalisert rad
    return W


# Denne funksjonen tar inn en tilfeldig generert matrise og gjør slik at lengden av hver rad i matrisen normaliseres til 1,
#for videre orthogonalisering av matrisen. 



In [114]:
def decorrelate_weights(W):
    
    #Input: dxd matrise W
    
    # W*W^T:
    WW_t = W@W.T
    
    #Egenmatrisen E til W*W^t og en diagonal matrisen D med egenverdien til W*W^T i diaginalen: 
    E, D, _ = np.linalg.svd(WW_t,full_matrices=False)
    
    #Invers kvadrat av W*W^t:
    WW_t_i= E@np.diag(1/np.sqrt(D))@E.T
    
    #Orthogonalisert W
    wd= WW_t_i@W
    
    #output: Wd, dxd projisert W-matrise:   
    return wd

#Tar inn en dxd matrise og projiserer matrisen på en orthogonal matrise (decorrelerer), 
    #ved transformasjonen  Wd = (WW^T)^{-1/2}.

#Den decorrelerte matrisen har enda større "ikke gausiskhet" og er da enda nærmere likt den orthogonale A^{-1}.
#slik at den orthogonale matrisen A^-1*Zcw = S



In [115]:
from math import e
def update_W(W1, Zcw):
  
    
    #Input 1: W1, dxd matrise, et forslag til orthogonalt A^-1. 
    #Input 2: Zcw, dxN matrise, urkorrelert matrisen til miksmatrisen.
    
    
    # Den sentrert-"whitened" matrisen gange den normaliserte matrisen W2
    sk = W1@Zcw
    
    # Negentropy og dens derivert
    #G = sk*(np.power(e, -sk*sk/2))          
    #G_der = np.power(e, -sk*sk/2)-sk*sk*np.power(e, -sk*sk/2)        #G'= 12*u^2
    
    # Kurtose og dens derivert
    G = 4*(np.power(sk, 3))
    G_der = 12*(np.power(sk, 2))
    
    #Antall elementer i G_der:
    N = np.size(G_der,1)
    
    #Gjennomsnitt av hver rad i G_der:
    Mean = np.mean(G_der, axis = 1)
    
    # Regner W_pluss : En matrise med større ikke-gaussiskhet enn W1
    Zcw_T = Zcw.T    #Zcw transponert
    W_pluss = ((G@Zcw_T)/N) - (np.diag(Mean)@W1)    #W_pluss=(1/N)*G*Zcw^T - diag(E[G'])*W1
    
    #Normaliserer radene til W_pluss:
    W_pluss = normalize_rownorms(W_pluss)
    
    # Regner W_2 fra W_pluss ved å kalle decorrelate_weights funksjonen
    W2 = decorrelate_weights(W_pluss)
    
    #Output: W2, dxd matrise, den har et større "ikke gaussiskhet" enn W1 og er dermed bedre tilnærmet orthogonalt A^-1
    return W2 



#Tar inn to matriser, en matrise W1 som er den aproksimerte matrisen til den orthogonale A invers matrisen.

#Den andre matrisen er den ukorrelerte miksmatrisen Zcw, slik at de oprinnelig lydsignalen s er tilnærmet W1*Zcw.

#Funksjonen bruker W1 og regner ut en ny matrise W_pluss som har da enda større "ikke gausiskehet" enn W1 
#ved å bruke G=4u^3 og dens derivert. Kan også bruke negentropy.

#Også blir W_pluss orthogonalisert ved å kalle på funksjonen "decorrelate_weights(W_pluss)",
#vi får da en ny matrise W2, den skal da være enda mer lik den orthogonale A^{-1} matrisen. 


In [116]:
def measure_of_convergence(W1, W2):
   
    #Input: W1, the previous iterate, W2 the one just computed
    
    #Output: en avviksmengde mellom W1 og W2:
    return np.amax(1-np.abs(np.sum(W1*W2,axis=1))) #delta = max{1=<i<=d}(1-|sum{j=1,j=d}(W_k_{ij}W_k-1_{ij})|)

#regner ut en aviksmengde anslag av W1 og W2



In [117]:
import warnings

def fast_ICA(Z, signals_to_find, tol=1e-10, max_iter=100):
    #This is the function that organises all the work.
    
    #Input: Z is the unprocessed data,
           # signals_to_find: in our case, always d the number of sources,
           # tol is the tolerance, default value 1.0e-10,
           # max_iter abort after max_iter iterations if not converged, (to avoid infinite loop)
    

    # center the rows of Z
    Zc = center_rows(Z)
    
    # whiten the centered rows
    Zcw = whiten_rows(Zc)
    
    
    
    # Initializing some variables to prepare for the while-loop: 
    delta = 1
    max_iter = 100
    number_of_iterations = 0
    
    # Giving W1 a first random 3x3 matrix:
    W1 = np.random.uniform(0, 10, 9).reshape((3,3))
    
    #Normalizing our first matrix:
    W1 = normalize_rownorms(W1)
    
    #Orthogonalizing our first matrix after it is normalized:
    W1= decorrelate_weights(W1) #this is the first matrix that aproximates the orthogonal A^-1
    
    
    while delta>tol and number_of_iterations < max_iter:
    #      do an iteration to get a new W-iterate
            W2 = update_W(W1, Zcw)
    #      Compute the error estimate to update delta
            delta = measure_of_convergence(W1, W2)
    #      Changing our first estimate matrix to the new computed matrix
            W1 = W2
    #      counting the iteration number
            number_of_iterations +=1
            if number_of_iterations == max_iter:
                print("Convergence is not meet!!!")
            
    
    # After the conditions are meet in the while loop, the W2 should be the best aproximate of the orthogonal A^-1, 
    # given those conditions.
    
    #Output the seperated signals (a dxN matrix aproximating the sources):
    print("number of iterations: ",number_of_iterations)
    return W2@Zcw

Sound_machine = fast_ICA(data, 3, tol=1e-10, max_iter=100)


import IPython.display as ipd

ipd.display(ipd.Audio(Sound_machine[0,:], rate=sampling_rate))
ipd.display(ipd.Audio(Sound_machine[1,:], rate=sampling_rate))
ipd.display(ipd.Audio(Sound_machine[2,:], rate=sampling_rate))
    

number of iterations:  7


### Resultater og Konklusjon


Før den sammensetning av hele prosjektet ble hver funskjon testet separatvis og skrevet dens resultat (verdier og antall verdier) ut til skjermen og deretter sammenlignet med de forventet verdiene. 

Denne algoritmen ble kjørt 10 ganger.Resultatene ble oppnåddet da man kunne lytte til de separerte signalene tydelig, både med den oppgitte lydfiler og egen lydfiler, dermed ble hovedmålet av prosjektet fullført. Denne impliserer at algoritmen nådde den oppgitt konvergensen (innom en toleranse) ved et visst antall (gjennomsnittlig 8) iterasjoner, dermed viser algoritmen å være effektivt. På hver test ble det dannet en ny Matrise W_0, som fungerte for alle tilfeller som ble testet.
 

Konklusjon: Hovedmålet fullført, Algoritmens iterasjoner varierer i hver testing og dette skyldes av tilfeldige matrisen som oppstår mens algoritmen kjøres . Resultatene med Kurtose og Negentropy visste ingen forskjell , i de antall testene som ble kjørt,  til tross for at det finnes teoretiske forskjeller mellom de to. Den algoritmen som ble brukt til å mikse rene lydsignalene fungerte som den skulle, og lydene ble separert med god kvalitett. 













In [118]:
#Ekstra oppgave: fjern komentar tegn for å kjøre med egen lyd fil. 


#paths = ['audio/Machinegun_solo.wav', 'audio/Master_solo.wav', 'audio/Trump_solo.wav']
#data, sampling_rate = read_wavefiles(paths)
#num_signals = data.shape[0]







#def normalize_rowsums(A):
    #"""Divide each row in A by its sum.
    
    #The sum of each row in the result is 1.0."""
 #   the_sum = np.sum(A,1)
   # A = A / the_sum.reshape((3,1))
    #return A



#def random_mixing_matrix(signals, observations):
 #   """ Creates a random matrix
    
  #  Each element is a small positive number, not too close to 0.
   # (1/11, 5/7).
    #"""
    #A = 0.25 + np.random.rand(observations, signals)
    #return normalize_rowsums(A)

#A = random_mixing_matrix(3, 3)
#data_mixed = normalize_audio(A @ data)



#import IPython.display as ipd


#ipd.display(ipd.Audio(data_mixed[0,:], rate=sampling_rate))
#ipd.display(ipd.Audio(data_mixed[1,:], rate=sampling_rate))
#ipd.display(ipd.Audio(data_mixed[2,:], rate=sampling_rate))