In [6]:
import os
import torch
import torchaudio
import torchaudio.transforms as T
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torchaudio.transforms as transforms
import torch.nn as nn

class BirdSongDataset(Dataset):
    def __init__(self, df, audio_dir, class_info, transform=None):
        self.df = df
        self.audio_dir = audio_dir
        self.class_info = class_info
        self.transform = transform

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        filename = self.df.iloc[idx, 0]
        audio_path = os.path.join(self.audio_dir, filename)
        waveform, sample_rate = torchaudio.load(audio_path)
        
        # Getting the labels
        labels = self.df[self.df['filename'] == filename]
        target = torch.zeros(len(self.class_info))
        for _, label in labels.iterrows():
            class_name = label['class']
            target[self.class_info.index(class_name)] = 1.0

        if self.transform:
            waveform = self.transform(waveform)
        
        return waveform, target

# Load csv files
train_csv = pd.read_csv('data/train.csv')
class_info_csv = pd.read_csv('data/class_info.csv')
class_names = class_info_csv['class name'].tolist()

# Split data into train and validation sets
train_df, valid_df = train_test_split(train_csv, test_size=0.1, random_state=42)

# Chain transformations using nn.Sequential
transform = nn.Sequential(
    transforms.MelSpectrogram(sample_rate=44100, n_fft=1024, hop_length=512, n_mels=64),
    transforms.FrequencyMasking(freq_mask_param=15),
    transforms.TimeMasking(time_mask_param=35)
)

train_dataset = BirdSongDataset(train_df, 'data/train/', class_names, transform=transform)
valid_dataset = BirdSongDataset(valid_df, 'data/train/', class_names, transform=transform)


# Define DataLoader objects
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)


In [8]:
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader

# Calculate the global max length of waveforms in the dataset
global_max_len = max(max(wf.shape[2] for wf, _ in dataset) for dataset in [train_dataset, valid_dataset])

def collate_fn(batch):
    # A batch is a list of (waveform, target) pairs
    waveforms, targets = zip(*batch)

    # Pad the waveforms to the global max length
    waveforms = [torch.cat([wf, torch.zeros((1, wf.shape[1], global_max_len - wf.shape[2]))], dim=2) for wf in waveforms]

    waveforms = torch.stack(waveforms)
    targets = torch.stack(targets)

    return waveforms, targets

# Update DataLoader objects with the new collate_fn
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn, num_workers=2)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn, num_workers=2)


In [3]:
import torch
import torch.nn as nn
import torch.optim as optim

class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1)
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(32 * 16 * 108, 128)
        self.fc2 = nn.Linear(128, num_classes)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(x.size(0), 32 * 16 * 108)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        x = self.sigmoid(x)
        return x
    
class ImprovedCNN(nn.Module):
    def __init__(self, num_classes):
        super(ImprovedCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(128 * 8 * 54, 1024),  # Adjust the size based on the output from features
            nn.ReLU(),
            nn.Linear(1024, num_classes),
            nn.Sigmoid()
        )
        
    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x



In [4]:
# Set up the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize the model
# model = SimpleCNN(num_classes=len(class_names)).to(device)
model = ImprovedCNN(num_classes=len(class_names)).to(device)


In [5]:
import numpy as np


def find_size(model):
    dummy_x = torch.randn(1, 1, 64, 432).to(device)  # Assuming you have 1080 time frames
    dummy_out = model.pool(model.conv2(model.pool(model.conv1(dummy_x))))
    return int(np.prod(dummy_out.size()))

# Before training
feature_size = find_size(model)
print(feature_size)

55296


In [6]:


# Loss and Optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

class SmoothBCELoss(nn.Module):
    def __init__(self, smoothing=0.1):
        super(SmoothBCELoss, self).__init__()
        self.smoothing = smoothing
    
    def forward(self, pred, target):
        target = target * (1.0 - self.smoothing) + 0.5 * self.smoothing
        loss = nn.BCELoss()(pred, target)
        return loss


criterion = SmoothBCELoss(smoothing=0.1)


num_epochs = 12
for epoch in range(num_epochs):
    # Training
    model.train()
    running_train_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_train_loss += loss.item()
    
    train_loss = running_train_loss / len(train_loader)
    
    # Validation
    model.eval()
    running_val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in valid_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            running_val_loss += loss.item()
        
    val_loss = running_val_loss / len(valid_loader)
    
    print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

print('Finished Training')

In [None]:
from sklearn.metrics import f1_score

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in valid_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        preds = outputs.round()  # Convert to binary: 0 or 1
        
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

f1_macro = f1_score(all_labels, all_preds, average='samples')
print(f"F1 Score (Samples): {f1_macro}")


F1 Score (Macro): 0.5344448031212603


  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))


In [None]:
def test_collate_fn(batch):
    # A batch is a list of (waveform, filename) pairs for the test dataset
    waveforms, filenames = zip(*batch)

    # Pad the waveforms to the global max length
    waveforms = [torch.cat([wf, torch.zeros((1, wf.shape[1], global_max_len - wf.shape[2]))], dim=2) for wf in waveforms]

    waveforms = torch.stack(waveforms)

    return waveforms, filenames


# Define Test Dataset
class TestBirdSongDataset(Dataset):
    def __init__(self, df, audio_dir, transform=None):
        self.df = df
        self.audio_dir = audio_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        filename = self.df.iloc[idx, 0]
        audio_path = os.path.join(self.audio_dir, filename)
        waveform, sample_rate = torchaudio.load(audio_path)

        if self.transform:
            waveform = self.transform(waveform)
        
        return waveform, filename  # No target labels for test set, return filename instead for submission

# Global variables to ensure consistency
SAMPLE_RATE = 44100

# Load test.csv
test_df = pd.read_csv('data/test.csv')
test_dataset = TestBirdSongDataset(test_df, 'data/test/', transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=test_collate_fn, num_workers=2)

# Generate Predictions
model.eval()
predictions = {}

with torch.no_grad():
    for waveforms, filenames in test_loader:
        waveforms = waveforms.to(device)
        outputs = model(waveforms)
        preds = (outputs > 0.5).long().cpu().numpy()  # Convert to binary: 0 or 1
        
        for i, filename in enumerate(filenames):
            predictions[filename] = preds[i]

# Save Predictions in Submission Format
submission_data = []

header = ["filename"] + class_names
submission_data.append(header)

for filename in test_df['filename'].values:
    if filename in predictions:
        row = [filename] + predictions[filename].tolist()
        submission_data.append(row)
    else:
        # Handle edge case: prediction not generated for some reason
        row = [filename] + [0] * len(class_names)
        submission_data.append(row)

# Write submission_data to a csv
import csv

with open("data/sample.csv", 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(submission_data)
