In [320]:
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import sys
import os
import matplotlib.pyplot as plt
import librosa
from hmmlearn import hmm
import random
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

In [2]:
base_dir = "../.."
sys.path.insert(0, base_dir)
from models.gmm.gmm import GMM

# KDE

In [95]:
class KDE:
    """
    Kernel Density Estimation (KDE) class
    
    Parameters
    ----------
    kernel : str
        Kernel to be used for KDE. Default is 'box'. Available kernels are 'box', 'gaussian', 'triangular'
    bandwidth : float
        Bandwidth for KDE. Default is 0.5
    """
    def __init__(self, kernel='box', bandwidth=0.5):
        self.kernel = kernel
        self.bandwidth = bandwidth
        match self.kernel:
            case 'box':
                self.kernel_func = self.box_kernel
            case 'gaussian':
                self.kernel_func = self.gaussian_kernel
            case 'triangular':
                self.kernel_func = self.triangular_kernel
            case _:
                raise ValueError('Invalid kernel')
    
    def box_kernel(self, x, d):
        return np.all(np.abs(x) < 0.5, axis=1)
    
    def gaussian_kernel(self, x, d):
        return (2*np.pi)**(-d/2) * np.exp(-0.5*np.sum(x**2, axis=1))
    
    def triangular_kernel(self, x, d):
        return np.maximum(1 - np.sum(np.abs(x), axis=1), 0)
    
    def fit(self, X):
        """
        Fit the KDE model
        
        Parameters
        ----------
        X : numpy.ndarray
            Data to be fitted (N x d) where N is the number of samples and d is the number of features
        """
        self.X = X
        
    def predict(self, x):
        """
        Predict the KDE model
        
        Parameters
        ----------
        x : numpy.ndarray
            Density to be predicted for one sample (d,)
        """
        N = self.X.shape[0]
        d = self.X.shape[1]
        p = np.sum(self.kernel_func((x-self.X) / self.bandwidth, d)) / (N*self.bandwidth**d)
        
        return p
    
    def plot(self, title='', return_fig=False):
        """
        Plot the KDE model
        
        Parameters
        ----------
        title : str
            Title of the plot
        """
        assert (self.X.shape[1] == 2) , 'Only 2D data is supported'
        X = self.X[:,0]
        Y = self.X[:,1]
        Z = np.array([self.predict(np.array([x, y])) for x, y in zip(X, Y)])
        plt = go.Scatter(x=X, y=Y, mode='markers', marker=dict(size=5, color=Z, colorscale='Viridis', showscale=True))
        if return_fig:
            return plt
        fig = go.Figure()
        fig.add_trace(plt)
        fig.update_layout(title=title, width=600, height=600)
        fig.show()

In [96]:
def generate_random_circle(center, radius, num_points):
    r = radius * np.sqrt(np.random.rand(num_points))
    theta = np.random.rand(num_points) * 2 * np.pi
    x = r * np.cos(theta) + center[0] + np.random.normal(0, 0.1, num_points)
    y = r * np.sin(theta) + center[1] + np.random.normal(0, 0.1, num_points)
    return np.column_stack((x, y))

# Generat the data and plot
p1 = generate_random_circle((0,0), 2.2, 3000)
p2 = generate_random_circle((1,1), 0.3, 500)
p = np.vstack((p1, p2))
fig = px.scatter(x=p[:,0], y=p[:,1], title='Original Data')
fig.update_traces(marker=dict(size=3))
fig.update_layout(width=600, height=600)
fig.show()

In [112]:
# kde model on data
for kernel in ['box', 'gaussian', 'triangular']:
    kde = KDE(kernel=kernel, bandwidth=0.5)
    kde.fit(p)
    kde.plot(title=kernel.capitalize() + ' Kernel')

In [113]:
# gmm model on data
gmm = GMM(k=2)
gmm.fit(p)
probs = (gmm.getMembership(p)*255).astype(np.int32)
viridis = plt.get_cmap("viridis")
colors = np.array(['rgb({},{},{})'.format(*viridis(r)[:3]*255) for r, _ in probs])
fig = px.scatter(x=p[:,0], y=p[:,1], title='GMM Results')
fig.update_traces(marker=dict(size=5, color=colors))
fig.update_layout(width=600, height=600)
fig.show()

Finding clusters: 76it [00:00, 792.49it/s]


# HMM

In [136]:
def get_mfccs(file_path):
    y, sr = librosa.load(file_path)
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    return mfccs.T

