In [1]:
import numpy as np
import librosa 
import glob
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow as tf
from scipy.stats import multivariate_normal

In [8]:
def readAudio(filename,Fs):
    x, sr = librosa.load(filename, sr=Fs)
    return x, sr

#calculate spectrogram
def calc_spec(x):
    n_fft = 1024
    hop_length = 512
    win_length = 1024
    X = np.abs(librosa.stft(x, n_fft = n_fft, hop_length = hop_length, win_length = win_length, window='hann', dtype = np.complex256))
    X = librosa.power_to_db(X**2,ref=np.max)
    return X

def saveSpectrogram(X, outfilename):
    assert outfilename[-4:]=='.npy'  #'outfilename extension should be .npy'
    np.save(outfilename, X)
    return

def readSpectrogram(infilename):
    X = np.load(infilename)
    return X


In [3]:
def read_and_combine_speech_music_spec():
  p=0
  for i in ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '14', '15', '16', '17', '18']:
    name = 'music_samples'+'/' + i + '.wav'
    print(name)
    audio, sr = readAudio(name,16000)
    if p==0:
      music_data = audio
      p=1
    elif p==1:
      music_data = np.hstack((music_data,audio))
  p=0
  for i in ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30']:
    name = 'speech_samples'+'/' + i + '.wav'
    print(name)
    audio, sr = readAudio(name,16000)
    if p==0:
      speech_data = audio
      p=1
    elif p==1:
      speech_data = np.hstack((speech_data,audio))
  speech_spec=calc_spec(speech_data)
  label_speech = np.zeros((speech_spec.shape[1],))
  music_spec=calc_spec(music_data)
  label_music = np.ones((music_spec.shape[1],))
  label = np.hstack([label_speech,label_music])
  combine = np.hstack([speech_spec,music_spec])
  return speech_spec, music_spec, label, combine

In [6]:
def PCA(spec):
    spec2 = spec - np.average(spec,axis=0)
    cov_mat =np.cov(spec2, rowvar = False) #each column represents a variable
    eigen_values, eigen_vectors = np.linalg.eig(cov_mat)
    sorted_index = np.argsort(eigen_values)[::-1]
    sorted_eigenvalue = eigen_values[sorted_index]
    sorted_eigenvectors = eigen_vectors[:,sorted_index]
    n_com = 16
    eigenvector_subset = sorted_eigenvectors[:,0:n_com]
    spec_reduced = np.dot(eigenvector_subset.transpose(),spec2.transpose()).transpose()
    return spec_reduced.T

In [4]:
class GMM_music:
    def __init__(self,k,max_iter=5):
        self.k = k
        self.max_iter = int(max_iter)
    
    def initialize(self, X):
        self.shape = X.shape
        self.n, self.m = self.shape
        
        self.pi = np.full(shape=self.k, fill_value=1/self.k)
        self.gamma = np.full(shape =self.shape, fill_value=1/self.k) #P(zi = j/x,theta)probability that xi comes from cluster j

        random_row = np.random.randint(low=0, high=self.n, size=self.k)
        self.mu = [  X[row_index,:] for row_index in random_row ]
        self.sigma = [ np.cov(X.T) for _ in range(self.k) ]
       # print(X.shape)
       # print(self.pi.shape,self.gamma.shape,len(self.mu),len(self.sigma))

    def e_step(self, X):
        self.gamma = self.predict_proba(X)
        self.pi = self.gamma.mean(axis=0)
        
    def m_step(self, X):
        for i in range(self.k):
            gamma = self.gamma[:, [i]]
            total_gamma = gamma.sum()
            self.mu[i] = (X * gamma).sum(axis=0)/total_gamma
            self.sigma[i] = np.cov(X.T, 
                aweights=(gamma/total_gamma).flatten(), 
                bias=True)
        
    def fit(self, X):
        self.initialize(X)
        for iteration in range(self.max_iter):
            self.e_step(X)
            self.m_step(X)
        self.saveweights(self.k,self.mu,self.sigma,self.pi, 'music_k.npy', 'music_mu.npy', 'music_sigma.npy', 'music_pi.npy')
    
    def saveweights(self,k, mu , sigma, pi,kn,mun,sigman,pin):
      assert kn[-4:]=='.npy'
      assert mun[-4:]=='.npy' 
      assert pin[-4:]=='.npy'
      assert sigman[-4:]=='.npy' 
      np.save(kn, k)
      np.save(mun, mu)
      np.save(sigman, sigma)
      np.save(pin, pi)
      return

    def readweights(self,kn, mun,sigman,pin):
      self.k = np.load(kn) 
      self.mu = np.load(mun)
      self.sigma = np.load(sigman)
      self.pi = np.load(pin)
      return self.k, self.mu, self.sigma, self.pi

    
        
    
    def predict_proba(self, X):
        likelihood = np.zeros( (X.shape[0], self.k) )
        for i in range(self.k):
            #print(self.mu[i])
            distribution = multivariate_normal(
                mean=self.mu[i], 
                cov=self.sigma[i])
            likelihood[:,i] = distribution.pdf(X)
        #print(likelihood)
        numerator = likelihood * self.pi
      #  print('h',numerator.shape,likelihood.shape,numerator[0,0])
        denominator = numerator.sum(axis=1)[:, np.newaxis]
       # print(denominator)
        gamma = numerator / denominator
        return gamma
        
    def predict(self, X):
        likelihood = np.zeros( (X.shape[0], self.k) )
        for i in range(self.k):
            distribution = multivariate_normal(
                mean=self.mu[i], 
                cov=self.sigma[i])
            likelihood[:,i] = distribution.pdf(X)
        numerator = likelihood * self.pi
        sum_numerator=numerator.sum(axis=1)[:, np.newaxis]
        return sum_numerator

