# Dataset_handling


In [1]:
import os
import pickle
from pathlib import Path
import matplotlib.pyplot as plt
from torchvision import datasets
from torchvision.datasets import ImageFolder
from torchvision import transforms
import torch

def get_cifar10_data():
    """
    Function to get CIFAR-10 train and test data.
    """
    train_data = datasets.CIFAR10(
        root="data",
        train=True,
        download=True,
    )

    test_data = datasets.CIFAR10(
        root="data",
        train=False,
        download=True,
    )
    return train_data, test_data

def load_batches(file_path):
    """
    Function to load batches from CIFAR-10 data files.
    """
    with open(file_path, 'rb') as fo:
        batch = pickle.load(fo, encoding='bytes')
    return batch

def create_classification_directory(input_dir, output_dir, subdirectory):
    """
    Function to create a classification directory structure for CIFAR-10.
    """
    os.makedirs(output_dir, exist_ok=True)
    for label_id, class_name in enumerate(subdirectory):
        class_name = class_name.decode('utf-8')
        class_dir = os.path.join(output_dir, class_name)
        os.makedirs(class_dir, exist_ok=True)

def Copy_train_Images(input_dir, output_dir):
    """
    Function to copy and save train images to the specified directory.
    """
    for batch_id in range(1, 6):
        batch_data = load_batches(os.path.join(input_dir, f'data_batch_{batch_id}'))
        for i, (image, label) in enumerate(zip(batch_data[b'data'], batch_data[b'labels'])):
            class_name = meta[b'label_names'][label].decode('utf-8')
            image = image.reshape(3, 32, 32).transpose(1, 2, 0)
            image_filename = f'{batch_id}_{i + 1}.png'
            output_path = os.path.join(output_dir, class_name, image_filename)
            plt.imsave(output_path, image)

def Images_in_directory(input_dir):
    """
    Function to get the number of items in a directory.
    """
    from pathlib import Path
    dir = Path(input_dir)
    num_items = len(list(dir.glob('*/*.png')))
    print(f"Number of items in {dir} directory: {num_items}")

def Copy_test_Images(input_dir, output_dir):
    """
    Function to copy and save test images to the specified directory.
    """
    batch_data = load_batches(os.path.join(input_dir, 'test_batch'))
    for i, (image, label) in enumerate(zip(batch_data[b'data'], batch_data[b'labels'])):
        class_name = meta[b'label_names'][label].decode('utf-8')
        image = image.reshape(3, 32, 32).transpose(1, 2, 0)
        image_filename = f'{i + 1}.png'
        output_path = os.path.join(output_dir, class_name, image_filename)
        plt.imsave(output_path, image)

def simple_dataset(input_dir):
    """
    Function to create a simple ImageFolder dataset.
    """
    from torchvision import datasets
    data = datasets.ImageFolder(root=input_dir)
    return data

def get_subdirectories(parent_dir):
    """
    Function to get subdirectories in a parent directory.
    """
    import os
    from pathlib import Path
    for subdirectory in os.listdir(parent_dir):
        subdirectory_path = os.path.join(parent_dir, subdirectory)
        if os.path.isdir(subdirectory_path):
            print("Subdirectory:", subdirectory_path)
    parent_dir = Path(parent_dir)
    subdirectories = [subdir for subdir in parent_dir.iterdir() if subdir.is_dir()]
    return subdirectories

def l_u_split(seed, source, l, u, ratio):
    """
    Function to split data into labeled and unlabeled based on a given ratio.
    """
    import numpy as np
    np.random.seed(seed)
    torch.manual_seed(seed)

    import os
    import random
    from pathlib import Path

    source_dir = Path(source)
    labeled_dir = Path(l)
    unlabeled_dir = Path(u)

    labeled_dir.mkdir(parents=True, exist_ok=True)
    unlabeled_dir.mkdir(parents=True, exist_ok=True)

    split_ratio = ratio

    for class_name in os.listdir(source_dir):
        class_dir = source_dir / class_name

        labeled_class_dir = labeled_dir / class_name
        labeled_class_dir.mkdir(parents=True, exist_ok=True)

        unlabeled_class_dir = unlabeled_dir / class_name
        unlabeled_class_dir.mkdir(parents=True, exist_ok=True)

        image_files = list(class_dir.glob("*.png"))

        random.shuffle(image_files)

        num_labeled = int(len(image_files) * split_ratio)
        num_unlabeled = len(image_files) - num_labeled

        for i in range(num_labeled):
            src_path = image_files[i]
            dest_path = labeled_class_dir / src_path.name
            src_path.rename(dest_path)

        for i in range(num_labeled, num_labeled + num_unlabeled):
            src_path = image_files[i]
            dest_path = unlabeled_class_dir / src_path.name
            src_path.rename(dest_path)


