In [None]:
import tensorflow as tf
from tensorflow.keras import layers

def conv1d_block(inp, cweight, bweight, activation=tf.nn.relu):
    """Custom 1D convolutional block for MAML."""
    conv_output = tf.nn.conv1d(input=inp, filters=cweight, stride=1, padding='SAME') + bweight
    return activation(conv_output)

class CustomConv1D(tf.keras.layers.Layer):
    def __init__(self, filter_size, num_filters):
        super(CustomConv1D, self).__init__()
        self.num_filters = num_filters
        self.filter_size = filter_size

        weight_initializer = tf.keras.initializers.GlorotUniform()
        self.cweight = tf.Variable(weight_initializer(shape=[self.filter_size, 1, self.num_filters]), name='cweight')
        self.bweight = tf.Variable(tf.zeros([self.num_filters]), name='bweight')

    def call(self, inp):
        return conv1d_block(inp, self.cweight, self.bweight)

class MAMLCNNClassifier(tf.keras.Model):
    def __init__(self, **kargs):
        super(MAMLCNNClassifier, self).__init__(name=kargs['model_name'])
        self.embedding = layers.Embedding(input_dim=kargs['vocab_size'], output_dim=kargs['embedding_size'])

        # Custom Conv1D layers for MAML
        self.conv_list = [CustomConv1D(kernel_size, kargs['num_filters']) for kernel_size in [3, 4, 5]]

        self.pooling = layers.GlobalMaxPooling1D()
        self.dropout = layers.Dropout(kargs['dropout_rate'])
        self.fc = layers.Dense(units=kargs['output_dimension'], activation='sigmoid')

    def call(self, x):
        x = self.embedding(x)
        x = self.dropout(x)
        # Ensure x is correctly reshaped if necessary before this step
        x = tf.concat([self.pooling(conv(x)) for conv in self.conv_list], axis=1)
        x = self.fc(x)
        return x




In [None]:
import random
import numpy as np
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

class TextDataGenerator:
    def __init__(self, texts, labels, categories, num_samples_per_class, max_len=100, num_words=10000):
        self.texts = texts
        self.labels = labels
        self.categories = categories  # A list of unique categories
        self.num_samples_per_class = num_samples_per_class
        self.max_len = max_len
        self.tokenizer = Tokenizer(num_words=num_words)
        self._create_tokenizer()
        self._create_category_map()

    def _create_tokenizer(self):
        self.tokenizer.fit_on_texts(self.texts)

    def _create_category_map(self):
        self.category_map = {}
        for category in self.categories:
            self.category_map[category] = [(text, label) for text, label, cat in zip(self.texts, self.labels, self.categories) if cat == category]

    def sample_batch(self, batch_size, shuffle=True):
        batch_texts, batch_labels = [], []
        for _ in range(batch_size):
            sampled_categories = random.sample(self.categories, self.num_samples_per_class)
            for category in sampled_categories:
                samples = random.sample(self.category_map[category], 1)
                for text, label in samples:
                    batch_texts.append(text)
                    batch_labels.append(label)

        # Tokenize and pad text sequences
        sequences = self.tokenizer.texts_to_sequences(batch_texts)
        padded_sequences = pad_sequences(sequences, maxlen=self.max_len, padding='post')

        if shuffle:
            combined = list(zip(padded_sequences, batch_labels))
            random.shuffle(combined)
            padded_sequences, batch_labels = zip(*combined)

        return np.array(padded_sequences), np.array(batch_labels)



In [None]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
from functools import partial

