# Introduction

This notebook illustrates how to calculate the *minimal sample distance* used in *On Interaction Between Augmentations and Corruptions in Natural Corruption Robustness*.  As an example calculation it will produce a plot similar to the left plot in Figure 2b.

The notebook requires the CIFAR-10 dataset and GPU-enabled pytorch, as well as the packages listed in `notebook_requirements.txt`.  Over the course of the notebook, it will train one ResNet-40-2s for 100 epochs, which takes around an hour on a single V100 GPU.  The CIFAR-10 dataset should be in the default format used by torchvision.  Please set ```CIFAR_path``` to the proper directory.  If you would like torchvision to automatically download the dataset to this location, set ```download=True```.

In [None]:
CIFAR_path = "path/to/dataset"
download = False

If you would like to control the GPU device the notebook runs on, set it below.  Otherwise, it will be chosen automatically.  You may also set the number of workers used by the pytorch dataloader for training.

In [None]:
gpu_device = None
dataloader_num_workers = 4

# Setup

This section contains basic set-up code for the corruptions and dataset.  

## Inclusions

In [None]:
import torch
import torchvision as tv
import numpy as np
import matplotlib.pyplot as plt
from notebook_utils.wideresnet import WideResNet
from notebook_utils.training_loop import train_model

# For sample CIFAR-10-C corruptions
from wand.api import library as wandlibrary
from wand.image import Image as WandImage
from io import BytesIO
import skimage as sk
import ctypes
from PIL import Image
import cv2

## Corruption Transforms

*Impulse noise* and *motion blur* will be used as example corruption transforms.  Here we reproduce the motion blur and impulse noise corruption functions for CIFAR-10-C used in *Benchmarking Neural Network Robustness to Common Corruptions and Perturbations*.  We separate out the random parameter generation from the transform function so that the same transform may be sampled and applied to multiple images.

In [None]:
def impulse_noise(x, seed, severity=1):
    im_size = 32
    c = [.01, .02, .03, .05, .07][int(severity) - 1]

    x = sk.util.random_noise(np.array(x) / 255., mode='s&p', amount=c, seed=seed)
    return (np.clip(x, 0, 1) * 255).astype(np.float32)

def sample_impulse_noise():
    return {'seed': np.random.randint(low=0, high=2**32)}

In [None]:
wandlibrary.MagickMotionBlurImage.argtypes = (ctypes.c_void_p,  # wand
                                              ctypes.c_double,  # radius
                                              ctypes.c_double,  # sigma
                                              ctypes.c_double)  # angle
class MotionImage(WandImage):
    def motion_blur(self, radius=0.0, sigma=0.0, angle=0.0):
        wandlibrary.MagickMotionBlurImage(self.wand, radius, sigma, angle)

def motion_blur(x, angle, severity=1):
    im_size = 32
    c = [(6,1), (6,1.5), (6,2), (8,2), (9,2.5)][int(severity) - 1]

    output = BytesIO()
    x.save(output, format='PNG')
    x = MotionImage(blob=output.getvalue())

    x.motion_blur(radius=c[0], sigma=c[1], angle=angle)

    x = cv2.imdecode(np.fromstring(x.make_blob(), np.uint8),
                     cv2.IMREAD_UNCHANGED)

    if x.shape != (im_size, im_size):
        return np.clip(x[..., [2, 1, 0]], 0, 255).astype(np.float32)  # BGR to RGB
    else:  # greyscale to RGB
        return np.clip(np.array([x, x, x]).transpose((1, 2, 0)), 0, 255).astype(np.float32)
    
def sample_motion_blur():
    return {'angle' : np.random.uniform(low=-45, high=45)}

## Datasets

In [None]:
CIFAR_MEAN = [125.3/255, 123.0/255, 113.9/255]
CIFAR_STD = [63.0/255, 62.1/255, 66.7/255]

