In [149]:
import librosa
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import sklearn
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import KFold
import torch.nn.functional as F
from torchvision.transforms import Compose
import random
from datetime import datetime
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Setup

Important note: Space and Enter keys are denoted as the following by the model:
- Space -> -
- Enter -> +

In [150]:
# waveform function for me to not bang my keyboard
def disp_waveform(signal, sr=None, color='blue'):
    plt.figure(figsize=(7,2))
    return librosa.display.waveshow(signal, sr=sr, color=color)

In [151]:
def isolator(signal, sample_rate, n_fft, hop_length, before, after, threshold, show=False):
    strokes = []
    # -- signal'
    if show:
        disp_waveform(signal, sr=sample_rate)
    fft = librosa.stft(signal, n_fft=n_fft, hop_length=hop_length)
    energy = np.abs(np.sum(fft, axis=0)).astype(float)
    # norm = np.linalg.norm(energy)
    # energy = energy/norm
    # -- energy'
    if show:
        disp_waveform(energy)
    threshed = energy > threshold
    # -- peaks'
    if show:
        disp_waveform(threshed.astype(float))
    peaks = np.where(threshed == True)[0]
    peak_count = len(peaks)
    prev_end = sample_rate*0.1*(-1)
    # '-- isolating keystrokes'
    for i in range(peak_count):
        this_peak = peaks[i]
        timestamp = (this_peak*hop_length) + n_fft//2
        if timestamp > prev_end + (0.1*sample_rate):
            keystroke = signal[timestamp-before:timestamp+after]
            # strokes.append(torch.tensor(keystroke)[None, :])
            # keystroke = transform(keystroke)
            strokes.append(keystroke)
            if show:
                disp_waveform(keystroke, sr=sample_rate)
            prev_end = timestamp+after
    return strokes

In [175]:
# Constants we actually need for the task
#MBP_AUDIO_DIR = '../Dataset-for-Binary/base-audio/'
MBP_AUDIO_DIR = '../Dataset-custom-audio/base-audio/' #for custom audio
#keys_s = '1234567890QWERTYUIOPASDFGHJKLZXCVBNM'
keys_s = '1234567890ABCDEFGHIJ+-' #for custom audio
labels = list(keys_s)
#keys = ['audio_' + k + '.wav' for k in labels]
keys = [ k + '.wav' for k in labels] #for custom audio
data_dict = {'Key':[], 'File':[]}
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

In [153]:
def create_dataset(n_fft, hop_length, before, after, keys, custom_audio=False):
    for i, File in enumerate(keys):
        loc = MBP_AUDIO_DIR + File
        samples, sr = librosa.load(loc)
        prom = 0.06
        step = 0.005
        strokes = isolator(samples, sr, n_fft, hop_length, before, after, prom, False )
        print(f'File {File} length: {len(strokes)}')
        label = [labels[i]]*len(strokes)
        #works fine here
        data_dict['Key'] += label
        data_dict['File'] += strokes

    df = pd.DataFrame(data_dict)
    mapper = {}
    counter = 0
    for l in df['Key']:
        if not l in mapper:
            mapper[l] = counter
            counter += 1
        print(mapper)
    df.replace({'Key': mapper}, inplace = True)

    return df

In [176]:
for key in keys_s:
    #sample, sr = librosa.load(f'../Dataset-for-Binary/base-audio/audio_{key}.wav')
    sample, sr = librosa.load(f'../Dataset-custom-audio/base-audio/{key}.wav') #for custom audio
    print(sr)
    print(len(isolator(sample, sr, 1024, 225, 2400, 12000, 0.06)), end=' ')
    

22050
63 22050
63 22050
63 22050
63 22050
63 22050
62 22050
63 22050
63 22050
63 22050
63 22050
63 22050
63 22050
63 22050
63 22050
63 22050
63 22050
63 22050
63 22050
63 22050
63 22050
63 22050
63 

In [217]:
# Working parameters for different audio sets
# Dataset-for-Binary: n_fft = 1024, hop_length = 225, before = 2400, after = 12000
#n_fft = 1024 
#hop_length = 225 
#before = 2400 
#after = 12000 
# Normalized only audio: n_fft = 7, hop_length = 4450, before = 2400, after = 12000
n_fft = 25000
hop_length = 22500
before = 2400 
after = 12000 
# Normalized and denoised audio: n_fft = 9, hop_length = 500, before = 2400, after = 12000
#n_fft = 10
#hop_length = 10
#before = 2400 
#after = 12000 

