In [1]:
import torch
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset, ConcatDataset
from torchvision.datasets import ImageFolder
import torch.nn as nn
import torch.optim as optim
from torchvision.models import mobilenet_v2

In [2]:
seed = 42
torch.manual_seed(seed)

<torch._C.Generator at 0x113ae8b10>

In [3]:
seed = 42
torch.manual_seed(seed)

<torch._C.Generator at 0x113ae8b10>

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")


In [5]:
from torch.utils.data import Dataset
from PIL import Image
import os

class UnlabeledDataset(Dataset):
    def __init__(self, root, transform=None):
        self.root = root
        self.image_paths = [os.path.join(root, f) for f in os.listdir(root) if f.endswith(('.jpg'))]
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image  # No label

In [6]:


transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),  # âœ… Ensure tensor conversion before normalization
    transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
])

target_transform = transforms.Lambda(lambda y: torch.tensor(y, dtype=torch.long))

# Load datasets
synthetic_dataset = ImageFolder(root="data/synthetic/cifar10", 
                                transform=transform_test,
                                target_transform=target_transform)
unlabeled_dataset = UnlabeledDataset("data/real/unlabelled", transform=transform_test)
test_dataset = ImageFolder(root="data/real/animal_data", transform=transform_test)

batch_size = 32
synthetic_loader = DataLoader(synthetic_dataset, batch_size=batch_size, shuffle=True)
unlabeled_loader = DataLoader(unlabeled_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [7]:
class FeatureExtractor(nn.Module):
    def __init__(self, num_classes):
        super(FeatureExtractor, self).__init__()
        mobilenet = mobilenet_v2(pretrained=True)

        # Freeze pre-trained layers
        for param in mobilenet.parameters():
            param.requires_grad = False
        
        # Extract feature layers
        self.features = mobilenet.features
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        
        # Custom classifier
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(1280, num_classes)  # MobileNetV2's last conv layer has 1280 channels
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

In [8]:
# Load the saved model
num_classes = 3

model = FeatureExtractor(num_classes=num_classes).to(device)
model.eval()




FeatureExtractor(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU6(inplace=True)
    )
    (1): InvertedResidual(
      (conv): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
          (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU6(inplace=True)
        )
        (1): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (2): InvertedResidual(
      (conv): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 96, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(96,

In [9]:
def generate_pseudo_labels(model, dataloader, threshold=0.7):
    model.eval()
    pseudo_data = []

    with torch.no_grad():
        for images in dataloader:
            images = images.to(device)
            outputs = model(images)
            probabilities = torch.softmax(outputs, dim=1)
            confidence, pseudo_labels = torch.max(probabilities, dim=1)

            for i in range(len(images)):
                if confidence[i] > threshold:
                    pseudo_data.append((images[i].cpu(), pseudo_labels[i].cpu()))  # Store as detached tensor
    
    return pseudo_data

class PseudoLabeledDataset(Dataset):
    def __init__(self, pseudo_data, transform=None):
        self.pseudo_data = pseudo_data
        self.transform = transform

    def __len__(self):
        return len(self.pseudo_data)

    def __getitem__(self, idx):
        image, label = self.pseudo_data[idx]
        
        # Ensure image is a tensor
        if self.transform:
            image = self.transform(image)
        
        # Ensure label is a tensor
        label = torch.tensor(label, dtype=torch.long)
        
        return image, label

In [10]:
pseudo_data = generate_pseudo_labels(model, unlabeled_loader, threshold=0.51)
pseudo_dataset = PseudoLabeledDataset(pseudo_data, )  # Apply train transforms if needed
combined_dataset = ConcatDataset([synthetic_dataset, pseudo_dataset])
combined_loader = DataLoader(combined_dataset, batch_size=batch_size, shuffle=True)
# Retrain Model
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.classifier.parameters(), lr=0.01)
num_epochs = 10


In [11]:
print("\nRetraining model with domain adaptation...")

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for images, labels in combined_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        total_loss += loss.item()

    accuracy = 100 * correct / total
    avg_loss = total_loss / len(combined_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")

# Evaluate on test dataset
model.eval()
correct = 0
total = 0
test_loss = 0.0

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)

        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        test_loss += loss.item()

test_accuracy = 100 * correct / total
test_loss /= len(test_loader)

print(f"\nFinal Test Accuracy: {test_accuracy:.2f}% | Test Loss: {test_loss:.4f}")

# Save the improved model
torch.save(model.state_dict(), "domain_adapted_model.pth")
print("\nImproved model saved as 'domain_adapted_model.pth'.")


Retraining model with domain adaptation...


  label = torch.tensor(label, dtype=torch.long)


Epoch [1/10], Loss: 0.1162, Accuracy: 95.59%
Epoch [2/10], Loss: 0.0191, Accuracy: 99.35%
Epoch [3/10], Loss: 0.0190, Accuracy: 99.44%
Epoch [4/10], Loss: 0.0187, Accuracy: 99.35%
Epoch [5/10], Loss: 0.0347, Accuracy: 99.05%
Epoch [6/10], Loss: 0.0170, Accuracy: 99.47%
Epoch [7/10], Loss: 0.0098, Accuracy: 99.75%
Epoch [8/10], Loss: 0.0614, Accuracy: 98.74%
Epoch [9/10], Loss: 0.0514, Accuracy: 98.79%
Epoch [10/10], Loss: 0.0206, Accuracy: 99.55%

Final Test Accuracy: 91.62% | Test Loss: 0.6746

Improved model saved as 'domain_adapted_model.pth'.


In [12]:
len(pseudo_data)

10