In [None]:
train_transform = tv.transforms.Compose([
    tv.transforms.RandomHorizontalFlip(),
    tv.transforms.RandomCrop(32, padding=4),
    tv.transforms.ToTensor(),
    tv.transforms.Normalize(CIFAR_MEAN, CIFAR_STD)
    ])
cifar10_default = tv.datasets.CIFAR10(
    root=CIFAR_path,
    train=True,
    transform=train_transform,
    download=download
    )

postprocess = tv.transforms.Compose([
    tv.transforms.ToTensor(),
    tv.transforms.Normalize(CIFAR_MEAN, CIFAR_STD)
    ])
cifar10_no_aug = tv.datasets.CIFAR10(
    root=CIFAR_path,
    train=True,
    transform=None,
    download=download
    )

# Defining Minimal Sample Distance

For given augmentation and corruption distibutions $p_a$ and $p_c$, the minimal sample distance is defined as $$d_{\mathrm{MSD}}(p_a, p_c) = \min_{a \in \mathbb{A} \sim p_a} || f(a)  - \mathbb{E}_{c \sim p_c} [ f(c) ] || \, ,$$ where $f(t)$ encodes an image transform $t$ into a feature space.  First we train this encoder, then we will use it to calculate $d_{\mathrm{MSD}}$.

## Transform Encoder $f(t)$

To obtain an encoding $f(t)$ of a transform $t$, we start with an encoding $\hat{f}(x)$ of an image $x$.  The transform encoder is then calculated by applying a transform $t$ to a fixed set of datapoints $\mathbb{D}_S$ in the dataset of interest $$f(t) = \mathbb{E}_{x \in \mathbb{D}_S}  [ \hat{f} (t(x))) ] \, .$$

The encoder for images, $\hat{f}(x)$, is obtained by training a ResNet-40-2 on CIFAR-10 and taking the last hidden layer as the feature space.  Note that the model is trained using the default CIFAR-10 augmentation of random crop and flip.

In [None]:
image_encoder = WideResNet(depth=40, num_classes=10, widen_factor=2).cuda(device=gpu_device)

In [None]:
train_model(image_encoder, cifar10_default, num_workers=dataloader_num_workers, gpu_device=gpu_device)

An example of feature extraction:

In [None]:
example_image = torch.stack([postprocess(cifar10_no_aug[0][0]).cuda(device=gpu_device)], axis=0)
with torch.no_grad():
    _ = image_encoder(example_image)
    example_features = image_encoder.features.detach().cpu().numpy()
print(example_features.shape)

To encode a transform $t$, we sample a fixed set of images $\mathbb{D}_S$ from the CIFAR-10 training set, without data augmentation.  In experiments we use $|\mathbb{D}_S|=100$.

In [None]:
image_indices = np.random.choice(np.arange(len(cifar10_no_aug)), size=100, replace=False)
def transform_encoder(transform):
    images = [postprocess(transform(cifar10_no_aug[i][0])) for i in image_indices] # Transform each image
    images = torch.stack(images, dim=0).cuda(device=gpu_device) # Prepare a batch to evaluate
    with torch.no_grad():
        _ = image_encoder(images)
        image_features = image_encoder.features # Calculate image features
        transform_features = torch.mean(image_features, dim=0) # Average over images to get transform features
    return transform_features.detach().cpu().numpy()

An example of an encoding for a single transform $t$:

In [None]:
transform_params = sample_impulse_noise()
transform = lambda x: impulse_noise(x, **transform_params, severity=3)
transform_features = transform_encoder(transform)
print(transform_features.shape)

## Minimal Sample Distance

As a simple example of how to compute the minimal sample distance, take the target corruption to be *impulse noise* and the augmentation to be a mixture of *impulse noise* and *motion blur*.  This will produce a plot similar to the left plot of Figure 2b.  First define this augmentation function: 

In [None]:
def augment(x, severity, aug_type, **kwargs):
    if aug_type=='motion_blur':
        return motion_blur(x, severity=severity, **kwargs)
    elif aug_type=='impulse_noise':
        return impulse_noise(x, severity=severity, **kwargs)
    else:
        raise ValueError
        
