In [1]:
import numpy as np
import pandas as pd
import torch
import torchaudio
import random
from torch.utils.data import Dataset, DataLoader, Subset

path1 = "/kaggle/input/dcase2025-evaluation-dataset/Data_part1.npz"
path2 = "/kaggle/input/dcase2025-evaluation-dataset/Data_part2.npz"

device = 'cuda:0' if torch.cuda.is_available() else 'cpu' # Device configuration
print(f"Device: {device}")

def get_class_name(klass='unknown',getall=False):
    # Class mappings
    classes_names = {'airport': 0, 'shopping_mall': 1, 'metro_station': 2, 'street_pedestrian': 3,'public_square': 4, 'street_traffic': 5, 'tram': 6, 'bus': 7, 'metro': 8, 'park': 9}
    class_indx = {v: k for k, v in classes_names.items()}
    if getall :
        return classes_names
    classes_names = classes_names | class_indx
    return classes_names.get(klass)

def get_device_name(dev='unknown',getall=False):
    # Class mappings
    devices_names = {'a' : 0,'b' : 1,'c' : 2,'s1': 3,'s2': 4,'s3': 5,'unknown': 6}
    if getall :
        return devices_names
    return devices_names.get(dev) if devices_names.get(dev) != None else 6

class MelSpecsDataset(Dataset):
    def __init__(self, data , rec_device='unknown'):
        self.data = data['mel']  # for version 1 and 2 of the dataset           
        self.names = data['name']
        self.devs = self.encode2int_dev(data['device'])  # Loaded as a numpy array
        
        mask = np.ones(len(self.names), dtype=bool)  # Initialize mask as all True
        mask = (data['device'] == rec_device).astype(bool)  # Base mask for rec_device
        self.data = self.data[mask]
        self.names = self.names[mask]
        self.devs = self.devs[mask]
                 
    def __getitem__(self, item):
        log_mel_tensor = torch.tensor(self.data[item], dtype=torch.float32)
        dev = torch.tensor(self.devs[item])
        name = self.names[item]  # Include the sample name in the output            
        return log_mel_tensor.unsqueeze(0), dev, name
        
    def __len__(self):
        return len(self.devs)

    @staticmethod
    def encode2int_dev(values):
        return np.array([get_device_name(dev=v) for v in values])

data1 = np.load(path1)  # Load the .npz file
data2 = np.load(path2)  # Load the .npz file
devs = ['a','b','c','s1','s2','s3','unknown']
test_dataset = {}
test_loader = {}
for dev in devs:
    print("Data Generator declared")
    test_dataset_part1 = MelSpecsDataset(data=data1, rec_device=dev)
    print("Dataset part 1 defined")
    test_dataset_part2 = MelSpecsDataset(data=data2, rec_device=dev)
    print("Dataset part 2 defined")
    test_dataset[dev] = torch.utils.data.ConcatDataset([test_dataset_part1, test_dataset_part2])
    print(f"Test Dataset defined - Number of samples: {len(test_dataset[dev])}")
    del test_dataset_part1, test_dataset_part2
    test_loader[dev] = DataLoader(dataset=test_dataset[dev], batch_size=1024, num_workers=4, shuffle=False, drop_last=False)
    # test_loader[dev] = DataLoader(dataset=Subset(test_dataset[dev], range(16)), batch_size=2, num_workers=16, shuffle=False, drop_last=False)
    print(f"Test Loader defined - Number of batches: {len(test_loader[dev])}")

del data1, data2