# Data Transformation

In [2]:
import torchvision.transforms as transforms

def get_transform():
    """
    Function to get data transformation for training images.
    """
    transform = transforms.Compose([
        transforms.AutoAugment(),
        transforms.RandAugment(),
        transforms.Resize((32, 32)),
        # transforms.RandomAutocontrast(p= 0.5),
        # transforms.ColorJitter(brightness=0.1, contrast=0.2),
        # transforms.RandomResizedCrop(size=(32,32)),
        # transforms.RandomEqualize(p= 0.5),
        # transforms.RandomInvert(p=0.5),
        # transforms.RandomAdjustSharpness(sharpness_factor=0.5),
        # transforms.RandomAffine(degrees=12.5,translate=[0.5,0.5],shear=[0.5,0.5]),
        # transforms.RandomRotation(degrees=12.5),
        # transforms.RandomPosterize(bits=8,p= 0.5),
        # transforms.RandomSolarize(threshold = 0.5,p= 0.5),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    return transform

def simple_transform():
    """
    Function to get a simple data transformation for images.
    """
    transform = transforms.Compose([
        transforms.Resize((32, 32)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    return transform


# Data Loading

In [3]:
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

def get_loaders(BATCH_SIZE, labeled_dir, unlabeled_dir, test_dir, transform, simple_transform):
    """
    Function to get DataLoader instances for labeled, unlabeled, and test datasets.
    """
    labeled_data = ImageFolder(root=labeled_dir, transform=get_transform())
    unlabeled_data = ImageFolder(root=unlabeled_dir, transform=simple_transform())
    unlabeled_data_augmented = ImageFolder(root=unlabeled_dir, transform=get_transform())
    test_data = ImageFolder(root=test_dir, transform=get_transform())

    labeled_train_loader = DataLoader(labeled_data, batch_size=BATCH_SIZE, shuffle=True)
    unlabeled_train_loader = DataLoader(unlabeled_data, batch_size=BATCH_SIZE, shuffle=True)
    unlabeled_train_loader_augmented = DataLoader(unlabeled_data_augmented, batch_size=BATCH_SIZE, shuffle=True)
    test_loader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=True)

    print(f"Labeled: {len(labeled_train_loader)} | Unlabeled_Augmented: {len(unlabeled_train_loader_augmented)} | Unlabeled: {len(unlabeled_train_loader)} | Test: {len(test_loader)}")

    return labeled_train_loader, unlabeled_train_loader_augmented, unlabeled_train_loader, test_loader


# Model Initialization

In [4]:
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.nn.functional as F
from torch.autograd import Variable

import sys
import numpy as np

def conv3x3(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=True)

def conv_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        init.xavier_uniform_(m.weight, gain=np.sqrt(2))
        init.constant_(m.bias, 0)
    elif classname.find('BatchNorm') != -1:
        init.constant_(m.weight, 1)
        init.constant_(m.bias, 0)

class wide_basic(nn.Module):
    def __init__(self, in_planes, planes, dropout_rate, stride=1):
        super(wide_basic, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, padding=1, bias=True)
        self.dropout = nn.Dropout(p=dropout_rate)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=True)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, planes, kernel_size=1, stride=stride, bias=True),
            )

    def forward(self, x):
        out = self.dropout(self.conv1(F.relu(self.bn1(x))))
        out = self.conv2(F.relu(self.bn2(out)))
        out += self.shortcut(x)

        return out

class Wide_ResNet(nn.Module):
    def __init__(self, depth, widen_factor, dropout_rate, num_classes):
        super(Wide_ResNet, self).__init__()
        self.in_planes = 16

        assert ((depth-4)%6 ==0), 'Wide-resnet depth should be 6n+4'
        n = (depth-4)/6
        k = widen_factor

        print('| Wide-Resnet %dx%d' %(depth, k))
        nStages = [16, 16*k, 32*k, 64*k]

        self.conv1 = conv3x3(3,nStages[0])
        self.layer1 = self._wide_layer(wide_basic, nStages[1], n, dropout_rate, stride=1)
        self.layer2 = self._wide_layer(wide_basic, nStages[2], n, dropout_rate, stride=2)
        self.layer3 = self._wide_layer(wide_basic, nStages[3], n, dropout_rate, stride=2)
        self.bn1 = nn.BatchNorm2d(nStages[3], momentum=0.9)
        self.linear = nn.Linear(nStages[3], num_classes)

    def _wide_layer(self, block, planes, num_blocks, dropout_rate, stride):
        strides = [stride] + [1]*(int(num_blocks)-1)
        layers = []

        for stride in strides:
            layers.append(block(self.in_planes, planes, dropout_rate, stride))
            self.in_planes = planes

        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.relu(self.bn1(out))
        out = F.avg_pool2d(out, 8)
        out = out.view(out.size(0), -1)
        out = self.linear(out)

        return out


# Train and eval

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim

def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true.to(device), y_pred.to(device)).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

