# Magic Wand Data Augmentation

[![Open In Colab <](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ShawnHymel/course-embedded-ml-capstone/blob/master/02-data-augmentation/magic_wand_data_augmentation.ipynb)

Perform data augmentation on the magic wand dataset by splicing different samples together. This is an optional project. Upload your magic wand dataset (as a zip file, i.e. dataset.zip) to */content* and run the cells below to complete the data augmentation.

Author: EdgeImpulse, Inc.<br>
Date: July 28, 2022<br>
License: Apache-2.0

## Step 1: Read data from CSV files

Read each CSV, verify that the data (and header) are valid, save the data in Numpy format, and save the associated filename in a list.

In [None]:
import csv
import os
import shutil
import random
import uuid

import numpy as np

In [None]:
### Settings

# Path information
HOME_PATH = "/content"                # Location of the working directory
DATASET_ZIP = "/content/dataset.zip"  # Name of the .zip file containing your original dataset
DATASET_PATH = "/content/dataset"     # Upload your .csv samples to this directory
OUT_PATH = "/content/out"             # Where output files go (will be deleted and recreated)
OUT_ZIP = "/content/out-augmented.zip"        # Where to store the zipped output files
CLASS_IDLE = "_idle"                  # Name of idle class
CLASS_UNKNOWN = "_unknown"            # Name of unknown class
AUGMENT_SHIFT_PERCENT = 0.2           # Amount to shift a sample for augmentation
UNKNOWN_SHIFT_PERCENT = 0.5           # Amount to shift for new unknown samples

In [None]:
### Unzip files to dataset directory
%cd {HOME_PATH}
!mkdir {DATASET_PATH}
!unzip -q -d {DATASET_PATH} {DATASET_ZIP}

/content
mkdir: cannot create directory ‘/content/dataset’: File exists


In [None]:
### Read in .csv files to construct our data in a numpy array

X_all = []
filenames = []
first_sample = True
channel_names = None
sample_shape = None

# Loop through all files in our dataset
for filename in os.listdir(DATASET_PATH):

  # Check if the path is a file
  filepath = os.path.join(DATASET_PATH, filename)
  if not os.path.isfile(filepath):
    continue

  # Read CSV file
  try:
    data = np.genfromtxt(filepath, 
                        dtype=float,
                        delimiter=',',
                        names=True)
  except Exception as e:
    print("Could not parse", filepath, " - skipping.")

  # Get length of the sample
  num_readings = data.shape[0]

  # Extract sample rate (in milliseconds), header (without timestamp), and shape info (without 
  # timestamp) from the first sample we read
  if first_sample:
    channel_names = data.dtype.names
    sample_shape = (num_readings, len(channel_names))
    first_sample = False

  # Check to make sure the new sample conforms to the first sample
  else:

    # Check header
    if data.dtype.names != channel_names:
      print("Header does not match. Skipping", filename)
      continue

    # Check shape
    if (num_readings, len(channel_names)) != sample_shape:
      print("Shape does not match. Skipping", filename)
      continue

  # Create sample (drop timestamp column)
  sample = np.zeros(sample_shape)
  for i in range(num_readings):
    sample[i, :] = np.array(data[i].item())

  # Append to our dataset
  X_all.append(sample)

  # Append the filename to our list of filenames
  filenames.append(filename)

# Convert the dataset into a numpy array
X_all = np.array(X_all)

# Get number of samples and channels
num_samples = X_all.shape[0]
num_channels = len(channel_names)

print("Header:", channel_names)
print("Dataset shape:", X_all.shape)
print("Number of samples:", num_samples)
print("Number of files", len(filenames))

Header: ('timestamp', 'accX', 'accY', 'accZ', 'gyrX', 'gyrY', 'gyrZ')
Dataset shape: (648, 150, 7)
Number of samples: 648
Number of files 648


In [None]:
### Create a list of labels that line up with the samples

# Create list of labels
labels = []
for filename in filenames:
  labels.append(filename.split('.')[0])

# Calculate the number of classes
classes = list(set(labels))
classes.sort()

# Show the classes
print(classes)

['_idle', '_unknown', 'alpha', 'beta', 'gamma']


# Step 2: Helper functions

Helper functions to shift and append/prepend data as well as writing new samples to .csv files.

In [None]:
def shift_and_add(sample_orig, sample_add, num_shift, keep_first_col=False):
  """
  Shift a sample by some amount and prepend/append samples from another sample

  :param sample_orig: Original sample you wish to shift
  :param sample_add: Sample that you want to draw from for prepending/appending
  :param num_shift: Number of readings that you want to shift by. Positive to shift right, negative to shift left
  :param keep_first_col: True to keep first column readings (timestamps). False to shift first column readings.
  :raise ValueError: if abs(num_shift) is greater than the length of either sample_orig or sample_add
  :return: new Numpy sample
  """
  # Make sure that the shifted amount is not more than the sample length
  if (abs(num_shift) > sample_orig.shape[0]) or (abs(num_shift) > sample_add.shape[0]):
    raise ValueError("Shift amount is greater than one of the sample lengths")

  # Create a new sample the same shape as our sample
  sample_aug = np.zeros(sample_orig.shape)

  # Determine if we shift left or right
  if num_shift < 0:

    # Shift sample to the left by some amount and append start of the given add sample
    num_shift = abs(num_shift)
    sample_aug[0:(sample_orig.shape[0] - num_shift), :] = sample_orig[num_shift:, :]
    sample_aug[(sample_orig.shape[0] - num_shift):, :] = sample_add[0:num_shift, :]

  else:

    # Shift right by some amount and prepend the end of the given sample
    sample_aug[num_shift:, :] = sample_orig[0:(sample_orig.shape[0] - num_shift), :]
    sample_aug[:num_shift, :] = sample_add[(sample_add.shape[0] - num_shift):, :]

  # Copy over first column if requested
  if keep_first_col:
    sample_aug[:, 0] = sample_orig[:, 0]

  return sample_aug

In [None]:
def write_sample_csv(sample, header, label, path, debug=False):
  """
  Write a given sample to a CSV file.

  :param sample: Numpy array of the sample to write
  :param header: Names of the different channels in the sample
  :parm label: String containing the label of the sample
  :param path: Location of directory where to create the CSV file
  """
  # Generate unique filename (last 12 characters from uuid4 method) and
  # make sure it does not conflict with any existing filenames
  while True:
    uid = str(uuid.uuid4())[-12:]
    filename = label + "." + uid + ".csv"
    if not os.path.exists(os.path.join(path, filename)):
      break

  # Save new augmented sample to file
  file_path = os.path.join(path, filename)
  with open(file_path, 'w') as f:
    csv_writer = csv.writer(f, delimiter=',')
    csv_writer.writerow(header)
    csv_writer.writerows(sample)

  # Print out path to file
  if debug:
    print("Wrote:", file_path)

## Step 3: Do the augmentation

Shift samples over by some amount and append/prepend data from another sample to keep the same size.

In [None]:
### Delete output directory (if it exists) and recreate it
if os.path.exists(OUT_PATH):
  shutil.rmtree(OUT_PATH)
os.makedirs(OUT_PATH)

In [None]:
### Copy original dataset files to our output directory
counter = 0
for filename in os.listdir(DATASET_PATH):
  if not os.path.isdir(os.path.join(DATASET_PATH, filename)):
    shutil.copy(os.path.join(DATASET_PATH, filename), os.path.join(OUT_PATH, filename))
    counter += 1
print("Copied", counter, "files")

Copied 648 files


In [None]:
### Initialize our counters here in case we want to run the augmentation cells multiple times
new_sample_counter = 0
unknown_counter = 0

In [None]:
### Create new augmented samples for all but our "unknown" class
# Loop through all samples
for idx in range(X_all.shape[0]):

  # Choose random idle sample
  idle_idxs = np.where(np.array(labels) == CLASS_IDLE)[0]
  idle_idx = np.random.choice(idle_idxs)

  # Get original sample and idle sample
  sample_orig = X_all[idx]
  sample_add = X_all[idle_idx]

  # Get the label
  label = filenames[idx].split('.')[0]

  # Don't augment unknown samples (we'll do that later)
  if label == CLASS_UNKNOWN:
    continue

  # Calculate the amount to shift by
  num_shift = int(AUGMENT_SHIFT_PERCENT * sample_orig.shape[0])

  # Shift the sample to the right and write to file
  sample_aug = shift_and_add(sample_orig, sample_add, num_shift, keep_first_col=True)
  write_sample_csv(sample_aug, channel_names, label, OUT_PATH)

  # Shift the sample to the left and write to file
  sample_aug = shift_and_add(sample_orig, sample_add, (-1 * num_shift), keep_first_col=True)
  write_sample_csv(sample_aug, channel_names, label, OUT_PATH)

  # Count the number of new files we created
  new_sample_counter += 2

# Show the number of new files we created
print("Created", new_sample_counter, "new samples")

Created 5170 new samples


In [None]:
### Create new unknown samples by putting two halves of different samples together

# Estimate the number of new augmented samples we added per class in the last cell
num_aug_samples_per_class = int(new_sample_counter / (len(classes) - 1))

# Loop through all samples
for idx in range(X_all.shape[0]):

  # To maintain a balanced dataset, don't add more samples than other classes
  if unknown_counter >= num_aug_samples_per_class:
    break

  # Get original sample and random sample
  sample_orig = X_all[idx]
  sample_add = X_all[np.random.randint(num_samples)]

  # Get the label
  label = filenames[idx].split('.')[0]

  # Don't augment unknown samples (we don't need more)
  if label != CLASS_UNKNOWN:
    continue

  # However, our new sample should be classified as "unknown"
  label = CLASS_UNKNOWN

  # Calculate the amount to shift by
  num_shift = int(UNKNOWN_SHIFT_PERCENT * sample_orig.shape[0])

  # Shift the sample to the right and write to file
  sample_aug = shift_and_add(sample_orig, sample_add, num_shift, keep_first_col=True)
  write_sample_csv(sample_aug, channel_names, label, OUT_PATH)

  # Shift the sample to the left and write to file
  sample_aug = shift_and_add(sample_orig, sample_add, (-1 * num_shift), keep_first_col=True)
  write_sample_csv(sample_aug, channel_names, label, OUT_PATH)

  # Count the number of new files we created
  unknown_counter += 2

# Show the number of new files we created
print("Created", unknown_counter, "new samples")

Created 1292 new samples


In [None]:
### Zip output directory
%cd {OUT_PATH}
!zip -FS -r -q {OUT_ZIP} *
%cd {HOME_PATH}

/content/out
/content
