## Download and Import Lib

In [1]:
!pip install librosa
!pip install torch



In [2]:
import os
import pandas as pd
import s3fs
import zipfile
import torch
from torch.utils.data import Dataset
import numpy as np
import matplotlib.pyplot as plt
import librosa
from scipy import signal
from tqdm import tqdm
import numpy as np

from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error,mean_absolute_error
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KNeighborsRegressor
from torch.utils.data import DataLoader

In [3]:
DATASET_PATH = "data/LivingRoom_preprocessed_hack"

## Read Data

In [4]:
centroids_h1 = np.load(DATASET_PATH + "/Human1/centroid.npy")
centroids_h2 = np.load(DATASET_PATH + "/Human2/centroid.npy")
deconvoled_trim_h1 = np.load(DATASET_PATH + "/Human1/deconvoled_trim.npy")
deconvoled_trim_h2 = np.load(DATASET_PATH + "/Human2/deconvoled_trim.npy")

In [5]:
print("Deconvoled Trim Human 1 Shape: ", deconvoled_trim_h1.shape)
print("Deconvoled Trim Human 2 Shape: ", deconvoled_trim_h2.shape)
print("Centroids Huma 1 Shape: ", centroids_h1.shape)
print("Centroids Human 2 Shape: ", centroids_h2.shape)
print(centroids_h1.shape)
print(centroids_h2.shape)
print(centroids_h1)

Deconvoled Trim Human 1 Shape:  (1000, 4, 667200)
Deconvoled Trim Human 2 Shape:  (104, 4, 667200)
Centroids Huma 1 Shape:  (1000, 2)
Centroids Human 2 Shape:  (104, 2)
(1000, 2)
(104, 2)
[[-3231.3293467  -1127.87771457]
 [-3198.54107875  -744.5100656 ]
 [-3192.9776274   -248.26678827]
 ...
 [-1717.89923578 -3166.59648491]
 [-1808.60337549 -2779.13038427]
 [   44.43741322   106.48353609]]


## Preprocess Data - Human 1

In [8]:
sampling_rate = 44100
preprocessed_data_mfcc = []
preprocessed_data_rms = []
preprocessed_data_zcr = []

# Wrap the outer loop with tqdm for progress visualization
for instance_index in tqdm(range(deconvoled_trim_h1.shape[0])):
    instance_data_mfcc = []
    instance_data_zcr = []
    instance_data_rms = []
    for channel_index in range(deconvoled_trim_h1.shape[1]):
        # Filtering
        filtered_signal = signal.medfilt(deconvoled_trim_h1[instance_index, channel_index, :], kernel_size=3)
        
        # Normalization
        normalized_signal = librosa.util.normalize(filtered_signal)

        # Resampling
        #resampled_signal = librosa.resample(normalized_signal, orig_sr=sampling_rate, target_sr=16000)
        
        # Feature extraction
        # MFCC features
        mfcc_features = librosa.feature.mfcc(y=deconvoled_trim_h1[instance_index, channel_index, :], sr=16000, n_mfcc=13)

        # RMS features
        rms_features = np.sqrt(np.mean(deconvoled_trim_h1[instance_index, channel_index, :]**2))
        # rms_features = librosa.feature.rms(y=normalized_signal)

        # Zero-Crossing Rate
        zero_crossing_rate = librosa.feature.zero_crossing_rate(y=deconvoled_trim_h1[instance_index, channel_index, :])
        
        instance_data_mfcc.append(mfcc_features)
        instance_data_zcr.append(zero_crossing_rate)
        instance_data_rms.append(rms_features)
    
    preprocessed_data_mfcc.append(instance_data_mfcc)
    preprocessed_data_rms.append(instance_data_rms)
    preprocessed_data_zcr.append(instance_data_zcr)

preprocessed_data_mfcc_h1 = np.array(preprocessed_data_mfcc)
preprocessed_data_rms_h1 = np.array(preprocessed_data_rms)
preprocessed_data_zcr_h1 = np.array(preprocessed_data_zcr)


100%|██████████| 1000/1000 [13:09<00:00,  1.27it/s]


## Preprocess Data - Human 2

In [9]:
sampling_rate = 44100
preprocessed_data_mfcc = []
preprocessed_data_rms = []
preprocessed_data_zcr = []

# Wrap the outer loop with tqdm for progress visualization
for instance_index in tqdm(range(deconvoled_trim_h2.shape[0])):
    instance_data_mfcc = []
    instance_data_zcr = []
    instance_data_rms = []
    for channel_index in range(deconvoled_trim_h2.shape[1]):
        # Filtering
        filtered_signal = signal.medfilt(deconvoled_trim_h2[instance_index, channel_index, :], kernel_size=3)
        
        # Normalization
        normalized_signal = librosa.util.normalize(filtered_signal)

        # Resampling
        #resampled_signal = librosa.resample(normalized_signal, orig_sr=sampling_rate, target_sr=16000)
        
        # Feature extraction
        # MFCC features
        mfcc_features = librosa.feature.mfcc(y=deconvoled_trim_h2[instance_index, channel_index, :], sr=16000, n_mfcc=13)

        # RMS features
        rms_features = np.sqrt(np.mean(deconvoled_trim_h2[instance_index, channel_index, :]**2))
        # rms_features = librosa.feature.rms(y=normalized_signal)

        # Zero-Crossing Rate
        zero_crossing_rate = librosa.feature.zero_crossing_rate(y=deconvoled_trim_h2[instance_index, channel_index, :])
        
        instance_data_mfcc.append(mfcc_features)
        instance_data_zcr.append(zero_crossing_rate)
        instance_data_rms.append(rms_features)
    
    preprocessed_data_mfcc.append(instance_data_mfcc)
    preprocessed_data_rms.append(instance_data_rms)
    preprocessed_data_zcr.append(instance_data_zcr)