class MAML(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, num_filters, output_dimension, dropout_rate, num_inner_updates=5, inner_update_lr=0.4, learn_inner_update_lr=False):
        super(MAML, self).__init__()
        self.inner_update_lr = inner_update_lr
        self.loss_func = tf.keras.losses.BinaryCrossentropy()  # or CategoricalCrossentropy for multi-class
        self.learn_inner_update_lr = learn_inner_update_lr

        # Custom CNN for text data
        self.cnn_model = MAMLCNNClassifier(model_name='maml_cnn', vocab_size=vocab_size, embedding_size=embedding_size, num_filters=num_filters, output_dimension=output_dimension, dropout_rate=dropout_rate)

        if self.learn_inner_update_lr:
            self.inner_update_lr_dict = {}
            for layer in self.cnn_model.layers:
                self.inner_update_lr_dict[layer.name] = [tf.Variable(self.inner_update_lr, name=f'inner_update_lr_{layer.name}_{j}') for j in range(num_inner_updates)]

    def call(self, inp, meta_batch_size=25, num_inner_updates=1):
        def task_inner_loop(inp, reuse=True, meta_batch_size=25, num_inner_updates=1):
            input_tr, input_ts, label_tr, label_ts = inp

            # weights corresponds to the initial weights in MAML (i.e. the meta-parameters)
            weights = {layer.name: layer.trainable_weights for layer in self.cnn_model.layers}

            task_outputs_ts, task_losses_ts, task_accuracies_ts = [], [], []

            for i in range(num_inner_updates):
                with tf.GradientTape(persistent=True) as inner_tape:
                    task_output_tr_pre = self.cnn_model(input_tr)
                    task_loss_tr_pre = self.loss_func(label_tr, task_output_tr_pre)

                grads = inner_tape.gradient(task_loss_tr_pre, self.cnn_model.trainable_weights)
                if self.learn_inner_update_lr:
                    updated_weights = [weight - self.inner_update_lr_dict[weight.name][i] * grad for weight, grad in zip(self.cnn_model.trainable_weights, grads)]
                else:
                    updated_weights = [weight - self.inner_update_lr * grad for weight, grad in zip(self.cnn_model.trainable_weights, grads)]

                # Manual weight update
                for layer, new_weights in zip(self.cnn_model.layers, updated_weights):
                    layer.set_weights(new_weights)

                # Testing on the test set
                task_output_ts = self.cnn_model(input_ts)
                task_loss_ts = self.loss_func(label_ts, task_output_ts)
                task_outputs_ts.append(task_output_ts)
                task_losses_ts.append(task_loss_ts)

            # Compute accuracies
            task_accuracy_tr_pre = tf.keras.metrics.binary_accuracy(label_tr, tf.nn.softmax(task_output_tr_pre))  # or categorical_accuracy
            task_accuracies_ts = [tf.keras.metrics.binary_accuracy(label_ts, tf.nn.softmax(output)) for output in task_outputs_ts]

            task_output = [task_output_tr_pre, task_outputs_ts, task_loss_tr_pre, task_losses_ts, task_accuracy_tr_pre, task_accuracies_ts]

            return task_output

        input_tr, input_ts, label_tr, label_ts = inp

        # Parallel processing for each task in the meta-batch
        out_dtype = [tf.float32, [tf.float32] * num_inner_updates, tf.float32, [tf.float32] * num_inner_updates, tf.float32, [tf.float32] * num_inner_updates]
        task_inner_loop_partial = partial(task_inner_loop, meta_batch_size=meta_batch_size, num_inner_updates=num_inner_updates)

        result = tf.map_fn(task_inner_loop_partial, elems=(input_tr, input_ts, label_tr, label_ts), dtype=out_dtype, parallel_iterations=meta_batch_size)

        return result

