In [57]:
from pydub import AudioSegment
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
import random
from sklearn.metrics import confusion_matrix

In [42]:
import torch
import torchaudio
from torch.utils.data import DataLoader, Dataset
from torch import nn
import torch.optim as optim
import torch.nn.functional as F
from pydub import AudioSegment
from sklearn.model_selection import train_test_split

In [2]:
import os
result = os.environ['PATH']
os.environ['PATH'] += os.pathsep + 'C:\\Program Files\\ffmpeg-2024-09-22-git-a577d313b2-full_build\\bin'

In [3]:
# load audio files 
path_to_jayesh_drinking = "DrinkingAudio\\JayeshDrinking\\"
path_to_jayesh_not_drinking = "DrinkingAudio\\JayeshDrinking\\SilentRecording\\"
path_to_elvis_drinking = "DrinkingAudio\\ElvisDrinking\\"
path_to_elvis_not_drinking = "DrinkingAudio\\ElvisDrinking\\SilentRecording\\"

In [4]:
# load jayesh m4a files 
jayesh_drinking_files = [f for f in os.listdir(path_to_jayesh_drinking) if f.endswith('.m4a')]
jayesh_not_drinking_files = [f for f in os.listdir(path_to_jayesh_not_drinking) if f.endswith('.m4a')]

# load elvis mp4 files
elvis_drinking_files = [f for f in os.listdir(path_to_elvis_drinking) if f.endswith('.mp4')]
elvis_not_drinking_files = [f for f in os.listdir(path_to_elvis_not_drinking) if f.endswith('.mp4')]

In [19]:
# create function to load audio files using pydub
def load_audio_file(file_path):
    audio = AudioSegment.from_file(file_path)
    audio = audio.set_frame_rate(16000)
    audio = audio.set_channels(1)
    return audio

# load audio files
jayesh_drinking_waveforms = [load_audio_file(path_to_jayesh_drinking + f)[0] for f in jayesh_drinking_files]
jayesh_not_drinking_waveforms = [load_audio_file(path_to_jayesh_not_drinking + f)[0] for f in jayesh_not_drinking_files]
elvis_drinking_waveforms = [load_audio_file(path_to_elvis_drinking + f)[0] for f in elvis_drinking_files]
elvis_not_drinking_waveforms = [load_audio_file(path_to_elvis_not_drinking + f)[0] for f in elvis_not_drinking_files]

In [12]:
# count number of samples\
print("Jayesh Drinking: ", len(jayesh_drinking_waveforms))
print("Jayesh Not Drinking: ", len(jayesh_not_drinking_waveforms))
print("Elvis Drinking: ", len(elvis_drinking_waveforms))
print("Elvis Not Drinking: ", len(elvis_not_drinking_waveforms))

Jayesh Drinking:  11
Jayesh Not Drinking:  26
Elvis Drinking:  16
Elvis Not Drinking:  75


In [13]:
# get total ratio of drinking to not drinking
total_drinking = len(jayesh_drinking_waveforms) + len(elvis_drinking_waveforms)
total_not_drinking = len(jayesh_not_drinking_waveforms) + len(elvis_not_drinking_waveforms)
total_ratio = total_drinking / total_not_drinking
print("Total Drinking to Not Drinking Ratio: ", total_ratio)

Total Drinking to Not Drinking Ratio:  0.26732673267326734


In [18]:
# check channels length of audio files
jayesh_drinking_waveforms[0].channels

1

In [16]:
elvis_drinking_waveforms[0].channels

2

In [35]:
# Data augmentation function
def augment_audio(audio, augment_type="pitch_shift"):
    samples = np.array(audio.get_array_of_samples())
    
    if augment_type == "pitch_shift":
        # Pitch shifting
        n_steps = random.randint(-2, 2)  # Shift pitch by up to 2 steps
        shifted = librosa.effects.pitch_shift(samples.astype(float), sr=audio.frame_rate, n_steps=n_steps)
        return AudioSegment(
            shifted.astype(np.int16).tobytes(), 
            frame_rate=audio.frame_rate, 
            sample_width=audio.sample_width,
            channels=audio.channels
        )
    elif augment_type == "add_noise":
        # Adding noise
        noise = np.random.normal(0, 0.02, len(samples))  # Adjust the noise level as needed
        noisy = samples + noise
        return AudioSegment(
            noisy.astype(np.int16).tobytes(), 
            frame_rate=audio.frame_rate, 
            sample_width=audio.sample_width,
            channels=audio.channels
        )
    elif augment_type == "volume_adjust":
        # Adjusting volume
        volume_change = random.uniform(-5, 5)  # Change volume by -5 to +5 dB
        return audio + volume_change  # Adjust volume
    else:
        return audio  # No augmentation

