In [5]:
TRAIN_DIR = 'data/dev_data/dev_data/slider/train'
TEST_DIR = 'data/dev_data/dev_data/slider/test'
SAMPLE_RATE = 16000
DOWNSAMPLE_FACTOR = 10
RANDOM_STATE = 42

In [6]:
import os
import pandas as pd
from scipy.io import wavfile
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from scipy.spatial.distance import cosine, euclidean
from sklearn.metrics.pairwise import cosine_similarity
import plotly.graph_objs as go

In [7]:
XTRdic = dict()
for file_name in os.listdir(TRAIN_DIR):
    file_path = os.path.join(TRAIN_DIR, file_name)
    _, audio_data = wavfile.read(file_path)
    id = file_name.split('_')[2]
    if id in XTRdic:
        XTRdic[id].append(np.array(audio_data[::DOWNSAMPLE_FACTOR]))
    else:
        XTRdic[id] = [audio_data[::DOWNSAMPLE_FACTOR]]

In [8]:
XTEdic = dict()
yTEdic = dict()
for file_name in os.listdir(TEST_DIR):
    file_path = os.path.join(TEST_DIR, file_name)
    _, audio_data = wavfile.read(file_path)
    parts = file_name.split('_')
    id = parts[2]
    label = 1 if parts[0] == 'normal' else 0
    if id in XTEdic:
        XTEdic[id].append(np.array(audio_data[::DOWNSAMPLE_FACTOR]))
        yTEdic[id].append(label)
    else:
        XTEdic[id] = [audio_data[::DOWNSAMPLE_FACTOR]]
        yTEdic[id] = [label]

In [9]:
ids = sorted(XTRdic.keys())
for id in ids:
    XTRdic[id] = np.vstack(XTRdic[id])
    XTEdic[id] = np.vstack(XTEdic[id])
    yTEdic[id] = np.array(yTEdic[id])

In [10]:
import librosa

def estimate_pitch(audio_data):
    """
    Estimate pitch from audio data.

    Parameters:
    - audio_data: numpy array containing audio samples.

    Returns:
    - pitches: numpy array containing estimated pitch values.
    """
    # Estimate pitch using librosa
    pitches, magnitudes = librosa.piptrack(y=audio_data, sr=SAMPLE_RATE/DOWNSAMPLE_FACTOR)
    
    return pitches

def pitch_similarity(pitches1, pitches2):
    """
    Compute similarity based on pitch values using Euclidean distance.

    Parameters:
    - pitches1: numpy array containing pitch values of the first audio signal.
    - pitches2: numpy array containing pitch values of the second audio signal.

    Returns:
    - similarity: similarity score based on pitch values.
    """
    # Compute Euclidean distance between pitch values
    distance = np.linalg.norm(pitches1 - pitches2)
    
    # Convert distance to similarity score (e.g., inverse)
    similarity = 1 / (1 + distance)
    
    return similarity

In [11]:
def phase_correlation(signal1, signal2):
    # Compute the complex conjugate of signal1
    conj_signal1 = np.conj(signal1)
    
    # Compute the cross-power spectrum
    cross_power_spectrum = signal2 * conj_signal1
    
    # Compute the phase correlation
    phase_corr = np.abs(np.fft.ifft(cross_power_spectrum))
    
    # Compute the mean or sum of the phase correlation matrix
    similarity = np.mean(phase_corr)  # or np.sum(phase_corr)
    
    return similarity

def cross_correlation(signal1, signal2):
    # Compute the complex conjugate of signal2
    conj_signal2 = np.conj(signal2)
    
    # Compute the cross-correlation
    cross_corr = np.fft.ifft(np.fft.fft(signal1) * conj_signal2)
    
    return np.mean(cross_corr)

In [12]:
import librosa
import numpy as np

def compute_mfcc(audio_data, n_mfcc=13):
    """
    Compute Mel-Frequency Cepstral Coefficients (MFCCs) from audio data.

    Parameters:
    - audio_data: numpy array containing audio samples.
    - n_mfcc: number of MFCC coefficients to compute (default: 13).

    Returns:
    - mfccs: numpy array containing MFCCs.
    """
    # Compute MFCCs
    mfccs = librosa.feature.mfcc(y=audio_data.astype(float), sr=SAMPLE_RATE/DOWNSAMPLE_FACTOR, n_mfcc=n_mfcc)
    
    return mfccs

def mfcc_distance(mfcc1, mfcc2):
    """
    Compute the Euclidean distance between two sets of MFCCs.

    Parameters:
    - mfcc1: numpy array containing MFCCs of the first audio signal.
    - mfcc2: numpy array containing MFCCs of the second audio signal.

    Returns:
    - distance: Euclidean distance between the two sets of MFCCs.
    """
    distance = euclidean(mfcc1.flatten(), mfcc2.flatten())
    return distance

