In [44]:
import torch
import torch.nn as nn

class CustomLinear(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()

        # Weight matrix: (out_features, in_features)
        self.weight = nn.Parameter(
            torch.randn(out_features, in_features) * 0.01
        )

        # Bias vector: (out_features,)
        self.bias = nn.Parameter(
            torch.zeros(out_features)
        )

    def forward(self, x):
        """
        x shape: (batch_size, in_features)
        output shape: (batch_size, out_features)
        """
        return x @ self.weight.T + self.bias


In [45]:
x = torch.randn(4, 10)
fc = CustomLinear(10, 3)
y = fc(x)

print(y.shape)
# torch.Size([4, 3])



torch.Size([4, 3])


In [46]:
class CustomReLU(nn.Module):
    def forward(self, x):
        return torch.clamp(x, min=0.0)



In [47]:
x = torch.tensor([[-1.0, 2.0, -0.5]])
relu = CustomReLU()
print(relu(x))
# tensor([[0., 2., 0.]])


tensor([[0., 2., 0.]])


In [48]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CustomMaxPool2D(nn.Module):
    def __init__(self, kernel_size, stride=None):
        super().__init__()
        self.kernel_size = kernel_size
        self.stride = stride if stride is not None else kernel_size

    def forward(self, x):
        """
        x shape: (B, C, H, W)
        """
        B, C, H, W = x.shape
        k = self.kernel_size
        s = self.stride

        # Unfold: (B, C*k*k, L)
        x_unfold = F.unfold(
            x,
            kernel_size=k,
            stride=s
        )

        # Reshape to separate window elements
        # (B, C, k*k, L)
        x_unfold = x_unfold.view(B, C, k * k, -1)

        # Max over window
        out, _ = torch.max(x_unfold, dim=2)

        # Compute output spatial size
        H_out = (H - k) // s + 1
        W_out = (W - k) // s + 1

        # Reshape back to image format
        out = out.view(B, C, H_out, W_out)

        return out


In [49]:
x = torch.tensor([[[[1., 2., 3., 4.],
                    [5., 6., 7., 8.],
                    [9.,10.,11.,12.],
                    [13.,14.,15.,16.]]]])

pool = CustomMaxPool2D(kernel_size=2)
y = pool(x)

print(y)


tensor([[[[ 6.,  8.],
          [14., 16.]]]])


In [50]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CustomConv2D(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):
        super().__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding

        # Weight: (out_channels, in_channels, k, k)
        self.weight = nn.Parameter(
            torch.randn(out_channels, in_channels, kernel_size, kernel_size) * 0.01
        )

        # Bias: (out_channels,)
        self.bias = nn.Parameter(torch.zeros(out_channels))

    def forward(self, x):
        """
        x shape: (B, C_in, H, W)
        """
        B, C, H, W = x.shape
        k = self.kernel_size

        # Step 1: Unfold input
        # Shape: (B, C_in * k * k, L)
        x_unfold = F.unfold(
            x,
            kernel_size=k,
            stride=self.stride,
            padding=self.padding
        )

        # Step 2: Reshape weights
        # (out_channels, C_in * k * k)
        w_flat = self.weight.view(self.out_channels, -1)

        # Step 3: Perform convolution as matrix multiplication
        # (B, out_channels, L)
        out = w_flat @ x_unfold
        out = out + self.bias.view(1, -1, 1)

        # Step 4: Reshape output to image grid
        H_out = (H + 2 * self.padding - k) // self.stride + 1
        W_out = (W + 2 * self.padding - k) // self.stride + 1

        out = out.view(B, self.out_channels, H_out, W_out)

        return out


In [51]:
x = torch.randn(2, 3, 8, 8)

conv_ref = nn.Conv2d(3, 5, 3, padding=1, bias=True)
conv_custom = CustomConv2D(3, 5, 3, padding=1)

# Copy weights for comparison
conv_custom.weight.data = conv_ref.weight.data.clone()
conv_custom.bias.data = conv_ref.bias.data.clone()

y_ref = conv_ref(x)
y_custom = conv_custom(x)

print(torch.allclose(y_ref, y_custom, atol=1e-6))
# True


True


In [52]:
class FeatureExtractor(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv1 = CustomConv2D(
            in_channels=1,
            out_channels=8,
            kernel_size=3,
            stride=1,
            padding=1
        )
        self.relu1 = CustomReLU()
        self.pool1 = CustomMaxPool2D(kernel_size=2)

        self.conv2 = CustomConv2D(
            in_channels=8,
            out_channels=12,
            kernel_size=3,
            stride=1,
            padding=1
        )
        self.relu2 = CustomReLU()
        self.pool2 = CustomMaxPool2D(kernel_size=2)

        self.conv3 = CustomConv2D(
            in_channels=12,
            out_channels=16,
            kernel_size=3,
            stride=1,
            padding=1
        )
        self.relu3 = CustomReLU()
        self.pool3 = CustomMaxPool2D(kernel_size=2)        

    def forward(self, x):
        """
        x: (B, 1, H, W)
        returns: (B, D)
        """
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = self.pool3(self.relu3(self.conv3(x)))

        # Flatten
        x = x.view(x.size(0), -1)
        return x


In [53]:
class MultiInputCNN(nn.Module):
    def __init__(self, input_shape, num_classes=16):
        """
        input_shape: (1, H, W) â€” shape of ONE input image
        """
        super().__init__()

        self.feature_extractor = FeatureExtractor()

        # ---- Automatically compute feature dimension ----
        with torch.no_grad():
            dummy = torch.zeros(1, *input_shape)
            dummy_feat = self.feature_extractor(dummy)
            feature_dim = dummy_feat.shape[1]

        self.feature_dim = feature_dim

        # ---- Classifier ----
        self.fc1 = CustomLinear(3 * feature_dim, 32)
        self.relu = CustomReLU()
        self.fc2 = CustomLinear(32, num_classes)

    def forward(self, x1, x2, x3):
        """
        x1, x2, x3: (B, 1, H, W)
        """
        f1 = self.feature_extractor(x1)
        f2 = self.feature_extractor(x2)
        f3 = self.feature_extractor(x3)

        fused = torch.cat([f1, f2, f3], dim=1)
        out = self.fc2(self.relu(self.fc1(fused)))
        return out

In [54]:
import torch
from torch.utils.data import Dataset
from PIL import Image
import pandas as pd
import numpy as np
import os


class MultiInputImageDataset(Dataset):
    def __init__(self, csv_path, root_dir=None, transform=None):
        """
        csv_path : path to metadata CSV
        root_dir : optional base directory for images
        transform: torchvision-style transform (applied to ALL 3 images)
        """
        self.df = pd.read_csv(csv_path)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def _load_image(self, path):
        if self.root_dir is not None:
            path = os.path.join(self.root_dir, path)

        img = Image.open(path).convert("L")  # grayscale
        img = img.resize((128, 128))
        if self.transform:
            img = self.transform(img)
        else:
            # fallback: tensor + normalize to [0,1]
            img = torch.from_numpy(
                np.array(img, dtype="float32") / 255.0
            ).unsqueeze(0)

        return img

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        img1_path = row["input_1"]
        img2_path = row["input_2"]
        img3_path = row["input_3"]
        label = int(row["target"])

        img1 = self._load_image(img1_path)
        img2 = self._load_image(img2_path)
        img3 = self._load_image(img3_path)

        return img1, img2, img3, label


In [55]:
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv("dataset_music/train/metadata.csv")

train_df, val_df = train_test_split(
    df,
    test_size=0.2,
    random_state=42,
    stratify=df["target"]
)

train_df.to_csv("dataset_music/train/train_split.csv", index=False)
val_df.to_csv("dataset_music/train/val_split.csv", index=False)


In [56]:
train_dataset = MultiInputImageDataset(
    csv_path="dataset_music/train/train_split.csv",
    root_dir=None,
)

val_dataset = MultiInputImageDataset(
    csv_path="dataset_music/train/val_split.csv",
    root_dir=None,
)


In [57]:
from torch.utils.data import DataLoader


train_loader = DataLoader(
    train_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=2,
    pin_memory=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=32,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)


In [58]:
from sklearn.metrics import f1_score

def evaluate_macro_f1(model, loader, device):
    model.eval()
    y_true, y_pred = [], []

    with torch.no_grad():
        for img1, img2, img3, target in loader:
            img1 = img1.to(device)
            img2 = img2.to(device)
            img3 = img3.to(device)

            logits = model(img1, img2, img3)
            preds = torch.argmax(logits, dim=1)

            y_true.extend(target.numpy())
            y_pred.extend(preds.cpu().numpy())

    return f1_score(y_true, y_pred, average="macro")


In [59]:
img1, img2, img3, label = train_dataset[0]
print(img1.shape, img2.shape, img3.shape, label)


torch.Size([1, 128, 128]) torch.Size([1, 128, 128]) torch.Size([1, 128, 128]) 8


In [60]:
model = MultiInputCNN(input_shape=(1, 128, 128))
print(sum(p.numel() for p in model.parameters()))


396476


In [61]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for img1, img2, img3, target in train_loader:
        img1 = img1.to(device)
        img2 = img2.to(device)
        img3 = img3.to(device)
        target = target.to(device)

        logits = model(img1, img2, img3)
        loss = criterion(logits, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    avg_loss = running_loss / len(train_loader)
    val_f1 = evaluate_macro_f1(model, val_loader, device)

    print(
        f"Epoch [{epoch+1}/{num_epochs}] "
        f"Loss: {avg_loss:.4f} | Val Macro F1: {val_f1:.4f}"
    )


Epoch [1/10] Loss: 2.6590 | Val Macro F1: 0.0777
Epoch [2/10] Loss: 1.9775 | Val Macro F1: 0.3329
Epoch [3/10] Loss: 1.7066 | Val Macro F1: 0.3600
Epoch [4/10] Loss: 1.5821 | Val Macro F1: 0.4109
Epoch [5/10] Loss: 1.5092 | Val Macro F1: 0.4195
Epoch [6/10] Loss: 1.4581 | Val Macro F1: 0.4531


KeyboardInterrupt: 