# Load audio files and apply augmentation
def load_and_augment_audio_file(file_path):
    audio = AudioSegment.from_file(file_path)
    audio = audio.set_frame_rate(16000)
    audio = audio.set_channels(1)

    # Apply augmentation (you can use multiple augmentations)
    augmented_audios = [audio]
    for _ in range(2):  # Create two augmented versions for each original
        aug_type = random.choice(["pitch_shift", "add_noise", "volume_adjust"])  # Randomly choose an augmentation type
        augmented_audio = augment_audio(audio, augment_type=aug_type)
        augmented_audios.append(augmented_audio)

    return augmented_audios

# Load and augment audio files
jayesh_drinking_waveforms = [load_and_augment_audio_file(path_to_jayesh_drinking + f) for f in jayesh_drinking_files]
jayesh_not_drinking_waveforms = [load_and_augment_audio_file(path_to_jayesh_not_drinking + f) for f in jayesh_not_drinking_files]
elvis_drinking_waveforms = [load_and_augment_audio_file(path_to_elvis_drinking + f) for f in elvis_drinking_files]
elvis_not_drinking_waveforms = [load_and_augment_audio_file(path_to_elvis_not_drinking + f) for f in elvis_not_drinking_files]

# Flatten the list of augmented waveforms
jayesh_drinking_waveforms = [item for sublist in jayesh_drinking_waveforms for item in sublist]
jayesh_not_drinking_waveforms = [item for sublist in jayesh_not_drinking_waveforms for item in sublist]
elvis_drinking_waveforms = [item for sublist in elvis_drinking_waveforms for item in sublist]
elvis_not_drinking_waveforms = [item for sublist in elvis_not_drinking_waveforms for item in sublist]

# Now you can proceed to extract MFCCs and continue with the rest of the workflow


In [36]:
def extract_mfcc(audio, max_duration=40, sr=16000, n_mfcc=13):
    # Convert pydub AudioSegment to numpy array of samples
    samples = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0  # Normalize to [-1, 1]
    
    # If audio is too long, truncate to 40 seconds
    max_samples = sr * max_duration
    if len(samples) > max_samples:
        samples = samples[:max_samples]
    
    # If audio is too short, pad with zeros
    if len(samples) < max_samples:
        padding = np.zeros(max_samples - len(samples))
        samples = np.concatenate([samples, padding])
    
    # Extract MFCC features
    mfcc = librosa.feature.mfcc(y=samples, sr=sr, n_mfcc=n_mfcc)
    return mfcc.T  # Transpose so that we have (time, n_mfcc)

# Extract MFCCs for each set of waveforms
jayesh_drinking_mfccs = [extract_mfcc(audio) for audio in jayesh_drinking_waveforms]
jayesh_not_drinking_mfccs = [extract_mfcc(audio) for audio in jayesh_not_drinking_waveforms]
elvis_drinking_mfccs = [extract_mfcc(audio) for audio in elvis_drinking_waveforms]
elvis_not_drinking_mfccs = [extract_mfcc(audio) for audio in elvis_not_drinking_waveforms]

# Step 4: Combine the data into one dataset (with labels: 1 for drinking, 0 for not drinking)
# Drinking = 1, Not drinking = 0
all_mfccs = jayesh_drinking_mfccs + jayesh_not_drinking_mfccs + elvis_drinking_mfccs + elvis_not_drinking_mfccs
all_labels = [1] * len(jayesh_drinking_mfccs) + [0] * len(jayesh_not_drinking_mfccs) + \
             [1] * len(elvis_drinking_mfccs) + [0] * len(elvis_not_drinking_mfccs)

In [46]:
X_train, X_test, y_train, y_test = train_test_split(all_mfccs, all_labels, test_size=0.25, random_state=42, stratify=all_labels)

