In [1]:
import torch
from torchvision.io import read_image
import torchaudio
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, SGD
from torch.utils.data import DataLoader
from torch.autograd import Variable
from torchvision.transforms import ToTensor, Lambda, Compose

import os
import pandas as pd
import numpy as np
import matplotlib as plt
import time
from rich.progress import Progress, TextColumn, BarColumn, TimeRemainingColumn, MofNCompleteColumn


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [2]:
# wav_directory = "E:\\UAV_DISTASIO_DATA\\X\\ESCAPE_FORMAT_ONECHANNEL"

# label_spreadsheet = "E:\\UAV_DISTASIO_DATA\\y\\escape_singleUAV_scenarios_cleaned.xlsx"

# # Read the existing label spreadsheet
# df_labels = pd.read_excel(label_spreadsheet, header=None, names=["filename", "type", "motion"])

# # Extract the unique identifiers (sA1r01) from the label filenames
# df_labels["identifier"] = df_labels["filename"].str.extract(r"(sA\d+r\d+)")

# wav_files = [file for file in os.listdir(wav_directory) if file.endswith(".wav")]

# # Create a new DataFrame to store the entries for each .wav file
# df_entries = pd.DataFrame(columns=["filename", "type", "motion"])

# # Iterate over each .wav file
# for wav_file in wav_files:
#     # Extract the identifier (sA1r01) from the .wav filename
#     identifier = wav_file.split("-")[0]
    
#     try:
#         # Find the corresponding label in the label DataFrame
#         label_row = df_labels[df_labels["identifier"] == identifier].iloc[0]
        
#         # Create a new DataFrame for the current entry
#         entry_df = pd.DataFrame({
#             "filename": [wav_file],
#             "type": [label_row["type"]],
#             "motion": [label_row["motion"]]
#         })
        
#         # Concatenate the new entry DataFrame with the existing DataFrame
#         df_entries = pd.concat([df_entries, entry_df], ignore_index=True)
        
#     except IndexError:
#         print(f"No corresponding label found for file: {wav_file}")
#         continue

# # Save the new DataFrame to a new Excel spreadsheet
# output_spreadsheet = "E:\\UAV_DISTASIO_DATA\\y\\UAV_chunk_labels.xlsx"
# df_entries.to_excel(output_spreadsheet, index=False)

In [3]:
class SpectrogramDataset(Dataset):
    def __init__(self, excel_file, audio_dir, transform=None, target_transform=None):
        self.df = pd.read_excel(excel_file)
        self.audio_dir = audio_dir
        self.transform = transform
        self.target_transform = target_transform
        self.label_map = {1: "Inspired Flight 1200", 2: "DJI Matrice 800", 3: "DJI Phantom 4 Pro v2", 5: "Phantom and Matrice"}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        audio_path = os.path.join(self.audio_dir, self.df.iloc[idx, 0])
        label = self.df.iloc[idx, 1]
        
        waveform, sample_rate = torchaudio.load(audio_path)
        
        # Resample the waveform if necessary
        if sample_rate != 44100:
            waveform = torchaudio.transforms.Resample(sample_rate, 44100)(waveform)
        
        # Convert waveform to spectrogram
        spectrogram = torchaudio.transforms.Spectrogram()(waveform)
        
        # Convert spectrogram to grayscale tensor
        grayscale_spectrogram = spectrogram.mean(dim=0).unsqueeze(0)
        
        if self.transform:
            grayscale_spectrogram = self.transform(grayscale_spectrogram)
        
        if self.target_transform:
            label = self.target_transform(label)
        
        return grayscale_spectrogram, torch.tensor(label)
    
    
    def save_spectrograms_as_tensors(self, output_dir):
        os.makedirs(output_dir, exist_ok=True)
        
        for idx in range(len(self)):
            spectrogram, label = self[idx]
            
            # Convert label to string
            label_str = self.label_map.get(label.item(), f"Unknown_{label.item()}")
            
            # Generate filename
            filename = f"spectrogram_{idx}_{label_str}.pt"
            filepath = os.path.join(output_dir, filename)
            
            # Save the spectrogram tensor
            torch.save(spectrogram, filepath)
            
            print(f"Saved spectrogram tensor: {filepath}")

# Usage example
excel_file = "E:\\UAV_DISTASIO_DATA\\y\\UAV_chunk_labels.xlsx"
audio_dir = r"E:\UAV_DISTASIO_DATA\X\ESCAPE_FORMAT_ONECHANNEL"

# Define any additional transformations if needed
transform = None
target_transform = None

# Usage example
excel_file = "E:\\UAV_DISTASIO_DATA\\y\\UAV_chunk_labels.xlsx"
audio_dir = r"E:\UAV_DISTASIO_DATA\X\ESCAPE_FORMAT_ONECHANNEL"

# Define any transformations if needed
transform = None
target_transform = None

dataset = SpectrogramDataset(excel_file, audio_dir, transform=transform, target_transform=target_transform)