Device: cuda:0
Data Generator declared
Dataset part 1 defined
Dataset part 2 defined
Test Dataset defined - Number of samples: 1
Test Loader defined - Number of batches: 28
Data Generator declared
Dataset part 1 defined
Dataset part 2 defined
Test Dataset defined - Number of samples: 2
Test Loader defined - Number of batches: 19
Data Generator declared
Dataset part 1 defined
Dataset part 2 defined
Test Dataset defined - Number of samples: 3
Test Loader defined - Number of batches: 19
Data Generator declared
Dataset part 1 defined
Dataset part 2 defined
Test Dataset defined - Number of samples: 4
Test Loader defined - Number of batches: 19
Data Generator declared
Dataset part 1 defined
Dataset part 2 defined
Test Dataset defined - Number of samples: 5
Test Loader defined - Number of batches: 19
Data Generator declared
Dataset part 1 defined
Dataset part 2 defined
Test Dataset defined - Number of samples: 6
Test Loader defined - Number of batches: 19
Data Generator declared
Dataset part 

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class RepConv2d(nn.Module):
    def __init__(self, input_channel, output_channel, stride=(1, 1), groups=1):
        super().__init__()
        self.input_channel = input_channel
        self.output_channel = output_channel
        self.stride = stride

        # Initial convolutions
        self.conv1 = nn.Conv2d(input_channel, output_channel, kernel_size=(3, 3),
                               stride=stride, padding=(1, 1), bias=False,groups=groups)
        self.conv2 = nn.Conv2d(input_channel, output_channel, kernel_size=(1, 3),
                               stride=stride, padding=(0, 1), bias=False,groups=groups)
        self.conv3 = nn.Conv2d(input_channel, output_channel, kernel_size=(3, 1),
                               stride=stride, padding=(1, 0), bias=False,groups=groups)
        self.conv4 = nn.Conv2d(input_channel, output_channel, kernel_size=(1, 1),
                               stride=stride, padding=(0, 0), bias=False,groups=groups)

    def forward(self, x):
        x1 = self.conv1(x)
        x2 = self.conv2(x)
        x3 = self.conv3(x)
        x4 = self.conv4(x)
        return x1 + x2 + x3 + x4

    def merge_convs(self):
        conv2ds_list = [self.conv1, self.conv2, self.conv3, self.conv4]
        main_shape = conv2ds_list[0].weight.data.shape
        device = conv2ds_list[0].weight.device  # Get device from one of the convs

        # Initialize zero tensors on the correct device
        conv1 = conv2ds_list[0].weight.data
        conv2 = torch.zeros(main_shape, device=device)
        conv3 = torch.zeros(main_shape, device=device)
        conv4 = torch.zeros(main_shape, device=device)

        # Fill the corresponding parts
        conv2[:, :, 1, :] = conv2ds_list[1].weight.data.squeeze(2)
        conv3[:, :, :, 1] = conv2ds_list[2].weight.data.squeeze(3)
        conv4[:, :, 1, 1] = conv2ds_list[3].weight.data.squeeze(3).squeeze(2)

        # Create new Conv2d layer on same device
        conv2d = nn.Conv2d(
            in_channels=self.conv1.in_channels,
            out_channels=self.conv1.out_channels,
            kernel_size=self.conv1.kernel_size,
            stride=self.conv1.stride,
            padding=self.conv1.padding,
            bias=False,
            groups=self.conv1.groups
        ).to(device)

        with torch.no_grad():
            conv2d.weight.copy_((conv1 + conv2 + conv3 + conv4))

        return conv2d

    def get_reparametrized_layer(self):
        conv2d = self.merge_convs()
        return nn.Sequential(conv2d)


class ResidualNormalization(nn.Module):
    """
    Combined normalization layer:
    λ * x + InstanceNorm(x)
    """
    def __init__(self, num_features):
        super().__init__()
        # Learnable per-channel scaling factor
        self.lambda_param = nn.Parameter(torch.ones(num_features, 1, 1))
        self.instance_norm = nn.InstanceNorm2d(num_features, affine=True)

    def forward(self, x):
        return self.lambda_param * x + self.instance_norm(x)


class LearnablePooling(nn.Module):
    """
    Attention-based learnable pooling with Global Average Pooling (GAP)
    Output: concat(attention_pooled_features, GAP_features)
    """
    def __init__(self, in_channels, hidden_dim=None):
        super().__init__()
        hidden_dim = hidden_dim or in_channels // 2
        
        # Input normalization
        self.bn_input = ResidualNormalization(in_channels)
        
        # Attention mechanism
        self.attn_conv = nn.Conv2d(in_channels, hidden_dim, kernel_size=1, bias=False)
        self.bn_attn = ResidualNormalization(hidden_dim)
        self.attn_score = nn.Conv2d(hidden_dim, in_channels, kernel_size=1, bias=False)
        self.activation = nn.LeakyReLU(0.1, inplace=True)
        
        # Global average pooling
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)

    def forward(self, x):
        # Input normalization
        x_norm = self.bn_input(x)
        
        # Attention weights calculation
        attn = self.activation(self.bn_attn(self.attn_conv(x_norm)))
        scores = self.attn_score(attn)
        
        # Softmax over spatial dimensions
        b, c, h, w = x.size()
        spatial_weights = F.softmax(scores.view(b, c, -1), dim=-1).view(b, c, h, w)
        
        # Attention-weighted pooling
        attn_pooled = (x * spatial_weights).sum(dim=[2, 3])
        
        # Global average pooling
        gap_pooled = self.global_avg_pool(x).squeeze(-1).squeeze(-1)
        
        # Concatenate both pooling results
        return torch.cat([attn_pooled, gap_pooled], dim=1)