def train_model(model, labeled_train_loader, unlabeled_train_loader_augmented, optimizer, criterion, device):
    SEED = 2
    EPOCHS = 100
    BETA_ZERO = 8
    a = torch.tensor(8,dtype=torch.float32)
    i = torch.tensor(1,dtype=torch.float32)

    torch.manual_seed(SEED)
    labeled_batches = iter(labeled_train_loader)
    torch.manual_seed(SEED)
    unlabeled_batches = iter(unlabeled_train_loader)
    torch.manual_seed(SEED)
    unlabeled_batches_augmented = iter(unlabeled_train_loader_augmented)

    r_seed = 2
    print(device)
    for epoch_num in range(EPOCHS):
        acc = 0
        print("\n\n\n\n\nEPOCH NUMBER", epoch_num + 1, "\n\n\n\n")
        print("Loss_1 , Loss_old, Loss_new, Loss_partial, Loss_UDA, Loss_MPL\n")
        model.train()
        loss_per_epoch_1 = 0

        for epoch in range(len(unlabeled_train_loader)):
            try:
                labeled_images, labels = next(labeled_batches)
            except StopIteration:
                r_seed += 1
                torch.manual_seed(r_seed)
                labeled_batches = iter(labeled_train_loader)
                labeled_images, labels = next(labeled_batches)

            try:
                unlabeled_images, uy = next(unlabeled_batches)
            except StopIteration:
                torch.manual_seed(r_seed)
                unlabeled_batches = iter(unlabeled_train_loader)
                unlabeled_images, uy = next(unlabeled_batches)

            try:
                unlabeled_images_augmented, uya = next(unlabeled_batches_augmented)
            except StopIteration:
                torch.manual_seed(r_seed)
                unlabeled_batches_augmented = iter(unlabeled_train_loader_augmented)
                unlabeled_images_augmented, uya = next(unlabeled_batches_augmented)

            with torch.no_grad():
                pseudo_labels = torch.argmax(torch.softmax(model(unlabeled_images.to(device)), dim=1), dim=1).to(device)

            optimizer.zero_grad()

            unlabeled_logits_augmented = model(unlabeled_images_augmented.to(device)).to(device)
            unlabeled_logits = model(unlabeled_images.to(device)).to(device)
            labeled_logits = model(labeled_images.to(device)).to(device)

            loss_1 = nn.CrossEntropyLoss()(unlabeled_logits_augmented, pseudo_labels)
            loss_old = nn.CrossEntropyLoss()(labeled_logits.to(device), labels.to(device)).to(device)

            (loss_1).backward()
            optimizer.step()

            new_labeled_logits = model(labeled_images.to(device)).to(device)
            loss_ce_y = nn.CrossEntropyLoss()(new_labeled_logits, labels.to(device)).to(device)

            new_unlabeled_logits = model(unlabeled_images.to(device)).to(device)
            new_unlabeled_logits_augmented = model(unlabeled_images_augmented.to(device)).to(device)

            with torch.no_grad():
                pseudo_labels_unlabeled = torch.softmax(model(unlabeled_images.to(device)).to(device), dim=1)

            BETA_K = BETA_ZERO * torch.min(i, ((epoch + 1) / a))

            mask, ____ = torch.max(torch.softmax(new_unlabeled_logits_augmented, dim=1), dim=1)
            mask = mask.ge(0.95).float()

            loss_partial = BETA_K * torch.mean(
                torch.sum(-(torch.softmax(new_unlabeled_logits, dim=1)) * torch.log(
                    torch.softmax(new_unlabeled_logits_augmented, dim=1)), dim=-1) * mask)
            loss_uda = loss_ce_y + loss_partial

            with torch.no_grad():
                new_pseudo_labels = torch.argmax(torch.softmax(new_unlabeled_logits, dim=1), dim=1).to(device)

            ce_new = loss_ce_y.item()
            ce_old = loss_old.item()
            change = ce_new - ce_old
            loss_mpl = ((change) * (nn.CrossEntropyLoss()(new_unlabeled_logits, new_pseudo_labels)))

            print(loss_1.item(), loss_old.item(), loss_ce_y.clone().item(), loss_partial.item(),
                  loss_uda.item(), loss_mpl.item())

            loss_2 = loss_uda + loss_mpl
            (loss_2).backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 8)
            optimizer.step()

            loss_per_epoch_1 += loss_ce_y.item()

        model.eval()

        for _ in range(len(test_loader)):
            try:
                test_images, test_labels = next(test_batches)
            except StopIteration:
                test_batches = iter(test_loader)
                test_images, test_labels = next(test_batches)

            test_logits = model(test_images.to(device))
            pred_labels = torch.argmax(torch.softmax(test_logits, dim=1).to(device), dim=1).to(device)
            acc += accuracy_fn(test_labels, pred_labels)

        print("Accuracy per epoch :", acc / len(test_loader))
        print(f"Loss_1:{loss_per_epoch_1/len(unlabeled_train_loader)} ")


