Cutout Data Augmentation.

This code is implmented by following the official code (https://github.com/uoguelph-mlrg/Cutout)


##**Import all neceassary packages**

In [None]:
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torch.backends.cudnn as cudnn
from torch.optim.lr_scheduler import MultiStepLR

from torchvision import datasets, transforms

from tqdm.notebook import tqdm as tqdm

##**Model - Define ResNet Model**


In [None]:
'''ResNet18/34/50/101/152 in Pytorch.'''

def conv3x3(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3(in_planes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = conv3x3(3,64)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


def ResNet18(num_classes=10):
    return ResNet(BasicBlock, [2,2,2,2], num_classes)

def ResNet34(num_classes=10):
    return ResNet(BasicBlock, [3,4,6,3], num_classes)

def ResNet50(num_classes=10):
    return ResNet(Bottleneck, [3,4,6,3], num_classes)

def ResNet101(num_classes=10):
    return ResNet(Bottleneck, [3,4,23,3], num_classes)

def ResNet152(num_classes=10):
    return ResNet(Bottleneck, [3,8,36,3], num_classes)

def test_resnet():
    net = ResNet50()
    y = net(Variable(torch.randn(1,3,32,32)))
    print(y.size())

# test_resnet()

##**Utils**

In [None]:
class AverageMeter(object):
    r"""Computes and stores the average and current value
    """
    def __init__(self, name, fmt=':f'):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)


class ProgressMeter(object):
    def __init__(self, num_batches, *meters, prefix=""):
        self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
        self.meters = meters
        self.prefix = prefix

    def print(self, batch):
        entries = [self.prefix + self.batch_fmtstr.format(batch)]
        entries += [str(meter) for meter in self.meters]
        print('\t'.join(entries))

    def _get_batch_fmtstr(self, num_batches):
        num_digits = len(str(num_batches // 1))
        fmt = '{:' + str(num_digits) + 'd}'
        return '[' + fmt + '/' + fmt.format(num_batches) + ']'


def accuracy(output, target, topk=(1,)):
    r"""Computes the accuracy over the $k$ top predictions for the specified values of k
    """
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        # _, pred = output.topk(maxk, 1, True, True)
        # pred = pred.t()
        # correct = pred.eq(target.view(1, -1).expand_as(pred))

        # faster topk (ref: https://github.com/pytorch/pytorch/issues/22812)
        _, idx = output.sort(descending=True)
        pred = idx[:,:maxk]
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res

##**Cutout: Main Code for Applying Cutout data augmentation**

In [None]:
class Cutout(object):
    """Randomly mask out one or more patches from an image.

    Args:
        n_holes (int): Number of patches to cut out of each image.
        length (int): The length (in pixels) of each square patch.
    """
    def __init__(self, n_holes, length):
        self.n_holes = n_holes
        self.length = length

    def __call__(self, img):
        """
        Args:
            img (Tensor): Tensor image of size (C, H, W).
        Returns:
            Tensor: Image with n_holes of dimension length x length cut out of it.
        """
        h = img.size(1)
        w = img.size(2)

        mask = np.ones((h, w), np.float32)

        for n in range(self.n_holes):
            y = np.random.randint(h)
            x = np.random.randint(w)

            y1 = np.clip(y - self.length // 2, 0, h)
            y2 = np.clip(y + self.length // 2, 0, h)
            x1 = np.clip(x - self.length // 2, 0, w)
            x2 = np.clip(x + self.length // 2, 0, w)

            mask[y1: y2, x1: x2] = 0.

        mask = torch.from_numpy(mask)
        mask = mask.expand_as(img)
        img = img * mask

        return img

##**Random-Shadows-Highlights**

In [None]:
import random
import numpy as np
import cv2
from PIL import Image, ImageChops
import torchvision.transforms.functional as TF

class RandomShadows(object):
    def __init__(self, p=0.5, high_ratio=(1,2), low_ratio=(0.01, 0.5), left_low_ratio=(0.4,0.6), \
    left_high_ratio=(0,0.2), right_low_ratio=(0.4,0.6), right_high_ratio = (0,0.2)):
        self.p = p
        self.high_ratio = high_ratio
        self.low_ratio = low_ratio
        self.left_low_ratio = left_low_ratio
        self.left_high_ratio = left_high_ratio
        self.right_low_ratio = right_low_ratio
        self.right_high_ratio = right_high_ratio

    @staticmethod
    def process(img, high_ratio, low_ratio, left_low_ratio, left_high_ratio, \
            right_low_ratio, right_high_ratio):

        w, h = img.size
        high_bright_factor = random.uniform(high_ratio[0], high_ratio[1])
        low_bright_factor = random.uniform(low_ratio[0], low_ratio[1])

        left_low_factor = random.uniform(left_low_ratio[0]*h, left_low_ratio[1]*h)
        left_high_factor = random.uniform(left_high_ratio[0]*h, left_high_ratio[1]*h)
        right_low_factor = random.uniform(right_low_ratio[0]*h, right_low_ratio[1]*h)
        right_high_factor = random.uniform(right_high_ratio[0]*h, right_high_ratio[1]*h)

        tl = (0, left_high_factor)
        bl = (0, left_high_factor+left_low_factor)

        tr = (w, right_high_factor)
        br = (w, right_high_factor+right_low_factor)

        contour = np.array([tl, tr, br, bl], dtype=np.int32)

        mask = np.zeros([h, w, 3],np.uint8)
        cv2.fillPoly(mask,[contour],(random.randint(0,255),random.randint(0,255),random.randint(0,255)))
        inverted_mask = cv2.bitwise_not(mask)
        # we need to convert this cv2 masks to PIL images
        # img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        # we skip the above convertion because our mask is just black and white
        mask_pil = Image.fromarray(mask)
        inverted_mask_pil = Image.fromarray(inverted_mask)

        low_brightness = TF.adjust_brightness(img, low_bright_factor)
        low_brightness_masked = ImageChops.multiply(low_brightness, mask_pil)
        high_brightness = TF.adjust_brightness(img, high_bright_factor)
        high_brightness_masked = ImageChops.multiply(high_brightness, inverted_mask_pil)

        return ImageChops.add(low_brightness_masked, high_brightness_masked)

    def __call__(self, img):
        if random.uniform(0, 1) < self.p:
            img = self.process(img, self.high_ratio, self.low_ratio, \
            self.left_low_ratio, self.left_high_ratio, self.right_low_ratio, \
            self.right_high_ratio)
            return img
        else:
            return img


** random shadow 논문에 포함된 추가적인 함수 **




In [None]:
import operator
import numpy as np
import cv2
import random
from PIL import Image

class DiskAugmenter(object):
    def __init__(self, local_mask=(120, 160), global_mask=(40, 80),
                 flip_and_noise=False, augmenting_prob=0.67):

        self.augmenting_prob = augmenting_prob
        self.local_mask = local_mask
        self.global_mask = global_mask
        self.flip_and_noise = flip_and_noise
        self.augment_illumination = any(x > 0 for x in list(local_mask) + list(global_mask))

    def __call__(self, img):
        if random.uniform(0, 1) < self.augmenting_prob:
            img = illumination_augmenter(img, self.global_mask, self.local_mask)
            return img
        else:
            return img

In [None]:
import random
import torchvision.transforms.functional as TF

class RandomGamma(object):
    def __init__(self, gamma_p = 0.5, gamma_ratio=(0,1.5)):
        self.gamma_p = gamma_p
        self.gamma_ratio = gamma_ratio

    def __call__(self,img):
        if random.uniform(0, 1) < self.gamma_p:
            gamma = random.uniform(self.gamma_ratio[0], self.gamma_ratio[1])
            img = TF.adjust_gamma(img, gamma, gain=1)
            return img
        else:
            return img

class RandomColorJitter(object):
    def __init__(self, p = 0.5, brightness_ratio=(0,2), contrast_ratio=(0,2), \
                saturation_ratio=(0,2), hue_ratio=(-0.5,0.5)):
        self.p = p
        self.brightness_ratio = brightness_ratio
        self.contrast_ratio = contrast_ratio
        self.saturation_ratio = saturation_ratio
        self.hue_ratio = hue_ratio

    @staticmethod
    def process(img, brightness_ratio, contrast_ratio, saturation_ratio, hue_ratio):
        brightness = random.uniform(brightness_ratio[0], brightness_ratio[1])
        contrast = random.uniform(contrast_ratio[0], contrast_ratio[1])
        saturation = random.uniform(saturation_ratio[0], saturation_ratio[1])
        hue = random.uniform(hue_ratio[0], hue_ratio[1])

        img = TF.adjust_brightness(img, brightness)
        img = TF.adjust_contrast(img, contrast)
        img = TF.adjust_saturation(img, saturation)
        img = TF.adjust_hue(img, hue)

        return img

    def __call__(self,img):
        if random.uniform(0, 1) < self.p:
            img = self.process(img, self.brightness_ratio, self.contrast_ratio, \
                                self.saturation_ratio, self.hue_ratio)
            return img
        else:
            return img

##**Parameter Settings**

In [None]:
dataset = 'cifar100' # cifar10 or cifar100
model = 'resnet34' # resnet18, resnet50, resnet101
batch_size = 128  # Input batch size for training (default: 128)
epochs = 150 # Number of epochs to train (default: 200)
learning_rate = 0.1 # Learning rate
data_augmentation = True # Traditional data augmentation such as augmantation by flipping and cropping?
cutout = True # Apply Cutout?
n_holes = 1 # Number of holes to cut out from image
length = 16 # Length of the holes
seed = 0 # Random seed (default: 0)
print_freq = 30
cuda = torch.cuda.is_available()
cudnn.benchmark = True  # Should make training should go faster for large models

# What we need for our data augmentation
randomshadows = True

torch.manual_seed(seed)
if cuda:
    torch.cuda.manual_seed(seed)

test_id = dataset + '_' + model

##**Load and preprocess data**

In [None]:
# Image Preprocessing
normalize = transforms.Normalize(mean=[x / 255.0 for x in [125.3, 123.0, 113.9]],
                                     std=[x / 255.0 for x in [63.0, 62.1, 66.7]])

train_transform = transforms.Compose([])
if data_augmentation:
    train_transform.transforms.append(transforms.RandomCrop(32, padding=4))
    train_transform.transforms.append(transforms.RandomHorizontalFlip())
if randomshadows:
    p = np.round(np.arange(0, 1.1, 0.1), 2)
    for i_p in p:
        print('RSH p value: ', i_p)
        data_transforms = {
            'train': transforms.Compose([
                # For CIFAR-10 and CIFAR100, either change the model or resize images to 64x64 (uncomment the transform below)
                # transforms.Resize(64),
                DiskAugmenter(local_mask=(120, 160), global_mask=(40, 80), augmenting_prob=0),
                RandomShadows(p=i_p, high_ratio=(1,2), low_ratio=(0,1), \
                left_low_ratio=(0.4,0.8), left_high_ratio=(0,0.3), right_low_ratio=(0.4,0.8),
                right_high_ratio = (0,0.3)), ## high means from top of image, low means from top to bottom low
                #RandomGamma(gamma_p = 0, gamma_ratio=(0, 1.5)),
                #RandomColorJitter(p = 0, brightness_ratio=(0,2), contrast_ratio=(0,2), \
                #           saturation_ratio=(0,2), hue_ratio=(-0.5,0.5)),
                
                #transforms.ToTensor(),
                #transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
                # transforms.RandomErasing(p=i_p)
            ]),
            'val': transforms.Compose([
                # For CIFAR-10 and CIFAR100, either change the model or resize images to 64x64 (uncomment the transform below)
                # transforms.Resize(64),
                RandomShadows(p=1, high_ratio=(1,2), low_ratio=(0,1), \
                left_low_ratio=(0.4,0.8), left_high_ratio=(0,0.3), right_low_ratio=(0.4,0.8),
                right_high_ratio = (0,0.3)), ## high means from top of image, low means from top to bottom low
                #transforms.ToTensor(),
                #transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
            ]),
            'test': transforms.Compose([
                #transforms.ToTensor(),
            ])
        }

    
train_transform.transforms.append(transforms.ToTensor())
train_transform.transforms.append(normalize)
if cutout:
    train_transform.transforms.append(Cutout(n_holes=n_holes, length=length))


test_transform = transforms.Compose([
    transforms.ToTensor(),
    normalize])

if dataset == 'cifar10':
    num_classes = 10
    train_dataset = datasets.CIFAR10(root='data/',
                                     train=True,
                                     transform=train_transform,
                                     download=True)

    test_dataset = datasets.CIFAR10(root='data/',
                                    train=False,
                                    transform=test_transform,
                                    download=True)
elif dataset == 'cifar100':
    num_classes = 100
    train_dataset = datasets.CIFAR100(root='data/',
                                      train=True,
                                      transform=train_transform,
                                      download=True)

    test_dataset = datasets.CIFAR100(root='data/',
                                     train=False,
                                     transform=test_transform,
                                     download=True)


# Data Loader (Input Pipeline)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True,
                                           pin_memory=True,
                                           num_workers=2)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False,
                                          pin_memory=True,
                                          num_workers=2)

RSH p value:  0.0
RSH p value:  0.1
RSH p value:  0.2
RSH p value:  0.3
RSH p value:  0.4
RSH p value:  0.5
RSH p value:  0.6
RSH p value:  0.7
RSH p value:  0.8
RSH p value:  0.9
RSH p value:  1.0
Downloading https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz to data/cifar-100-python.tar.gz


HBox(children=(FloatProgress(value=0.0, max=169001437.0), HTML(value='')))


Extracting data/cifar-100-python.tar.gz to data/
Files already downloaded and verified


##**Main Training**

In [None]:
def train(train_loader, epoch, model, optimizer, criterion):
    batch_time = AverageMeter('Time', ':6.3f')
    losses = AverageMeter('Loss', ':.4e')
    top1 = AverageMeter('Acc@1', ':6.2f')
    top5 = AverageMeter('Acc@5', ':6.2f')
    progress = ProgressMeter(len(train_loader), batch_time, losses,
                             top1, top5, prefix="Epoch: [{}]".format(epoch))
    # switch to train mode
    model.train()

    end = time.time()
    for i, (input, target) in enumerate(train_loader):
        # measure data loading time
        input = input.cuda()
        target = target.cuda()

        # compute output
        output = model(input)
        loss = criterion(output, target)

        # measure accuracy and record loss, accuracy 
        acc1, acc5 = accuracy(output, target, topk=(1, 5))
        losses.update(loss.item(), input.size(0))
        top1.update(acc1[0].item(), input.size(0))
        top5.update(acc5[0].item(), input.size(0))

        # compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()

        if i % print_freq == 0:
            progress.print(i)

    print('==> Train Accuracy: Acc@1 {top1.avg:.3f} || Acc@5 {top5.avg:.3f}'.format(top1=top1, top5=top5))
    return top1.avg

def test(test_loader,epoch, model):
    top1 = AverageMeter('Acc@1', ':6.2f')
    top5 = AverageMeter('Acc@5', ':6.2f')
    model.eval()
    for i,(input,target) in enumerate(test_loader):
        input = input.cuda()
        target = target.cuda()

        output = model(input)
        acc1, acc5 = accuracy(output, target, topk=(1, 5))
        top1.update(acc1[0].item(), input.size(0))
        top5.update(acc5[0].item(), input.size(0))
    print('==> Test Accuracy:  Acc@1 {top1.avg:.3f} || Acc@5 {top5.avg:.3f}'.format(top1=top1, top5=top5))
    return top1.avg

model = ResNet34(num_classes=num_classes).cuda()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate,momentum=0.9, nesterov=True, weight_decay=5e-4)

scheduler = MultiStepLR(optimizer, milestones=[60, 90, 120], gamma=0.2)

criterion = torch.nn.CrossEntropyLoss().cuda()
###########################################################
best_acc = 0
for epoch in range(epochs):
    print("\n----- epoch: {}, lr: {} -----".format(
        epoch, optimizer.param_groups[0]["lr"]))

    # train for one epoch
    start_time = time.time()
    train(train_loader, epoch, model, optimizer, criterion)
    test_acc = test(test_loader,epoch,model)

    elapsed_time = time.time() - start_time
    print('==> {:.2f} seconds to train this epoch\n'.format(elapsed_time))
    # learning rate scheduling
    scheduler.step()
    
    # Save model for best accuracy
    if best_acc < test_acc:
        best_acc = test_acc
        torch.save(model.state_dict(), 'model_best.pt')

torch.save(model.state_dict(),'model_latest.pt')
print(f"Best Top-1 Accuracy: {best_acc}")


----- epoch: 0, lr: 0.1 -----
Epoch: [0][  0/391]	Time  1.281 ( 1.281)	Loss 4.7337e+00 (4.7337e+00)	Acc@1   0.00 (  0.00)	Acc@5   5.47 (  5.47)
Epoch: [0][ 30/391]	Time  0.162 ( 0.193)	Loss 4.6855e+00 (5.2570e+00)	Acc@1   1.56 (  1.18)	Acc@5   8.59 (  6.00)
Epoch: [0][ 60/391]	Time  0.162 ( 0.178)	Loss 4.5426e+00 (4.9527e+00)	Acc@1   2.34 (  1.52)	Acc@5   5.47 (  6.86)
Epoch: [0][ 90/391]	Time  0.164 ( 0.173)	Loss 4.4225e+00 (4.7956e+00)	Acc@1   2.34 (  1.85)	Acc@5  10.16 (  8.16)
Epoch: [0][120/391]	Time  0.164 ( 0.170)	Loss 4.2529e+00 (4.6857e+00)	Acc@1   5.47 (  2.19)	Acc@5  13.28 (  9.63)
Epoch: [0][150/391]	Time  0.164 ( 0.169)	Loss 4.3695e+00 (4.6075e+00)	Acc@1   4.69 (  2.47)	Acc@5   9.38 ( 10.90)
Epoch: [0][180/391]	Time  0.166 ( 0.169)	Loss 4.2494e+00 (4.5465e+00)	Acc@1   4.69 (  2.90)	Acc@5  14.06 ( 12.08)
Epoch: [0][210/391]	Time  0.166 ( 0.168)	Loss 4.1728e+00 (4.4956e+00)	Acc@1   4.69 (  3.24)	Acc@5  17.97 ( 13.13)
Epoch: [0][240/391]	Time  0.170 ( 0.168)	Loss 4.2309e+00 