In [89]:
import numpy as np
import src.load_data as load_data
import src.feature_extraction as feature_extraction

# Load data

In [None]:
positive_sample_folder_batch1 = "data/raw/positive/coughing"
positive_segments_batch1 = load_data.load_positive_data(positive_sample_folder_batch1)
print(positive_segments_batch1[0]) # The first cough in mix2_cough_train, 13363 samples, last 0.835s
print(f"Number of positive samples from coughing folder is {len(positive_segments_batch1)}") 
load_data.get_segment_statistics(positive_segments_batch1)

In [None]:
positive_sample_folder_batch2 = "data/raw/positive/coughing_batch_2"
positive_segments_batch2 = load_data.load_positive_data(positive_sample_folder_batch2)
print(positive_segments_batch2[0]) # The first cough in mix2_cough_train, 13363 samples, last 0.835s
print(f"Number of positive samples from coughing folder is {len(positive_segments_batch2)}")
load_data.get_segment_statistics(positive_segments_batch2)

In [None]:
negative_segments_batch1 = load_data.load_negative_data(positive_sample_folder_batch1)
print(negative_segments_batch1[0])
print(f"Number of negative samples from coughing folder batch1 is {len(negative_segments_batch1)}")
load_data.get_segment_statistics(negative_segments_batch1)

In [None]:
negative_segments_batch2 = load_data.load_negative_data(positive_sample_folder_batch2, segments_per_file=2, seed=99)
print(negative_segments_batch2[0])
print(f"Number of positive samples from coughing folder is {len(negative_segments_batch2)}")
load_data.get_segment_statistics(negative_segments_batch2)

In [None]:
negative_sample_folder_laugh = "data/raw/negative/laugh"
negative_segments_laugh = load_data.load_negative_data(negative_sample_folder_laugh, segments_per_file=45, seed=42)
print(f"Number of negative samples from laugh folder is {len(negative_segments_laugh)}")
load_data.get_segment_statistics(negative_segments_laugh)

In [None]:
negative_sample_folder_mic_tap = "data/raw/negative/mic_tapping/studio"
negative_segments_mic_tap = load_data.load_negative_data(negative_sample_folder_mic_tap, segments_per_file=40, seed=123)
print(f"Number of negative samples from mic tapping folder is {len(negative_segments_mic_tap)}")
load_data.get_segment_statistics(negative_segments_mic_tap)

In [None]:
negative_sample_folder_people_talk = "data/raw/negative/people_talking"
negative_segments_people_talk = load_data.load_negative_data(negative_sample_folder_people_talk, segments_per_file=35, seed=1)
print(f"Number of negative samples from people talk folder is {len(negative_segments_people_talk)}")
load_data.get_segment_statistics(negative_segments_people_talk)

In [None]:
positive_segments = positive_segments_batch1 + positive_segments_batch2
negative_segments = negative_segments_laugh + negative_segments_mic_tap + negative_segments_people_talk
print(f"type of a positive segment: {type(positive_segments[10])}, and its shape is {positive_segments[10].shape}")
print(f"type of a negetive segment: {type(negative_segments[10])}, and its shape is {negative_segments[10].shape}")


# Feature extraction

In [98]:
positive_features_mel = []
for segment in positive_segments_batch1:
    feature = feature_extraction.extract_mel_spectrogram(segment, sr=16000)
    positive_features_mel.append(feature)

positive_features_mfcc = []
for segment in positive_segments_batch1:
    feature = feature_extraction.extract_mfcc(segment, sr=16000)
    positive_features_mfcc.append(feature)

In [None]:
negative_features_mel = []
for segment in negative_segments_laugh:
    feature = feature_extraction.extract_mel_spectrogram(segment, sr=16000)
    negative_features_mel.append(feature)

negative_features_mfcc = []
for segment in negative_segments_laugh:
    feature = feature_extraction.extract_mfcc(segment, sr=16000)
    negative_features_mfcc.append(feature)

In [None]:
feature_extraction.plot_log_spectrogram(positive_features_mel[1])
feature_extraction.plot_mfccs(positive_features_mfcc[1])

In [None]:
feature_extraction.plot_log_spectrogram(negative_features_mel[10])
feature_extraction.plot_mfccs(negative_features_mfcc[10])