In [13]:
def fft_distance(signal1, signal2):
    """
    Compute the Euclidean distance between two sets of FFTs.

    Parameters:
    - signal1: numpy array containing FFT of the first audio signal.
    - signal2: numpy array containing FFT of the second audio signal.

    Returns:
    - distance: Euclidean distance between the two sets of FFTs.
    """
    distance = euclidean(signal1.flatten(), signal2.flatten())
    return distance

In [15]:
# for id in ids:
#     XTR = XTRdic[id]
#     XTRmean = np.mean(XTR, axis=0)
#     mfcc = compute_mfcc(XTRmean)
#     fft = np.fft.fft(XTRmean)
#     # avg = fft.mean()

#     print(XTRmean.shape)

#     scores = []
#     for i in range(len(XTR)):
#         pc = phase_correlation(XTR[i,:], XTRmean)
#         md = mfcc_distance(compute_mfcc(XTR[i,:]), mfcc)
#         thisfft = np.fft.fft(XTR[i,:])
#         fd = fft_distance(thisfft, fft)
#         # fa = avg - thisfft.mean()
#         peaks, _ = find_peaks(XTR[i,:])
#         scores.append([pc, len(peaks), fd])
#     scores = np.array(scores)
#     print(scores.shape)
    
#     # Convert scores to numpy array for easier manipulation
#     scores = np.array(scores)

#     # Separate the two dimensions
#     x = scores[:, 0]
#     y = scores[:, 1]
#     z = scores[:, 2]
    
#     # Create a trace
#     trace = go.Scatter3d(
#         x=x,
#         y=y,
#         z=z,
#         mode='markers',
#         marker=dict(
#             size=3,
#             color=z,                # Color by z value
#             colorscale='Viridis',   # Set color scale
#             opacity=0.8
#         )
#     )
#     # Create layout
#     layout = go.Layout(
#     margin=dict(l=0, r=0, b=0, t=0),
#     scene=dict(
#             xaxis=dict(title='X'),
#             yaxis=dict(title='Y'),
#             zaxis=dict(title='Z'),
#         )
#     )
#     # Create figure
#     fig = go.Figure(data=[trace], layout=layout)

#     # Show plot
#     fig.show()


#     # Create scatter plot
#     # plt.scatter(score1, score2, c=colors)
#     # plt.xlabel('Score 1')
#     # plt.ylabel('Score 2')
#     # plt.title('Scatter Plot of Scores')
#     # plt.show()

In [16]:
# for id in ids:
#     XTR = np.mean(XTRdic[id], axis=0)
#     mfcc = compute_mfcc(XTR)
#     fft = np.fft.fft(XTR)
#     avg = fft.mean()
#     XTE = XTEdic[id]
#     yTE = yTEdic[id]

#     print(XTR.shape)
#     print(XTE.shape)

#     scores = []
#     for i in range(len(XTE)):
#         # thisfft = np.fft.fft(XTE[i,:])
#         # dist0 = thisfft.T @ thisfft
#         # dist0 = dist0*np.conj(dist0)
#         # fa = thisfft.mean()
#         pc = phase_correlation(XTE[i,:], XTR)
#         # md = mfcc_distance(compute_mfcc(XTE[i,:]), mfcc)
#         thisfft = np.fft.fft(XTE[i,:])
#         fd = fft_distance(thisfft, fft)
#         fa = avg - thisfft.mean()
        
#         peaks, _ = find_peaks(np.abs(np.clip(thisfft-fft, 0, None)), height=40000)
#         scores.append([pc, len(peaks), fd])
#     scores = np.array(scores)
#     print(scores.shape, yTE.shape)
    
#     # Convert scores to numpy array for easier manipulation
#     scores = np.array(scores)

#     # Separate the two dimensions
#     x = scores[:, 0]
#     y = scores[:, 1]
#     z = scores[:, 2]
    
#     # Filter scores based on class
#     scores_red = scores[yTE == 0]
#     scores_blue = scores[yTE == 1]
    
#     # Define colors for the classes (assuming yTE is defined)
#     colors = np.where(yTE == 0, 'red', 'blue')
    
#     # Create a trace
#     trace = go.Scatter3d(
#         x=x,
#         y=y,
#         z=z,
#         mode='markers',
#         marker=dict(
#             size=3,
#             color=colors,                # Color by z value
#             colorscale='Viridis',   # Set color scale
#             opacity=0.8
#         )
#     )
#     # Create layout
#     layout = go.Layout(
#     margin=dict(l=0, r=0, b=0, t=0),
#     scene=dict(
#             xaxis=dict(title='X'),
#             yaxis=dict(title='Y'),
#             zaxis=dict(title='Z'),
#         )
#     )
#     # Create figure
#     fig = go.Figure(data=[trace], layout=layout)