In [4]:
train_size = int(.8 * len(dataset))
test_size = int(.75 * len(dataset) - train_size)
val_size = len(dataset) - train_size - test_size

train_dataset, test_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, test_size, val_size])

train_loader = DataLoader(
    train_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=2
    )

test_loader = DataLoader(
    test_dataset,
    batch_size=16,
    shuffle=False,
    num_workers=2
)

val_size = DataLoader(
    val_dataset,
    batch_size=16,
    shuffle=False,
    num_workers=2
)

In [5]:
train_dataset.__getitem__(400)[0].shape

torch.Size([1, 201, 1103])

In [6]:
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.seq = nn.Sequential(
            nn.Conv2d(1, 4, (5, 10), stride=1),
            nn.ReLU(),
            nn.Conv2d(4, 4, (5, 10), stride=1),
            nn.ReLU(),
            nn.MaxPool2d((5, 10), stride=(1, 5)),

            nn.Conv2d(4, 8, (10, 5), stride=1),
            nn.ReLU(),
            nn.Conv2d(8, 8, (10, 5), stride=1),
            nn.ReLU(),
            nn.MaxPool2d((10, 5), stride=(2, 3)),

            nn.Flatten(),
            nn.Linear(101 * 74 * 8, 1024),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(1024, 200),
            nn.ReLU(),
            nn.Linear(200, 4)
        )

    def forward(self, x):
        logits = self.seq(x)
        probs = F.softmax(logits, dim=1)
        return probs


In [7]:
def split_seconds(seconds):
    minutes = seconds // 60
    hours = minutes // 60
    days = hours // 24
    return seconds % 60, minutes % 60, hours % 24, days


In [8]:
def main():
    # Define model
    model = CNN()
    # Cuda setup
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)
    print(f"Using device: {device}")
    # Optimizer setup
    optimizer = Adam(model.parameters(), lr=1e-3)
   
    # Loss function
    loss_fn = nn.CrossEntropyLoss(reduction="mean")
   
    # Number of epochs
    num_epochs = 8
   
    # Train or load model?
    model.train()
    train_model = True
    print("Training model....")
    start = time.time()
   
    if train_model:
        with Progress(
            TextColumn("[progress.description]{task.description}"),
            BarColumn(),
            MofNCompleteColumn(),
            TimeRemainingColumn(),
            refresh_per_second=10
        ) as progress:
            epoch_task = progress.add_task("[cyan]Epochs", total=num_epochs)
            batch_task = progress.add_task("[green]Batches", total=len(train_loader), visible=False)
            for epoch in range(num_epochs):
                progress.update(batch_task, visible=True, completed=0, total=len(train_loader))
                progress.update(epoch_task, advance=1)
                total_loss = 0
               
                for batch_idx, (images, labels) in enumerate(train_loader):
                    images, labels = images.to(device), labels.to(device)
                    optimizer.zero_grad()
                   
                    # CNN forward pass
                    probabilities = model(images)
                    loss = loss_fn(probabilities, labels)
                    loss.backward()
                    optimizer.step()
                   
                    total_loss += loss.item()
                    progress.update(batch_task, advance=1)
                    progress.refresh()
               
                avg_loss = total_loss / len(train_loader)
                progress.print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")
                progress.update(batch_task, visible=False)
       
        torch.save(model.state_dict(), "mnist_cnn.pt")
    else:
        state = torch.load("mnist_cnn.pt", map_location=torch.device(device))
        model.load_state_dict(state)
   
    end = time.time()
    seconds, minutes, hours, days = split_seconds(end - start)
    print(f"Training Runtime: {int(days)}d {int(hours)}h {int(minutes)}m {seconds:.2f}s")
   
    # Evaluate model on test data
    model.eval()
    print("Evaluating model....")
    start = time.time()
    num_test = 0
    num_correct = 0
   
    with torch.no_grad():
        with Progress(
            TextColumn("[progress.description]{task.description}"),
            BarColumn(),
            MofNCompleteColumn(),
            TimeRemainingColumn(),
            refresh_per_second=10
        ) as progress:
            test_task = progress.add_task("[yellow]Testing", total=len(test_loader))
            for images, labels in test_loader:
                images, labels = images.to(device), labels.to(device)
                probabilities = model(images)
                _, preds = probabilities.max(1)
                num_test += labels.size(0)
                num_correct += preds.eq(labels).sum().item()
                progress.update(test_task, advance=1)
                progress.refresh()
   
    print(f"Test accuracy: {num_correct / num_test * 100:.2f}%")
    end = time.time()
    seconds, minutes, hours, days = split_seconds(end - start)
    print(f"Testing Runtime: {int(days)}d {int(hours)}h {int(minutes)}m {seconds:.2f}s")

In [9]:
main()

Output()

Using device: cuda
Training model....


In [None]:
# # Save spectrograms as tensors
# dataset.save_spectrograms_as_tensors(output_dir)