# Code  
---
This code is taken from: https://github.com/rafikg/CEAL
## Import all the required stuff

In [1]:
# imported from run_ceal/ceal_learning_algorithm.py
from utils import Normalize, RandomCrop, SquarifyImage, \
    ToTensor
from utils import get_uncertain_samples, get_high_confidence_samples, \
    update_threshold
from torch.utils.data import DataLoader
from torchvision import transforms
from torch.utils.data.sampler import SubsetRandomSampler

import numpy as np
import torch
import logging


# imported from model/alexnet.py
from typing import Optional, Callable

from torchvision.models import alexnet
from torch.utils.data import DataLoader
from torch.nn.functional import softmax
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch
import torch.optim as Optimizer
import logging


# imported from utils/dataset.py
from typing import Optional, Callable, Union
from torch.utils.data import Dataset

import torch
import os
import glob
import numpy as np
import warnings
import cv2

#imported by me
from sklearn.metrics import precision_recall_fscore_support as prfs

logging.basicConfig(format="%(levelname)s:%(name)s: %(message)s",
                    level=logging.INFO)
logger = logging.getLogger(__name__)

In [2]:
device = None

## Model

In [3]:
logging.basicConfig(format="%(levelname)s:%(name)s: %(message)s",
                    level=logging.INFO)
logger = logging.getLogger(__name__)


class AlexNet(object):
    """
    Encapsulate the pretrained alexnet model
    Parameters
    ----------
    n_classes : int, default(256)
        the new number of classes
    device: Optional[str] 'cuda' or 'cpu', default(None)
            if None: cuda will be used if it is available
    """

    def __init__(self, n_classes: int = 4, device: Optional[str] = None):

        self.n_classes = n_classes
        self.model = alexnet(pretrained=True, progress=True)

        self.__freeze_all_layers()
        self.__change_last_layer()
        if device is None:
            self.device = torch.device(
                "cuda:0" if torch.cuda.is_available() else "cpu")
        logger.info('The code is running on {} '.format(self.device))

    def __freeze_all_layers(self) -> None:
        """
        freeze all layers in alexnet
        Returns
        -------
        None
        """

        for param in self.model.parameters():
            param.requires_grad = False

    def __change_last_layer(self) -> None:
        """
        change last layer to accept n_classes instead of 1000 classes
        Returns
        -------
        None
        """
        self.model.classifier[6] = nn.Linear(4096, self.n_classes)

    def __add_softmax_layer(self) -> None:
        """
        Add softmax layer to alexnet model
        Returns
        -------

        """
        # add softmax layer
        self.model = nn.Sequential(self.model, nn.LogSoftmax(dim=1))

    def __train_one_epoch(self, train_loader: DataLoader,
                          optimizer: Optimizer,
                          criterion: Callable,
                          valid_loader: DataLoader = None,
                          epoch: int = 0,
                          each_batch_idx: int = 300) -> None:
        """
        Train alexnet for one epoch
        Parameters
        ----------
        train_loader : DataLoader
        criterion :  Callable
        optimizer : Optimizer (torch.optim)
        epoch : int
        each_batch_idx : int
            print training stats after each_batch_idx

        Returns
        -------
        None
        """
        train_loss = 0
        data_size = 0

        for batch_idx, sample_batched in enumerate(train_loader):
            # load data and label
            data, label = sample_batched['image'], sample_batched['label']

            # convert data and label to be compatible with the device
            data = data.to(self.device)
            data = data.float()
            label = label.to(self.device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # run forward
            pred_prob = self.model(data)

            # calculate loss
            label = label.float()
            loss = criterion(pred_prob, label)

            # calculate gradient (backprop)
            loss.backward()

            # total train loss
            train_loss += loss.item()
            data_size += label.size(0)

            # update weights
            optimizer.step()

            if batch_idx % each_batch_idx == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data),
                    len(train_loader.sampler.indices),
                    100. * batch_idx / len(train_loader.sampler.indices),
                    loss.item()))
        if valid_loader:
            p, r, f = self.evaluate(test_loader=valid_loader)
            print('Precision on the valid dataset {}'.format(p))
            print('Recall on the valid dataset {}'.format(r))
            print('F-score on the valid dataset {}'.format(f))

        print('====> Epoch: {} Average loss: {:.4f}'.
              format(epoch,
                     train_loss / data_size))

    def train(self, epochs: int, train_loader: DataLoader,
              valid_loader: DataLoader = None) -> None:
        """
        Train alexnet for several epochs
        Parameters
        ----------
        epochs : int
            number of epochs
        train_loader:  DataLoader
            training set
        valid_loader : DataLoader, Optional

        Returns
        -------
        None
        """
        self.model.to(self.device)
        self.model.train()
        optimizer = optim.SGD(
            filter(lambda p: p.requires_grad, self.model.parameters()),
            lr=0.001, momentum=0.9)

        criterion = nn.BCEWithLogitsLoss()
        for epoch in range(epochs):
            self.__train_one_epoch(train_loader=train_loader,
                                   optimizer=optimizer,
                                   criterion=criterion,
                                   valid_loader=valid_loader,
                                   epoch=epoch
                                   )

    def evaluate(self, test_loader: DataLoader) -> float:
        """
        Calaculate alexnet accuracy on test data
        Parameters
        ----------
        test_loader: DataLoader

        Returns
        -------
        accuracy: float
        """
        precisions = [0,0,0,0]
        recalls = [0,0,0,0]
        fscores = [0,0,0,0]
        
        with torch.no_grad():
            for batch_idx, sample_batched in enumerate(test_loader):
                data, labels = sample_batched['image'], \
                               sample_batched['label']
                data = data.to(self.device)
                data = data.float()
                labels = labels.to(self.device)
                outputs = self.model(data)
                _, predicted = torch.max(outputs.data, 1)
                
                outputs = torch.sigmoid(outputs)
                
                for i in range(4):
                    p, r, f, _ = prfs(labels[:,i].cpu().numpy(),outputs[:,i].cpu().numpy()>0.5, average='binary')
                    precisions[i] += p
                    recalls[i] += r
                    fscores[i] += f
                
        return precisions, recalls, fscores

    def predict(self, test_loader):
        """
        Run the inference pipeline on the test_loader data
        Parameters
        ----------
        test_loader: DataLoader
            test data

        Returns
        -------

        """
        self.model.eval()
        self.model.to(self.device)
        predict_results = np.empty(shape=(0, 4))
        with torch.no_grad():
            for batch_idx, sample_batched in enumerate(test_loader):
                data, _ = sample_batched['image'], \
                          sample_batched['label']
                data = data.to(self.device)
                data = data.float()
                outputs = self.model(data)
                outputs = softmax(outputs)
                predict_results = np.concatenate(
                    (predict_results, outputs.cpu().numpy()))
        return predict_results