class DSFlexiNetBlock(nn.Module):
    """Inverted Residual Block with Expansion and RepConv"""
    def __init__(self, in_channels, out_channels, stride, expansion_factor=6):
        super().__init__()
        self.stride = stride
        self.use_skip = True
        mid_channels = in_channels * expansion_factor
        
        # ---- Input normalization and scaling ----
        self.input_norm = nn.BatchNorm2d(in_channels)
        self.input_scaling = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias = False) if self.use_skip else None
        
        # ---- Expansion convolution ----
        self.expand_conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias = False)
        self.expand_norm = nn.BatchNorm2d( out_channels )
        self.expand_activation = nn.LeakyReLU()
        
        # ---- Spatial convolution ----
        self.spatial_conv = RepConv2d( out_channels , out_channels, stride=stride, groups = out_channels)
        self.spatial_norm = nn.BatchNorm2d( out_channels )
        self.spatial_activation = nn.LeakyReLU()
        
        # ---- Projection convolution ----
        # self.project_conv = nn.Conv2d(out_channels , out_channels , kernel_size = 1 , bias = False ,  )
        # self.project_norm = nn.BatchNorm2d( mid_channels )
        # self.dropout = nn.Dropout2d( 0.1 )

    def forward(self, x):
        residual = self.input_scaling(x) if self.use_skip else None
        
        # Input normalization
        out = self.input_norm(x)
        
        # Expansion
        out = self.expand_conv(out)
        out = self.expand_activation(out)
        
        # Spatial processing
        out = self.spatial_norm(out)
        out = self.spatial_conv(out)
        out = self.spatial_activation(out)
        
        # Projection
        # out = self.project_norm(out)
        # out = self.project_conv(out)
        # out = self.dropout(out)
        
        # Residual connection
        return out + residual if self.use_skip else out


class DSFlexiNet(nn.Module):
    """Main Network Architecture with RepConv and Flexible Blocks"""
    def __init__(self, num_classes=10):
        super().__init__()
        # assert len(expansion_factors) == 6, "Requires 6 expansion factors"
        
        # ---- Initial Convolution Layers ----
        self.input_norm = nn.BatchNorm2d(1)
        
        # Stage 1: Downsample
        self.conv1 = RepConv2d(1, 16, stride = ( 2 , 2))
        self.norm1 = nn.BatchNorm2d(16)
        self.activation1 = nn.ReLU()
        # Stage 2: Downsample
        self.conv2 = RepConv2d( 16 , 32 , stride = ( 2 , 2) )
        self.activation2 = nn.ReLU()
        
        # ---- Residual Stages ----
        # Stage 1: Residual blocks
        self.stage1 = nn.Sequential(
            DSFlexiNetBlock(32, 32, stride=(1,1)),
            DSFlexiNetBlock(32, 32, stride=(1,1)),
            DSFlexiNetBlock(32, 32, stride=(1,1))
        )
        self.stage1_norm = ResidualNormalization(32)
        
        # Stage 2: Residual blocks
        self.stage2 = nn.Sequential(
            DSFlexiNetBlock(32, 32, stride=(1,1)),
            DSFlexiNetBlock(32 , 32, stride=(1,1)),
            DSFlexiNetBlock(32 , 32, stride=(1,1)),
        )
        self.stage2_norm = ResidualNormalization(32)
        
        # Stage 3: Final residual block
        self.stage3 = nn.Sequential(
            DSFlexiNetBlock(32 , 64 , stride=(1,1)),
        )
        
        self.stage3_norm = ResidualNormalization(64)
        
        # ---- Classification Head ----
        self.pooling = LearnablePooling(64)
        self.head_norm = nn.BatchNorm1d(64 * 2)
        self.dropout = nn.Dropout(0.2)
        self.classifier = nn.Linear(64 * 2, num_classes)

    def forward(self, x , device = None):
        # Input preprocessing
        if x.dim() == 3:
            x = x.unsqueeze(1)  # Add channel dimension

        x = self.input_norm(x)
        # Initial convolution stages
        x = self.conv1(x)
        x = self.activation1(x)
        x = self.norm1(x)
        x = self.conv2(x)
        x = self.activation2(x)
        
        # Residual stages with skip connections
        x = self.stage1(x) + x
        x = self.stage1_norm(x)
        
        x = self.stage2(x) + x
        x = self.stage2_norm(x)
        
        x = self.stage3(x)
        x = self.stage3_norm(x)
        
        # Classification head
        x = self.pooling(x)
        x = self.head_norm(x)
        x = self.dropout(x)
        x = self.classifier(x)
        return x

def ReParametrize(module,device):
    """
    Recursively replaces all RepConv2d layers in a module with their reparametrized version.
    """
    for name, child in list(module.named_children()):
        if isinstance(child, RepConv2d):
            # print(f"Reparametrizing {name}")
            new_module = child.get_reparametrized_layer().to(device)
            setattr(module, name, new_module)
        else:
            ReParametrize(child,device)



device = 'cuda:0' if torch.cuda.is_available() else 'cpu' # Device configuration
print(f"Device: {device}")