In [None]:
def outer_train_step(inp, model, optimizer, meta_batch_size=25, num_inner_updates=1):
    with tf.GradientTape(persistent=False) as outer_tape:
        result = model(inp, meta_batch_size=meta_batch_size, num_inner_updates=num_inner_updates)

        # Unpack the output
        _, _, losses_tr_pre, losses_ts, _, accuracies_ts = result
        total_losses_ts = [tf.reduce_mean(loss_ts) for loss_ts in losses_ts]

    gradients = outer_tape.gradient(total_losses_ts[-1], model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # Calculate average pre-update loss and post-update accuracy
    total_loss_tr_pre = tf.reduce_mean(losses_tr_pre)
    total_accuracy_ts = [tf.reduce_mean(accuracy_ts) for accuracy_ts in accuracies_ts]

    return total_loss_tr_pre, total_losses_ts, total_accuracy_ts

def outer_eval_step(inp, model, meta_batch_size=25, num_inner_updates=1):
    result = model(inp, meta_batch_size=meta_batch_size, num_inner_updates=num_inner_updates)

    # Unpack the output
    _, _, losses_tr_pre, losses_ts, _, accuracies_ts = result

    total_loss_tr_pre = tf.reduce_mean(losses_tr_pre)
    total_losses_ts = [tf.reduce_mean(loss_ts) for loss_ts in losses_ts]
    total_accuracy_ts = [tf.reduce_mean(accuracy_ts) for accuracy_ts in accuracies_ts]

    return total_loss_tr_pre, total_losses_ts, total_accuracy_ts

def meta_train_fn(model, exp_string, data_generator, n_way, meta_train_iterations, meta_batch_size, log, logdir, k_shot, num_inner_updates, meta_lr):
    optimizer = tf.keras.optimizers.Adam(learning_rate=meta_lr)

    for itr in range(meta_train_iterations):
        # Sample a batch of training data
        batch_texts, batch_labels = data_generator.sample_batch(batch_size=meta_batch_size, shuffle=True)

        # Prepare the input for the model
        inp = (batch_texts, batch_labels)  # Adjust as per your data format

        total_loss_tr_pre, total_losses_ts, total_accuracy_ts = outer_train_step(inp, model, optimizer, meta_batch_size=meta_batch_size, num_inner_updates=num_inner_updates)

        if itr % 10 == 0:  # Print every 10 iterations
            print(f"Iteration {itr}: Pre-update loss: {total_loss_tr_pre}, Post-update accuracy: {total_accuracy_ts[-1]}")

        if itr % 100 == 0:  # Save model every 100 iterations
            model_save_path = f"{logdir}/{exp_string}/model_{itr}"
            model.save_weights(model_save_path)
            print(f"Saved model to {model_save_path}")

def meta_test_fn(model, data_generator, n_way, meta_batch_size, k_shot, num_inner_updates):
    # Meta-testing function
    num_test_tasks = 600  # Number of tasks to test on

    meta_test_accuracies = []

    for _ in range(num_test_tasks):
        # Sample a batch of test data
        batch_texts, batch_labels = data_generator.sample_batch(batch_size=meta_batch_size, shuffle=True)

        # Prepare the input for the model
        inp = (batch_texts, batch_labels)  # Adjust as per your data format

        _, _, _, _, _, accuracies_ts = outer_eval_step(inp, model, meta_batch_size=meta_batch_size, num_inner_updates=num_inner_updates)
        meta_test_accuracies.append(accuracies_ts[-1])

    # Compute and print the average accuracy across all tasks
    mean_accuracy = np.mean(meta_test_accuracies)
    print(f"Meta-Test Accuracy: {mean_accuracy}")


def run_maml(data_generator, n_way, k_shot, meta_batch_size, meta_lr,
             inner_update_lr, num_filters, num_inner_updates,
             learn_inner_update_lr, meta_train_iterations, logdir):
    # Instantiate the MAML model
    data_generator = TextDataGenerator()
    model = MAML(vocab_size=data_generator.vocab_size, embedding_size=128, num_filters=num_filters, output_dimension=n_way, dropout_rate=0.5, num_inner_updates=num_inner_updates, inner_update_lr=inner_update_lr, learn_inner_update_lr=learn_inner_update_lr)

    exp_string = f"maml_sentiment_nway_{n_way}_kshot_{k_shot}_metabatch_{meta_batch_size}"
    meta_train_fn(model, exp_string, data_generator, n_way, meta_train_iterations, meta_batch_size, True, logdir, k_shot, num_inner_updates, meta_lr)

