In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils import data
#from utils import transforms
from utils_dir import transforms
import torchvision

import os
import numpy as np
import imageio
import random
import collections
import csv
import librosa
import os
import torch
import torchaudio
import torchaudio.transforms as T
import numpy as np
import torchvision.transforms as transforms
import librosa
import random
import config


# CUDA for PyTorch
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")





ModuleNotFoundError: No module named 'utils_dir'

In [None]:


class ContrastiveESCDataset(torch.utils.data.Dataset):
    def __init__(self, train=True, root='./data/ESC50/ESC-50-master/audio/', config=None):
        self.root = root
        self.train = train
        
        temp = os.listdir(self.root)
        temp.sort()
        self.file_names = []
        if train:
            for i in range(len(temp)):
                if int(temp[i].split('-')[0]) in config.train_folds:
                    self.file_names.append(temp[i])
        else:
            for i in range(len(temp)):
                if int(temp[i].split('-')[0]) in config.test_fold:
                    self.file_names.append(temp[i])
        
        self.mel_transform = T.MelSpectrogram(sample_rate=44100, n_mels=128, n_fft=1024, hop_length=512)
        
        if self.train:
            self.wave_transforms = transforms.Compose([transforms.ToTensor1D(), 
                                                       transforms.RandomScale(max_scale = 1.25), 
                                                       transforms.RandomPadding(out_len = 220500),
                                                       transforms.RandomCrop(out_len = 220500)])
            
            self.spec_transforms = transforms.Compose([transforms.ToTensor(), 
                                                       transforms.FrequencyMask(max_width=config.freq_masks_width, numbers=config.freq_masks), 
                                                       transforms.TimeMask(max_width=config.time_masks_width, numbers=config.time_masks)])
        else:
            self.wave_transforms = transforms.Compose([transforms.ToTensor1D(),
                                                       transforms.RandomPadding(out_len = 220500),
                                                       transforms.RandomCrop(out_len = 220500)])
            
            self.spec_transforms = transforms.Compose([transforms.ToTensor()])

    def __len__(self):
        return len(self.file_names)
    
    def __getitem__(self, index):
        file_name = self.file_names[index]
        
        # Create a positive pair and a negative sample
        aug_wave1, aug_wave2, neg_wave = self.load_and_augment(file_name, index)
        
        # Create spectrograms
        spec1 = self.generate_spectrogram(aug_wave1)
        spec2 = self.generate_spectrogram(aug_wave2)
        neg_spec = self.generate_spectrogram(neg_wave)
        
        return (spec1, spec2, neg_spec)

    def load_and_augment(self, file_name, index):
        path = os.path.join(self.root, file_name)
        wave, _ = torchaudio.load(path, num_frames=44100)
        wave = wave.squeeze(0)

        aug_wave1 = self.process_wave(wave)
        aug_wave2 = self.process_wave(wave)
        
        # Fetch a negative sample
        neg_index = random.choice([x for x in range(len(self.file_names)) if x != index])
        neg_file_name = self.file_names[neg_index]
        neg_path = os.path.join(self.root, neg_file_name)
        neg_wave, _ = torchaudio.load(neg_path, num_frames=44100)
        neg_wave = neg_wave.squeeze(0)
        neg_wave = self.process_wave(neg_wave)

        return aug_wave1, aug_wave2, neg_wave

    def process_wave(self, wave):
        # Normalize, remove silent sections, and apply wave transforms
        if wave.ndim == 1:
            wave = wave[:, np.newaxis]

        if np.abs(wave.max()) > 1.0:
            wave = transforms.scale(wave, wave.min(), wave.max(), -1.0, 1.0)
        wave = wave.T * 32768.0
        
        start = wave.nonzero()[1].min()
        end = wave.nonzero()[1].max()
        wave = wave[:, start:end+1]

        wave_copy = np.copy(wave)
        wave_copy = self.wave_transforms(wave_copy)
        wave_copy.squeeze_(0)

        return wave_copy

    def generate_spectrogram(self, wave):
        # Generating mel-spectrogram and apply spec transforms
        s = self.mel_transform(wave)
        log_s = torchaudio.transforms.AmplitudeToDB()(s)
        log_s = self.spec_transforms(log_s)
        
        spec = torch.cat((log_s, log_s, log_s), dim=0)
        return spec

        


from torch.utils import data
import random

def create_generators():
    train_dataset = ContrastiveESCDataset(train=True)
    test_dataset = ContrastiveESCDataset(train=False)
    
    train_loader = data.DataLoader(train_dataset, 
                                   batch_size=config.batch_size, 
                                   shuffle=True, 
                                   num_workers=10, 
                                   drop_last=False, 
                                   collate_fn=contrastive_collate_fn)
    
    test_loader = data.DataLoader(test_dataset, 
                                  batch_size=config.batch_size, 
                                  shuffle=True, 
                                  num_workers=10, 
                                  drop_last=False, 
                                  collate_fn=contrastive_collate_fn)
    
    return train_loader, test_loader



In [1]:
def contrastive_collate_fn(batch):
    """
    Expects a batch of (file_name, spec, class_id) tuples.
    This will convert these tuples to batches of anchor, positive, and negative samples.
    """
    anchors, positives, negatives = [], [], []
    
    for item in batch:
        file_name, spec, class_id = item
        anchors.append(spec)
        
        # For the sake of simplicity, let's just duplicate the anchor as the positive sample
        # In a real-world scenario, you might want to pick another example of the same class or apply a different transformation
        positives.append(spec)
        
        # Pick a random negative sample from the batch that doesn't belong to the current class_id
        negative_class_id = class_id
        while negative_class_id == class_id:
            negative_item = random.choice(batch)
            _, negative_spec, negative_class_id = negative_item
        
        negatives.append(negative_spec)
    
    return {
        "anchors": torch.stack(anchors),
        "positives": torch.stack(positives),
        "negatives": torch.stack(negatives)
    }
