# Setup Kaggle Environment
Set up the Kaggle environment by installing necessary packages and configuring the environment.

In [None]:
# Install necessary packages
!pip install torch torchvision

# Import necessary libraries
import torch
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.utils.data
from torchvision import datasets, transforms

# Set random seed for reproducibility
import random
import numpy as np

manual_seed = random.randint(1, 10000)
random.seed(manual_seed)
torch.manual_seed(manual_seed)

# Check if CUDA is available and set the device
cuda = torch.cuda.is_available()
if cuda:
    cudnn.benchmark = True

# Define constants
lr = 1e-3
batch_size = 32
image_size = 28
n_epoch = 100

# Define paths for datasets and models
source_dataset_name = 'MNIST'
target_dataset_name = 'mnist_m'
source_image_root = f'/kaggle/input/{source_dataset_name}'
target_image_root = f'/kaggle/input/{target_dataset_name}'
model_root = '/kaggle/working/models'

# Create directories if they don't exist
import os
os.makedirs(model_root, exist_ok=True)

# Import Required Libraries
Import the necessary libraries, including PyTorch, NumPy, and others.

In [None]:
# Import additional necessary libraries
from PIL import Image
import os
import sys

# Import custom modules
from data_loader import GetLoader
from model import CNNModel
from test import test
from functions import ReverseLayerF

# Define CNN Model
Define the CNN model class as per the provided code.

In [None]:
# Define CNN Model
import torch.nn as nn
from functions import ReverseLayerF

class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.feature = nn.Sequential()
        self.feature.add_module('f_conv1', nn.Conv2d(3, 64, kernel_size=5))
        self.feature.add_module('f_bn1', nn.BatchNorm2d(64))
        self.feature.add_module('f_pool1', nn.MaxPool2d(2))
        self.feature.add_module('f_relu1', nn.ReLU(True))
        self.feature.add_module('f_conv2', nn.Conv2d(64, 50, kernel_size=5))
        self.feature.add_module('f_bn2', nn.BatchNorm2d(50))
        self.feature.add_module('f_drop1', nn.Dropout2d())
        self.feature.add_module('f_pool2', nn.MaxPool2d(2))
        self.feature.add_module('f_relu2', nn.ReLU(True))
        
        self.class_classifier = nn.Sequential()
        self.class_classifier.add_module('c_fc1', nn.Linear(50 * 4 * 4, 100))
        self.class_classifier.add_module('c_bn1', nn.BatchNorm1d(100))
        self.class_classifier.add_module('c_relu1', nn.ReLU(True))
        self.class_classifier.add_module('c_drop1', nn.Dropout())
        self.class_classifier.add_module('c_fc2', nn.Linear(100, 100))
        self.class_classifier.add_module('c_bn2', nn.BatchNorm1d(100))
        self.class_classifier.add_module('c_relu2', nn.ReLU(True))
        self.class_classifier.add_module('c_fc3', nn.Linear(100, 10))
        self.class_classifier.add_module('c_softmax', nn.LogSoftmax(dim=1))
        
        self.domain_classifier = nn.Sequential()
        self.domain_classifier.add_module('d_fc1', nn.Linear(50 * 4 * 4, 100))
        self.domain_classifier.add_module('d_bn1', nn.BatchNorm1d(100))
        self.domain_classifier.add_module('d_relu1', nn.ReLU(True))
        self.domain_classifier.add_module('d_fc2', nn.Linear(100, 2))
        self.domain_classifier.add_module('d_softmax', nn.LogSoftmax(dim=1))

    def forward(self, input_data, alpha):
        input_data = input_data.expand(input_data.data.shape[0], 3, 28, 28)
        feature = self.feature(input_data)
        feature = feature.view(-1, 50 * 4 * 4)
        reverse_feature = ReverseLayerF.apply(feature, alpha)
        class_output = self.class_classifier(feature)
        domain_output = self.domain_classifier(reverse_feature)
        
        return class_output, domain_output

# Define Reverse Layer Function
Define the ReverseLayerF function as per the provided code.

In [None]:
# Define Reverse Layer Function
from torch.autograd import Function

class ReverseLayerF(Function):
    @staticmethod
    def forward(ctx, x, alpha):
        ctx.alpha = alpha
        return x.view_as(x)

    @staticmethod
    def backward(ctx, grad_output):
        output = grad_output.neg() * ctx.alpha
        return output, None

# Define Data Loader
Define the data loader class to load the datasets.