## Make custom dataset for the images

In [4]:
warnings.filterwarnings("ignore")


class GameImageDataset(Dataset):
    """
    Encapsulate Caltech256 torch.utils.data.Dataset

    Parameters
    ----------
    root_dir : str
        Path to the dataset directory.

    transform : Callable,
        A transform function that takes the original image and
        return a transformed version.

    Attributes
    ----------
    data : list
        list of images files names
    labels : list
        list of integers (labels)
    """

    def __init__(self, root_dir: str = "data",
                 transform: Optional[Callable] = None):

        self.root_dir = root_dir
        self.transform = transform
        self.data = []
        self.labels = []
        self._classes = 4

        # load data and labels
        for item in os.listdir(root_dir):
            filepath = os.path.join(root_dir, item)
            if not os.path.isdir(filepath):
                # read and process image
                img = cv2.imread(filepath)
                img = img[:, :, ::-1]
                img = self.img_normalize(img)
                self.data.append(img)
                
                # read labels from filename
                label = item.split('.')[0][-4:]
                label = [int(i) for i in label]
                self.labels.append(label)
                

    def __getitem__(self, idx: int) -> dict:
        """
        Get the idx element

        Parameters
        ----------
        idx : int
           the index of the element


        Returns
        -------
        sample: dict[str, Any]
        """
        
        sample = {'image': self.data[idx], 'label': self.labels[idx]}

        if self.transform:
            sample = self.transform(sample)

        return sample

    def __len__(self):

        return len(self.data)

    @staticmethod
    def img_normalize(img):
        img = (img / 255.0)

        return img


## CEAL Algorithm

