# TF.image Augmentations Playground

This notebook allows you to experiment with different image augmentations.  It also shows how to use TF.py_function and a way to create a custom percentage wrapper.  It is NOT a step by step guide, but more of a playground.

The dataset illustrated is cats/dogs.  In real usage you would copy this notebook into your subdirectory and then change global parms as needed.  You will see a copy of this notebook in some of my folders.

To add/remove the augmentations, uncomment/comment the calls or add methods of your own.

The augmentation methods are in TF.image and other ones I have used in the past.  (A good list of what you can do to an image)

Here is the TF.image information

https://www.tensorflow.org/api_docs/python/tf/image

These are also good research links:

https://docs.scipy.org/doc/scipy/reference/generated/scipy.ndimage.shift.html

https://docs.scipy.org/doc/scipy-0.16.0/reference/generated/scipy.ndimage.interpolation.rotate.html

https://scikit-image.org/docs/dev/api/skimage.filters.html#skimage.filters.gaussian

https://www.tensorflow.org/api_docs/python/tf/py_function


Other good reads about brightness, contrast and gamma:

https://www.orpalis.com/blog/color-adjustments-brightness-contrast-and-gamma-2/

https://www.cambridgeincolour.com/tutorials/gamma-correction.htm


### Processing for using Google Drive and normal includes

The notebook uses TensorFlow 2.x.  (Eager execution is enabled by default and we use the newer versions of tf.Data.)

I use Notebooks with Colab and on my local workstation, so I need to separate some logic to make it easier to run in both locations.

I was going to delete and just make Colab version, but that is not "real world."  You usually have multiple environments and I'm showing you how I accommodate different environments, you might need something different...



In [0]:
#"""
# Google Collab specific stuff....
from google.colab import drive
drive.mount('/content/drive')

import os
!ls "/content/drive/My Drive"

USING_COLLAB = True
# Force to use 2.x version of Tensorflow
%tensorflow_version 2.x
#"""

In [0]:
# Setup sys.path to find MachineLearning lib directory

# Check if "USING_COLLAB" is defined, if yes, then we are using Colab, otherwise set to False
try: USING_COLLAB
except NameError: USING_COLLAB = False

%load_ext autoreload
%autoreload 2

# set path env var
import sys
if "MachineLearning" in sys.path[0]:
    pass
else:
    print(sys.path)
    if USING_COLLAB:
        sys.path.insert(0, '/content/drive/My Drive/GitHub/MachineLearning/lib')  ###### CHANGE FOR SPECIFIC ENVIRONMENT
    else:
        sys.path.insert(0, '/Users/john/Documents/GitHub/MachineLearning/lib')  ###### CHANGE FOR SPECIFIC ENVIRONMENT
    
    print(sys.path)

In [0]:
# Normal includes...

from __future__ import absolute_import, division, print_function, unicode_literals

import os, sys, random, warnings, time, copy, csv
import numpy as np 

import IPython.display as display
from PIL import Image

import matplotlib.pyplot as plt
%matplotlib inline

import tensorflow as tf
print(tf.__version__)

# This allows the runtime to decide how best to optimize CPU/GPU usage
AUTOTUNE = tf.data.experimental.AUTOTUNE

from TrainingUtils import *

#warnings.filterwarnings("ignore", category=DeprecationWarning)
#warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", "(Possibly )?corrupt EXIF data", UserWarning)

## General Setup

- Create a dictionary wrapped by a class for global values.  This is how I manage global vars in my notebooks.
- Load a couple of images that will be used to create a very simple dataset



In [0]:
# Set root directory path to data
if USING_COLLAB:
    ROOT_PATH = "/content/drive/My Drive/GitHub/MachineLearning/9-LibTest/Data"  ###### CHANGE FOR SPECIFIC ENVIRONMENT
else:
    ROOT_PATH = "/Users/john/Documents/GitHub/MachineLearning/9-LibTest/Data"  ###### CHANGE FOR SPECIFIC ENVIRONMENT
        