# Modify __getitem__ in AudioDataset class to reshape the MFCCs
class AudioDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        mfcc = self.X[idx]
        label = self.y[idx]
        
        # Add a channel dimension (for CNN input)
        mfcc = np.expand_dims(mfcc, axis=0)  # This adds a 1-channel dimension
        
        return torch.tensor(mfcc, dtype=torch.float32), torch.tensor(label, dtype=torch.long)

batch_size = 32

train_dataset = AudioDataset(X_train, y_train)
test_dataset = AudioDataset(X_test, y_test)

# Step 5: Create DataLoader instances for training and testing
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Print the sizes of the training and testing datasets
print(f'Training set size: {len(train_dataset)}')
print(f'Testing set size: {len(test_dataset)}')


Training set size: 288
Testing set size: 96


In [53]:
# Define the CNN model
class AudioClassifierCNN(nn.Module):
    def __init__(self):
        super(AudioClassifierCNN, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(32)

        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(64)

        # Initialize the fully connected layer size dynamically
        self.fc1 = nn.Linear(self._get_conv_output_size(), 128)
        self.fc2 = nn.Linear(128, 2)  # Assuming binary classification

    def _get_conv_output_size(self):
        # Create a dummy input to calculate the size after convolution layers
        dummy_input = torch.zeros(1, 1, 1251, 13)  # Sample input shape
        x = self.pool(F.relu(self.bn1(self.conv1(dummy_input))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        return int(torch.prod(torch.tensor(x.size()[1:])))

    def forward(self, x):
        # Apply convolutional layers with ReLU activations and max pooling
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))

        # Flatten the input dynamically
        x = x.view(x.size(0), -1)

        # Apply fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)  # No activation for output (CrossEntropyLoss will be applied later)

        return x


# Create the model instance
model = AudioClassifierCNN()


In [54]:
# Define loss function (Cross-Entropy Loss for classification)
criterion = nn.CrossEntropyLoss()

# Define optimizer (Adam optimizer)
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [55]:
# Number of epochs and batch size
num_epochs = 10

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Zero the parameter gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        # Statistics
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    # Print epoch stats
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader)}, Accuracy: {100 * correct / total}%")


Epoch [1/10], Loss: 1.9337808622254267, Accuracy: 61.111111111111114%
Epoch [2/10], Loss: 0.36923138466146255, Accuracy: 79.51388888888889%
Epoch [3/10], Loss: 0.2893830719921324, Accuracy: 83.68055555555556%
Epoch [4/10], Loss: 0.2287169810798433, Accuracy: 88.19444444444444%
Epoch [5/10], Loss: 0.18745451834466723, Accuracy: 91.31944444444444%
Epoch [6/10], Loss: 0.16153829875919554, Accuracy: 91.66666666666667%
Epoch [7/10], Loss: 0.11218020733859804, Accuracy: 96.52777777777777%
Epoch [8/10], Loss: 0.08734425115916464, Accuracy: 96.875%
Epoch [9/10], Loss: 0.07068691526850064, Accuracy: 98.26388888888889%
Epoch [10/10], Loss: 0.05250347860985332, Accuracy: 98.95833333333333%


In [58]:
# Initialize empty lists to store true and predicted labels
all_labels = []
all_predictions = []

model.eval()  # Set model to evaluation mode (no gradients)
correct = 0
total = 0

with torch.no_grad():  # No need to compute gradients
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)

        # Accumulate total correct predictions
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Store predictions and true labels for confusion matrix
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

# Compute test accuracy
accuracy = 100 * correct / total
print(f"Test Accuracy: {accuracy}%")

# Compute confusion matrix
conf_matrix = confusion_matrix(all_labels, all_predictions)

# Print the confusion matrix
print("Confusion Matrix:")
print(conf_matrix)


Test Accuracy: 95.83333333333333%
Confusion Matrix:
[[72  4]
 [ 0 20]]


In [59]:
# show the 4 false positives
false_positives = []
for i in range(len(all_labels)):
    if all_labels[i] == 0 and all_predictions[i] == 1:
        false_positives.append(i)

In [63]:
print("False Positives: ", false_positives)

False Positives:  [32, 55, 56, 79]