In [6]:
def evaluate_model(model, test_loader, criterion, device):
  acc=0
  torch.manual_seed(43)
  test_batches = iter(test_loader)
  torch.manual_seed(43)
  labeled_batches = iter(labeled_train_loader)
  model.eval()
  for _ in range(len(test_loader)):
    try:
        test_images, test_labels = next(test_batches)
    except StopIteration:
        test_batches = iter(test_loader)
        test_images, test_labels = next(test_batches)

    test_logits = model(test_images.to(device))
    pred_labels = torch.argmax(torch.softmax(test_logits,dim=1).to(device),dim=1).to(device)
    acc += accuracy_fn(test_labels, pred_labels)

  print("Accuracy per epoch :", acc/len(test_loader))

# MASTER SCRIPT

In [None]:
# hyperparameters and device
BATCH_SIZE = 128
labeled_dir = 'labeled'
unlabeled_dir = 'unlabeled'
test_dir = 'test'
device = "cuda" if torch.cuda.is_available() else "cpu"
SEED = 42
EPOCHS = 100
BETA_ZERO = 8
MU = 11.5
labeled_ratio = 1/(MU+1)
depth = 28
widen_factor = 2
dropout_rate = 0.3
num_classes = 10


# Step 1: Data Handling
train_data, test_data = get_cifar10_data()
meta_path = 'data/cifar-10-batches-py/batches.meta'
meta = load_batches(meta_path)
### Train directory
create_classification_directory('data/cifar-10-batches-py', 'cifar-10-dataset', meta[b'label_names'])
### Test directory
create_classification_directory('data/cifar-10-batches-py', 'test', meta[b'label_names'])
Copy_train_Images('/content/data/cifar-10-batches-py','cifar-10-dataset')
Copy_test_Images('/content/data/cifar-10-batches-py','test')
data = simple_dataset('cifar-10-dataset')
sub_directories = get_subdirectories('cifar-10-dataset')


# Step 2: Data Transformation
transform = get_transform()

# Step 3: Split Data into Labeled and Unlabeled
l_u_split(SEED, 'cifar-10-dataset', 'labeled', 'unlabeled', labeled_ratio)

# Step 4: Data Loading
labeled_train_loader, unlabeled_train_loader_augmented, unlabeled_train_loader, test_loader = get_loaders(
    BATCH_SIZE, labeled_dir, unlabeled_dir, test_dir, transform, simple_transform
)
# Step 5: Model Definition
model = Wide_ResNet(depth, widen_factor, dropout_rate, num_classes).to(device)

# Step 6: Training
optimizer = torch.optim.SGD(model.parameters(), lr=0.05, weight_decay=1e-4, momentum=0.9)
criterion = nn.CrossEntropyLoss()

train_model(model, labeled_train_loader, unlabeled_train_loader_augmented, optimizer, criterion, device)

# Step 7: Evaluation
evaluate_model(model, test_loader, criterion, device)