preprocessed_data_mfcc_h2 = np.array(preprocessed_data_mfcc)
preprocessed_data_rms_h2 = np.array(preprocessed_data_rms)
preprocessed_data_zcr_h2 = np.array(preprocessed_data_zcr)


100%|██████████| 104/104 [01:13<00:00,  1.41it/s]


## Model

In [10]:
class CustomDataset(Dataset):
    def __init__(self, coordinates, preprocessed_data_mfcc=[], _type="rms", preprocessed_data_rms=[], preprocessed_data_zcr=[]):
        self.preprocessed_data_mfcc = preprocessed_data_mfcc
        self.preprocessed_data_rms = preprocessed_data_rms
        self.preprocessed_data_zcr = preprocessed_data_zcr
        self.coordinates = coordinates
        self.type = _type
        
    def __len__(self):
        return len(self.coordinates)

    def __getitem__(self, idx):
        coordinates = torch.tensor(self.coordinates[idx], dtype=torch.float32)
        if self.type == "rms":
            rms = self.preprocessed_data_rms[idx]
            rms = torch.tensor(rms, dtype=torch.float32)
            return rms, coordinates
        elif self.type == "mfcc":
            mfcc = [torch.tensor(self.preprocessed_data_mfcc[idx, mic_index], dtype=torch.float32) for mic_index in range(4)]
            return mfcc, coordinates
        elif self.type == "zcr":
            zcr = torch.tensor(self.preprocessed_data_zcr[idx], dtype=torch.float32) if self.preprocessed_data_zcr else None
            return zcr, coordinates
        else:
            print("Error type")
            return None, None

In [11]:
# Define the custom dataset
dataset_h1 = CustomDataset(preprocessed_data_rms=preprocessed_data_rms_h1, coordinates=centroids_h1)
dataset_h2 = CustomDataset(preprocessed_data_rms=preprocessed_data_rms_h2, coordinates=centroids_h2)

# Access a sample from the dataset
features_list, coordinates = dataset_h1[1]
print("Features List Length:", len(features_list))
print("Features Shape (Microphone 1):", features_list[0])
print("Features Shape (Microphone 2):", features_list[1])
print("Features Shape (Microphone 3):", features_list[2])
print("Features Shape (Microphone 4):", features_list[3])
print("Coordinates:", coordinates)

Features List Length: 4
Features Shape (Microphone 1): tensor(0.0005)
Features Shape (Microphone 2): tensor(0.0005)
Features Shape (Microphone 3): tensor(0.0007)
Features Shape (Microphone 4): tensor(0.0004)
Coordinates: tensor([-3198.5410,  -744.5101])


## KNN RMS

In [12]:
batch_size = 64
train_loader = DataLoader(dataset=dataset_h1, batch_size=batch_size, shuffle=True)

In [13]:
def euclidean_distance(pred_coords, true_coords):
    if not isinstance(pred_coords, np.ndarray):
        pred_coords = pred_coords.numpy()
    if not isinstance(true_coords, np.ndarray):
        pred_coords = true_coords.numpy()
    return np.sqrt(np.sum((pred_coords - true_coords)**2))


def custom_scoring(estimator, X, y):
    # Predict coordinates using the estimator
    pred_coords = estimator.predict(X)

    # Calculate mean squared error using your euclidean_distance function
    ed = np.mean([euclidean_distance(p, t) for p, t in zip(pred_coords, y)])

    # Return negative MSE for minimization during grid search
    return -ed

In [14]:
# Define the parameter grid for GridSearchCV (adjust k values as needed)
param_grid = {'n_neighbors': np.arange(1, 40)}

# Create a GridSearchCV object
grid_search = GridSearchCV(KNeighborsRegressor(), param_grid, cv=5, scoring=custom_scoring)
#grid_search = GridSearchCV(KNeighborsRegressor(), param_grid, cv=5, scoring="neg_mean_absolute_error")

# Train the model with GridSearchCV
grid_search.fit(dataset_h1.preprocessed_data_rms, dataset_h1.coordinates)

# Print the best parameters and MSE
print("GridSearchCV Best Parameters:", grid_search.best_params_['n_neighbors'])
print("GridSearchCV Best Scoring:", -grid_search.best_score_)  # Negate for readability

GridSearchCV Best Parameters: 5
GridSearchCV Best Scoring: 1414.7135232311489


In [15]:
knn_model = KNeighborsRegressor(n_neighbors=grid_search.best_params_['n_neighbors']) #

# Train the model
knn_model.fit(dataset_h1.preprocessed_data_rms, dataset_h1.coordinates)

predicted_coords = knn_model.predict(dataset_h2.preprocessed_data_rms)

mse = mean_squared_error(centroids_h2, predicted_coords)
print("Mean Squared Error:", mse)

mae = mean_absolute_error(centroids_h2, predicted_coords)
print("Mean Absolute Error:", mae)

# Calculate localization errors for each prediction
errors = []
for pred_coords, true_coords in zip(predicted_coords, dataset_h2.coordinates):
    error = euclidean_distance(pred_coords, true_coords)
    errors.append(error)

errors = np.array(errors)
mean_error = np.mean(errors)
stdev_error = np.std(errors)

print("Localization Error: {:.2f} ({:.2f})".format(mean_error, stdev_error))
print("Score: ", knn_model.score(dataset_h2.preprocessed_data_rms, dataset_h2.coordinates))

Mean Squared Error: 1086286.9795039732
Mean Absolute Error: 785.8125857797194
Localization Error: 1230.26 (811.81)
Score:  0.14771953741184252
