
##Setup


You will need to make a copy of this Colab notebook in your Google Drive before you can edit the homework files. You can do so with **File &rarr; Save a copy in Drive**.

Please complete the code with **TODO** marks




---



First, we import and download the Omniglot dataset

In [8]:
import os
from google_drive_downloader import GoogleDriveDownloader as gdd

# Need to download the Omniglot dataset -- DON'T MODIFY THIS CELL
if not os.path.isdir('./omniglot_resized'):
    gdd.download_file_from_google_drive(file_id='1iaSFXIYC3AB8q9K_M-oVMa4pmB7yKMtI',
                                        dest_path='./omniglot_resized.zip',
                                        unzip=True)

assert os.path.isdir('./omniglot_resized')

In [9]:
""" Utility functions. """
## NOTE: You do not need to modify this block but you will need to use it.
import numpy as np
import os
import random
import tensorflow as tf

## Loss utilities
def cross_entropy_loss(pred, label, k_shot):
    return tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=pred, labels=tf.stop_gradient(label)) / k_shot)

def accuracy(labels, predictions):
  return tf.reduce_mean(tf.cast(tf.equal(labels, predictions), dtype=tf.float32))


Omniglot data loading

In [10]:
"""Data loading scripts"""
## NOTE: You do not need to modify this block but you will need to use it.
import numpy as np
import os
import random
import tensorflow as tf
from scipy import misc
import imageio

def get_images(paths, labels, n_samples=None, shuffle=True):
  """
  Takes a set of character folders and labels and returns paths to image files
  paired with labels.
  Args:
    paths: A list of character folders
    labels: List or numpy array of same length as paths
    n_samples: Number of images to retrieve per character
  Returns:
    List of (label, image_path) tuples
  """
  if n_samples is not None:
    sampler = lambda x: random.sample(x, n_samples)
  else:
    sampler = lambda x: x
  images_labels = [(i, os.path.join(path, image))
           for i, path in zip(labels, paths)
           for image in sampler(os.listdir(path))]
  if shuffle:
    random.shuffle(images_labels)
  return images_labels


def image_file_to_array(filename, dim_input):
  """
  Takes an image path and returns numpy array
  Args:
    filename: Image filename
    dim_input: Flattened shape of image
  Returns:
    1 channel image
  """
  image = imageio.v2.imread(filename)
  image = image.reshape([dim_input])
  image = image.astype(np.float32) / 255.0
  image = 1.0 - image
  return image


class DataGenerator(object):
  """
  Data Generator capable of generating batches of Omniglot data.
  A "class" is considered a class of omniglot digits.
  """

  def __init__(self, num_classes, num_samples_per_class, num_meta_test_classes, num_meta_test_samples_per_class, config={}):
    """
    Args:
      num_classes: Number of classes for classification (K-way)
      num_samples_per_class: num samples to generate per class in one batch
      num_meta_test_classes: Number of classes for classification (K-way) at meta-test time
      num_meta_test_samples_per_class: num samples to generate per class in one batch at meta-test time
      batch_size: size of meta batch size (e.g. number of functions)
    """
    self.num_samples_per_class = num_samples_per_class
    self.num_classes = num_classes
    self.num_meta_test_samples_per_class = num_meta_test_samples_per_class
    self.num_meta_test_classes = num_meta_test_classes

    data_folder = config.get('data_folder', './omniglot_resized')
    self.img_size = config.get('img_size', (28, 28))

    self.dim_input = np.prod(self.img_size)
    self.dim_output = self.num_classes

    character_folders = [os.path.join(data_folder, family, character)
               for family in os.listdir(data_folder)
               if os.path.isdir(os.path.join(data_folder, family))
               for character in os.listdir(os.path.join(data_folder, family))
               if os.path.isdir(os.path.join(data_folder, family, character))]

    random.seed(123)
    random.shuffle(character_folders)
    num_val = 100
    num_train = 1100
    self.metatrain_character_folders = character_folders[: num_train]
    self.metaval_character_folders = character_folders[
      num_train:num_train + num_val]
    self.metatest_character_folders = character_folders[
      num_train + num_val:]

  def sample_batch(self, batch_type, batch_size, shuffle=True, swap=False):
    """
    Samples a batch for training, validation, or testing
    Args:
      batch_type: meta_train/meta_val/meta_test
      shuffle: randomly shuffle classes or not
      swap: swap number of classes (N) and number of samples per class (K) or not
    Returns:
      A a tuple of (1) Image batch and (2) Label batch where
      image batch has shape [B, N, K, 784] and label batch has shape [B, N, K, N] if swap is False
      where B is batch size, K is number of samples per class, N is number of classes
    """
    if batch_type == "meta_train":
      folders = self.metatrain_character_folders
      num_classes = self.num_classes
      num_samples_per_class = self.num_samples_per_class
    elif batch_type == "meta_val":
      folders = self.metaval_character_folders
      num_classes = self.num_classes
      num_samples_per_class = self.num_samples_per_class
    else:
      folders = self.metatest_character_folders
      num_classes = self.num_meta_test_classes
      num_samples_per_class = self.num_meta_test_samples_per_class
    all_image_batches, all_label_batches = [], []
    for i in range(batch_size):
      sampled_character_folders = random.sample(
        folders, num_classes)
      labels_and_images = get_images(sampled_character_folders, range(
        num_classes), n_samples=num_samples_per_class, shuffle=False)
      labels = [li[0] for li in labels_and_images]
      images = [image_file_to_array(
        li[1], self.dim_input) for li in labels_and_images]
      images = np.stack(images)
      labels = np.array(labels).astype(np.int32)
      labels = np.reshape(
        labels, (num_classes, num_samples_per_class))
      labels = np.eye(num_classes, dtype=np.float32)[labels]
      images = np.reshape(
        images, (num_classes, num_samples_per_class, -1))

      batch = np.concatenate([labels, images], 2)
      if shuffle:
        for p in range(num_samples_per_class):
          np.random.shuffle(batch[:, p])

      labels = batch[:, :, :num_classes]
      images = batch[:, :, num_classes:]

      if swap:
        labels = np.swapaxes(labels, 0, 1)
        images = np.swapaxes(images, 0, 1)

      all_image_batches.append(images)
      all_label_batches.append(labels)
    all_image_batches = np.stack(all_image_batches)
    all_label_batches = np.stack(all_label_batches)
    return all_image_batches, all_label_batches