data_dict = {'Key':[], 'File':[]}
mbp_dataset = create_dataset(n_fft, hop_length, before, after, keys)
mbp_dataset

File 1.wav length: 40
File 2.wav length: 40
File 3.wav length: 40
File 4.wav length: 38
File 5.wav length: 40
File 6.wav length: 40
File 7.wav length: 40
File 8.wav length: 40
File 9.wav length: 39
File 0.wav length: 41
File A.wav length: 40
File B.wav length: 40
File C.wav length: 40
File D.wav length: 40
File E.wav length: 40
File F.wav length: 39
File G.wav length: 40
File H.wav length: 41
File I.wav length: 40
File J.wav length: 41
File +.wav length: 40
File -.wav length: 40
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0}
{'1': 0, '2': 1}
{'1': 0, '2': 1}
{'1': 0, '2': 1}
{'1': 0, '2': 1}
{'1': 0, '2': 1}
{'1': 0, '2': 1}
{'1': 0, '2': 1}
{'1': 0, '2': 1}
{'1': 0, '2': 1}
{'1

Unnamed: 0,Key,File
0,0,"[0.02292344, -0.0026099365, 0.005319747, 0.028..."
1,0,"[0.04376667, 0.032344945, 0.019199431, 0.02930..."
2,0,"[-0.037698664, -0.041183047, -0.043770947, -0...."
3,0,"[-0.11052307, -0.10500163, -0.107069805, -0.12..."
4,0,"[0.05923058, 0.052709818, 0.05137258, 0.048006..."
...,...,...
874,21,"[-0.09909846, -0.1066711, -0.09214234, -0.0978..."
875,21,"[0.07839166, 0.07631026, 0.082578294, 0.087819..."
876,21,"[0.010268506, 0.004127588, 0.0046186973, 4.378..."
877,21,"[-0.048078485, -0.049883205, -0.05327087, -0.0..."


In [218]:
audio_samples = mbp_dataset['File'].values.tolist()
labels = mbp_dataset['Key'].values.tolist()

audioDataset = np.array(audio_samples, dtype = object)
print(audio_samples[0].shape)
mfcc = librosa.feature.mfcc(y=audio_samples[0], sr=44100) # shape: (n_mfcc, t)
print(mfcc.shape)
# labels = np.array(labels)

(14400,)
(20, 29)


In [219]:
audio_samples[0]


array([ 0.02292344, -0.00260994,  0.00531975, ...,  0.00035781,
        0.0001915 , -0.00352737], dtype=float32)

In [220]:
class TimeShifting():
    def __call__(self, samples):
#       samples_shape = samples.shape
        samples = samples.flatten()
        
        shift = int(len(samples) * 0.4) #Max shift (0.4)
        random_shift = random.randint(0, shift) #Random number between 0 and 0.4*len(samples)
        data_roll = np.roll(samples, random_shift)
        return data_roll

In [159]:
def time_shift(samples):
    samples = samples.flatten()
    shift = int(len(samples) * 0.4) #Max shift (0.4)
    random_shift = random.randint(0, shift) #Random number between 0 and 0.4*len(samples)
    data_roll = np.roll(samples, random_shift)
    return data_roll

In [160]:
from skimage.transform import resize


class ToMelSpectrogram:
    def __init__(self, audio_length=14400):
        self.audio_length = audio_length

    def __call__(self, samples):
        if len(samples) > self.audio_length:
            samples = samples[:self.audio_length]
        elif len(samples) < self.audio_length:
            samples = np.pad(samples, (0, self.audio_length - len(samples)), mode='constant')

        mel_spec = librosa.feature.melspectrogram(y=samples, sr=44100, n_mels=64, n_fft=1024, hop_length=225)
        mel_spec_resized = resize(mel_spec, (64, 64), anti_aliasing=True)
        mel_spec_resized = np.expand_dims(mel_spec_resized, axis=0)
        return torch.tensor(mel_spec_resized)


class ToMelSpectrogramMfcc:
    def __init__(self, audio_length=14400):
        self.audio_length = audio_length

    def __call__(self, samples):
        if len(samples) > self.audio_length:
            samples = samples[:self.audio_length]
        elif len(samples) < self.audio_length:
            samples = np.pad(samples, (0, self.audio_length - len(samples)), mode='constant')

        mel_spec = librosa.feature.melspectrogram(y=samples, sr=44100, n_mels=64, n_fft=n_fft, hop_length=hop_length)
        mel_spec = librosa.feature.mfcc(S=librosa.power_to_db(mel_spec))
        mel_spec_resized = resize(mel_spec, (64, 64), anti_aliasing=True)
        mel_spec_resized = np.expand_dims(mel_spec_resized, axis=0)

        return torch.tensor(mel_spec_resized)


class ToMfcc:
    def __init__(self, audio_length=14400):
        self.audio_length = audio_length

    def __call__(self, samples):
        if len(samples) > self.audio_length:
            samples = samples[:self.audio_length]
        elif len(samples) < self.audio_length:
            samples = np.pad(samples, (0, self.audio_length - len(samples)), mode='constant')
        
        mfcc_spec = librosa.feature.mfcc(y=samples, sr=44100)
        mfcc_spec = np.transpose(mfcc_spec)
        return torch.tensor(mfcc_spec)


In [221]:
transform = Compose([ToMelSpectrogram()])
transform_mfcc = Compose([ToMfcc()])

In [222]:
audio_samples_new = audio_samples.copy() # audio samples CNN

for i, sample in enumerate(audio_samples):
    audio_samples_new.append(time_shift(sample))
    labels.append(labels[i])
    
# convert labels to a numpy array
labels = np.array(labels)
print(len(audio_samples_new))
print(len(labels))

1758
1758


In [223]:
audioDatasetFin, audioDatasetMfcc = [], []

for i in range(len(audio_samples_new)):
    transformed_sample = transform(audio_samples_new[i])
    transformed_mfcc = transform_mfcc(audio_samples_new[i])
    audioDatasetFin.append((transformed_sample, labels[i]))
    audioDatasetMfcc.append((transformed_sample, transformed_mfcc, labels[i]))

In [164]:
len(audioDatasetFin)

2216

In [224]:
audioDatasetMfcc[0][0].shape

torch.Size([1, 64, 64])

In [166]:
import time

class MfccLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout=0.2, num_classes=36):
        super(MfccLSTM, self).__init__()
        
        self.conv = nn.Sequential(
            nn.Conv2d(1, 32, 3, 1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, 3, 1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Flatten(),
            nn.LazyLinear(512),
            nn.ReLU(),
            nn.Linear(512, num_classes)
        )
        
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.dropout = nn.Dropout(dropout)
        self.lstm2 = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.fc1 = nn.LazyLinear(64)
        self.fc2 = nn.Linear(64, 16)
    
        self.fc3 = nn.LazyLinear(128)
        self.final_lstm = nn.LSTM(1, 64, batch_first=True)
        
        self.fc = nn.LazyLinear(num_classes)
    
    def forward(self, image_input, sequence_input):
        # must return shape (batch_size, num_classes) 
        # batch_size: right now is 16
        # num_classes: right now is 36
        x1 = self.conv(image_input)
        out1, _ = self.lstm(sequence_input)
        out1_dp = self.dropout(out1)
        # print(f'output of first lstm: {out1_dp.shape[1:]}')
        out2, _ = self.lstm2(out1_dp[:, -1, :])
        out2_dp = self.dropout(out2)
        # print(f'output of second lstm: {out2_dp.shape[1:]}')
        x2 = self.fc2(self.fc1(out2_dp))
        x3 = torch.cat((x1, x2), 1)
        # print(f'output of concatenation: {x3.shape[1:]}')
        # x = self.fc(final_out[:, -1, :])
        x = self.fc(x3)
        return x
    

In [167]:
# Model architecture
class CNN(nn.Module):
    def __init__(self, num_classes=36):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.fc1 = nn.LazyLinear(512)
        self.fc2 = nn.Linear(512, num_classes)
    
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 14 * 14)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [168]:
import time

