In [None]:
! pip install resampy

In [None]:
import torch
import torchaudio
import torchvision

print(torch.__version__)
print(torchaudio.__version__)
print(torchvision.__version__)

In [None]:
import librosa
import pathlib
import os

from torchaudio.utils import download_asset
from torch.utils.data import Dataset, DataLoader
from torchaudio import transforms
from torchinfo import summary

In [None]:
# Make function to find classes in target directory
def find_classes(directory: str) -> tuple[list[str], dict[str, int]]:
    """Finds the class folder names in a target directory.
    
    Assumes target directory is in standard image classification format.

    Args:
        directory (str): target directory to load classnames from.

    Returns:
        Tuple[List[str], Dict[str, int]]: (list_of_class_names, dict(class_name: idx...))
    
    Example:
        find_classes("food_images/train")
        >>> (["class_1", "class_2"], {"class_1": 0, ...})
    """
    # 1. Get the class names by scanning the target directory
    classes = list(set(sorted(entry.split('_')[-1].lower() for entry in os.listdir(directory))))
    classes.remove('surprised')
    # 2. Raise an error if class names not found
    if not classes:
        raise FileNotFoundError(f"Couldn't find any classes in {directory}.")
    
        
    # 3. Crearte a dictionary of index labels (computers prefer numerical rather than string labels)
    class_to_idx = {cls_name: i for i, cls_name in enumerate(classes)}
    return classes, class_to_idx

In [None]:
# Write a custom dataset class (inherits from torch.utils.data.Dataset)

# 1. Subclass torch.utils.data.Dataset
class AudioFolderCustom(Dataset):
    
    # 2. Initialize with a targ_dir and transform (optional) parameter
    def __init__(self, targ_dir: str, transform=None) -> None:
        
        # 3. Create class attributes
        # Get all image paths
        self.paths = list(pathlib.Path(targ_dir).glob("*/*.wav")) # note: you'd have to update this if you've got .png's or .jpeg's
        # Setup transforms
        self.transform = transform
        # Create classes and class_to_idx attributes
        self.classes, self.class_to_idx = find_classes(targ_dir)
        self.idx_to_class = {v: k for k, v in self.class_to_idx.items()}

    # 4. Make function to load audio
    def load_audio(self, index: int) -> torch.tensor:
        "Opens an audio via a path and returns it."
        audio_path = self.paths[index]
        sample_rate=24414
        waveform = torch.zeros(1, 3*sample_rate)
        w, sample_rate = torchaudio.load(download_asset(audio_path))
        
        if sample_rate not in [24414,]:
            w = torch.from_numpy(librosa.resample(w.squeeze().numpy(), orig_sr=sample_rate, target_sr=24414, res_type="kaiser_best")).unsqueeze(0)
        
        waveform[0][:len(w[0])] = w
        
        return waveform

    # 5. Overwrite the __len__() method (optional but recommended for subclasses of torch.utils.data.Dataset)
    def __len__(self) -> int:
        "Returns the total number of samples."
        return len(self.paths)
    
    # 6. Overwrite the __getitem__() method (required for subclasses of torch.utils.data.Dataset)
    def __getitem__(self, index: int) -> tuple[torch.Tensor, int]:
        "Returns one sample of data, data and label (X, y)."
        wav = self.load_audio(index)
        class_name  = self.paths[index].parent.name.split('_')[-1].lower() # expects path in data_folder/class_name/file.wav
        if class_name == 'surprised':
            class_name = 'surprise'
        class_idx = torch.tensor([self.class_to_idx[class_name]])

        # Transform if necessary
        if self.transform:
            return self.transform(wav), class_idx # return data, label (X, y)
        else:
            return wav, class_idx # return data, label (X, y)

In [None]:
data_transform = transforms.MFCC(
    sample_rate=24414,
    n_mfcc=50,
    melkwargs={"n_fft": 400, "hop_length": 160, "n_mels": 53, "center": False},
)

In [None]:
dir_path = "path/input/TESS"

In [None]:
data_custom = AudioFolderCustom(targ_dir=dir_path, 
                                transform=data_transform)

In [None]:
data_custom.class_to_idx

In [None]:
data_custom.idx_to_class

In [None]:
wav, label = next(iter(data_custom))
print(f"Audio shape: {wav.shape}")
print(f"Label shape: {label.shape}")

In [None]:
total_data_size=len(data_custom)

In [None]:
train_set, test_set = torch.utils.data.random_split(data_custom, [int(total_data_size*0.8), total_data_size-int(total_data_size*0.8)])

In [None]:
## train and test data loaders
train_dataloader = DataLoader(dataset=train_set, # use custom created train Dataset
                                     batch_size=20, # how many samples per batch?
                                     num_workers=0, # how many subprocesses to use for data loading? (higher = more)
                                     shuffle=True) # shuffle the data?

