## Basic Data Augmentation for MNIST Example

In [2]:
# imports
import cv2
import csv
import numpy
import math

# for showing images in the notebook
from IPython.display import Image
from IPython.display import display

#### Assign global constants

In [3]:
# debug mode - prints the number images specified by debug_cutoff  
DEBUG = False             # True displays pictures of new images; WILL OVER-WRITE OLD FILES 
DEBUG_CUTOFF = 20         # number of new images to display

# train mode 
TRAIN = True             # adds labels to first column; true for training data; false for test
NORMALIZE_ONLY = True   # does not add distorted records; only ever used for training data

# io/file locations
DIR = '/Users/yataolu/Desktop/DR'
FILE_IN = 'train.csv'
FILE_OUT = 'train_augmented.csv'

# probably never change below this line
################################################################################

# global magic numbers for images ... 

INPUT_SIZE = (28, 28)    # size of input image, 2-tuple
OUT_SIZE = (27, 27)      # size of output image, 2-tuple

NORM_SIZE = (21, 21)     # final bounding box size for normalized images, 2-tuple, < OUT_SIZE 
NORM_EXPAND_SIZE = int((27-NORM_SIZE[0])/2)
 
LARGE_SIZE = (25, 25)   # final bounding box size for enlarged images, 2-tuple,< OUT_SIZE  
LARGE_EXPAND_SIZE = int((27-LARGE_SIZE[0])/2)

RAND_PERCENT = 0.15     # noise injection, < 1.0
RAND_THRESHOLD = int(OUT_SIZE[0]*OUT_SIZE[0]*RAND_PERCENT)

DEGREE = 15              # rotation degree

### more complicated ...

# turning a 1, 7 or other skinny number into a square during normalization is dumb
# to avoid doing so, don't resize numbers whose left most pixel is located
# at a index >= to SKINNY_THRESHOLD 
SKINNY_THRESHOLD = 10                

# difficult to see (and therefore test) the bounding box without this, 0-255
TO_BLACK_THRESHOLD = 50 

#### Helper functions for image augmentation

In [4]:
def write_image_to_record(src, out_csv, row_label=None):
    
    """ Writes an OpenCV image array to a single csv record.
    
    :param src: OpenCV image array.
    :param out_csv: Name of file to which to write record.
    :param row_label: Image label for training data.
    
    """
    
    out = numpy.array(src).flatten()
    
    if (row_label != None): 
        out = numpy.insert(out, 0, row_label)
        
    out_csv.writerow(out)


In [5]:
def normalize_scale(src, out_size=OUT_SIZE, norm_size=NORM_SIZE, norm_expand_size=NORM_EXPAND_SIZE, 
                    skinny_threshold=SKINNY_THRESHOLD, to_black_threshold=TO_BLACK_THRESHOLD):
    
    """ Normalizes OpenCV MNIST image arrays.
    
    :param src: OpenCV image array.
    :param out_size: Size of output image, 2-tuple.
    :param norm_size: Final bounding box size for normalized images, 2-tuple, < out_size.
    :param norm_expand_size: Amount of padding to leave outside normalized images.
    :param skinny_threshold: Don't resize numbers whose left most pixel is located
                             at a index >= to skinny threshold. 
    :param to_black_threshold: Difficult to see the bounding box without this, 0-255.
    :return: Normalized OpenCV MNIST image array.

    """

    src[src < to_black_threshold] = 0
    
    bottom, top= numpy.min(numpy.nonzero(src)[0]), numpy.max(numpy.nonzero(src)[0])
    left, right= numpy.min(numpy.nonzero(src.T)[0]), numpy.max(numpy.nonzero(src.T)[0])
    
    bounding_box = src[bottom:top + 1, left:right + 1]
    
    if (left >= skinny_threshold): 
        skinny = True 
    else: 
        skinny = False
        
    if skinny: 
        return cv2.resize(src, (out_size))
    else:
        norm = cv2.resize(bounding_box, (norm_size))
        return cv2.copyMakeBorder(norm, norm_expand_size, norm_expand_size, norm_expand_size, norm_expand_size, 0)


In [6]:
def inject_noise(src, out_size=OUT_SIZE, rand_threshold=RAND_THRESHOLD):
    
    """ Performs noise injection on an OpenCV image arrays.
    
    :param src: OpenCV image array. 
    :param out_size: Size of output image, 2-tuple.
    :param rand_threshold: Amount of random noise to inject, 0-1.
    :return: An OpenCV MNIST image array with random noise injection.
    
    """
    
    noise=numpy.copy(src) # deep copy
    noise[numpy.random.randint(out_size[0]-1, size=rand_threshold), numpy.random.randint(out_size[0]-1, size=rand_threshold)] = 0

    return noise