def train_with_cross_validation(dataset, num_epochs, model_name, patience=15, random_state=42, n_splits=10):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)
    fold_results = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(dataset)):
        print(f'Fold {fold+1}/{n_splits}')
        
        # Split the dataset into training and validation sets
        train_set = Subset(dataset, train_idx)
        val_set = Subset(dataset, val_idx)
        train_loader = DataLoader(train_set, batch_size=16, shuffle=True)
        val_loader = DataLoader(val_set, batch_size=16, shuffle=True)
        
        # Initialize model, optimizer, and loss function
        model = MfccLSTM(input_size=20, hidden_size=32, num_classes=36, output_size=64)
        model = model.to(device)
        optimizer = optim.Adam(model.parameters(), lr=5e-4)
        criterion = nn.CrossEntropyLoss()
        
        best_val_acc, epochs_no_imp = 0, 0
        train_accuracies, val_accuracies = [], []

        for epoch in range(num_epochs):
            model.train()
            epoch_train_loss = 0.0
            correct_train = 0
            total_train = 0
            tic = time.perf_counter()
            
            for images, sequences, labels in train_loader:
                images = images.to(device)
                sequences = sequences.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()

                #converting labels to Long to avoid error "not implemented for Int"
                labels = labels.long()
                
                # Forward pass
                outputs = model(images, sequences)
                loss = criterion(outputs, labels)
                epoch_train_loss += loss.item() * images.size(0)

                _, predicted_train = torch.max(outputs.data, 1)
                total_train += labels.size(0)
                correct_train += (predicted_train == labels).sum().item()
                
                # Backward pass
                loss.backward()
                optimizer.step()
            
            toc = time.perf_counter()
            time_taken = toc - tic
            
            epoch_train_loss /= len(train_loader.dataset)
            train_accuracy = correct_train / total_train
            train_accuracies.append(train_accuracy)
            
            # Evaluation of the model
            model.eval()
            total, correct = 0, 0
            for images, sequences, labels in val_loader:
                images = images.to(device)
                sequences = sequences.to(device)
                labels = labels.to(device)

                outputs = model(images, sequences)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
            
            val_accuracy = correct / total
            val_accuracies.append(val_accuracy)
            if (epoch + 1) % 5 == 0:
                print(f"Epoch [{epoch + 1}/{num_epochs}], Train Loss: {epoch_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Val Accuracy: {val_accuracy:.4f}, Iter Time: {time_taken:.2f}s")
                
            if val_accuracy > best_val_acc:
                best_val_acc = val_accuracy
                epochs_no_imp = 0
                best_model_state = model.state_dict()  # Save the best model
            else:
                epochs_no_imp += 1
            if epochs_no_imp >= patience:
                print(f'Early stopping after {epoch+1} epochs')
                model.load_state_dict(best_model_state)  # Load the best model
                break
        
        fold_results.append((epoch+1, best_val_acc))
        print(f'Fold {fold+1} Best Validation Accuracy: {best_val_acc:.4f}')
    torch.save(model.state_dict(), model_name)

    return fold_results