test_dataloader = DataLoader(dataset=test_set, 
                                     batch_size=20,
                                     num_workers=0,
                                     shuffle=False)

In [None]:
wav, label = next(iter(train_dataloader))
print(f"Audio shape: {wav.shape}")
print(f"Label shape: {label.shape}")

In [None]:
print(f"Total data: {int(total_data_size)}",
      f"\nTrain Data: {len(train_dataloader)*10}",
      f"\nTest Data: {len(test_dataloader)*10}") # batch=10

In [None]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
weights = torchvision.models.VGG16_Weights.DEFAULT # .DEFAULT = best available weights 
model = torchvision.models.vgg16(weights=weights).to(device)

In [None]:
# Print a summary using torchinfo (uncomment for actual output)
summary(model=model, 
        input_size=(32, 3, 224, 224), # make sure this is "input_size", not "input_shape"
        # col_names=["input_size"], # uncomment for smaller output
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"]
)

In [None]:
for param in model.features.parameters():
    param.requires_grad = False

In [None]:
# Set the manual seeds
torch.manual_seed(42)
torch.cuda.manual_seed(42)

# Recreate the input layer
model._modules["features"][0] = torch.nn.Conv2d(1, 64, kernel_size=(3,3), stride=(1,1), padding=(1,1)).to(device)

# Geting the length of class_names
#output_shape = len(data_custom.classes)

# Recreate the classifier layer and seed it to the target device
model.classifier[6] = torch.nn.Linear(in_features=4096, 
                    out_features=7, # same number of output units as our number of classes
                    bias=True).to(device)
#     torch.nn.LayerNorm([768, 1, 1]),
#     torch.nn.Flatten(), 

In [None]:

summary(model=model, 
        input_size=(10, 1, 50, 456), ## (batch_size, color_channels, height, width)
        verbose=0,
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"]
)

In [None]:
# Defining loss and optimizer
loss_fn = torch.nn.BCEWithLogitsLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
def train_step(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader, 
               loss_fn: torch.nn.Module, 
               optimizer: torch.optim.Optimizer,
               device: torch.device) -> tuple[float, float]:

    
    model.train()

    train_loss, train_acc = 0, 0

    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        
        y_pred = model(X)

        loss = loss_fn(torch.squeeze(y_pred).float(), torch.squeeze(torch.nn.functional.one_hot(y, num_classes=7)).float())
        train_loss += loss.item() 

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()

        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == torch.squeeze(y)).sum().item()/len(y_pred)

    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

In [None]:
def test_step(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module,
              device: torch.device) -> tuple[float, float]:

    model.eval() 

    test_loss, test_acc = 0, 0

    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):

            X, y = X.to(device), y.to(device)

            test_pred_logits = model(X)

            loss = loss_fn(torch.squeeze(test_pred_logits).float(), torch.squeeze(torch.nn.functional.one_hot(y, num_classes=7)).float())
            test_loss += loss.item()

            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == torch.squeeze(y)).sum().item()/len(test_pred_labels))

    # Adjusting metrics to get average loss and accuracy per batch 
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

In [None]:
from tqdm import tqdm

def train(model: torch.nn.Module, 
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module,
          epochs: int,
          device: torch.device) -> dict[str, list[float]]:

    # Creating empty results dictionary
    results = {"train_loss": [],
      "train_acc": [],
      "test_loss": [],
      "test_acc": []
    }

    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                          dataloader=train_dataloader,
                                          loss_fn=loss_fn,
                                          optimizer=optimizer,
                                          device=device)
        test_loss, test_acc = test_step(model=model,
          dataloader=test_dataloader,
          loss_fn=loss_fn,
          device=device)

        print(
          f"Epoch: {epoch+1} | "
          f"train_loss: {train_loss:.4f} | "
          f"train_acc: {train_acc:.4f} | "
          f"test_loss: {test_loss:.4f} | "
          f"test_acc: {test_acc:.4f}"
        )

        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

    return results

In [None]:
# Set the random seeds
torch.manual_seed(42)
torch.cuda.manual_seed(42)

from timeit import default_timer as timer 
start_time = timer()

results = train(model=model,
                   train_dataloader=train_dataloader,
                   test_dataloader=test_dataloader,
                   optimizer=optimizer,
                   loss_fn=loss_fn,
                   epochs=10,
                   device=device)

end_time = timer()
print(f"[INFO] Total training time: {end_time-start_time:.3f} seconds")

In [None]:
try:
    from helper_functions import plot_loss_curves
except:
    print("[INFO] Couldn't find helper_functions.py, downloading...")
    with open("helper_functions.py", "wb") as f:
        import requests
        request = requests.get("https://raw.githubusercontent.com/mrdbourke/pytorch-deep-learning/main/helper_functions.py")
        f.write(request.content)
    from helper_functions import plot_loss_curves

# Plot the loss curves of our model
plot_loss_curves(results)