#     # Show plot
#     fig.show()


#     # Create scatter plot
#     # plt.scatter(score1, score2, c=colors)
#     # plt.xlabel('Score 1')
#     # plt.ylabel('Score 2')
#     # plt.title('Scatter Plot of Scores')
#     # plt.show()

In [17]:
XTRfeatures = dict()
XTRmeans = dict()

for id in ids:
    print(f'Processing {id}')
    XTR = XTRdic[id]
    XTRmean = np.mean(XTR, axis=0)
    mfcc = compute_mfcc(XTRmean)
    fft = np.fft.fft(XTRmean)
    avg = fft.mean()

    features = []
    for i in range(len(XTR)):
        pc = phase_correlation(XTR[i,:], XTRmean)
        md = mfcc_distance(compute_mfcc(XTR[i,:]), mfcc)
        thisfft = np.fft.fft(XTR[i,:])
        fd = fft_distance(thisfft, fft)
        fa = avg - thisfft.mean()
        peaks, _ = find_peaks(XTR[i,:])
        features.append([pc, md, fd, fa.real, fa.imag, len(peaks)])
    
    XTRfeatures[id] = np.array(features)
    XTRmeans[id] = {
        'mean': XTRmean,
        'mfcc': mfcc,
        'fft': fft,
        'avgfft-real': avg.real,
        'avgfft-imag': avg.imag
    }

Processing 00
Processing 02
Processing 04


In [18]:
# from sklearn.preprocessing import StandardScaler
# from scipy.stats import multivariate_normal

# scalers = dict()
# gaussians = dict()
# thresholds = dict()

# for id in ids:
#     print(f'Processing {id}')
#     XTR = XTRfeatures[id]
    
#     scaler = StandardScaler()
#     XTRscaled = scaler.fit_transform(XTR)
#     scalers[id] = scaler

#     # Estimate mean and covariance matrix from normalized data
#     mean = np.mean(XTRscaled, axis=0)
#     covariance_matrix = np.cov(XTRscaled, rowvar=False)

#     gaussian_distribution = multivariate_normal(mean=mean, cov=covariance_matrix)
#     gaussians[id] = gaussian_distribution

#     log_likelihoods = gaussian_distribution.logpdf(XTRscaled)
#     log_likelihoods.sort()
#     threshold = log_likelihoods[int(0.1*len(log_likelihoods))]
#     thresholds[id] = threshold

In [19]:
# loglikelihoods = dict()

# for id in ids:
#     print(f'Processing {id}')
#     XTE = XTEdic[id]
#     yTE = yTEdic[id]
    
#     features = []
#     for i in range(len(XTE)):
#         thisfft = np.fft.fft(XTE[i,:])
#         pc = phase_correlation(XTE[i,:], XTRmeans[id]['mean'])
#         md = mfcc_distance(compute_mfcc(XTE[i,:]), XTRmeans[id]['mfcc'])
#         fd = fft_distance(thisfft, XTRmeans[id]['fft'])
#         fa = XTRmeans[id]['avgfft-real'] - thisfft.mean()
#         peaks, _ = find_peaks(XTE[i,:])
#         features.append([pc, md, fd, fa.real, fa.imag, len(peaks)])
    
#     XTEfeatures = np.array(features)

#     # Normalize test data using the same scaler
#     XTEscaled = scalers[id].transform(XTEfeatures)

#     # Compute log-likelihood of the test data
#     log_likelihood = gaussians[id].logpdf(XTEscaled)
#     loglikelihoods[id] = log_likelihood

In [20]:
# from sklearn.metrics import roc_auc_score, accuracy_score, confusion_matrix

# for id in ids:
#     threshold = thresholds[id]
#     yTE = yTEdic[id]
#     colors = np.where(yTE == 0, 'red', 'blue')
#     print(f'Processing {id}')
#     log_likelihood = loglikelihoods[id]
#     # Create a scatter plot
#     plt.scatter(range(len(log_likelihood)), log_likelihood, c=colors)

#     # Add labels and title
#     plt.xlabel('Sample')
#     plt.ylabel('Log-Likelihood')
#     plt.title('Scatter Plot of Log-Likelihoods')

#     # Show the plot
#     plt.show()

