In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np
import os
from scipy.signal import welch
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
import read_data as rd  # Importing the read_data.py module
from scipy.fft import fft
from scipy.stats import skew, kurtosis

In [None]:
# Function to normalize the data using min-max scaling
def normalize_data(data):
    min_val = np.min(data, axis=0)
    max_val = np.max(data, axis=0)
    normalized = (data - min_val) / (max_val - min_val)
    return normalized

def extract_time_domain_features(signal):
    features = []
    for i in range(signal.shape[1]):
        sig = signal[:, i]
        features.extend([
            np.mean(sig),               # Mean
            np.std(sig),                # Standard Deviation
            skew(sig),                  # Skewness
            kurtosis(sig),              # Kurtosis
            np.max(sig),                # Maximum
            np.min(sig),                # Minimum
            np.ptp(sig),                # Peak-to-Peak
            np.sqrt(np.mean(sig**2)),   # RMS
            np.sum(np.abs(np.diff(np.sign(sig)))) / 2,  # Zero Crossing Rate
            np.sum(sig**2)              # Energy
        ])
    return features

def extract_fft_features(signal, fs, top_n=50):
    N = len(signal)
    T = 1.0 / fs
    yf = fft(signal)
    xf = np.fft.fftfreq(N, T)[:N//2]
    yf = 2.0/N * np.abs(yf[:N//2])
    
    # Get indices of the top_n highest frequencies
    top_indices = np.argsort(yf)[-top_n:]
    
    # Extract the top_n highest FFT features
    fft_features = yf[top_indices]
    return fft_features

def extract_frequency_domain_features(signal, fs):
    features = []
    for i in range(signal.shape[1]):
        sig = signal[:, i]
        freqs, psd = welch(sig, fs)
        
        features.extend([
            np.mean(psd),                   # Mean Power Spectral Density
            np.sum(psd),                    # Total Power
            np.argmax(psd),                 # Peak Frequency
            np.mean(freqs * psd) / np.mean(psd),  # Spectral Centroid
            np.sqrt(np.mean((freqs - np.mean(freqs))**2 * psd)) / np.mean(psd),  # Spectral Bandwidth
            np.percentile(psd, 75) - np.percentile(psd, 25),  # Spectral Contrast
            np.max(freqs[np.cumsum(psd) / np.sum(psd) <= 0.85])  # Spectral Roll-off
        ])
        
    return features

In [None]:
# Load data
dataset_dir = '/home/ecappiell/datasets/full'
data_arrays, labels, class_ids = rd.process_mafaulda_data(dataset_dir)

In [None]:
# Original sampling rate (in Hz)
original_sampling_rate = 50 * 10**3  # 50 kHz

# Target sampling rate (in Hz)
target_sampling_rate = 2 * 10**3  # 2 kHz

# Downsample the data
downsampled_data = rd.downsample_data(data_arrays, original_sampling_rate, target_sampling_rate)

In [None]:
# Normalize the downsampled d ata
normalized_data = np.array([normalize_data(signal) for signal in downsampled_data])

In [None]:
# Extract features for each signal
X = []
for signal in normalized_data:
    time_features = extract_time_domain_features(signal)
    fft_features = [extract_fft_features(signal[:, i], target_sampling_rate) for i in range(signal.shape[1])]
    freq_features = extract_frequency_domain_features(signal, target_sampling_rate)
    signal_features = np.concatenate((time_features, np.hstack(fft_features), freq_features))
    X.append(signal_features)
X = np.array(X)

In [None]:
# Assuming labels and class_ids are prepared for classification
y = np.array(class_ids)

In [None]:
# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

In [None]:
# Normalize the feature matrix using StandardScaler
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.metrics import classification_report

classifiers = [
    KNeighborsClassifier(3),
    SVC(kernel="linear", C=0.025,probability=True),
    SVC(gamma=2, C=1, probability=True),
    GaussianProcessClassifier(),
    DecisionTreeClassifier(max_depth=5),
    RandomForestClassifier(max_depth=5, n_estimators=10, max_features=1),
    AdaBoostClassifier(),
    GaussianNB(),
    QuadraticDiscriminantAnalysis()]

classifier_names = [
    "Nearest Neighbors",
    "Linear SVM",
    "RBF SVM",
    "Gaussian Process",
    "Decision Tree",
    "Random Forest",
    "AdaBoost",
    "Naive Bayes",
    "QDA"]

# Train and evaluate each classifier
for name, clf in zip(classifier_names, classifiers):
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    print(f"Classifier: {name}")
    print(classification_report(y_test, y_pred))
    print("Confusion Matrix:")
    print(confusion_matrix(y_test, y_pred))
    print("="*50)

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout

# Train each classifier separately
predictions = []
for clf in classifiers:
    clf.fit(X_train, y_train)
    y_pred = clf.predict_proba(X_test)[:, 1]  # Use probabilities as predictions
    predictions.append(y_pred)

# Convert predictions to numpy array
predictions = np.array(predictions)

# Convert labels to one-hot encoding
num_classes = len(np.unique(y_train))
y_train_one_hot = tf.keras.utils.to_categorical(y_train, num_classes)
y_test_one_hot = tf.keras.utils.to_categorical(y_test, num_classes)

# Create a neural network for ensemble learning
ensemble_model = Sequential([
    Dense(64, activation='relu', input_shape=(len(classifiers),)),
    Dropout(0.5),
    Dense(64, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')  # Use softmax for multiclass classification
])

# Compile the model
ensemble_model.compile(optimizer='adam',
                       loss='categorical_crossentropy',
                       metrics=['accuracy'])

# Train the model on the combined predictions
ensemble_model.fit(predictions.T, y_train_one_hot, epochs=10, batch_size=32, validation_split=0.2)

# Evaluate the ensemble model
ensemble_predictions = ensemble_model.predict(predictions.T)
ensemble_predictions = np.argmax(ensemble_predictions, axis=1)
print("Ensemble Model Performance:")
print(classification_report(y_test, ensemble_predictions))
print("Confusion Matrix:")
print(confusion_matrix(y_test, ensemble_predictions))