# Establish global dictionary
parms = GlobalParms(ROOT_PATH=ROOT_PATH,
                    TRAIN_DIR="CatDogLabeledVerySmall", 
                    NUM_CLASSES=2,
                    IMAGE_ROWS=256,
                    IMAGE_COLS=256,
                    IMAGE_CHANNELS=3,
                    BATCH_SIZE=4,
                    IMAGE_EXT=".jpg")

parms.print_contents()

In [0]:
# Create path list and class list using cat/dog images
# Change for your own dataset 
images_list, sub_directories = load_file_names_labeled_subdir_Util(parms.TRAIN_PATH, 
                                                                   parms.IMAGE_EXT)

# Reduce the number of images from 12 to 2, makes it easier to show augmentation
del images_list[1:7]
del images_list[2:7]

images_list_len = len(images_list)
print("Number of images: ", images_list_len)

# Set the class names.
parms.set_class_names(sub_directories)
print("Classes: {}  Labels: {}  {}".format(parms.NUM_CLASSES, len(parms.CLASS_NAMES), parms.CLASS_NAMES) )


In [0]:
# Using the path, show the images that will be used
for image_path in images_list[:2]:
    print(image_path)
    display.display(Image.open(str(image_path)))

## Build an input pipeline

In [0]:
# Simple helper method to display batches of images with labels....        
def show_batch(image_batch, label_batch, number_to_show=25, r=5, c=5, print_shape=False):
    show_number = min(number_to_show, parms.BATCH_SIZE)

    if show_number < 8: #if small number, then change row, col and figure size
        if parms.IMAGE_COLS > 64 or parms.IMAGE_ROWS > 64:
            plt.figure(figsize=(25,25)) 
        else:
            plt.figure(figsize=(10,10))  
        r = 4
        c = 2 
    else:
        plt.figure(figsize=(10,10))  

    if show_number == 1:
        image_batch = np.expand_dims(image_batch, axis=0)
        label_batch = np.expand_dims(label_batch, axis=0)

    for n in range(show_number):
        if print_shape:
            print("Image shape: {}  Max: {}  Min: {}".format(image_batch[n].shape, np.max(image_batch[n]), np.min(image_batch[n])))
        ax = plt.subplot(r,c,n+1)
        cmap="gray"
        if len(image_batch[n].shape) == 3:
            if image_batch[n].shape[2] == 3:
                cmap="viridis"
        plt.imshow(tf.keras.preprocessing.image.array_to_img(image_batch[n]), cmap=plt.get_cmap(cmap))
        plt.title(parms.CLASS_NAMES[np.argmax(label_batch[n])])
        plt.axis('off')

In [0]:
# Return a label based on the path of the image
def get_label(file_path: tf.Tensor) -> tf.Tensor:
    # convert the path to a list of path components
    parts = tf.strings.split(file_path, os.path.sep)
    # The second to last is the class-directory
    return parts[-2] == parms.CLASS_NAMES

# Decode the image, convert to float, normalize by 255 and resize
def decode_img(image: tf.Tensor) -> tf.Tensor:
    # convert the compressed string to a 3D uint8 tensor
    image = tf.image.decode_jpeg(image, channels=parms.IMAGE_CHANNELS)
    # Use `convert_image_dtype` to convert to floats in the [0,1] range.
    image = tf.image.convert_image_dtype(image, parms.IMAGE_DTYPE)
    # resize the image to the desired size.
    return tf.image.resize(image, [parms.IMAGE_ROWS, parms.IMAGE_COLS])

# method mapped to load, resize and aply any augmentations
def process_path(file_path: tf.Tensor) -> tf.Tensor:
    label = get_label(file_path)
    # load the raw data from the file as a string
    image = tf.io.read_file(file_path)
    image = decode_img(image)

    # add any augmentations
    image = image_aug(image)

    return image, label

In [0]:

# Various image rescaling methods
# These are optional, I included them because sometimes you need to rescale an image

def image_rescale_0_255(image: tf.Tensor) -> tf.Tensor:
    # takes ANY scale and converts to 0..255
    image = tf.constant(255, dtype=tf.float32)*((image - tf.math.reduce_min(image))/(tf.math.reduce_max(image) - tf.math.reduce_min(image)))
    return image

def image_rescale_0_1(image: tf.Tensor) -> tf.Tensor:
    # takes ANY scale and converts to 0..1
    image = (image - tf.math.reduce_min(image))/(tf.math.reduce_max(image) - tf.math.reduce_min(image))
    return image

def image_rescale_1_neg_1(image: tf.Tensor) -> tf.Tensor:
    # takes Any scale and converts to 1..-1
    image = (tf.constant(2., dtype=tf.float32)*(image - tf.math.reduce_min(image))/(tf.math.reduce_max(image) - tf.math.reduce_min(image)))-1
    return image

def image_rescale_0_1_alt(image: tf.Tensor) -> tf.Tensor:
    # takes 0..255 and converts to 0..1
    image = tf.cast(image, tf.float32) / 255.
    return image

def image_rescale_1_neg_1_alt(image: tf.Tensor) -> tf.Tensor:
    # takes 0..1 and converts to 1..-1
    image = tf.subtract(image, 0.5)
    image = tf.multiply(image, 2.0)
    return image


In [0]:
# Simple test for the rescaling methods
t = tf.Variable([0, .5, 1])
print("0 to 255: ", image_rescale_0_255(t))
                           
t = tf.Variable([0, 127.5, 255])
print("0 to 1: ", image_rescale_0_1(t))
                                                                                  
t = tf.Variable([0, 127.5, 255])
print("1 to -1: ", image_rescale_1_neg_1(t))

t = tf.Variable([0, 127.5, 255])
print("0 to 1: ", image_rescale_0_1_alt(t))

t = tf.Variable([0, .5, 1])
print("1 to -1: ", image_rescale_1_neg_1_alt(t))


In [0]:
"""
These methods are from TF.image.  I mainly have the the random ones, but there 
are corresponding non-random ones that will always apply augmentation.  

"""

import random
import scipy.ndimage
from skimage.filters import gaussian

def image_blur(image):
    """
    Takes an image and applies Gaussian Blur using skimage filters.
    Applies random +/- sigma_max to the image
    """
    sigma_max = 3.0
    sigma = tf.random.uniform(0., sigma_max)  # change range or remove if want a fixed sigma value
    image = tf.image.convert_image_dtype(image, dtype=tf.int32)
    image = gaussian(image, sigma=sigma, multichannel=True)
    image = tf.image.convert_image_dtype(image, dtype=tf.float32)
    return image

def image_random_shift(image,
                       shift_x=60, # number of pixels, should be > 0 and less than width
                       shift_y=60 # number of pixels, should be > 0 and less than height
                       ):
    """
    Takes an image and randomly shifts it up/down and/or right/left by number of pixels
    Modify this if you want different behavior when shifting, this was most common for my usage.
    Uses ndimage.shift for pixel movement
    """

    shift_x = int(tf.random.uniform(-shift_x, shift_x))  # could also just hard code pos or neg values
    shift_y = int(tf.random.uniform(-shift_y, shift_y))  # could also just hard code pos or neg values

    if shift_x > 0 and shift_y > 0:
        # alternate between x and y, remove if you want both applied
        if bool(np.random.choice([0, 1], p=[0.5, 0.5])):  # change p values as needed
            shift_x = 0
        else:
            shift_y = 0

    shift = (shift_x, shift_y, 0)
    image = scipy.ndimage.shift(image, shift, mode='constant', cval=0.0) #cval is fill value.  See scipy doc
    return image
    
    
def image_random_rotate(image, max_angle=45):
    """
    Takes an image and randomly rotates it between +/- max_angle
    Uses ndimage for rotation
    """
    
    angle = tf.random.uniform(-max_angle, max_angle)
    image = scipy.ndimage.interpolation.rotate(image,
                                                angle,
                                                reshape=False)
    return image

def image_aug(image: tf.Tensor) -> tf.Tensor:
    # This is called from the process_path to augment images

    #print(type(image)) # uncomment for testing, should start and end with a tensor....

    #######################################################
    # rotate using tf.py_function
    #######################################################
    #if tf.random.uniform(()) > 0.5:
    #   im_shape = image.shape
    #   [image,] = tf.py_function(image_random_rotate, [image], [tf.float32])  #parms must be tensors
    #   image.set_shape(im_shape)
    #######################################################
        
    #######################################################
    # shift using tf.py_function
    #######################################################
    #if tf.random.uniform(()) > 0.5:
    #   im_shape = image.shape
    #   [image,] = tf.py_function(image_random_shift, [image], [tf.float32])  #parms must be tensors
    #   image.set_shape(im_shape)
    #######################################################

    #######################################################
    # Blur using tf.py_function
    #######################################################
    #if tf.random.uniform(()) > 0.5:
    #   im_shape = image.shape
    #   [image,] = tf.py_function(image_blur, [image], [tf.float32])  #parms must be tensors
    #   image.set_shape(im_shape)
    #######################################################

    #######################################################
    # These are native tf.image methods
    #######################################################
    #image = tf.image.random_flip_left_right(image)
    #image = tf.image.flip_up_down(image)
    #image = tf.image.random_flip_up_down(image)
    #image = tf.image.random_hue(image, 0.08) #  -delta - +delta
    #image = tf.image.random_saturation(image, 0.6, 1.6)  #lower, upper
    #image = tf.image.random_brightness(image, 0.05) # -delta - +delta
    #image = tf.image.adjust_contrast(image, 1.4)
    #image = tf.image.random_contrast(image, 0.7, 1.5) # lower, upper
    #image = tf.image.rot90(image, tf.random.uniform(shape=[], minval=0, maxval=3, dtype=tf.int32)) #0-4, 0/360, 90/180/270
    #image = tf.image.random_jpeg_quality(image, 25, 100) #min 0-100, min<max, max 0-100
    #######################################################

    #######################################################
    # random zoom - random crop + resize which will zoom the image
    #######################################################
    #w = parms.IMAGE_COLS
    #h = parms.IMAGE_ROWS
    #p = 0.90
    #image = tf.image.resize(tf.image.random_crop(image, (int(h*p), int(w*p), 3)), (h, w))
    #######################################################

    #######################################################
    # Gamma 
    #######################################################
    #gamma = tf.math.reduce_mean(image) + 0.5
    #image = tf.image.adjust_gamma(image, gamma=gamma)
    #######################################################

    #######################################################
    # roll 
    #######################################################
    #shift = 10 # pixels to roll, can be pos/neg
    #axis = 1  # 0 or 1, u/d or r/l
    #image = tf.roll(image, shift, axis)
    #######################################################

    #######################################################
    # These next examples two show how to create a custom random value
    # Helps if you want to over-ride the normal 50/50 and have different augmentations
    # applied.  Uses tensors to do the random behavior
    #######################################################
    #
    # This uses flip_up_down
    #sample = tf.random.categorical(tf.math.log([[0., 1.]]), 1)   # change values as needed [0., 1.] is always True
    #image = tf.cond(sample == 1, lambda: tf.image.flip_up_down(image), lambda: image)
    #
    # This uses two methods, one for True, one for False (adjust_contrast and flip_up_down)
    sample = tf.random.categorical(tf.math.log([[0.5, 0.5]]), 1)   # change values as needed [0., 1.] is always True
    image = tf.cond(sample == 1, lambda: tf.image.adjust_contrast(image, 0.1), lambda: tf.image.flip_up_down(image))
    #######################################################

    ######################################################
    # Rescaling tests
    #######################################################
    #image = image_rescale_0_255(image)
    #image = image_rescale_0_1(image)
    ######################################################

    #print(type(image)) # uncomment for testing, should start and end with a tensor....

    #image = tf.clip_by_value(image, 0., 1.)  # after majority of augmentations, clip back to 0, 1 before returning

    return image


### Create dataset and normal mappings

Pipeline Flow:

create dataset -> map "process_path" -> repeat forever -> batch

This will illustrate whatever methods have been uncommented in the image_aug method.

These 3 methods must be rerun if you change which methods are used.  That is mainly due to the mapping.  And, it is best to start with a clean dataset after any augmentation changes.


In [0]:
# Create Dataset from list of images
full_dataset = tf.data.Dataset.from_tensor_slices(np.array(images_list))

# Verify image paths were loaded and save one path for later in "some_image"
for f in full_dataset.take(2):
    some_image = f.numpy().decode("utf-8")
    print(f.numpy())
    
print("Some Image: ", some_image)

In [0]:
# map training images to processing, includes any augmentation
full_dataset = full_dataset.map(process_path, num_parallel_calls=AUTOTUNE)

# Verify the mapping worked
for image, label in full_dataset.take(1):
    print("Image shape: {}  Max: {}  Min: {}".format(image.numpy().shape, np.max(image.numpy()), np.min(image.numpy())))
    print("Label: ", label.numpy())

# Repeat forever
full_dataset = full_dataset.repeat()

# set the batch size
full_dataset = full_dataset.batch(parms.BATCH_SIZE)


In [0]:
# Show the images, execute this cell multiple times to see the images
# Execute at least 4 times if random is applied

image_batch, label_batch = next(iter(full_dataset))
show_batch(image_batch.numpy(), label_batch.numpy())

### Final Thoughts.....

Play around with uncommenting different methods and using different percentages.  I've found that by separating this type of research work from your training notebook helps keep clutter at a minimum.


In [0]:
############# WORKING

import numpy as np
def add_s_p(X_img):
    # Need to produce a copy as to not modify the original image
    #X_imgs_copy = X_imgs.copy()
    #row, col, _ = X_imgs_copy[0].shape
    row, col, _ = X_img.shape
    #salt_vs_pepper = 0.2
    salt_vs_pepper = 0.2
    amount = 0.004
    num_salt = np.ceil(amount * X_img.size * salt_vs_pepper)
    num_pepper = np.ceil(amount * X_img.size * (1.0 - salt_vs_pepper))
    print(num_salt, num_pepper)
    # Add Salt noise
    coords = [np.random.randint(0, i - 1, int(num_salt)) for i in X_img.shape]
    print("-255", coords)
    #X_img[coords[0], coords[1], :] = 1
    X_img[coords[0], coords[1], :] = -255

    # Add Pepper noise
    coords = [np.random.randint(0, i - 1, int(num_pepper)) for i in X_img.shape]
    print("255", coords)

    #X_img[coords[0], coords[1], :] = 0
    X_img[coords[0], coords[1], :] = 255
        
    return X_img

    ##########

def add_salt_pepper_noise(X_imgs):
    # Need to produce a copy as to not modify the original image
    X_imgs_copy = X_imgs.copy()
    row, col, _ = X_imgs_copy[0].shape
    salt_vs_pepper = 0.2
    amount = 0.004
    num_salt = np.ceil(amount * X_imgs_copy[0].size * salt_vs_pepper)
    num_pepper = np.ceil(amount * X_imgs_copy[0].size * (1.0 - salt_vs_pepper))
    
    for X_img in X_imgs_copy:
        # Add Salt noise
        coords = [np.random.randint(0, i - 1, int(num_salt)) for i in X_img.shape]
        X_img[coords[0], coords[1], :] = 1

        # Add Pepper noise
        coords = [np.random.randint(0, i - 1, int(num_pepper)) for i in X_img.shape]
        X_img[coords[0], coords[1], :] = 0
    return X_imgs_copy
    
 ########## 