In [137]:
# plot mfccs heatmap
fig = make_subplots(rows=5, cols=2, subplot_titles=[f'Digit {i}' for i in range(10)])
for i in range(5):
    for j in range(2):
        mfccs = get_mfccs(f"../../data/external/recordings/{i*2+j}_george_0.wav")
        fig.add_trace(
            go.Heatmap(z=mfccs, zmax=300, zmin=-700, showscale=(i == 0 and j == 0)),
            row=i + 1,
            col=j + 1,
        )
        fig.update_xaxes(title_text='MFCC', row=i + 1, col=j + 1)
        fig.update_yaxes(title_text='Time', row=i + 1, col=j + 1)
        
fig.update_layout(width=800, height=1200, title="MFCC for all digits (george_0)")
fig.show()

In [144]:
# store all the data
all_data = []
for file in os.listdir("../../data/external/recordings"):
    if file.endswith(".wav"):
        label = int(file.split("_")[0])
        mfccs = get_mfccs(f"../../data/external/recordings/{file}")
        all_data.append((label,mfccs))

In [153]:
# create train and test data as list of labels and mfccs
train_data = {}
test_data = {}

for i in range(10):
    train_data[i] = []
    test_data[i] = []

random.shuffle(all_data)
for i, (label, mfccs) in enumerate(all_data):
    if i < 0.8*len(all_data):
        train_data[label].append(mfccs)
    else:
        test_data[label].append(mfccs)

In [163]:
class HMM:
    """
    Hidden Markov Model (HMM) class
    """
    def __init__(self, num_digits=10):
        self.models = {}
        self.num_digits = num_digits
    
    def fit(self, train_data):
        for i in range(self.num_digits):
            X = np.concatenate(train_data[i])
            model = hmm.GaussianHMM(n_components=9, covariance_type='diag', n_iter=100)
            model.fit(X)
            self.models[i] = model
            
    def predict(self, test_data):
        predictions = []
        for mfccs in test_data:
            scores = [model.score(mfccs) for model in self.models.values()]
            predictions.append(np.argmax(scores))
        return predictions
        

In [164]:
h = HMM()
h.fit(train_data)

mfccs_test = []
labels_test = []
for i in range(10):
    mfccs_test.extend(test_data[i])
    labels_test.extend([i]*len(test_data[i]))
    
predictions = h.predict(mfccs_test)
accuracy = np.mean(np.array(predictions) == np.array(labels_test))
print(f'Accuracy: {accuracy*100}%')

Accuracy: 92.33333333333333%


# RNN

In [261]:
# generate randon binary dataset
def make_binary_dataset(num_samples, min_length, max_length):
    df = pd.DataFrame(columns=['binary', 'label'])
    for i in range(num_samples):
        length = np.random.randint(min_length, max_length)
        x = np.random.randint(0, 2, length)
        x_str = ''.join([str(i) for i in x])
        df.loc[i] = [x_str, x_str.count('1')]
        
    return df
        
binary_df = make_binary_dataset(100000, 1, 16)
binary_df

Unnamed: 0,binary,label
0,0000000000000111,3
1,0000000010000001,2
2,0001001000011000,4
3,0000000000000111,3
4,0000000000000010,1
...,...,...
99995,0000010100101110,6
99996,0000011010111010,7
99997,0000000000000000,0
99998,0000000000000110,2


In [263]:
binary_df.to_csv('../../data/processed/binary_dataset.csv', index=False)

In [327]:
binary_df = pd.read_csv('../../data/processed/binary_dataset.csv', dtype={'binary': str, 'label': int})
binary_lists = binary_df['binary'].apply(lambda x: [int(bit) for bit in x]).tolist()
max_len = max(len(seq) for seq in binary_lists)
padded_binary_lists = np.array([([0] * (max_len - len(seq))) + seq for seq in binary_lists], dtype=np.float32)
labels = binary_df['label'].values

In [328]:
class BinaryDataset(Dataset):
    def __init__(self, binary, label):
        self.binary = binary
        self.label = label
        
    def __len__(self):
        return len(self.binary)
    
    def __getitem__(self, idx):
        return self.binary[idx], self.label[idx]