In [5]:
class GMM_speech:
    def __init__(self,k,max_iter=5):
        self.k = k
        self.max_iter = int(max_iter)
    
    def initialize(self, X):
        self.shape = X.shape
        self.n, self.m = self.shape
        
        self.pi = np.full(shape=self.k, fill_value=1/self.k)
        self.gamma = np.full(shape =self.shape, fill_value=1/self.k) #P(zi = j/x,theta)probability that xi comes from cluster j

        random_row = np.random.randint(low=0, high=self.n, size=self.k)
        self.mu = [  X[row_index,:] for row_index in random_row ]
        self.sigma = [ np.cov(X.T) for _ in range(self.k) ]
      #  print(X.shape)
      #  print(self.pi.shape,self.gamma.shape,len(self.mu),len(self.sigma))

    def e_step(self, X):
        self.gamma = self.predict_proba(X)
        self.pi = self.gamma.mean(axis=0)
        
    def m_step(self, X):
        for i in range(self.k):
            gamma = self.gamma[:, [i]]
            total_gamma = gamma.sum()
            self.mu[i] = (X * gamma).sum(axis=0)/total_gamma
            self.sigma[i] = np.cov(X.T, 
                aweights=(gamma/total_gamma).flatten(), 
                bias=True)
        
    def fit(self, X):
        self.initialize(X)
        for iteration in range(self.max_iter):
            self.e_step(X)
            self.m_step(X)
        self.saveweights(self.k,self.mu,self.sigma,self.pi, 'speech_k.npy', 'speech_mu.npy', 'speech_sigma.npy', 'speech_pi.npy')
    
    def saveweights(self,k, mu , sigma, pi,kn,mun,sigman,pin):
      assert kn[-4:]=='.npy'
      assert mun[-4:]=='.npy' 
      assert pin[-4:]=='.npy'
      assert sigman[-4:]=='.npy' 
      np.save(kn, k)
      np.save(mun, mu)
      np.save(sigman, sigma)
      np.save(pin, pi)
      return

    def readweights(self,kn, mun,sigman,pin):
      self.k = np.load(kn) 
      self.mu = np.load(mun)
      self.sigma = np.load(sigman)
      self.pi = np.load(pin)
      return self.k, self.mu, self.sigma, self.pi
        
    
    def predict_proba(self, X):
        likelihood = np.zeros( (X.shape[0], self.k) )
        for i in range(self.k):
            #print(self.mu[i])
            distribution = multivariate_normal(
                mean=self.mu[i], 
                cov=self.sigma[i])
            likelihood[:,i] = distribution.pdf(X)
       # print(likelihood)
        numerator = likelihood * self.pi
        print('h',numerator.shape,likelihood.shape,numerator[0,0])
        denominator = numerator.sum(axis=1)[:, np.newaxis]
       # print(denominator)
        gamma = numerator / denominator
        return gamma
        
    def predict(self, X):
        likelihood = np.zeros( (X.shape[0], self.k) )
        for i in range(self.k):
            distribution = multivariate_normal(
                mean=self.mu[i], 
                cov=self.sigma[i])
            likelihood[:,i] = distribution.pdf(X)
        numerator = likelihood * self.pi
        sum_numerator=numerator.sum(axis=1)[:, np.newaxis]
        return sum_numerator

In [9]:
if __name__=="__main__":
    speech_spec, music_spec, label, combine = read_and_combine_speech_music_spec()
    music_spec=np.float64(music_spec)
    reduced_music_spec = PCA(music_spec.T)
    speech_spec=np.float64(speech_spec)
    reduced_speech_spec = PCA(speech_spec.T)
    np.random.seed(42)
    gmm_speech = GMM_speech(k=1, max_iter=20)
#gmm_speech.readweights('speech_k.npy', 'speech_mu.npy', 'speech_sigma.npy', 'speech_pi.npy') ################################
    gmm_speech.fit(reduced_speech_spec.T)
    np.random.seed(42)
    gmm_music = GMM_music(k=1, max_iter=20)
  #gmm_music.readweights('music_k.npy', 'music_mu.npy', 'music_sigma.npy', 'music_pi.npy') ##############################
    gmm_music.fit(reduced_music_spec.T)

music_samples/1.wav
music_samples/2.wav
music_samples/3.wav
music_samples/4.wav
music_samples/5.wav
music_samples/6.wav
music_samples/7.wav
music_samples/8.wav
music_samples/9.wav
music_samples/10.wav
music_samples/11.wav
music_samples/12.wav
music_samples/14.wav
music_samples/15.wav
music_samples/16.wav
music_samples/17.wav
music_samples/18.wav
speech_samples/1.wav
speech_samples/2.wav
speech_samples/3.wav
speech_samples/4.wav
speech_samples/5.wav
speech_samples/6.wav
speech_samples/7.wav
speech_samples/8.wav
speech_samples/9.wav
speech_samples/10.wav
speech_samples/11.wav
speech_samples/12.wav
speech_samples/14.wav
speech_samples/15.wav
speech_samples/16.wav
speech_samples/17.wav
speech_samples/18.wav
speech_samples/19.wav
speech_samples/20.wav
speech_samples/21.wav
speech_samples/22.wav
speech_samples/23.wav
speech_samples/24.wav
speech_samples/25.wav
speech_samples/26.wav
speech_samples/27.wav
speech_samples/28.wav
speech_samples/29.wav
speech_samples/30.wav
(28253, 16)
(1,) (28253