In [5]:
def ceal_learning_algorithm(du: DataLoader,
                            dl: DataLoader,
                            dtest: DataLoader,
                            k: int = 5,
                            delta_0: float = 0.005,
                            dr: float = 0.00033,
                            t: int = 1,
                            epochs: int = 10,
                            criteria: str = 'cl',
                            max_iter: int = 10):
    """
    Algorithm1 : Learning algorithm of CEAL.
    For simplicity, I used the same notation in the paper.
    Parameters
    ----------
    du: DataLoader
        Unlabeled samples
    dl : DataLoader
        labeled samples
    dtest : DataLoader
        test data
    k: int, (default = 1000)
        uncertain samples selection
    delta_0: float
        hight confidence samples selection threshold
    dr: float
        threshold decay
    t: int
        fine-tuning interval
    epochs: int
    criteria: str
    max_iter: int
        maximum iteration number.

    Returns
    -------

    """
    logger.info('Initial configuration: len(du): {}, len(dl): {} '.format(
        len(du.sampler.indices),
        len(dl.sampler.indices)))

    # Create the model
    model = AlexNet(n_classes=4, device=device)

    # Initialize the model
    logger.info('Intialize training the model on `dl` and test on `dtest`')

    model.train(epochs=epochs, train_loader=dl, valid_loader=None)

    # Evaluate model on dtest
    p, r, f = model.evaluate(test_loader=dtest)

    print('====> Initial precision: {} '.format(p))
    print('====> Initial recall: {} '.format(r))
    print('====> Initial f-score: {} '.format(f))

    for iteration in range(max_iter):

        logger.info('Iteration: {}: run prediction on unlabeled data '
                    '`du` '.format(iteration))

        pred_prob = model.predict(test_loader=du)

        # get k uncertain samples
        uncert_samp_idx, _ = get_uncertain_samples(pred_prob=pred_prob, k=k,
                                                   criteria=criteria)

        # get original indices
        uncert_samp_idx = [du.sampler.indices[idx] for idx in uncert_samp_idx]

        # add the uncertain samples selected from `du` to the labeled samples
        #  set `dl`
        dl.sampler.indices.extend(uncert_samp_idx)

        logger.info(
            'Update size of `dl`  and `du` by adding uncertain {} samples'
            ' in `dl`'
            ' len(dl): {}, len(du) {}'.
            format(len(uncert_samp_idx), len(dl.sampler.indices),
                   len(du.sampler.indices)))

        # get high confidence samples `dh`
        hcs_idx, hcs_labels = get_high_confidence_samples(pred_prob=pred_prob,
                                                          delta=delta_0)
        # get the original indices
        hcs_idx = [du.sampler.indices[idx] for idx in hcs_idx]

        # remove the samples that already selected as uncertain samples.
        hcs_idx = [x for x in hcs_idx if
                   x not in list(set(uncert_samp_idx) & set(hcs_idx))]

        # add high confidence samples to the labeled set 'dl'

        # (1) update the indices
        dl.sampler.indices.extend(hcs_idx)
        # (2) update the original labels with the pseudo labels.
        for idx in range(len(hcs_idx)):
            dl.dataset.labels[hcs_idx[idx]] = hcs_labels[idx]
        logger.info(
            'Update size of `dl`  and `du` by adding {} hcs samples in `dl`'
            ' len(dl): {}, len(du) {}'.
            format(len(hcs_idx), len(dl.sampler.indices),
                   len(du.sampler.indices)))

        if iteration % t == 0:
            logger.info('Iteration: {} fine-tune the model on dh U dl'.
                        format(iteration))
            model.train(epochs=epochs, train_loader=dl)

            # update delta_0
            delta_0 = update_threshold(delta=delta_0, dr=dr, t=iteration)

        # remove the uncertain samples from the original `du`
        logger.info('remove {} uncertain samples from du'.
                    format(len(uncert_samp_idx)))
        for val in uncert_samp_idx:
            du.sampler.indices.remove(val)

        p, r, f = model.evaluate(test_loader=dtest)
        print(
            "Iteration: {}, len(dl): {}, len(du): {},"
            " len(dh) {}, p: {} r: {} f: {} ".format(
                iteration, len(dl.sampler.indices),
                len(du.sampler.indices), len(hcs_idx), p, r, f))

In [6]:
dataset_train = GameImageDataset(
    root_dir="data/train",
    transform=transforms.Compose(
        [SquarifyImage(),
         RandomCrop(224),
         Normalize(),
         ToTensor()]))

dataset_test = GameImageDataset(
    root_dir="data/test",
    transform=transforms.Compose(
        [SquarifyImage(),
         RandomCrop(224),
         Normalize(),
         ToTensor()]))

# Creating data indices for training and validation splits:
random_seed = 123
validation_split = 0.1  # 10%
shuffling_dataset = True
batch_size = 16
dataset_size = len(dataset_train)

indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))

if shuffling_dataset:
    np.random.seed(random_seed)
    np.random.shuffle(indices)
train_indices, val_indices = indices[split:], indices[:split]

# Creating PT data samplers and loaders:
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(val_indices)

du = torch.utils.data.DataLoader(dataset_train, batch_size=batch_size,
                                 sampler=train_sampler, num_workers=4)
dl = torch.utils.data.DataLoader(dataset_train, batch_size=batch_size,
                                 sampler=valid_sampler, num_workers=4)
dtest = torch.utils.data.DataLoader(dataset_test, batch_size=batch_size,
                                    num_workers=4)

ceal_learning_algorithm(du=du, dl=dl, dtest=dtest)

FileNotFoundError: [Errno 2] No such file or directory: 'data/train'

# Extra processing
---
The cells are converted to markdown cells so that they are not run when you run all cells. Change them to code cells to run if required

Exporting data zip file

import zipfile
with zipfile.ZipFile('annotated-20210501T142205Z-001.zip', 'r') as zip_ref:
    zip_ref.extractall()

Move files to train and test directory

import os
dataitems = os.listdir('data')
finalitems = []
for item in dataitems:
    if not os.path.isdir('data/'+item):
        finalitems.append(item)

for idx, item in enumerate(finalitems):
    # this will assign 20% images(every 5th image) to the test directory
    if idx % 5 == 0: 
        os.rename('data/' + item, 'data/test/'+item)
    else:
        os.rename('data/' + item, 'data/train/'+item)