<a href="https://colab.research.google.com/github/MarshallPotts/CSE450-Machine-Learning/blob/main/Vehicle_CNN_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch_directml
import win32com.client
print(torch.__version__)

def check_directml_gpu_with_pywin32():
    """
    Checks if DirectML is available and retrieves GPU information using pywin32.
    """
    if torch_directml.is_available():
        print("DirectML is available.")
        dml_device = torch_directml.device()
        print(f"DirectML device: {dml_device}")

        try:
            wmi = win32com.client.GetObject("winmgmts:")
            gpu_info_objects = wmi.InstancesOf("Win32_VideoController")

            if gpu_info_objects:
                print("\nGPU Information:")
                for i, gpu in enumerate(gpu_info_objects):
                    print(f"--- GPU {i+1} ---")
                    print(f"Name: {gpu.Name}")
                    print(f"Adapter RAM: {gpu.AdapterRAM}")
                    print(f"Driver Version: {gpu.DriverVersion}")
                    print(f"PNP Device ID: {gpu.PNPDeviceID}")
                    # Add other properties you might want to check
            else:
                print("No GPU information found.")

        except Exception as e:
            print(f"Error accessing WMI: {e}")

        # You can add logic here to compare the GPU name or other details
        # with known DirectML compatible GPUs if you have a specific list.
        # However, DirectML compatibility is primarily driver-based.

        print("\nTo further verify DirectML usage, you can run a simple PyTorch operation on the DirectML device.")
        x = torch.randn(10, 10).to(dml_device)
        y = torch.randn(10, 10).to(dml_device)
        z = x @ y
        print(f"Result tensor device: {z.device}")
        print("\n\n\n")
        print(z)
        print("\n\n\n")
        if z.device.type == 'privateuseone':
            print("PyTorch operation was performed on the DirectML device, indicating it's active.")
        else:
            print("PyTorch operation was NOT performed on the DirectML device.")

    else:
        print("DirectML is not available on this system.")

if __name__ == "__main__":
    check_directml_gpu_with_pywin32()


2.4.1+cpu
DirectML is available.
DirectML device: privateuseone:0

GPU Information:
--- GPU 1 ---
Name: Intel(R) HD Graphics 530
Adapter RAM: 1073741824
Driver Version: 31.0.101.2111
PNP Device ID: PCI\VEN_8086&DEV_1912&SUBSYS_805D103C&REV_06\3&11583659&0&10
--- GPU 2 ---
Name: AMD Radeon RX 580 2048SP
Adapter RAM: -1048576
Driver Version: 31.0.21921.1000
PNP Device ID: PCI\VEN_1002&DEV_6FDF&SUBSYS_0B311002&REV_EF\4&9702874&0&0008

To further verify DirectML usage, you can run a simple PyTorch operation on the DirectML device.
Result tensor device: privateuseone:0