def sample_augment(prob_motion_blur):
    if np.random.uniform() < prob_motion_blur:
        augment_params = {'aug_type': 'motion_blur'}
        augment_params.update(sample_motion_blur())
    else:
        augment_params = {'aug_type': 'impulse_noise'}
        augment_params.update(sample_impulse_noise())
    return augment_params

While we take $|\mathbb{A}|=100\mathrm{k}$ in the paper, here we will use $|\mathbb{A}|=1\mathrm{k}$ for efficiency.  To be concrete, consider an augmentation distribution with a mixing probability of 0.5.  Then calculate the feature space for 1k randomly sampled augmentations. 

In [None]:
def calc_aug_features(motion_blur_probability):
    aug_features = []
    for i in range(1000):
        transform_params = sample_augment(motion_blur_probability)
        transform = lambda x: augment(x, severity=3, **transform_params) 
        feature = transform_encoder(transform)
        aug_features.append(feature)
        if (i+1) % 100 == 0:
            print("Completed transform {}.".format(i+1))
    return np.stack(aug_features, axis=0)

In [None]:
aug_features = calc_aug_features(0.5)

We sample 100 corruptions to approximate averaging over $p_c$ and calculate $\mathbb{E}_{c \sim p_c} [ f(c) ]$.

In [None]:
def calc_corr_avg():
    corr_features = []
    for i in range(100):
        transform_params = sample_impulse_noise()
        transform = lambda x: impulse_noise(x, severity=3, **transform_params) 
        feature = transform_encoder(transform)
        corr_features.append(feature)
    return np.mean(np.stack(corr_features, axis=0), axis=0)

In [None]:
corr_avg = calc_corr_avg()

To obtain $d_{\mathrm{MSD}}(p_a, p_c) = \min_{a \in \mathbb{A} \sim p_a} || f(a)  - \mathbb{E}_{c \sim p_c} [ f(c) ] || $, we only need to calculate the L-2 distance to each augmentation feature and take the minimum.

In [None]:
dists = np.linalg.norm(aug_features-corr_avg.reshape(1,-1), axis=1)
msd = np.min(dists, axis=0)
print("MSD between impulse noise corruption and 50/50 impulse-noise/motion-blur augmentation: {}".format(msd))

To generate a plot like the left part of Figure 2b, we need to repeat this for multiple mixing fractions.  For comparison, we may also calculate the distance between distribution centers, which here we call MMD.  

In [None]:
def dists_for_mixture(motion_blur_prob):
    print("Starting calculation for mixing fraction {}.".format(motion_blur_prob))
    aug_features = calc_aug_features(motion_blur_prob)
    corr_avg = calc_corr_avg()
    msd_dists = np.linalg.norm(aug_features-corr_avg.reshape(1,-1), axis=1)
    msd = np.min(msd_dists, axis=0)
    aug_avg = np.mean(aug_features, axis=0)
    mmd = np.linalg.norm(aug_avg - corr_avg)
    return (msd, mmd)

In [None]:
msd_dists = {i : dists_for_mixture(i) for i in [0.0, 0.5, 0.9, 0.99, 1.0]}

Plotting MSD and MMD yields the result from Fire 2b.  

In [None]:
plt.figure(figsize=(7,7))
plt.plot(list(msd_dists.keys()), [i[1] for i in msd_dists.values()], label="MMD")
plt.plot(list(msd_dists.keys()), [i[0] for i in msd_dists.values()], label="MSD")
plt.legend()
plt.xlabel("Motion Blur Fraction in Augmentation")
plt.ylabel("Distance to Impulse Noise")
plt.show()

# License

Copyright (c) Facebook, Inc. and its affiliates.

This source code is licensed under the MIT license found in the
LICENSE file in the root directory of this source tree.

The `impulse noise` and `motion_blur` functions are adapted from code licensed under the license at `third_party/imagenetc_license`, accessible from the root directory of this source tree. The original source is available at `https://github.com/hendrycks/robustness`.