In [None]:
# Define Data Loader
class GetLoader(torch.utils.data.Dataset):
    def __init__(self, data_root, data_list, transform=None):
        self.root = data_root
        self.transform = transform

        with open(data_list, 'r') as f:
            data_list = f.readlines()

        self.n_data = len(data_list)
        self.img_paths = []
        self.img_labels = []

        for data in data_list:
            self.img_paths.append(data[:-3])
            self.img_labels.append(data[-2])

    def __getitem__(self, item):
        img_paths, labels = self.img_paths[item], self.img_labels[item]
        imgs = Image.open(os.path.join(self.root, img_paths)).convert('RGB')

        if self.transform is not None:
            imgs = self.transform(imgs)
            labels = int(labels)

        return imgs, labels

    def __len__(self):
        return self.n_data

# Define Training and Testing Functions
Define the functions for training and testing the model.

In [None]:
# Define Training and Testing Functions

def train(model, dataloader_source, dataloader_target, optimizer, loss_class, loss_domain, n_epoch, cuda):
    best_accu_t = 0.0
    for epoch in range(n_epoch):
        len_dataloader = min(len(dataloader_source), len(dataloader_target))
        data_source_iter = iter(dataloader_source)
        data_target_iter = iter(dataloader_target)

        for i in range(len_dataloader):
            p = float(i + epoch * len_dataloader) / n_epoch / len_dataloader
            alpha = 2. / (1. + np.exp(-10 * p)) - 1

            # Training model using source data
            data_source = data_source_iter.next()
            s_img, s_label = data_source

            model.zero_grad()
            batch_size = len(s_label)
            domain_label = torch.zeros(batch_size).long()

            if cuda:
                s_img = s_img.cuda()
                s_label = s_label.cuda()
                domain_label = domain_label.cuda()

            class_output, domain_output = model(input_data=s_img, alpha=alpha)
            err_s_label = loss_class(class_output, s_label)
            err_s_domain = loss_domain(domain_output, domain_label)

            # Training model using target data
            data_target = data_target_iter.next()
            t_img, _ = data_target

            batch_size = len(t_img)
            domain_label = torch.ones(batch_size).long()

            if cuda:
                t_img = t_img.cuda()
                domain_label = domain_label.cuda()

            _, domain_output = model(input_data=t_img, alpha=alpha)
            err_t_domain = loss_domain(domain_output, domain_label)
            err = err_t_domain + err_s_domain + err_s_label
            err.backward()
            optimizer.step()

            sys.stdout.write('\r epoch: %d, [iter: %d / all %d], err_s_label: %f, err_s_domain: %f, err_t_domain: %f' \
                  % (epoch, i + 1, len_dataloader, err_s_label.data.cpu().numpy(),
                     err_s_domain.data.cpu().numpy(), err_t_domain.data.cpu().item()))
            sys.stdout.flush()

        # Save model
        torch.save(model, f'{model_root}/mnist_mnistm_model_epoch_current.pth')

        # Test model
        accu_s = test(source_dataset_name)
        accu_t = test(target_dataset_name)
        if accu_t > best_accu_t:
            best_accu_s = accu_s
            best_accu_t = accu_t
            torch.save(model, f'{model_root}/mnist_mnistm_model_epoch_best.pth')

    print('============ Summary ============= \n')
    print('Accuracy of the %s dataset: %f' % ('mnist', best_accu_s))
    print('Accuracy of the %s dataset: %f' % ('mnist_m', best_accu_t))
    print('Corresponding model was saved in ' + model_root + '/mnist_mnistm_model_epoch_best.pth')

def test(dataset_name):
    assert dataset_name in ['MNIST', 'mnist_m']
    model_root = 'models'
    image_root = os.path.join('dataset', dataset_name)
    batch_size = 128
    image_size = 28
    alpha = 0

    # Load data
    img_transform_source = transforms.Compose([
        transforms.Resize(image_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.1307,), std=(0.3081,))
    ])
    img_transform_target = transforms.Compose([
        transforms.Resize(image_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
    ])
    if dataset_name == 'mnist_m':
        test_list = os.path.join(image_root, 'mnist_m_test_labels.txt')
        dataset = GetLoader(
            data_root=os.path.join(image_root, 'mnist_m_test'),
            data_list=test_list,
            transform=img_transform_target
        )
    else:
        dataset = datasets.MNIST(
            root='dataset',
            train=False,
            transform=img_transform_source,
        )
    dataloader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=8
    )

    # Test model
    model = torch.load(os.path.join(model_root, 'mnist_mnistm_model_epoch_current.pth'))
    model = model.eval()
    if cuda:
        model = model.cuda()
    len_dataloader = len(dataloader)
    data_target_iter = iter(dataloader)
    i = 0
    n_total = 0
    n_correct = 0
    while i < len_dataloader:
        data_target = data_target_iter.next()
        t_img, t_label = data_target
        batch_size = len(t_label)
        if cuda:
            t_img = t_img.cuda()
            t_label = t_label.cuda()
        class_output, _ = model(input_data=t_img, alpha=alpha)
        pred = class_output.data.max(1, keepdim=True)[1]
        n_correct += pred.eq(t_label.data.view_as(pred)).cpu().sum()
        n_total += batch_size
        i += 1
    accu = n_correct.data.numpy() * 1.0 / n_total
    return accu