Prototypical neural network. Please complete the loss function at **TODO** marks

In [11]:
# models/ProtoNet
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers

class ProtoNet(tf.keras.Model):

  def __init__(self, num_filters, latent_dim):
    super(ProtoNet, self).__init__()
    self.num_filters = num_filters
    self.latent_dim = latent_dim
    num_filter_list = self.num_filters + [latent_dim]
    self.convs = []
    for i, num_filter in enumerate(num_filter_list):
      block_parts = [
        layers.Conv2D(
          filters=num_filter,
          kernel_size=3,
          padding='SAME',
          activation='linear'),
      ]

      block_parts += [layers.BatchNormalization()]
      block_parts += [layers.Activation('relu')]
      block_parts += [layers.MaxPool2D()]
      block = tf.keras.Sequential(block_parts, name='conv_block_%d' % i)
      self.__setattr__("conv%d" % i, block)
      self.convs.append(block)
    self.flatten = tf.keras.layers.Flatten()

  def call(self, inp):
    out = inp
    for conv in self.convs:
      out = conv(out)
    out = self.flatten(out)
    return out

def ProtoLoss(x_latent, q_latent, labels_onehot, num_classes, num_support, num_queries):
  """
    calculates the prototype network loss using the latent representation of x
    and the latent representation of the query set
    Args:
      x_latent: latent representation of supports with shape [N*S, D], where D is the latent dimension
      q_latent: latent representation of queries with shape [N*Q, D], where D is the latent dimension
      labels_onehot: one-hot encodings of the labels of the queries with shape [N, Q, N]
      num_classes: number of classes (N) for classification
      num_support: number of examples (S) in the support set
      num_queries: number of examples (Q) in the query set
    Returns:
      ce_loss: the cross entropy loss between the predicted labels and true labels
      acc: the accuracy of classification on the queries
  """
  #############################
  #### YOUR CODE GOES HERE ####

  # Reshaping latent representations of input to prepare for prototype calculation.
  x_latent_reshaped = tf.reshape(x_latent, [num_classes, num_support, -1])

  # Calculating the mean representation for each class (prototypes).
  prototypes = tf.reduce_mean(x_latent_reshaped, axis=1)

  # 1. Preparing prototypes for distance calculation by repeating them across the query examples.
  # Expand dimensions of prototypes tensor along the first axis
  prototypes_expanded = tf.expand_dims(prototypes, axis=0)
  # Calculate the number of times to replicate prototypes along the first axis
  # This will be the total number of classes multiplied by the number of queries
  num_replications = num_classes * num_queries
  # Tile the expanded prototypes tensor to match the desired shape
  # Tile along the first axis (axis=0) num_replications times.  Keep the other dimensions unchanged (1 and 1)
  prototypes_tiled = tf.tile(prototypes_expanded, (num_replications, 1, 1))

  # 2. Preparing query examples for distance calculation by repeating them across classes.
  q_latent_expanded = tf.expand_dims(q_latent, axis=1)
  # Calculate queries_tiled, considering the variables q_latent_expanded and num_classes
  queries_tiled = tf.tile(q_latent_expanded, (1, num_classes, 1))

  # 3. Calculating squared Euclidean distances between each class prototype (prototypes_tiled) and queries (queries_tiled).
  distance_squares = tf.square(prototypes_tiled - queries_tiled)
  distances = tf.reduce_mean(distance_squares, axis=2)

  # 4. TODO: Applying log softmax to negative distances to get log probabilities.
  log_softmax = tf.nn.log_softmax(-distances, axis=1)


  log_probs = tf.reshape(log_softmax, [num_classes, num_queries, -1])

  # 5. Calculating cross-entropy loss
  labels_onehot_multiplied = tf.multiply(labels_onehot, log_probs)
  summed_labels_onehot = tf.reduce_sum(labels_onehot_multiplied, axis=-1)
  ce_loss = -tf.reduce_mean(tf.reduce_sum(log_probs * labels_onehot, axis=[1, 2]))

  # 6. Calculating accuracy
  predicted_classes = tf.argmax(log_probs, axis=-1)
  true_classes = tf.argmax(labels_onehot, axis=-1)
  correct_predictions = tf.equal(predicted_classes, true_classes)
  accuracy = tf.reduce_mean(tf.cast(correct_predictions, tf.float32))

  #############################
  return ce_loss, accuracy