#     ypred = np.where(log_likelihood > threshold, 1, 0)
#     accuracy = accuracy_score(yTE, ypred)
#     auc = roc_auc_score(yTE, ypred)
#     conf_mat = confusion_matrix(yTE, ypred)
#     # Extract TN, FP, FN, TP from confusion matrix
#     TN, FP, FN, TP = conf_mat.ravel()
#     # Compute False Positive Rate (FPR)
#     fpr = FP / (FP + TN)
#     # Compute False Negative Rate (FNR)
#     fnr = FN / (FN + TP)

#     print(f'AUC: {auc:.2f}')
#     print(f'Accuracy: {accuracy:.2f}')
#     print(f'FPR: {fpr:.2f}')
#     print(f'FNR: {fnr:.2f}')
#     print('Confusion Matrix:')
#     print(conf_mat)

In [22]:
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score, accuracy_score, confusion_matrix

models = dict()

for id in ids:
    print(f'Processing {id}')
    XTR = XTRfeatures[id]
    scaler = StandardScaler()
    XTRscaled = scaler.fit_transform(XTR)

    XTE = XTEdic[id]
    yTE = yTEdic[id]
    features = []

    for i in range(len(XTE)):
        thisfft = np.fft.fft(XTE[i,:])
        pc = phase_correlation(XTE[i,:], XTRmeans[id]['mean'])
        md = mfcc_distance(compute_mfcc(XTE[i,:]), XTRmeans[id]['mfcc'])
        fd = fft_distance(thisfft, XTRmeans[id]['fft'])
        fa = XTRmeans[id]['avgfft-real'] - thisfft.mean()
        peaks, _ = find_peaks(XTE[i,:])
        features.append([pc, md, fd, fa.real, fa.imag, len(peaks)])
    
    XTEfeatures = np.array(features)
    # Normalize test data using the same scaler
    XTEscaled = scaler.transform(XTEfeatures)
    
    XTR = XTRscaled
    XTE = XTEscaled
    
    best_model, best_auc, best_threshold = None, None, None

    nums_components = [1, 2, 4, 8]
    for n_components in nums_components:
        gmm = GaussianMixture(n_components=n_components, random_state=RANDOM_STATE)
        gmm.fit(XTR)
        log_likelihoods = gmm.score_samples(XTR)
        log_likelihoods.sort()
        threshold = log_likelihoods[int(0.05*len(log_likelihoods))]
        avg_ll = log_likelihoods.mean()

        test_scores = gmm.score_samples(XTE)
        ypred = test_scores > threshold

        accuracy = accuracy_score(yTE, ypred)
        auc = roc_auc_score(yTE, ypred)
        conf_mat = confusion_matrix(yTE, ypred)
        TN, FP, FN, TP = conf_mat.ravel()
        fpr = FP / (FP + TN)
        fnr = FN / (FN + TP)

        print(f'Components: {n_components}, avg_ll: {avg_ll:.2f}, AUC: {auc:.2f}, Accuracy: {accuracy:.2f}, FPR: {fpr:.2f}, FNR: {fnr:.2f}')
        
        if best_auc is None or auc > best_auc:
            best_auc = auc
            best_model = gmm
            threshold = best_threshold

Processing 00
Components: 1, avg_ll: -7.97, AUC: 0.97, Accuracy: 0.99, FPR: 0.00, FNR: 0.05
Components: 2, avg_ll: -7.60, AUC: 0.99, Accuracy: 1.00, FPR: 0.00, FNR: 0.02
Components: 4, avg_ll: -7.18, AUC: 0.97, Accuracy: 0.98, FPR: 0.00, FNR: 0.07
Components: 8, avg_ll: -6.72, AUC: 0.94, Accuracy: 0.97, FPR: 0.00, FNR: 0.13
Processing 02
Components: 1, avg_ll: -7.41, AUC: 0.72, Accuracy: 0.63, FPR: 0.48, FNR: 0.08
Components: 2, avg_ll: -6.87, AUC: 0.86, Accuracy: 0.84, FPR: 0.18, FNR: 0.10
Components: 4, avg_ll: -6.45, AUC: 0.80, Accuracy: 0.77, FPR: 0.28, FNR: 0.11
Components: 8, avg_ll: -5.94, AUC: 0.84, Accuracy: 0.82, FPR: 0.20, FNR: 0.12
Processing 04
Components: 1, avg_ll: -7.96, AUC: 0.77, Accuracy: 0.73, FPR: 0.39, FNR: 0.07
Components: 2, avg_ll: -7.39, AUC: 0.77, Accuracy: 0.72, FPR: 0.42, FNR: 0.05
Components: 4, avg_ll: -7.13, AUC: 0.76, Accuracy: 0.72, FPR: 0.38, FNR: 0.09
Components: 8, avg_ll: -6.83, AUC: 0.75, Accuracy: 0.74, FPR: 0.30, FNR: 0.20