In [169]:
def predict_mfcc(dataset, model_path, device_external, custom=False):
    images_test_set = [t[0] for t in dataset]
    sequences_test_set = [t[1] for t in dataset]
    
    images = torch.stack(images_test_set)
    sequences = torch.stack(sequences_test_set)
    device = torch.device(device_external) #default to mps
    images = images.to(device)
    sequences = sequences.to(device)
    model = MfccLSTM(input_size=20, hidden_size=32, num_classes=36, output_size=64)
    model = model.to(device)
    model.load_state_dict(torch.load(model_path,map_location=device))
    model.eval()
    
    with torch.no_grad():
        outputs = model(images, sequences)
        _, predicted = torch.max(outputs.data, 1)

    pred = []
    keyss = '1234567890QWERTYUIOPASDFGHJKLZXCVBNM'
    if custom: #for using custom audio in project
        keyss = '1234567890ABCDEFGHIJ+-'
    phrase = predicted.tolist()
    for i in range(len(phrase)):
        pred.append(keyss[phrase[i]])

    pred_df = pd.DataFrame(pred)
    return pred_df

In [170]:
def save_csv(model_name, num_epochs, description, accuracy, precision, recall, f1_score):
    csv_file_path = 'model_comparison.csv'
    
    # Read the existing CSV file into a DataFrame
    try:
        df = pd.read_csv(csv_file_path)
    except FileNotFoundError:
        # If the file does not exist, create an empty DataFrame with the correct columns
        df = pd.DataFrame(columns=['Datetime', 'Name', 'Epochs', 'Description', 'Accuracy', 'Precision', 'Recall', 'F1'])
        
    # Data to append
    current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    # Remove newline characters from the description
    description = description.replace('\n', ' ').replace('\r', ' ')
    
    # Create a new column with the relevant information
    new_data = {
        'Datetime': [current_datetime],
        'Name': [model_name],
        'Epochs': [num_epochs],
        'Description': [description],
        'Accuracy': [accuracy],
        'Precision': [precision],
        'Recall': [recall],
        'F1': [f1_score],
    }
    
    new_df = pd.DataFrame(new_data)
    
    df = pd.concat([df, new_df], ignore_index=True)
    
    # Save the updated DataFrame back to the CSV file
    df.to_csv(csv_file_path, index=False)

# Running with audio from Dataset-for-Binary

In [225]:
# current random state to split the dataset
random_state = 42