Main run code for the prototypical network. Please complete the code with **TODO** marks

In [12]:
# run_ProtoNet
from PIL import Image
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
import os
import glob
import matplotlib.pyplot as plt

def proto_net_train_step(model, optim, x, q, labels_ph):
  num_classes, num_support, im_height, im_width, channels = x.shape
  num_queries = q.shape[1]
  x = tf.reshape(x, [-1, im_height, im_width, channels])
  q = tf.reshape(q, [-1, im_height, im_width, channels])

  with tf.GradientTape() as tape:
    x_latent = model(x)
    q_latent = model(q)
    ce_loss, acc = ProtoLoss(x_latent, q_latent, labels_ph, num_classes, num_support, num_queries)

  gradients = tape.gradient(ce_loss, model.trainable_variables)
  optim.apply_gradients(zip(gradients, model.trainable_variables))
  return ce_loss, acc

def proto_net_eval(model, x, q, labels_ph):
  num_classes, num_support, im_height, im_width, channels = x.shape
  num_queries = q.shape[1]
  x = tf.reshape(x, [-1, im_height, im_width, channels])
  q = tf.reshape(q, [-1, im_height, im_width, channels])

  x_latent = model(x)
  q_latent = model(q)
  ce_loss, acc = ProtoLoss(x_latent, q_latent, labels_ph, num_classes, num_support, num_queries)

  return ce_loss, acc