train_dataset = BinaryDataset(padded_binary_lists[:80000], labels[:80000])
val_dataset = BinaryDataset(padded_binary_lists[80000:90000], labels[80000:90000])
test_dataset = BinaryDataset(padded_binary_lists[90000:], labels[90000:])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [329]:
class Binary_RNN(nn.Module):
    def __init__(self, input_size=1, hidden_size=16, num_layers=1, output_size=1, dropout=0, batch_norm=False):
        super(Binary_RNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.batch_norm = batch_norm
        
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
        self.batch_norm = nn.BatchNorm1d(hidden_size) if batch_norm else None
        self.fc = nn.Linear(hidden_size, output_size)
        
        # determine device
        if torch.backends.mps.is_available():
            device = torch.device("mps")
        elif torch.cuda.is_available():
            device = torch.device("cuda")
        else:
            device = torch.device("cpu")

        print(f"Using device: {device}")
        self.device = device
        self.to(device)
    
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size, device=self.device)
        out, _ = self.rnn(x, h0)
        if self.batch_norm:
            out = self.batch_norm(out[:,-1,:])
        out = self.fc(out[:,-1,:])
        
        return out
    
    def fit(self, train_loader, val_loader):
        criterion = nn.L1Loss()
        optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
        
        for epoch in range(10):
            self.train()
            train_loss = 0
            for x, y in tqdm(train_loader):
                x, y = x.to(self.device), y.to(self.device)
                optimizer.zero_grad()
                outputs = self(x.unsqueeze(-1))
                loss = criterion(outputs, y.unsqueeze(-1))
                loss.backward()
                optimizer.step()
                train_loss += loss.item()
                
            self.eval()
            val_loss = 0
            with torch.no_grad():
                for i, (x, y) in enumerate(val_loader):
                    x, y = x.to(self.device), y.to(self.device)
                    outputs = self(x.unsqueeze(-1))
                    loss = criterion(outputs, y.unsqueeze(-1))
                    val_loss += loss.item()
                    
            print(f'Epoch {epoch+1}/{10}, Train Loss: {train_loss/len(train_loader)}, Val Loss: {val_loss/len(val_loader)}')
    
    def evaluate(self, loader, type):
        criterion = nn.L1Loss()
        self.eval()
        loss = 0
        with torch.no_grad():
            for i, (x, y) in enumerate(loader):
                x, y = x.to(self.device), y.to(self.device)
                outputs = self(x.unsqueeze(-1))
                loss += criterion(outputs, y.unsqueeze(-1))
                
        print(f'{type} Loss: {loss/len(loader)}')

In [330]:
bianry_rnn = Binary_RNN()
bianry_rnn.fit(train_loader, val_loader)

Using device: mps


100%|██████████| 2500/2500 [00:28<00:00, 88.67it/s]


Epoch 1/10, Train Loss: 0.8489185865163803, Val Loss: 0.07308759158268904


100%|██████████| 2500/2500 [00:30<00:00, 82.96it/s]


Epoch 2/10, Train Loss: 0.08072543599307537, Val Loss: 0.08356147756972633


100%|██████████| 2500/2500 [00:31<00:00, 78.85it/s]


Epoch 3/10, Train Loss: 0.06918298643827438, Val Loss: 0.07321435522538024


100%|██████████| 2500/2500 [00:31<00:00, 80.63it/s]


Epoch 4/10, Train Loss: 0.06715841525569557, Val Loss: 0.053390347062589265


100%|██████████| 2500/2500 [00:29<00:00, 84.32it/s]


Epoch 5/10, Train Loss: 0.06364148841649293, Val Loss: 0.16266391433465976


100%|██████████| 2500/2500 [00:29<00:00, 84.23it/s]


Epoch 6/10, Train Loss: 0.06173703053370118, Val Loss: 0.14611228148396405


100%|██████████| 2500/2500 [00:28<00:00, 87.39it/s]


Epoch 7/10, Train Loss: 0.06274266601353884, Val Loss: 0.11733604474856069


100%|██████████| 2500/2500 [00:30<00:00, 82.85it/s]


Epoch 8/10, Train Loss: 0.061161940727382895, Val Loss: 0.02850213535011005


100%|██████████| 2500/2500 [00:29<00:00, 84.23it/s]


Epoch 9/10, Train Loss: 0.05716923424974084, Val Loss: 0.026638884168749037


100%|██████████| 2500/2500 [00:30<00:00, 81.77it/s]


Epoch 10/10, Train Loss: 0.05483570819944143, Val Loss: 0.028075146777466083


In [194]:
trainX = torch.tensor(trainX)
trainX

tensor([[         101111],
        [      101001001],
        [   100011010111],
        ...,
        [        1101110],
        [        1100000],
        [111000110111101]])

In [185]:
rnn = nn.RNN(10, 20, 2)
input = torch.randn(5, 3, 10)
h0 = torch.randn(2, 3, 20)
output, hn = rnn(input, h0)

In [190]:
output.shape, hn.shape

(torch.Size([5, 3, 20]), torch.Size([2, 3, 20]))