# values for current run
train_final_set, test_set = train_test_split(audioDatasetMfcc, test_size=0.2, random_state=random_state)
num_epochs = 100
main_architecture = "CNN_LSTM"
currday = datetime.today().strftime('%Y-%m-%d')
model_name = f"model_multiclass_{num_epochs}_{main_architecture}_{currday}.pth"
description = "2 layer CNN (32 and 64 output channels) with final 2 Dense Layers (512 and num_classes) result concatenated with \n 2 LSTMs (hidden_size=32),  from mfcc with 2 Dense Layers (64 and 16) with a final Lazy Linear layer output of num_classes. Using normalized custom audio: 0-9, A-J, Space and Enter keys. n_fft="+str(n_fft)+", hop_length="+str(hop_length)

# Training part
fold_stats = train_with_cross_validation(train_final_set, num_epochs, model_name, random_state=random_state)
max_val = 0
real_num_epochs = 0
for fold_stat in fold_stats: #using folds instead of LOO
    if fold_stat[1] > max_val:
        max_val = fold_stat[1]
        real_num_epochs = fold_stat[0]


Fold 1/10




Epoch [5/100], Train Loss: 1.7137, Train Accuracy: 0.4087, Val Accuracy: 0.3688, Iter Time: 0.81s
Epoch [10/100], Train Loss: 0.9680, Train Accuracy: 0.6640, Val Accuracy: 0.5319, Iter Time: 0.74s
Epoch [15/100], Train Loss: 0.5845, Train Accuracy: 0.7984, Val Accuracy: 0.5816, Iter Time: 0.76s
Epoch [20/100], Train Loss: 0.3240, Train Accuracy: 0.9028, Val Accuracy: 0.5603, Iter Time: 0.76s
Epoch [25/100], Train Loss: 0.1228, Train Accuracy: 0.9723, Val Accuracy: 0.5887, Iter Time: 0.78s
Epoch [30/100], Train Loss: 0.1996, Train Accuracy: 0.9621, Val Accuracy: 0.5319, Iter Time: 0.76s
Epoch [35/100], Train Loss: 0.0661, Train Accuracy: 0.9913, Val Accuracy: 0.6028, Iter Time: 0.76s
Epoch [40/100], Train Loss: 0.0167, Train Accuracy: 0.9953, Val Accuracy: 0.6241, Iter Time: 0.87s
Early stopping after 41 epochs
Fold 1 Best Validation Accuracy: 0.6383
Fold 2/10
Epoch [5/100], Train Loss: 1.6149, Train Accuracy: 0.4451, Val Accuracy: 0.2979, Iter Time: 0.79s
Epoch [10/100], Train Loss: 0.

In [172]:
# Prediction part
prediction = predict_mfcc(test_set, model_name, device, custom=True) #mark "custom=True" for using custom audio in project
labels_set = [t[2] for t in test_set]
final_labels_set = [keys_s[ind] for ind in labels_set]

# Metrics calculation
accuracy = accuracy_score(final_labels_set, prediction[0])
precision = precision_score(final_labels_set, prediction[0], average='macro')
recall = recall_score(final_labels_set, prediction[0], average='macro')
f1 = sklearn.metrics.f1_score(final_labels_set, prediction[0], average='macro')

# Save in csv file
save_csv(model_name, real_num_epochs, description, accuracy, precision, recall, f1)

# Print results
print("Final Results!")
print(f"Model: {model_name}")
print(description)
print(f"Epochs: {real_num_epochs}")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

Final Results!
Model: model_multiclass_100_CNN_LSTM_2024-08-28.pth
2 layer CNN (32 and 64 output channels) with final 2 Dense Layers (512 and num_classes) result concatenated with 
 2 LSTMs (hidden_size=32),  from mfcc with 2 Dense Layers (64 and 16) with a final Lazy Linear layer output of num_classes. Using normalized and normalized custom audio: 0-9, A-J, Space and Enter keys. n_fft=10, hop_length=10
Epochs: 50
Accuracy: 0.6216216216216216
Precision: 0.6466321179958767
Recall: 0.6465371362088314
F1 Score: 0.6367186973067964


In [173]:
import csv

def empty_file(csv_file_path):
    # Read the header (first row) of the CSV file
    with open(csv_file_path, 'r') as file:
        reader = csv.reader(file)
        header = next(reader)  # Read the first row (header)
    
    # Write only the header back to the CSV file
    with open(csv_file_path, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(header)  # Wr`ite the header back to the file


In [174]:
# empty_file('model_comparison.csv')