# Edge Impulse Image Data Augmentation Workshop

[![Open In Colab <](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/edgeimpulse/workshop-image-data-augmentation/blob/main/image-data-augmentation.ipynb)

This notebook is part of the Edge Impulse image data augmentation workshop. It downloads a simple image dataset (electronic components), transforms the images to create a series of augmented samples, and then uploads the augmented to your Edge Impulse project.

Create a new project on [Edge Impulse](https://edgeimpulse.com/). Go to the dashboard of that project and copy your API key. Paste that key into the string value for `EI_API_KEY`. The final cell of this script will automatically upload the augmented dataset to your Edge Impulse project.

Press **shift + enter** to execute each cell.

Look for `***YOUR CODE HERE***` to write your own code for the challenges.

Each output file will be the original filename appended with "_{num}" where {num} is some incrementing value based on the total number of transforms performed per image. For example, if you have a file named `alpha.0.png`, it will become `alpha.0_0.png`. The first transform will be `alpha.0_1.png`, the second transform will be `alpha.0_2.png` and so on.

Author: EdgeImpulse, Inc.<br>
Date: May 17, 2023<br>
License: [Apache-2.0](apache.org/licenses/LICENSE-2.0)<br>

## Install dependencies, define settings, download original dataset

In [None]:
# Update Node.js to the latest stable version (use '!' to call Linux commands)
!npm cache clean -f
!npm install -g n
!n 16.18.1

In [None]:
# Install the Edge Impulsen CLI tool
!npm install -g --unsafe-perm edge-impulse-cli

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
import os
import shutil
import PIL

import skimage.transform
import skimage.util

In [None]:
# ***YOUR CODE HERE***

# Part 1
# ---
# Create a new Edge Impules project
# Go to Edge Impulse > your_project > Dashboard > Keys
# Copy API Key here
EI_API_KEY = "ei_706f..." 

# ***END CODE***

# Path information
DATASET_NAME = "electronic-components-png-original"
DATASET_URL = "https://github.com/edgeimpulse/workshop-image-data-augmentation/raw/main/" + DATASET_NAME + ".zip"
HOME_PATH = "/content"                  # Location of the working directory
DATASET_ZIP = os.path.join(HOME_PATH, DATASET_NAME + ".zip")
DATASET_PATH = os.path.join(HOME_PATH, DATASET_NAME)
OUT_NAME = "augmented-dataset"          # Name of output dataset folder and .zip file
OUT_PATH = os.path.join(HOME_PATH, OUT_NAME)
OUT_ZIP = OUT_PATH + ".zip"

# How to split the dataset
TEST_RATIO = 0.2      # 20% reserved for test set, rest is for training

# Max batch size for uploading to Edge Impulse
MAX_UPLOAD_BATCH_SIZE = 100

# You are welcome to change the seed to get different augmentation effects
SEED = 42
random.seed(SEED)

In [None]:
# Download and unzip the original dataset
!wget {DATASET_URL}
!mkdir {DATASET_PATH}
!unzip -q -d {DATASET_PATH} {DATASET_ZIP}

## Transformation functions

In [None]:
def create_flipped(img):
  """
  Generate 3 flipped versions of the original image: left-right flip, up-down
  flip, and left-right and up-down flip.
  
  Args:
      img: Original image as a Numpy array

  Returns:
      List of images as Numpy arrays
  """

  # Create a list of flipped images
  flipped = []
  flipped.append(np.fliplr(img))
  flipped.append(np.flipud(img))
  flipped.append(np.flipud(np.fliplr(img)))

  return flipped

In [None]:
def create_rotated(img, rotations):
  """
  Generates a number of rotated images as specified by the `rotations` argument.
  
  Args:
      img: Original image as a Numpy array
      rotations: List of rotations to perform (between 0 and 360 degrees)

  Returns:
      List of images as Numpy arrays
  """

  # Create list of rotated images (keep 8-bit values)
  rotated = []
  for rot in rotations:
    img_rot = skimage.transform.rotate(img, angle=rot, mode='edge', preserve_range=True)
    img_rot = img_rot.astype(np.uint8)
    rotated.append(img_rot)

  return rotated

In [None]:
def create_random_zooms(img, scale_factor, num_crops):
  """
  Zoom by scaling the image by `scale_factor` amount. Perform a number of random
  crops on that scaled image given be `num_crops`.

  Args:
      img: Original image as a Numpy array
      scale_factor: Amount to zoom the original image (1.0 keeps the image size)
      num_crops: Number of random crops to perform on the scaled image
  Returns:
      List of images as Numpy arrays
  """

  # Only support zoom-in for now
  if scale_factor < 1.0:
    raise NotImplementedError("Scale factor must be >=1.0. Only zoom-in supported.")

  # Get height and width of original image
  height = img.shape[0]
  width = img.shape[1]

  # Create scaled images (e.g. make the image bigger) and keep 8-bit values
  if len(img.shape) == 2:    # Grayscale
    channel_axis = None
  elif len(img.shape) == 3:  # RGB (assume WHC)
    channel_axis = 2
  else:
      raise IndexError("Only arrays with 2 and 3 dimensions are supported")
  img_scaled = skimage.transform.rescale(img, 
                                        scale=scale_factor, 
                                        anti_aliasing=True, 
                                        channel_axis=channel_axis,
                                        preserve_range=True)
  img_scaled = img_scaled.astype(np.uint8)

  # Get height and width of scaled image
  s_h = img_scaled.shape[0]
  s_w = img_scaled.shape[1]

  # Create list of random zooms
  zooms = []
  for i in range(num_crops):
    
    # Randomly choose start of crop point
    crop_y = round(random.random() * (s_h - height))
    crop_x = round(random.random() * (s_w - width))

    # Crop scaled image
    if len(img_scaled.shape) == 2:    # Grayscale
      zoom = img_scaled[crop_y:(crop_y + height), crop_x:(crop_x + width)]
    elif len(img_scaled.shape) == 3:  # RGB
      zoom = img_scaled[crop_y:(crop_y + height), crop_x:(crop_x + width), :]
    else:
      raise IndexError("Only arrays with 2 and 3 dimensions are supported")

    # Append zoomed image to list
    zooms.append(zoom)

  return zooms

In [None]:
def create_random_translations(img, num_translations):
  """
  Generate `num_translations` number of random translations of the original
  image. Each translations is no more than 1/4 of the width or height of the 
  original image. New pixels are created by copying the pixels on the edge of
  the original image.

  Args:
      img: Original image as a Numpy array
      num_translations: Number of translations to perform (integer)
  Returns:
      List of images as Numpy arrays
  """

  # Get height and width of original image
  height = img.shape[0]
  width = img.shape[1]

  # Create list of random translations
  translations = []
  for i in range(num_translations):
  
    # Choose random amount to translate (up to 1/4 image width, height) in either direction
    tr_y = round((0.5 - random.random()) * (height / 2))
    tr_x = round((0.5 - random.random()) * (width / 2))

    # Perform translation to create new image
    translation = skimage.transform.AffineTransform(translation=(tr_y, tr_x))
    img_tr = skimage.transform.warp(img, translation, mode='edge', preserve_range=True)
    img_tr = img_tr.astype(np.uint8)

    # Append translated image to list
    translations.append(img_tr)

  return translations

In [None]:
def create_noisy(img, types):
  """
  Generate images with noise added to the original. The types of supported noise
  can be found here: 
  https://scikit-image.org/docs/stable/api/skimage.util.html#random-noise

  Args:
      img: Original image as a Numpy array
      types: List of strings with desired types of noise to add
  Returns:
      List of images as Numpy arrays
  """

  # Add noise of different types
  noisy_imgs = []
  for t in types:
    noise = skimage.util.random_noise(img, mode=t, seed=None)
    noise = (noise * 255).astype(np.uint8)
    noisy_imgs.append(noise)

  return noisy_imgs

## Demonstrate transformations

In [None]:
# Choose an original image for the demonstration
original_path = "/content/electronic-components-png-original/capacitor.00bff939.png"

# Open the image and convert it to a Numpy array
img = PIL.Image.open(original_path)
img_array = np.asarray(img)

In [None]:
def show_images(imgs, cmap=None, figsize=(12, 12)):
  """
  Use Matplotlib to show several images.

  Args:
      imgs: list of Numpy arrays to plot
      cmap
      figsize (optional): size of the indivisual plots
  """
  figs, axs = plt.subplots(1, len(imgs), figsize=figsize)
  for i in range(len(imgs)):
    axs[i]. imshow(imgs[i], cmap=cmap)

In [None]:
# Generate flipped versions of the image
aug_imgs = create_flipped(img_array)
show_images([img_array] + aug_imgs, cmap='gray')

In [None]:
# Generate flipped versions of the image
aug_imgs = create_rotated(img_array, [45, 90, 135])
show_images([img_array] + aug_imgs, cmap='gray')

In [None]:
# Generate zoomed/cropped versions of the image
aug_imgs = create_random_zooms(img_array, 1.5, 3)
show_images([img_array] + aug_imgs, cmap='gray')

In [None]:
# Generate random translations
aug_imgs = create_random_translations(img_array, 3)
show_images([img_array] + aug_imgs, cmap='gray')

In [None]:
# Generate images with random noise added
aug_imgs = create_noisy(img_array, ('gaussian', 'poisson', 's&p'))
show_images([img_array] + aug_imgs, cmap='gray')

## Perform transformations

In [None]:
# Delete output directory (if it exists) and recreate it
if os.path.exists(OUT_PATH):
  shutil.rmtree(OUT_PATH)
os.makedirs(OUT_PATH)

In [None]:
def create_transforms(file_path):
  """
  Open image given by `file_path` and perform augmentation on that one image.

  Args:
      file_path: path to image

  Returns:
      List of images as Numpy arrays
  """

  # Open the image
  img = PIL.Image.open(file_path)

  # Convert the image to a Numpy array (keep all color channels)
  img_array = np.asarray(img)

  # Add original image to front of list
  aug_imgs = []
  aug_imgs.append([img_array])

  # Perform transforms
  aug_imgs.append(create_flipped(img_array))
  aug_imgs.append(create_flipped(img_array))
  aug_imgs.append(create_rotated(img_array, [45, 90, 135]))
  aug_imgs.append(create_random_zooms(img_array, 1.3, 2))

  # ***YOUR CODE HERE***
  
  # Part 2
  # ---
  # Call the `create_random_translations()` and `create_noisy()` functions with
  # the original image and append the results to `aug_imgs` as shown in the 
  # example transforms above. Create 3 random translation and 3 noisy augmented
  # images.
  


  # ***END CODE***

  # ***YOUR CODE HERE***

  # Part 3 (Optional challenge)
  # ---
  # Call more than one transformation function on a single image to generate
  # compound augmentations.

  

  # ***END CODE***

  # Flatten list of lists (to create one long list of images)
  aug_imgs = [img for img_list in aug_imgs for img in img_list]

  return aug_imgs

In [None]:
# Go through each original image file in the unzipped directory
for filename in os.listdir(DATASET_PATH):

  # Skip the Jupyter Notebook checkpoints folder that sometimes gets added
  if filename == ".ipynb_checkpoints":
    continue

  # Parse the filename into label and unique ID
  file_root, file_ext = os.path.splitext(filename)
  label = file_root.split('.')[0]
  uid = '.'.join(file_root.split('.')[1:])

  # Do all transforms for that one image
  file_path = os.path.join(DATASET_PATH, filename)
  img_tfs = create_transforms(file_path)

  # Save images to new files in output directory
  for i, img in enumerate(img_tfs):

    # Create a Pillow image from the Numpy array
    img_pil = PIL.Image.fromarray(img)

    # Construct filename (<original>_<transform_num>.<EXT>)
    out_file_path = os.path.join(OUT_PATH, label + "." + uid + "_" + str(i) + file_ext)

    # Convert Numpy array to image and save as a file
    img_pil = PIL.Image.fromarray(img)
    img_pil.save(out_file_path)

In [None]:
# Zip the augmented dataset
%cd {OUT_PATH}
!zip -FS -r -q {OUT_ZIP} *
%cd {HOME_PATH}

Feel free to download the *augmented-dataset.zip* file to have the dataset locally.

## Upload Dataset to Edge Impulse

In [None]:
# Create list of files for one category
paths = []
for filename in os.listdir(OUT_PATH):
  paths.append(os.path.join(OUT_PATH, filename))

# Shuffle and divide into test and training sets
random.shuffle(paths)
num_test_samples = int(TEST_RATIO * len(paths))
test_paths = paths[:num_test_samples]
train_paths = paths[num_test_samples:]

In [None]:
# Upload training set to Edge Impulse in mini batches
for first in range(0, len(train_paths), MAX_UPLOAD_BATCH_SIZE):

  # Construct one long string with all the paths of the mini batch
  train_mini_batch = train_paths[first:(first + MAX_UPLOAD_BATCH_SIZE)]
  print(f"Uploading {len(train_mini_batch)} files. Number {first} to " \
        f"{first + MAX_UPLOAD_BATCH_SIZE} out of a total of {len(train_paths)}.")
  train_mini_batch = ['"' + s + '"' for s in train_mini_batch]
  train_mini_batch = ' '.join(train_mini_batch)

  # Upload to Edge Impulse
  !edge-impulse-uploader \
  --category training \
  --api-key {EI_API_KEY} \
  --silent \
  {train_mini_batch}

In [None]:
# Upload test set to Edge Impulse in mini batches
for first in range(0, len(test_paths), MAX_UPLOAD_BATCH_SIZE):

  # Construct one long string with all the paths of the mini batch
  test_mini_batch = test_paths[first:(first + MAX_UPLOAD_BATCH_SIZE)]
  print(f"Uploading {len(test_mini_batch)} files. Number {first} to " \
        f"{first + MAX_UPLOAD_BATCH_SIZE} out of a total of {len(test_paths)}.")
  test_mini_batch = ['"' + s + '"' for s in test_mini_batch]
  test_mini_batch = ' '.join(test_mini_batch)

  # Upload to Edge Impulse
  !edge-impulse-uploader \
  --category testing \
  --api-key {EI_API_KEY} \
  --silent \
  {test_mini_batch}