# Train, val, test splitting

In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler

# Extract features
X_positive = feature_extraction.extract_agg_mfcc(positive_segments)
X_negative = feature_extraction.extract_agg_mfcc(negative_segments)

# Create labels: 1 for cough, 0 for not cough
y_positive = np.ones(len(X_positive))
y_negative = np.zeros(len(X_negative))

# Combine the data
X = np.vstack((X_positive, X_negative))
y = np.concatenate((y_positive, y_negative))

# Feature Scaling
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Split Data into Training(70%), Validation(15%), and Test Sets(15%)
X_train, X_temp, y_train, y_temp = train_test_split(
    X_scaled, y, test_size=0.3, stratify=y, random_state=42
)

X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42
)

print(f"Training set size: {X_train.shape[0]}")
print(f"Validation set size: {X_val.shape[0]}")
print(f"Test set size: {X_test.shape[0]}")

# Train, evaluate and save model

In [None]:
from sklearn.svm import SVC

# Train SVM with GridSearch
param_grid = {
    'C': [0.1, 1, 10],
    'kernel': ['linear', 'rbf'],
    'gamma': ['scale', 'auto']
}

grid_search = GridSearchCV(
    SVC(probability=True, random_state=42),
    param_grid,
    cv=5,
    scoring='f1',
    n_jobs=-1
)

grid_search.fit(X_train, y_train)

print(f"Best parameters: {grid_search.best_params_}")
print(f"Best cross-validation score: {grid_search.best_score_}")

# Use the Best Estimator
best_svm = grid_search.best_estimator_

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

# Validate the Model
y_val_pred = best_svm.predict(X_val)
print("Validation Classification Report:")
print(classification_report(y_val, y_val_pred))

# Test the Model
y_test_pred = best_svm.predict(X_test)
print("Test Classification Report:")
print(classification_report(y_test, y_test_pred))
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_test_pred))

In [None]:
import joblib

# Paths to save model and scaler
SCALER_PATH = 'model/svm/scaler.pkl'
MODEL_PATH = 'model/svm/svm_model.pkl'

joblib.dump(scaler, SCALER_PATH)
joblib.dump(best_svm, MODEL_PATH)

# Inference

In [109]:
# Load the trained Model and Scaler
# Paths to saved model and scaler
SCALER_PATH = 'model/svm/scaler.pkl'
MODEL_PATH = 'model/svm/svm_model.pkl'

# Load the trained SVM model and scaler
svm_model = joblib.load(MODEL_PATH)
scaler = joblib.load(SCALER_PATH)


In [None]:
# Get audio segments from new .wav file
cough_file1_path = "data/raw/test/mix2_cough_train/data.wav"
cough_file1_segments = load_data.load_and_segment_wav(cough_file1_path, segment_duration=0.1)
# Verify the segments
print(f"Total Segments: {len(cough_file1_segments)}")
print(f"Shape of first segment: {cough_file1_segments[0].shape}")

# Extract features from the new audio segments
X = feature_extraction.extract_agg_mfcc(cough_file1_segments)
print(f"Feature Matrix Shape: {X.shape}")

# Scale the features using the trained scaler
X_scaled = scaler.transform(X)

# Predict labels
y_pred = svm_model.predict(X_scaled)

# Get prediction probabilities
y_pred_probs = svm_model.predict_proba(X_scaled)[:, 1]  # Probability of 'Cough'


In [111]:
import pandas as pd

# Map predictions back to the audio timeline
segments = cough_file1_segments
num_segments = len(segments)
sr=16000
segment_duration=0.1
hop_length=segment_duration * sr
segment_times = []
for i in range(num_segments):
    start_time = i * hop_length / sr
    segment_times.append((start_time, segment_duration))

# Create a DataFrame for results
results_df = pd.DataFrame(segment_times, columns=['Time(Seconds)', 'Length(Seconds)'])
results_df['Label(string)'] = np.where(y_pred == 1, "cough", "not cough")
results_df['Confidence(double)'] = y_pred_probs

cough_df = results_df[results_df['Label(string)'] == 'cough']
cough_df.to_csv("data/raw/test/mix2_cough_train/testlabel.csv", index=False)