# Load Data
Load the MNIST and mnist_m datasets and prepare data loaders.

In [None]:
# Load Data

# Define image transformations for source and target datasets
img_transform_source = transforms.Compose([
    transforms.Resize(image_size),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.1307,), std=(0.3081,))
])

img_transform_target = transforms.Compose([
    transforms.Resize(image_size),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])

# Load source dataset (MNIST)
dataset_source = datasets.MNIST(
    root='/kaggle/input/mnist',
    train=True,
    transform=img_transform_source,
    download=True
)

dataloader_source = torch.utils.data.DataLoader(
    dataset=dataset_source,
    batch_size=batch_size,
    shuffle=True,
    num_workers=8
)

# Load target dataset (mnist_m)
train_list = os.path.join(target_image_root, 'mnist_m_train_labels.txt')

dataset_target = GetLoader(
    data_root=os.path.join(target_image_root, 'mnist_m_train'),
    data_list=train_list,
    transform=img_transform_target
)

dataloader_target = torch.utils.data.DataLoader(
    dataset=dataset_target,
    batch_size=batch_size,
    shuffle=True,
    num_workers=8
)

# Initialize Model, Optimizer, and Loss Functions
Initialize the CNN model, optimizer, and loss functions.

In [None]:
# Initialize Model, Optimizer, and Loss Functions

# Initialize the CNN model
model = CNNModel()

# Setup optimizer
optimizer = optim.Adam(model.parameters(), lr=lr)

# Define loss functions
loss_class = torch.nn.NLLLoss()
loss_domain = torch.nn.NLLLoss()

# Move model and loss functions to GPU if available
if cuda:
    model = model.cuda()
    loss_class = loss_class.cuda()
    loss_domain = loss_domain.cuda()

# Ensure all model parameters are trainable
for p in model.parameters():
    p.requires_grad = True

# Train the Model
Train the model using the training data.

In [None]:
# Train the Model

# Train the model using the training data
train(model, dataloader_source, dataloader_target, optimizer, loss_class, loss_domain, n_epoch, cuda)

# Test the Model
Test the model using the test data and print the accuracy.

In [None]:
# Test the Model

# Define the test function
def test(dataset_name):
    assert dataset_name in ['MNIST', 'mnist_m']
    model_root = '/kaggle/working/models'
    image_root = f'/kaggle/input/{dataset_name}'
    batch_size = 128
    image_size = 28
    alpha = 0

    # Load data
    img_transform_source = transforms.Compose([
        transforms.Resize(image_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.1307,), std=(0.3081,))
    ])
    img_transform_target = transforms.Compose([
        transforms.Resize(image_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
    ])
    if dataset_name == 'mnist_m':
        test_list = os.path.join(image_root, 'mnist_m_test_labels.txt')
        dataset = GetLoader(
            data_root=os.path.join(image_root, 'mnist_m_test'),
            data_list=test_list,
            transform=img_transform_target
        )
    else:
        dataset = datasets.MNIST(
            root='/kaggle/input/mnist',
            train=False,
            transform=img_transform_source,
        )
    dataloader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=8
    )

    # Test model
    model = torch.load(os.path.join(model_root, 'mnist_mnistm_model_epoch_current.pth'))
    model = model.eval()
    if cuda:
        model = model.cuda()
    len_dataloader = len(dataloader)
    data_target_iter = iter(dataloader)
    i = 0
    n_total = 0
    n_correct = 0
    while i < len_dataloader:
        data_target = data_target_iter.next()
        t_img, t_label = data_target
        batch_size = len(t_label)
        if cuda:
            t_img = t_img.cuda()
            t_label = t_label.cuda()
        class_output, _ = model(input_data=t_img, alpha=alpha)
        pred = class_output.data.max(1, keepdim=True)[1]
        n_correct += pred.eq(t_label.data.view_as(pred)).cpu().sum()
        n_total += batch_size
        i += 1
    accu = n_correct.data.numpy() * 1.0 / n_total
    return accu

# Test the model using the test data and print the accuracy
accuracy_source = test('MNIST')
accuracy_target = test('mnist_m')
print(f'Accuracy on source dataset (MNIST): {accuracy_source * 100:.2f}%')
print(f'Accuracy on target dataset (mnist_m): {accuracy_target * 100:.2f}%')