In [2]:
# dann_with_svm_keras_loader.py

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from functions import ReverseLayerF

from datasets import load_dataset
from keras.datasets import mnist
from keras.utils import to_categorical
from PIL import Image
import tensorflow as tf

# Load MNIST-M dataset
print("Loading MNIST-M from HuggingFace")
ds = load_dataset("Mike0307/MNIST-M")
train_data = ds['train']
test_data = ds['test']

X_train_m = np.array([np.array(image.resize((28, 28))) for image in train_data['image']])
y_train_m = np.array(train_data['label'])
X_test_m = np.array([np.array(image.resize((28, 28))) for image in test_data['image']])
y_test_m = np.array(test_data['label'])

from PIL import Image
import numpy as np
from skimage.util import random_noise
from tensorflow.keras.utils import to_categorical

# --- Augmentation Functions ---
def generate_structured_background(size=(28, 28, 3)):
    """Generate more realistic background patterns"""
    bg = np.random.randn(*size) * 0.3
    bg = np.cumsum(bg, axis=0)
    bg = np.clip(bg, 0, 1)
    color_mask = np.random.choice([0.7, 0.8, 0.9], size=size)
    return np.clip(bg * color_mask, 0, 1)

def adaptive_blend(img, alpha_range=(0.5, 0.8)):
    """Blend with adaptive transparency and structured background"""
    if img.ndim == 2:
        img = np.stack([img]*3, axis=-1)
    elif img.shape[-1] == 1:
        img = np.repeat(img, 3, axis=-1)
    alpha = np.random.uniform(*alpha_range)
    bg = generate_structured_background()
    mask = np.random.rand(28, 28) > 0.2
    blended = np.where(mask[..., None], alpha*img + (1-alpha)*bg, bg)
    blended = random_noise(blended, mode='gaussian', var=0.001)
    return np.clip(blended, 0, 1)

def augment_mnist_with_blending(images, apply_prob=0.8):
    """Enhanced augmentation pipeline"""
    augmented = []
    for img in images:
        img = np.squeeze(img)
        if np.random.rand() < apply_prob:
            blended_img = adaptive_blend(img)
        else:
            if img.ndim == 2:
                blended_img = np.stack([img]*3, axis=-1)
            else:
                blended_img = img
        pil_img = Image.fromarray((blended_img * 255).astype(np.uint8))
        augmented.append(np.array(pil_img) / 255.0)
    return np.array(augmented, dtype=np.float32)

# --- Load & Process MNIST ---
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()

# Normalize and apply augmentation
X_train = X_train.astype(np.float32) / 255.0
X_test = X_test.astype(np.float32) / 255.0
X_train = augment_mnist_with_blending(X_train)
X_test = augment_mnist_with_blending(X_test)

# One-hot encoding
y_train_oh = to_categorical(y_train)
y_test_oh = to_categorical(y_test)


X_train_m = X_train_m.astype(np.float32) / 255
X_test_m = X_test_m.astype(np.float32) / 255
y_train_m_oh = to_categorical(y_train_m)
y_test_m_oh = to_categorical(y_test_m)

Loading MNIST-M from HuggingFace


In [3]:


print('MNIST:', X_train.shape, y_train_oh.shape)
print('MNIST-M:', X_train_m.shape, y_train_m_oh.shape)

# Convert to PyTorch tensors
def to_tensor_dataset(x, y):
    x = torch.tensor(x.transpose((0, 3, 1, 2)), dtype=torch.float32)
    y = torch.tensor(y, dtype=torch.long)
    return TensorDataset(x, y.argmax(dim=1))

dataset_source = to_tensor_dataset(X_train, y_train_oh)
dataset_target = to_tensor_dataset(X_train_m, y_train_m_oh)
dataloader_source = DataLoader(dataset_source, batch_size=128, shuffle=True)
dataloader_target = DataLoader(dataset_target, batch_size=128, shuffle=True)

# CNN model with domain adaptation
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.feature = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=5),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2),
            nn.ReLU(True),
            nn.Conv2d(64, 50, kernel_size=5),
            nn.BatchNorm2d(50),
            nn.Dropout2d(),
            nn.MaxPool2d(2),
            nn.ReLU(True)
        )

        self.class_classifier = nn.Sequential(
            nn.Linear(50 * 4 * 4, 100),
            nn.BatchNorm1d(100),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(100, 100),
            nn.BatchNorm1d(100),
            nn.ReLU(True),
            nn.Linear(100, 10),
            nn.LogSoftmax(dim=1)
        )

        self.domain_classifier = nn.Sequential(
            nn.Linear(50 * 4 * 4, 100),
            nn.BatchNorm1d(100),
            nn.ReLU(True),
            nn.Linear(100, 2),
            nn.LogSoftmax(dim=1)
        )

    def forward(self, input_data, alpha):
        feature = self.feature(input_data)
        feature = feature.view(-1, 50 * 4 * 4)
        reverse_feature = ReverseLayerF.apply(feature, alpha)
        class_output = self.class_classifier(feature)
        domain_output = self.domain_classifier(reverse_feature)
        return class_output, domain_output

    def extract_features(self, input_data):
        with torch.no_grad():
            f = self.feature(input_data)
            return f.view(-1, 50 * 4 * 4)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Train DANN
model = CNNModel().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
loss_class = nn.NLLLoss()
loss_domain = nn.NLLLoss()

for epoch in range(5):
    data_source_iter = iter(dataloader_source)
    data_target_iter = iter(dataloader_target)
    len_dataloader = min(len(data_source_iter), len(data_target_iter))

    for i in range(len_dataloader):
        p = float(i + epoch * len_dataloader) / (5 * len_dataloader)
        alpha = 2. / (1. + np.exp(-10 * p)) - 1

        s_img, s_label = next(data_source_iter)
        t_img, _ = next(data_target_iter)
        s_img, s_label = s_img.to(device), s_label.to(device)
        t_img = t_img.to(device)

        model.zero_grad()
        batch_size = s_img.size(0)
        domain_label = torch.zeros(batch_size).long().to(device)
        class_output, domain_output = model(s_img, alpha)
        loss_s_label = loss_class(class_output, s_label)
        loss_s_domain = loss_domain(domain_output, domain_label)

        batch_size = t_img.size(0)
        domain_label = torch.ones(batch_size).long().to(device)
        _, domain_output = model(t_img, alpha)
        loss_t_domain = loss_domain(domain_output, domain_label)

        loss = loss_s_label + loss_s_domain + loss_t_domain
        loss.backward()
        optimizer.step()

        print(f"Epoch {epoch} | Step {i} | Loss: {loss.item():.4f}")

from torch.utils.data import TensorDataset, DataLoader

# Convert test data to tensors
X_test_tensor = torch.tensor(X_test_m.transpose((0, 3, 1, 2)), dtype=torch.float32)
y_test_tensor = torch.tensor(np.argmax(y_test_m_oh, axis=1), dtype=torch.long)

test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)


from sklearn.metrics import classification_report

all_preds = []
all_labels = []

model.eval()
for imgs, labels in test_loader:
    imgs = imgs.to(device)
    with torch.no_grad():
        class_output, _ = model(imgs, alpha=0)
    preds = class_output.argmax(dim=1).cpu().numpy()
    all_preds.extend(preds)
    all_labels.extend(labels.numpy())

# Print classification report
print("Classification Report on MNIST-M:")
print(classification_report(all_labels, all_preds, digits=4))


MNIST: (60000, 28, 28, 3) (60000, 10)
MNIST-M: (59001, 28, 28, 3) (59001, 10)
Epoch 0 | Step 0 | Loss: 3.7957
Epoch 0 | Step 1 | Loss: 3.6643
Epoch 0 | Step 2 | Loss: 3.6606
Epoch 0 | Step 3 | Loss: 3.6007
Epoch 0 | Step 4 | Loss: 3.5246
Epoch 0 | Step 5 | Loss: 3.5410
Epoch 0 | Step 6 | Loss: 3.5329
Epoch 0 | Step 7 | Loss: 3.4640
Epoch 0 | Step 8 | Loss: 3.4094
Epoch 0 | Step 9 | Loss: 3.3682
Epoch 0 | Step 10 | Loss: 3.3615
Epoch 0 | Step 11 | Loss: 3.3101
Epoch 0 | Step 12 | Loss: 3.2859
Epoch 0 | Step 13 | Loss: 3.2923
Epoch 0 | Step 14 | Loss: 3.2523
Epoch 0 | Step 15 | Loss: 3.2159
Epoch 0 | Step 16 | Loss: 3.1243
Epoch 0 | Step 17 | Loss: 3.1627
Epoch 0 | Step 18 | Loss: 3.0428
Epoch 0 | Step 19 | Loss: 3.0464
Epoch 0 | Step 20 | Loss: 3.0571
Epoch 0 | Step 21 | Loss: 3.0789
Epoch 0 | Step 22 | Loss: 2.9257
Epoch 0 | Step 23 | Loss: 2.9299
Epoch 0 | Step 24 | Loss: 2.8339
Epoch 0 | Step 25 | Loss: 2.8935
Epoch 0 | Step 26 | Loss: 2.8841
Epoch 0 | Step 27 | Loss: 2.8049
Epoch 0 