In [1]:
import numpy as np
import tensorflow as tf

a_range = [0.1, 5]
b_range = [0, 2*np.pi]

n_task = 100

a = np.random.uniform(low=a_range[0], high=a_range[1], size=n_task)
b = np.random.uniform(low=b_range[0], high=b_range[1], size=n_task)

x_range = [-5, 5]
n_sample = 10

x_data = []
y_data = []
for i in range(n_task):
  x = np.random.uniform(low=x_range[0], high=x_range[1], size=n_sample)
  y = a[i] * np.sin(x + b[i])
  x = tf.reshape(x, [10, 1])
  x_data.append(x)
  y_data.append(y)

In [2]:
# Build the regression model
model = tf.keras.Sequential([tf.keras.Input(shape=(1, )),
                             tf.keras.layers.Dense(40, activation='relu'),
                             tf.keras.layers.Dense(40, activation='relu'),
                             tf.keras.layers.Dense(1)
])


model_optimizer = tf.optimizers.Adam(0.01)

In [3]:
# Get training task and testing task
n_train_task = 70
n_test_task = 30

x_train = x_data[:n_train_task]
y_train = y_data[:n_train_task]

x_test = x_data[n_train_task:]
y_test = y_data[n_train_task:]

In [33]:
# Use first order MAML to train the model
epochs = 100

for i in range(epochs):
  total_loss = 0
  for j in range(n_train_task):

    # Make a deep copy for the initial variables
    meta_variables = model.trainable_variables.copy()
    for k in range(len(meta_variables)):
      meta_variables[k] = tf.constant(meta_variables[k])
    #print(meta_variables[0])
  
    inner_optimizer = tf.optimizers.Adam(0.01)

    # Update 2 times for the model to calculate gradients
    for m in range(2):
      with tf.GradientTape() as tape:
        y_pred = model(x_train[j])
        loss = tf.keras.losses.mean_squared_error(y_train[j], y_pred)

      gradients = tape.gradient(loss, model.trainable_variables)
      inner_optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    
    y_pred = model(x_train[j])
    total_loss += tf.reduce_mean(tf.keras.losses.mean_squared_error(y_train[j], y_pred))
    # Normalize graidents
    n_parameters = len(meta_variables)
    for k in range(n_parameters):
      model.trainable_variables[k] = meta_variables[k] - gradients[k] * 10

    #model_optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  print('Currrent Epoch is '+str(i) + '; Total loss is ' + str(total_loss.numpy()/70))

Currrent Epoch is 0; Total loss is 4.694234793526785
Currrent Epoch is 1; Total loss is 4.692397635323661
Currrent Epoch is 2; Total loss is 4.692112077985491
Currrent Epoch is 3; Total loss is 4.692111642020089
Currrent Epoch is 4; Total loss is 4.692111642020089
Currrent Epoch is 5; Total loss is 4.692111642020089
Currrent Epoch is 6; Total loss is 4.692111642020089
Currrent Epoch is 7; Total loss is 4.692111642020089


KeyboardInterrupt: ignored

In [34]:
# Calculate test error
test_epoch = 10
total_test_loss = 0

meta_variables = model.trainable_variables.copy()
for k in range(len(meta_variables)):
  meta_variables[k] = tf.constant(meta_variables[k])

for j in range(n_test_task):
  inner_optimizer = tf.optimizers.Adam(0.01)

  # Update 20 times for the model to calculate test loss
  for m in range(test_epoch):
    with tf.GradientTape() as tape:
      y_pred = model(x_test[j])
      loss = tf.keras.losses.mean_squared_error(y_test[j], y_pred)

    gradients = tape.gradient(loss, model.trainable_variables)
    inner_optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  
  y_pred = model(x_test[j])
  total_test_loss += tf.reduce_mean(tf.keras.losses.mean_squared_error(y_test[j], y_pred))

  n_parameters = len(meta_variables)
  for k in range(n_parameters):
    # gradients[k] = tf.math.l2_normalize(gradients[k])
    model.trainable_variables[k] = meta_variables[k]

print(total_test_loss/30)


tf.Tensor(3.9517038, shape=(), dtype=float32)


In [35]:
# Build the regression model
model_2 = tf.keras.Sequential([tf.keras.Input(shape=(1, )),
                             tf.keras.layers.Dense(40, activation='relu'),
                             tf.keras.layers.Dense(40, activation='relu'),
                             tf.keras.layers.Dense(1)
])

model_2_optimizer = tf.optimizers.Adam(0.01)
# Calculate test error
test_epoch = 10
total_test_loss = 0

meta_variables = model_2.trainable_variables.copy()
for k in range(len(meta_variables)):
  meta_variables[k] = tf.constant(meta_variables[k])

for j in range(n_test_task):
  
  inner_optimizer = tf.optimizers.Adam(0.01)

  # Update 20 times for the model to calculate test loss
  for m in range(test_epoch):
    with tf.GradientTape() as tape:
      y_pred = model_2(x_test[j])
      loss = tf.keras.losses.mean_squared_error(y_test[j], y_pred)

    gradients = tape.gradient(loss, model_2.trainable_variables)
    inner_optimizer.apply_gradients(zip(gradients, model_2.trainable_variables))
  
  y_pred = model_2(x_test[j])
  total_test_loss += tf.reduce_mean(tf.keras.losses.mean_squared_error(y_test[j], y_pred))

  n_parameters = len(meta_variables)
  for k in range(n_parameters):
    # gradients[k] = tf.math.l2_normalize(gradients[k])
    model_2.trainable_variables[k] = meta_variables[k]

print(total_test_loss/30)


tf.Tensor(3.811247, shape=(), dtype=float32)