models_dir = "/kaggle/input/dcase2025-task1-models/pytorch/final_student_models/2/"
submission = 1 # 2 , 4
model_names = ["GLOBAL.pth", f"V{submission}-A.pth", f"V{submission}-B.pth", f"V{submission}-C.pth", f"V{submission}-S1.pth", f"V{submission}-S2.pth", f"V{submission}-S3.pth"]

models = [DSFlexiNet().to(device) for v in model_names]
for i in range(len(model_names)):
    ReParametrize(models[i],device=device)
    models[i].load_state_dict(torch.load(models_dir+model_names[i], map_location=device)['model_state_dict'])
    print(f"Model {model_names[i]} Loaded")

Device: cuda:0
Model GLOBAL.pth Loaded
Model V1-A.pth Loaded
Model V1-B.pth Loaded
Model V1-C.pth Loaded
Model V1-S1.pth Loaded
Model V1-S2.pth Loaded
Model V1-S3.pth Loaded


In [8]:
import torch
import pandas as pd
import torch.nn.functional as F

# Define the device types and their corresponding model indices
device_types = ['a', 'b' , 'c', 's1', 's2', 's3', 'unknown']
model_mapping = {
    'a': 1,      # V1-A.pth
    'b': 2,      # V1-B.pth
    'c': 3,      # V1-C.pth
    's1': 4,     # V1-S1.pth
    's2': 5,     # V1-S2.pth
    's3': 6,     # V1-S3.pth
    'unknown': 0 # GLOBAL.pth
}

# Class names in the order required by the submission template
class_names = ['airport', 'shopping_mall', 'metro_station', 'street_pedestrian','public_square', 'street_traffic', 'tram', 'bus', 'metro', 'park']

# Function to evaluate a model on a data loader and return predictions with probabilities
def evaluate_test_dataset(model, loader, device):
    model.eval()
    results = {
        'filename': [],
        'scene_label': [],
        **{class_name: [] for class_name in class_names}
    }
    with torch.no_grad():
        for batch in loader:
            mels, devices, names = batch
            mels = mels.to(device)
            outputs = model(mels)
            # Apply softmax to get probabilities
            probs = F.softmax(outputs, dim=1).cpu().numpy()
            # Get predicted class indices
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            # Convert class indices to class names
            predicted_labels = [get_class_name(klass=pred) for pred in preds]
            # Collect results
            results['filename'].extend(names)
            results['scene_label'].extend(predicted_labels)
            # Add probabilities for each class
            for i, class_name in enumerate(class_names):
                results[class_name].extend(probs[:, i])
    return results

# Collect predictions for all device types
all_results = {
    'filename': [],
    'scene_label': [],
    **{class_name: [] for class_name in class_names}
}
for dev in device_types:
    print(f"Evaluating model for device: {dev}")
    model_idx = model_mapping[dev]
    model = models[model_idx]
    loader = test_loader[dev]
    # Evaluate the model
    results = evaluate_test_dataset(model, loader, device)
    # Aggregate results
    all_results['filename'].extend(results['filename'])
    all_results['scene_label'].extend(results['scene_label'])
    for class_name in class_names:
        all_results[class_name].extend(results[class_name])

# Create a DataFrame for submission
submission_df = pd.DataFrame(all_results)
# Ensure filenames have '.wav' extension
submission_df['filename'] = submission_df['filename'].apply(lambda x: f"{x}.wav")
# Sort by filename to ensure consistent ordering
submission_df = submission_df.sort_values(by='filename')
# Reorder columns to match the template
submission_columns = ['filename', 'scene_label'] + class_names
submission_df = submission_df[submission_columns]
# Save to CSV
submission_df.to_csv('temp.csv', index=False)
print("Submission file created: dcase2025_task1_submission.csv")

Evaluating model for device: a
Evaluating model for device: b
Evaluating model for device: c
Evaluating model for device: s1
Evaluating model for device: s2
Evaluating model for device: s3
Evaluating model for device: unknown
Submission file created: dcase2025_task1_submission.csv


In [10]:
import pandas as pd

# Step 1: Read the CSV file into a DataFrame
df = pd.read_csv('output.csv')

# Step 2: Reorder the columns (replace ['col1', 'col2', ...] with your desired column order)
# Example:
desired_column_order = ['filename', 'scene_label', 'airport', 'bus', 'metro', 'metro_station', 'park', 'public_square', 'shopping_mall', 'street_pedestrian', 'street_traffic', 'tram']
df = df[desired_column_order]

# Step 3: Round all numerical values to 4 decimal places
df = df.round(4)

# Step 4: Save the modified DataFrame to a tab-separated CSV file
df.to_csv('output.csv', sep='\t', index=False)