In [1]:
import sys
import time
import random
import numpy as np
import tensorflow as tf

import matplotlib.pyplot as plt
import tensorflow.keras as keras
from tensorflow.keras import Model

In [2]:
# Reproduction
np.random.seed(333)

# toy dataset

In [3]:
class SinusoidGenerator():
    def __init__(self, K=10, amplitude=None, phase=None):
        self.K = K
        self.sampled_points = None
        self.amplitude = amplitude if amplitude else np.random.uniform(0.1, 5.0)
        self.phase = phase if amplitude else np.random.uniform(0, np.pi)
        self.x = self._sample_x()
        
    def _sample_x(self):
        return np.random.uniform(-5, 5, self.K)
    
    def f(self, x):
        return self.amplitude * np.sin(x - self.phase)

    def batch(self, x = None, force_new=False):
        if x is None:
            if force_new:
                x = self._sample_x()
            else:
                x = self.x
        y = self.f(x)
        y = y.astype(np.float32)
        x = x.astype(np.float32)
        return x[:, None], y[:, None]
    
    def equally_spaced_samples(self, K=None):
        if K is None:
            K = self.K
        return self.batch(x=np.linspace(-5, 5, K))


In [4]:
def generate_dataset(K, train_size=20000, test_size=10):
    def _generate_dataset(size):
        return [SinusoidGenerator(K=K) for _ in range(size)]
    return _generate_dataset(train_size), _generate_dataset(test_size) 

In [5]:
class SineModel(keras.Model):
    def __init__(self):
        super().__init__()
        self.hidden1 = keras.layers.Dense(40, input_shape=(1,))
        self.hidden2 = keras.layers.Dense(40)
        self.out = keras.layers.Dense(1)
        
    def forward(self, x):
        x = keras.activations.relu(self.hidden1(x))
        x = keras.activations.relu(self.hidden2(x))
        x = self.out(x)
        return x

# Toy MAML

In [6]:
def copy_model(model, x):
    copied_model = SineModel()
    
    # If we don't run this step the weights are not "initialized"
    # and the gradients will not be computed.
    copied_model.forward(tf.convert_to_tensor(x))
    copied_model.set_weights(model.get_weights())
    return copied_model

In [7]:
def loss_function(pred_y, y):
    return tf.reduce_mean(keras.losses.mean_squared_error(y, pred_y))

def np_to_tensor(list_of_numpy_objs):
    return (tf.convert_to_tensor(obj) for obj in list_of_numpy_objs)
    
def compute_loss(model, x, y, loss_fn=loss_function):
    logits = model.forward(x)
    mse = loss_fn(y, logits)
    return mse, logits

def compute_gradients(model, x, y, loss_fn=loss_function):
    with tf.GradientTape() as tape:
        loss, _ = compute_loss(model, x, y, loss_fn)
    return tape.gradient(loss, model.trainable_variables), loss

def apply_gradients(optimizer, gradients, variables):
    optimizer.apply_gradients(zip(gradients, variables))
    
def train_batch(x, y, model, optimizer):
    tensor_x, tensor_y = np_to_tensor((x, y))
    gradients, loss = compute_gradients(model, tensor_x, tensor_y)
    apply_gradients(optimizer, gradients, model.trainable_variables)
    return loss

In [8]:
train_ds, test_ds = generate_dataset(K=10)

In [9]:
def train_maml(model, dataset, batch_size=1):
    lr_inner=0.01
    optimizer = keras.optimizers.Adam()
    
    """ trainable learning rate """
    llr = np.tile(lr_inner, (len(model.layers))).astype(np.float32)
    llr = tf.Variable(llr, name='learned_lr')

    
    for _ in range(20):
        total_loss = 0
        losses = []
        
        for i, t in enumerate(random.sample(dataset, len(dataset))):
            x, y = np_to_tensor(t.batch())
            model.forward(x)  # run forward pass to initialize weights
            
            with tf.GradientTape() as test_tape:
                with tf.GradientTape() as train_tape:
                    train_loss, _ = compute_loss(model, x, y)
                
                gradients = train_tape.gradient(train_loss, model.trainable_variables)
                model_copy = copy_model(model, x)
                
                # TASK LEARNING
                k = 0
                for j in range(len(model_copy.layers)):
                    lr = llr[int(k/2)]
                    model_copy.layers[j].kernel = tf.subtract(
                        model.layers[j].kernel, tf.multiply(lr, gradients[k+0]))
                    model_copy.layers[j].bias = tf.subtract(
                        model.layers[j].bias,   tf.multiply(lr, gradients[k+1]))
                    k += 2
                
                # Loss for Meta Learning
                test_loss, logits = compute_loss(model_copy, x, y)
            
            theta_meta = model.trainable_variables + [llr]
            gradients = test_tape.gradient(test_loss, theta_meta)
            optimizer.apply_gradients(zip(gradients, theta_meta))
            
            total_loss += test_loss
            loss = total_loss / (i+1.0)
            losses.append(loss)

            if i % 1000 == 0:
                print('Step {}: loss = {}'.format(i, loss))
                print(llr.numpy())
        plt.plot(losses)
        plt.show()

In [10]:
maml = SineModel()
train_maml(maml, train_ds)

Step 0: loss = 2.6287810802459717
[0.01100003 0.01100003 0.01100003]


KeyboardInterrupt: 