def masking_noise(data, sess, v):
    """Apply masking noise to data in X.
    In other words a fraction v of elements of X
    (chosen at random) is forced to zero.
    :param data: array_like, Input data
    :param sess: TensorFlow session
    :param v: fraction of elements to distort, float
    :return: transformed data
    """
    data_noise = data.copy()
    rand = tf.random_uniform(data.shape)
    data_noise[sess.run(tf.nn.relu(tf.sign(v - rand))).astype(np.bool)] = 0

    return data_noise

#######
def salt_and_pepper_noise(X, v):
    """Apply salt and pepper noise to data in X.
    In other words a fraction v of elements of X
    (chosen at random) is set to its maximum or minimum value according to a
    fair coin flip.
    If minimum or maximum are not given, the min (max) value in X is taken.
    :param X: array_like, Input data
    :param v: int, fraction of elements to distort
    :return: transformed data
    """
    X_noise = X.copy()
    n_features = X.shape[1]

    mn = X.min()
    mx = X.max()

    for i, sample in enumerate(X):
        mask = np.random.randint(0, n_features, v)

        for m in mask:

            if np.random.random() < 0.5:
                X_noise[i][m] = mn
            else:
                X_noise[i][m] = mx

    return X_noise



In [0]:
# EXPERIMENTIAL

# https://medium.com/@dimkak_89085/thanks-for-your-article-it-is-quite-helpful-ce665dc31705

def create_random_sparse_mask(points_num, batch_dim, im_dims_list, all_dims):
    coords = [tf.random.uniform([batch_dim, points_num], minval=0, maxval=i, dtype=tf.int32) for i in im_dims_list]
    batch_indeces = tf.range(batch_dim)
    batch_indeces = tf.tile(batch_indeces, [points_num])
    coords = tf.stack(coords, axis=-1)
    coords = tf.reshape(coords, [-1, 3])
    coords = tf.concat([tf.expand_dims(batch_indeces, -1), coords], axis=-1)
    values = tf.ones([batch_dim*points_num], dtype=tf.float32)
    sparse_mask = tf.scatter_nd(coords, values, all_dims)
    return sparse_mask

def add_salt_pepper_noise(image_batch, amount = 0.004, salt_vs_pepper = 0.2):
    image_batch = tf.expand_dims(image_batch, axis=0)

    tensor_dims = tf.shape(image_batch)
    b_dim = tensor_dims[0]
    im_dims = tensor_dims[1:]
    pixel_num = tf.reduce_prod(im_dims)
    num_salt = tf.cast(tf.math.ceil(amount * tf.cast(pixel_num, tf.float32) * salt_vs_pepper), tf.int32)
    num_pepper = tf.cast(tf.math.ceil(amount * tf.cast(pixel_num, tf.float32) * (1.0 - salt_vs_pepper)), tf.int32)
    im_dims = tf.split(im_dims, num_or_size_splits=im_dims.shape[0], axis=0)
    #im_dims = tf.split(im_dims, num_or_size_splits=im_dims.shape[0].value, axis=0)
    im_dims = [tf.squeeze(i) for i in im_dims]
    # Add salt noise
    sparsePixels = create_random_sparse_mask(num_salt, b_dim, im_dims, tensor_dims)
    neg_sparsePixels = 1 - sparsePixels
    image_batch = image_batch*neg_sparsePixels + sparsePixels
    # Add pepper noise
    sparsePixels = create_random_sparse_mask(num_pepper, b_dim, im_dims, tensor_dims)
    neg_sparsePixels = 1 - sparsePixels
    image_batch = image_batch*neg_sparsePixels

    return tf.squeeze(image_batch, axis=0)

def add_gaussian_noise(image_batch, stddev = 0.1):
    noise = tf.random.truncated_normal(tf.shape(image_batch), mean=0.5, stddev=stddev, dtype=tf.float32)
    noise = tf.clip_by_value(noise, 0.0, 1.0)
    image_batch = 0.75*image_batch + 0.25*noise
    return image_batch