tensor([[ 3.5392,  5.7784,  3.0537,  1.8715,  0.7075, -0.8008, -2.3382, -1.0061,
         -1.9024, -0.6926],
        [-4.9454, -3.4857,  3.1900, -3.9166,  0.1210,  4.3468,  4.7186,  2.1090,
         -2.0395,  2.6898],
        [-6.7957, -1.2868, -1.0887,  1.1106, -1.0543,  5.6852,  3.1421,  3.0151,
         -2.1551,  0.5518],
        [-1.8517,  2.4194,  0.6405, -3.7791, -1.8334,  2.0752,  1.3527,  0.0706,
          1.7833,

In [None]:
import numpy as np
import seaborn as sns
import pandas as pd
import matplotlib as plt
import librosa
import os
import pandas as pd
import torchaudio
from torch.utils.data import Dataset, DataLoader


In [None]:
from imblearn.over_sampling import SMOTE
import torchaudio.transforms as T

class CustomAudioDataset(Dataset):
    def __init__(self, root_dir, transform=None, target_transform=None):
        self.audio_files = []
        self.labels = []
        self.root_dir = root_dir
        self.transform = transform
        self.target_transform = target_transform

        self._load_data() #Load the data when the object is created.




    def _load_data(self):
        for label in os.listdir(self.root_dir):
            label_dir = os.path.join(self.root_dir, label)
            if os.path.isdir(label_dir):  # Ensure it's a directory
                for audio_file in os.listdir(label_dir):
                    if audio_file.endswith(('.wav', '.mp3', '.ogg')):  # Filter audio files
                        self.audio_files.append(os.path.join(label_dir, audio_file))
                        self.labels.append(label)

        augmented_audio_files = []
        augmented_labels = []

        # Apply transformations and compute spectrograms
        for i in range(len(self.audio_files)):
            audio, sample_rate = torchaudio.load(self.audio_files[i])

            # Apply transformations to augment the dataset
            augmented_audio = self._augment_audio(audio, sample_rate)

            # Compute the spectrogram for each augmented audio
            for augmented in augmented_audio:
                stft = T.Spectrogram()(augmented)
                augmented_audio_files.append(stft.flatten())
                augmented_labels.append(self.labels[i])

        # Update the dataset with augmented data
        self.audio_files.append(augmented_audio_files)
        self.labels.append(augmented_labels)

        # Create a pandas dataframe for label access
        self.audio_labels = pd.DataFrame({'label': self.labels})

        # Update labels based on substrings in file paths
        self._update_labels()


        # Oversample the data to balance the classes
        self._oversample_data()

    def _augment_audio(self, audio, sample_rate):
        """
        Apply a series of transformations to augment the audio data.
        Returns a list of augmented audio tensors.
        """

        dml_device = torch_directml.device()  # Get the DirectML device
        print(audio.device)  # Should print "privateuseone"

        audio = audio.to(dml_device)  # Move audio to DirectML device

        augmented_audio = [audio]  # Include the original audio

        # Add random noise
        noise = torch.randn_like(audio, device=dml_device) * 0.005
        augmented_audio.append(audio + noise)

        # Time shifting
       # n_fft = 400  # Example FFT size; adjust based on your spectrogram settings
       # n_freq = (n_fft // 2) + 1
       # time_shift = T.TimeStretch(n_freq=n_freq, fixed_rate=1.2)  # Stretch by 20% faster
       # stretched_audio = time_shift(audio)  # Apply the time stretch transform
       # stretched_audio = torch.abs(stretched_audio)  # Ensure non-negative values
       # augmented_audio.append(stretched_audio)

       #need to do time shift in frequency domain


        # Pitch shifting
        pitch_shift = T.PitchShift(sample_rate, n_steps=2).to(dml_device)  # Shift pitch by 2 semitones
        # Apply pitch shift transform
        augmented_audio.append(pitch_shift(audio))

        # Resample to the original sample rate
        normalized_audio = T.Resample(orig_freq=sample_rate, new_freq=sample_rate)(audio).to(dml_device)(audio)  # Resample to the original sample rate
        augmented_audio.append(normalized_audio)

        return augmented_audio


    def _update_labels(self):
        # Define a mapping of substrings to desired labels
        substring_to_label = {
            "car": "car",
            "street": "car",
            "plane": "plane",
            "air": "plane",
            "707": "plane",
            "bus": "bus",
            "train": "train",
            "truck": "truck",
            "cycl": "bicycle",
            "mixkit_bike": "bicycle",
            "helicopter": "helicopter",
            "motorbike": "motorcycle",
            "superbike": "motorcycle",
            "corvette": "car",
        }

        # Update labels based on substrings in file paths
        updated_labels = []
        for file_path in self.audio_files:
            label_found = False
            for substring, label in substring_to_label.items():
                if substring in file_path.lower():  # Case-insensitive match
                    updated_labels.append(label)
                    label_found = True
                    break
            if not label_found:
                updated_labels.append("unknown")  # Default to "unknown" if no match is found

        # Update the labels in the DataFrame
        self.audio_labels['label'] = updated_labels



    def _oversample_data(self):
        # Convert audio_files (features) and labels to numpy arrays
        features = np.array(self.audio_files)
        labels = np.array(self.audio_labels['label'])

        # Initialize SMOTE
        smote = SMOTE(random_state=42)

        # Apply SMOTE to oversample the minority classes
        features_resampled, labels_resampled = smote.fit_resample(features, labels)

        # Update the dataset with the resampled data
        self.audio_files = features_resampled.tolist()  # Convert back to list
        self.audio_labels = pd.DataFrame({'label': labels_resampled})  # Update labels



    def min_max_scale(tensor, min_val=0.0, max_val=1.0):
        """
        Scales a tensor to the range [min_val, max_val].
        """
        tensor_min = tensor.min()
        tensor_max = tensor.max()
        scaled_tensor = (tensor - tensor_min) / (tensor_max - tensor_min)  # Scale to [0, 1]
        scaled_tensor = scaled_tensor * (max_val - min_val) + min_val     # Scale to [min_val, max_val]
        return scaled_tensor


    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, idx):
        dml_device = torch_directml.device()


        audio_path = self.audio_labels.iloc[idx, 0]
        audio, sample_rate = torchaudio.load(audio_path)
        audio = audio.to(dml_device)
        # Apply transformations if any
        label = self.audio_labels.iloc[idx, 1]

        if self.transform:
            audio = self.transform(audio, sample_rate)

        if self.target_transform:
            label = self.target_transform(label)

        # Apply min-max scaling to normalize the spectrogram
        audio = min_max_scale(audio, min_val=0.0, max_val=1.0)

        audio = T._transforms.ToTensor()(audio)  # Convert audio to tensor
        label = T._transforms.ToTensor()(label)  # Convert label to tensor

        return audio, label


In [None]:
import os
import random
import shutil
from collections import defaultdict

def stratified_split_data(root_dir, train_ratio=0.8):
    """
    Splits audio data into stratified training and testing sets.

    Args:
        root_dir (str): The root directory containing the audio subdirectories (one per label).
        train_ratio (float): The ratio of data to use for training (e.g., 0.8 for 80%).

    Creates:
        - A "training" folder with stratified data.
        - A "test" folder with stratified data.
    """
    train_dir = os.path.join(root_dir, "training")
    test_dir = os.path.join(root_dir, "test")

    # Create train and test directories if they don't exist
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)

    total_files = 0

    for label in os.listdir(root_dir):
        label_dir = os.path.join(root_dir, label)
        if os.path.isdir(label_dir) and label not in ["training", "test"]:  # Skip training and test dirs
            files = [f for f in os.listdir(label_dir) if os.path.isfile(os.path.join(label_dir, f))]
            random.shuffle(files)  # Shuffle files to ensure randomness

            split_index = int(len(files) * train_ratio)
            train_files = files[:split_index]
            test_files = files[split_index:]

            # Create subdirectories for the label in training and test folders
            train_label_dir = os.path.join(train_dir, label)
            test_label_dir = os.path.join(test_dir, label)
            os.makedirs(train_label_dir, exist_ok=True)
            os.makedirs(test_label_dir, exist_ok=True)

            # Move files to training folder
            for file in train_files:
                src = os.path.join(label_dir, file)
                dst = os.path.join(train_label_dir, file)
                shutil.move(src, dst)

            # Move files to test folder
            for file in test_files:
                src = os.path.join(label_dir, file)
                dst = os.path.join(test_label_dir, file)
                shutil.move(src, dst)

            # Update total file count
            total_files += len(train_files) + len(test_files)

    print(f"Data splitting completed. Total files: {total_files}")
    if total_files != 9405:
        print(f"Warning: Total file count ({total_files}) does not match the expected count (9405).")


def display_file_distribution(root_dir):
    """
    Displays the file distribution between training, test, and other subdirectories.

    Args:
        root_dir (str): The root directory containing the 'training' and 'test' subdirectories.
    """
    train_dir = os.path.join(root_dir, "training")
    test_dir = os.path.join(root_dir, "test")

    print("File Distribution:")
    print("===================")

    # Display training and test set distributions
    for dataset_type, dataset_dir in [("Training", train_dir), ("Test", test_dir)]:
        print(f"\n{dataset_type} Set:")
        total_files = 0
        for label in os.listdir(dataset_dir):
            label_dir = os.path.join(dataset_dir, label)
            if os.path.isdir(label_dir):
                num_files = len(os.listdir(label_dir))
                total_files += num_files
                print(f"  {label}: {num_files} files")
        print(f"Total {dataset_type.lower()} files: {total_files}")

    # Display files in other subdirectories
    print("\nOther Subdirectories:")
    total_other_files = 0
    for label in os.listdir(root_dir):
        label_dir = os.path.join(root_dir, label)
        if os.path.isdir(label_dir) and label not in ["training", "test"]:  # Exclude training and test folders
            num_files = len(os.listdir(label_dir))
            total_other_files += num_files
            print(f"  {label}: {num_files} files")
    print(f"Total files in other subdirectories: {total_other_files}")



In [None]:
root_dir = "D:/Vehicle_CNN_Workspace/Audio_Files"  # Your original directory
stratified_split_data(root_dir, train_ratio=0.8)  # Split data into training and testing sets
# display_file_distribution(root_dir)  # Display file distribution between training and test sets

Data splitting completed. Total files: 0


: 

In [None]:
# Define the paths to the training and test directories
train_dir = os.path.join(root_dir, "training")
test_dir = os.path.join(root_dir, "test")

# Initialize the CustomAudioDataset for training and test data
train_dataset = CustomAudioDataset(root_dir=train_dir)
test_dataset = CustomAudioDataset(root_dir=test_dir)

# Create DataLoaders for training and test datasets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)