# Solution: Image Data Augmentation

[![Open In Colab <](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/edgeimpulse/workshop-arduino-tinyml-roshambo/blob/main/02-data-augmentation/ei-image-augmentation.ipynb)

This is a script for creating an augmented dataset for images. It transforms input images to create a series of augmented samples that are then uploaded to your Edge Impulse project.

Upload your dataset as *dataset.zip* to */content/*. All of your samples should have the filename format `<label>.<unique-id>.png` (.jpg is also acceptable).

Create a new project on [Edge Impulse](https://edgeimpulse.com/). Go to the dashboard of that project and copy your API key. Paste that key into the string value for `EI_API_KEY`. The final cell of this script will automatically upload the augmented dataset to your Edge Impulse project.

Press **shift + enter** to execute each cell.

The original images along with their transforms will be saved in the output directory. Each output file will be the original filename appended with "_{num}" where {num} is some incrementing value based on the total number of transforms performed per image.

For example, if you have a file named `alpha.0.png`, it will become `alpha.0_0.png`. The first transform will be `alpha.0_1.png`, the second transform will be `alpha.0_2.png` and so on.

Author: EdgeImpulse, Inc.<br>
Date: January 5, 2023<br>
License: [Apache-2.0](apache.org/licenses/LICENSE-2.0)<br>

In [None]:
### Update Node.js to the latest stable version
!npm cache clean -f
!npm install -g n
!n 16.18.1

In [None]:
### Install required packages and tools
!npm install -g --unsafe-perm edge-impulse-cli

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
import os
import shutil
import PIL

import skimage.transform
import skimage.util

In [None]:
### Settings

# Copy API here: Edge Impulse > your_project > Dashboard > Keys
EI_API_KEY = "ei_261c..." 

# Path information
HOME_PATH = "/content"                  # Location of the working directory
DATASET_ZIP = "/content/dataset.zip"    # Name of the .zip file containing your original dataset
DATASET_PATH = "/content/dataset"       # Upload your .csv samples to this directory
OUT_PATH = "/content/out"               # Where output files go (will be deleted and recreated)
OUT_ZIP = "/content/out-augmented.zip"  # Where to store the zipped output files

# How to split the dataset
TEST_RATIO = 0.2      # 20% reserved for test set, rest is for training

# File format to use for new dataset
IMG_EXT = ".png"

# Max batch size for uploading to Edge Impulse
MAX_UPLOAD_BATCH_SIZE = 100

# You are welcome to change the seed to get different augmentation effects
SEED = 42
random.seed(SEED)

In [None]:
### Use this to unzip the dataset folder
# !rm -rf /content/dataset

In [None]:
### Unzip files to dataset directory
%cd {HOME_PATH}
!mkdir {DATASET_PATH}
!unzip -q -d {DATASET_PATH} {DATASET_ZIP}

## Transform Functions

In [None]:
### Example: Function to create 3 new flipped images of the input
def create_flipped(img):

  # Create a list of flipped images
  flipped = []
  flipped.append(np.fliplr(img))
  flipped.append(np.flipud(img))
  flipped.append(np.flipud(np.fliplr(img)))

  return flipped

In [None]:
### Function to create new rotated images of the input
def create_rotated(img, rotations):

  # Create list of rotated images (keep 8-bit values)
  rotated = []
  for rot in rotations:
    img_rot = skimage.transform.rotate(img, angle=rot, mode='edge', preserve_range=True)
    img_rot = img_rot.astype(np.uint8)
    rotated.append(img_rot)

  return rotated

In [None]:
### Function to create random scale/crop (zoom) images
def create_random_zooms(img, scale_factor, num_crops):

  # Get height and width of original image
  height = img.shape[0]
  width = img.shape[1]

  # Create scaled images (e.g. make the image bigger) and keep 8-bit values
  img_scaled = skimage.transform.rescale(img, 
                                        scale=scale_factor, 
                                        anti_aliasing=True, 
                                        multichannel=True,
                                        preserve_range=True)
  img_scaled = img_scaled.astype(np.uint8)

  # Get height and width of scaled image
  s_h = img_scaled.shape[0]
  s_w = img_scaled.shape[1]

  # Create list of random zooms
  zooms = []
  for i in range(num_crops):
    
    # Randomly choose start of crop point
    crop_y = round(random.random() * (s_h - height))
    crop_x = round(random.random() * (s_w - width))

    # Crop scaled image
    zoom = img_scaled[crop_y:(crop_y + height), crop_x:(crop_x + width), :]

    # Append zoomed image to list
    zooms.append(zoom)

  return zooms

In [None]:
### Function to create a random set of translated images (no more than 1/4 of width or height away)
def create_random_translations(img, num_translations):

  # Get height and width of original image
  height = img.shape[0]
  width = img.shape[1]

  # Create list of random translations
  translations = []
  for i in range(num_translations):
  
    # Choose random amount to translate (up to 1/4 image width, height) in either direction
    tr_y = round((0.5 - random.random()) * (height / 2))
    tr_x = round((0.5 - random.random()) * (width / 2))

    # Perform translation to create new image
    translation = skimage.transform.AffineTransform(translation=(tr_y, tr_x))
    img_tr = skimage.transform.warp(img, translation, mode='edge', preserve_range=True)
    img_tr = img_tr.astype(np.uint8)

    # Append translated image to list
    translations.append(img_tr)

  return translations

In [None]:
### Function to add random noise to images
def create_noisy(img, types, seed=None):

  # Add noise of different types
  noisy_imgs = []
  for t in types:
    noise = skimage.util.random_noise(img, mode=t, seed=seed)
    noise = (noise * 255).astype(np.uint8)
    noisy_imgs.append(noise)

  return noisy_imgs

## Perform Transforms

In [None]:
### Delete output directory (if it exists) and recreate it
if os.path.exists(OUT_PATH):
  shutil.rmtree(OUT_PATH)
os.makedirs(OUT_PATH)

In [None]:
### Function to open image and create a list of new transforms
def create_transforms(file_path):

  # Open the image
  img = PIL.Image.open(file_path)

  # Convert the image to a Numpy array (keep all color channels)
  img_array = np.asarray(img)

  # Add original image to front of list
  img_tfs = []
  img_tfs.append([img_array])

  # Perform transforms (call your functions)
  img_tfs.append(create_flipped(img_array))
  img_tfs.append(create_flipped(img_array))
  img_tfs.append(create_rotated(img_array, [45, 90, 135]))
  img_tfs.append(create_random_zooms(img_array, 1.3, 2))
  img_tfs.append(create_random_translations(img_array, 2))
  img_tfs.append(create_noisy(img_array, ['gaussian', 's&p'], SEED))

  # Flatten list of lists (to create one long list of images)
  img_tfs = [img for img_list in img_tfs for img in img_list]

  return img_tfs

In [None]:
### Load all images, create transforms, and save in output directory

# Go through each file in the unzipped directory
for filename in os.listdir(DATASET_PATH):

  # Skip the Jupyter Notebook checkpoints folder that sometimes gets added
  if filename == ".ipynb_checkpoints":
    continue

  # Parse the filename into label and unique ID
  file_root = os.path.splitext(filename)[0]
  label = file_root.split('.')[0]
  uid = '.'.join(file_root.split('.')[1:])

  # Do all transforms for that one image
  file_path = os.path.join(DATASET_PATH, filename)
  img_tfs = create_transforms(file_path)

  # Save images to new files in output directory
  for i, img in enumerate(img_tfs):

    # Create a Pillow image from the Numpy array
    img_pil = PIL.Image.fromarray(img)

    # Construct filename (<original>_<transform_num>.<EXT>)
    out_file_path = os.path.join(OUT_PATH, label + "." + uid + "_" + str(i) + IMG_EXT)

    # Convert Numpy array to image and save as a file
    img_pil = PIL.Image.fromarray(img)
    img_pil.save(out_file_path)

In [None]:
### Zip our new dataset (use '!' to call Linux commands)
%cd {OUT_PATH}
!zip -FS -r -q {OUT_ZIP} *
%cd {HOME_PATH}

## Upload Dataset to Edge Impulse

In [None]:
### Shuffle/split the filenames and create lists of the full paths

# Create list of files for one category
paths = []
for filename in os.listdir(OUT_PATH):
  paths.append(os.path.join(OUT_PATH, filename))

# Shuffle and divide into test and training sets
random.shuffle(paths)
num_test_samples = int(TEST_RATIO * len(paths))
test_paths = paths[:num_test_samples]
train_paths = paths[num_test_samples:]

In [None]:
### Upload test set to Edge Impulse in mini batches
for first in range(0, len(test_paths), MAX_UPLOAD_BATCH_SIZE):

  # Construct one long string with all the paths of the mini batch
  test_mini_batch = test_paths[first:(first + MAX_UPLOAD_BATCH_SIZE)]
  print(f"Uploading {len(test_mini_batch)} files. Number {first} to " \
        f"{first + MAX_UPLOAD_BATCH_SIZE} out of a total of {len(test_paths)}.")
  test_mini_batch = ['"' + s + '"' for s in test_mini_batch]
  test_mini_batch = ' '.join(test_mini_batch)

  # Upload to Edge Impulse
  !edge-impulse-uploader \
  --category testing \
  --api-key {EI_API_KEY} \
  --silent \
  {test_mini_batch}

In [None]:
### Upload training set to Edge Impulse in mini batches
for first in range(0, len(train_paths), MAX_UPLOAD_BATCH_SIZE):

  # Construct one long string with all the paths of the mini batch
  train_mini_batch = train_paths[first:(first + MAX_UPLOAD_BATCH_SIZE)]
  print(f"Uploading {len(train_mini_batch)} files. Number {first} to " \
        f"{first + MAX_UPLOAD_BATCH_SIZE} out of a total of {len(train_paths)}.")
  train_mini_batch = ['"' + s + '"' for s in train_mini_batch]
  train_mini_batch = ' '.join(train_mini_batch)

  # Upload to Edge Impulse
  !edge-impulse-uploader \
  --category training \
  --api-key {EI_API_KEY} \
  --silent \
  {train_mini_batch}