In [7]:
def enlarge(src, skinny_threshold=SKINNY_THRESHOLD, input_size=INPUT_SIZE, out_size=OUT_SIZE, 
            large_size=LARGE_SIZE, large_expand_size=LARGE_EXPAND_SIZE):

    """ Enlarges OpenCV image arrays.
    
    :param src: OpenCV image array.
    :param skinny_threshold: Don't resize numbers whose left most pixel is located
                             at a index >= to skinny threshold. 
    :param input_size: Size of input image, 2-tuple.                       
    :param out_size: Size of output image, 2-tuple.
    :param large_size: Bounding box for enlarged image.
    :param large_expand_size: Amount of padding to leave outside enlarged images.
    :return: Enlarged OpenCV MNIST image array.
    
    """
    
    bottom, top = numpy.min(numpy.nonzero(src)[0]), numpy.max(numpy.nonzero(src)[0])
    left, right = numpy.min(numpy.nonzero(src.T)[0]), numpy.max(numpy.nonzero(src.T)[0])
    bounding_box = src[bottom:top + 1, left:right + 1]
    
    if (left >= skinny_threshold): 
        skinny = True 
    else: 
        skinny = False
        
    if (skinny):
        tall_bounding_box = src[bottom - 1:top + 2, 0:input_size[0] - 1]
        return cv2.resize(tall_bounding_box, (out_size))
    else: 
        large = cv2.resize(bounding_box, large_size)                                                            
        return cv2.copyMakeBorder(large, large_expand_size, large_expand_size, large_expand_size, large_expand_size, 0)


In [8]:
def rotate_about_center(src, angle, scale=1.0):
    
    """ Rotates OpenCV image arrays.
    
    :param src: OpenCV image array.
    :param angle: Rotation degree.
    :param scale: Factor by which to scale rotated images.
    :return: Rotated OpenCV MNIST image array.
    
    """
    
    w = src.shape[1]
    h = src.shape[0]
    rangle = numpy.deg2rad(angle) # angle in rads
    
    # calculate new image dimensions 
    nw = (abs(numpy.sin(rangle)*h) + abs(numpy.cos(rangle)*w))*scale
    nh = (abs(numpy.cos(rangle)*h) + abs(numpy.sin(rangle)*w))*scale
    
    # get rotation matrix 
    rot_mat = cv2.getRotationMatrix2D((nw*0.5, nh*0.5), angle, scale)
    
    # old and new centers combined with rotation
    rot_move = numpy.dot(rot_mat, numpy.array([(nw - w) * 0.5, (nh - h) * 0.5, 0]))
    
    # update translation
    rot_mat[0,2] += rot_move[0]
    rot_mat[1,2] += rot_move[1]
    
    return cv2.warpAffine(src, rot_mat, (int(math.ceil(nw)), int(math.ceil(nh))), flags=cv2.INTER_LANCZOS4)


#### Display Transformed Images or Writes Transformed Image to File

In [9]:
def main():
    
    """ Displays transformed images for debugging purposes or writes transformed images to file for model training or scoring. """

    # i/o
    
    # input file for reading 
    file_in = open(DIR + '/' + FILE_IN, 'rt')
    im_in_csv = csv.reader(file_in, dialect='excel')
    
    # output file for writing
    file_out = open(DIR + '/' + FILE_OUT, 'wt')
    im_out_csv = csv.writer(file_out, dialect='excel')
    
    # read through input file and create new images
    
    for i, row in enumerate(im_in_csv):
        
        if i > 0: # row 0 is not data

            if DEBUG:
                print()
                print('Image ' + str(i))
                print('====================')
                print()
            
            # progress indicator
            if i > 0 and i % 1000 == 0:
                print('Processing image ' + str(i) + ' ...')

            # read row and store label
            row_array = numpy.asarray(row)
            row_array = row_array.astype(numpy.float32)
            row_label = None
            
            if (TRAIN): 
                row_label = row_array[0]
                img = numpy.reshape(row_array[1:], (INPUT_SIZE))
            else:
                img = numpy.reshape(row_array, (INPUT_SIZE))
                
            if (DEBUG): 
                print('Original Image:')
                cv2.imwrite(DIR + '/' + 'raw' + str(i) + '.jpg', img)
                display(Image((DIR + '/' + 'raw' + str(i) + '.jpg')))

            # normalize   
            norm = normalize_scale(img)      
            write_image_to_record(norm, im_out_csv, row_label)
            if (DEBUG): 
                print('Normalized Image:')
                cv2.imwrite(DIR + '/' + 'norm' + str(i) + '.jpg', norm)
                display(Image((DIR + '/' + 'norm' + str(i) + '.jpg')))

            if (TRAIN) and not (NORMALIZE_ONLY):

                # inject noise
                noise = inject_noise(norm)
                write_image_to_record(noise, im_out_csv, row_label)
                if (DEBUG): 
                    print('Noise Injected Image:')
                    cv2.imwrite(DIR + '/' + 'noise' + str(i) + '.jpg', noise)
                    display(Image((DIR + '/' + 'noise' + str(i) + '.jpg')))

                # enlarge
                large = enlarge(norm)
                if (DEBUG): 
                    print('Enlarged Image:')
                    cv2.imwrite(DIR + '/' + 'large' + str(i) + '.jpg', large)
                    display(Image((DIR + '/' + 'large' + str(i) + '.jpg')))
                    
                large_noise = enlarge(noise)
                if (DEBUG): 
                    print('Noise Injected, Enlarged Image:')
                    cv2.imwrite(DIR + '/' + 'large_noise' + str(i) + '.jpg', large_noise)
                    display(Image((DIR + '/' + 'large_noise' + str(i) + '.jpg')))

                # rotate + degrees
                plus = cv2.resize(rotate_about_center(large, DEGREE), OUT_SIZE)
                write_image_to_record(plus, im_out_csv, row_label)
                
                if (DEBUG): 
                    print('Positively Rotated Image:')
                    cv2.imwrite(DIR + '/' + 'rotate_p' + str(DEGREE) + '_' + str(i) + '.jpg', plus)
                    display(Image((DIR + '/' + 'rotate_p' + str(DEGREE) + '_' + str(i) + '.jpg')))

                plus_noise = cv2.resize(rotate_about_center(large_noise, DEGREE), OUT_SIZE)
                write_image_to_record(plus_noise, im_out_csv, row_label)
                if (DEBUG): 
                    print('Noise Injected, Positively Rotated Image:')
                    cv2.imwrite(DIR + '/' + 'rotate_noise_p' + str(DEGREE) + '_' + str(i) + '.jpg', plus_noise)
                    display(Image((DIR + '/' + 'rotate_noise_p' + str(DEGREE) + '_' + str(i) + '.jpg')))
                
                # rotate - degrees
                minus = cv2.resize(rotate_about_center(large, -DEGREE), (OUT_SIZE))
                write_image_to_record(minus, im_out_csv, row_label)
                if (DEBUG): 
                    print('Negatively Rotated Image:')
                    cv2.imwrite(DIR + '/' + 'rotate_m' + str(DEGREE) + '_' + str(i) + '.jpg', minus)
                    display(Image((DIR + '/' + 'rotate_m' + str(DEGREE) + '_' + str(i) + '.jpg')))
                    
                minus_noise= cv2.resize(rotate_about_center(large_noise, -DEGREE), (OUT_SIZE))
                write_image_to_record(minus_noise, im_out_csv, row_label)
                if (DEBUG): 
                    print('Noise Injected, Negatively Rotated Image:')
                    cv2.imwrite(DIR + '/' + 'rotate_noise_m' + str(DEGREE) + '_' + str(i) + '.jpg', minus_noise)
                    display(Image((DIR + '/' + 'rotate_noise_m' + str(DEGREE) + '_' + str(i) + '.jpg')))
    
            if (DEBUG):
                if (i >= DEBUG_CUTOFF):
                    break  

        else: 
            if (TRAIN):
                im_out_csv.writerow(row[0:OUT_SIZE[0] * OUT_SIZE[0] + 1]) # write header w/ label
            else:
                im_out_csv.writerow(row[0:OUT_SIZE[0] * OUT_SIZE[0]]) # write just header
    
    file_in.close()
    file_out.close()

    print('Done.')

if __name__ == "__main__":
    main()
    

Processing image 1000 ...
Processing image 2000 ...
Processing image 3000 ...
Processing image 4000 ...
Processing image 5000 ...
Processing image 6000 ...
Processing image 7000 ...
Processing image 8000 ...
Processing image 9000 ...
Processing image 10000 ...
Processing image 11000 ...
Processing image 12000 ...
Processing image 13000 ...
Processing image 14000 ...
Processing image 15000 ...
Processing image 16000 ...
Processing image 17000 ...
Processing image 18000 ...
Processing image 19000 ...
Processing image 20000 ...
Processing image 21000 ...
Processing image 22000 ...
Processing image 23000 ...
Processing image 24000 ...
Processing image 25000 ...
Processing image 26000 ...
Processing image 27000 ...
Processing image 28000 ...
Processing image 29000 ...
Processing image 30000 ...
Processing image 31000 ...
Processing image 32000 ...
Processing image 33000 ...
Processing image 34000 ...
Processing image 35000 ...
Processing image 36000 ...
Processing image 37000 ...
Processing