def run_protonet(data_path='./omniglot_resized', n_way=20, k_shot=1, n_query=5, n_meta_test_way=20, k_meta_test_shot=5, n_meta_test_query=5):
  n_epochs = 20
  n_episodes = 100

  im_width, im_height, channels = 28, 28, 1
  num_filters = 32
  latent_dim = 16
  num_conv_layers = 3
  n_meta_test_episodes = 1000

  model = ProtoNet([num_filters]*num_conv_layers, latent_dim)
  optimizer = tf.keras.optimizers.Adam()

    # call DataGenerator with k_shot+n_query samples per class
  data_generator = DataGenerator(n_way, k_shot+n_query, n_meta_test_way, k_meta_test_shot+n_meta_test_query)
  for ep in range(n_epochs):
    for epi in range(n_episodes):
      #############################
      #### YOUR CODE GOES HERE ####

      # sample a batch of validation data and partition it into
      # support and query sets
      image_batches, label_batches = data_generator.sample_batch('meta_val', 1, shuffle=False)
      support = image_batches[:, :, :k_shot, :].reshape(n_way, k_shot, im_height, im_width, channels)
      query = image_batches[:, :, k_shot:, :].reshape(n_way, n_query, im_height, im_width, channels)
      labels = label_batches[:, :, k_shot:, :].reshape(n_way, n_query, n_way)

      #############################
      ls, ac = proto_net_train_step(model, optimizer, x=support, q=query, labels_ph=labels)
      if (epi+1) % 50 == 0:
        #############################
        #### YOUR CODE GOES HERE ####

        # sample a batch of validation data and partition it into
        # support and query sets

        image_batches, label_batches = data_generator.sample_batch('meta_val', 1, shuffle=False)
        support = image_batches[:, :, :k_shot, :].reshape(n_way, k_shot, im_height, im_width, channels)
        # TODO: Calculate query and labels (similarly to support set)
        query = image_batches[:, :, k_shot:, :].reshape(n_way, n_query, im_height, im_width, channels)
        labels = label_batches[:, :, k_shot:, :].reshape(n_way, n_query, n_way)


        #############################
        val_ls, val_ac = proto_net_eval(model, x=support, q=query, labels_ph=labels)
        print('[epoch {}/{}, episode {}/{}] => meta-training loss: {:.5f}, meta-training acc: {:.5f}, meta-val loss: {:.5f}, meta-val acc: {:.5f}'.format(ep+1,
                                                                    n_epochs,
                                                                    epi+1,
                                                                    n_episodes,
                                                                    ls,
                                                                    ac,
                                                                    val_ls,
                                                                    val_ac))

  print('Testing...')
  meta_test_accuracies = []
  for epi in range(n_meta_test_episodes):
    #############################
    #### YOUR CODE GOES HERE ####

    # sample a batch of test data and partition it into
    # support and query sets

    image_batches, label_batches = data_generator.sample_batch('meta_test', 1, shuffle=False)
    support = image_batches[:, :, :k_meta_test_shot, :].reshape(n_meta_test_way, k_meta_test_shot, im_height, im_width, channels)
    # TODO: Calculate query and labels (similarly to support set)
    query = image_batches[:, :, k_meta_test_shot:, :].reshape(n_meta_test_way, n_meta_test_query, im_height, im_width, channels)
    labels = label_batches[:, :, k_meta_test_shot:, :].reshape(n_meta_test_way, n_meta_test_query, n_meta_test_way)


    #############################
    ls, ac = proto_net_eval(model, x=support, q=query, labels_ph=labels)
    meta_test_accuracies.append(ac)
    if (epi+1) % 50 == 0:
      print('[meta-test episode {}/{}] => loss: {:.5f}, acc: {:.5f}'.format(epi+1, n_meta_test_episodes, ls, ac))
  avg_acc = np.mean(meta_test_accuracies)
  stds = np.std(meta_test_accuracies)
  print('Average Meta-Test Accuracy: {:.5f}, Meta-Test Accuracy Std: {:.5f}'.format(avg_acc, stds))

If the code is correct, a meta-val acc=0.20 is obtained during the first epochs. This accuracy should increase after some training epochs.

In [None]:
run_protonet('./omniglot_resized/', n_way=5, k_shot=1, n_query=5, n_meta_test_way=5, k_meta_test_shot=4, n_meta_test_query=4)

[epoch 1/20, episode 50/100] => meta-training loss: 8.04719, meta-training acc: 0.20000, meta-val loss: 8.04719, meta-val acc: 0.20000
[epoch 1/20, episode 100/100] => meta-training loss: 8.04719, meta-training acc: 0.20000, meta-val loss: 8.04719, meta-val acc: 0.20000
[epoch 2/20, episode 50/100] => meta-training loss: 8.04719, meta-training acc: 0.20000, meta-val loss: 8.04719, meta-val acc: 0.64000
[epoch 2/20, episode 100/100] => meta-training loss: 8.03967, meta-training acc: 0.52000, meta-val loss: 8.04575, meta-val acc: 0.44000
[epoch 3/20, episode 50/100] => meta-training loss: 4.88564, meta-training acc: 0.72000, meta-val loss: 4.90744, meta-val acc: 0.64000
[epoch 3/20, episode 100/100] => meta-training loss: 4.24983, meta-training acc: 0.68000, meta-val loss: 6.57624, meta-val acc: 0.40000
[epoch 4/20, episode 50/100] => meta-training loss: 4.08805, meta-training acc: 0.72000, meta-val loss: 4.95958, meta-val acc: 0.52000
[epoch 4/20, episode 100/100] => meta-training loss: