<a href="https://colab.research.google.com/github/MattyK-dev/MattyK-dev.github.io/blob/AI/OneShot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import wandb
import random
import numpy as np
import matplotlib.pyplot as plt


from wandb.keras import WandbCallback
from tensorflow.keras.layers import Dense, Flatten, Conv2D, Softmax
from tensorflow.keras import Model
from tensorflow.keras import backend as K

%load_ext tensorboard
tf.random.set_seed(1)

In [None]:
# load data

# (x_train, y_train), (x_test, y_test) = mnist.load_data()
# x_train = x_train.astype('float32')
# x_test = x_test.astype('float32')
# x_train /= 255
# x_test /= 255

def initialise():
  global train, validate, test, loss_object, optimizer
  global train_loss, train_accuracy, validate_loss, validate_accuracy, test_loss, test_accuracy
  global train_summary_writer, test_summary_writer, validate_summary_writer, batch_size

  # Constants:
  batch_size = 64
  shuffle_buffer_size = 1024

  # Split the dataset into labelled, unlabelled, validate and test
  train, validate, test = tfds.load('plant_village', split=['train[:10%]', 'train[90%:95%]', 'train[95%:]'], shuffle_files=True, as_supervised=True)

  # Shuffle dataset
  # Use `tf.data` to batch and shuffle the dataset:
  train = train.shuffle(shuffle_buffer_size).batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)
  validate = validate.shuffle(shuffle_buffer_size).batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)
  test = test.shuffle(shuffle_buffer_size).batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)

  # Choose an optimizer and loss function for training: 
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) 
  optimizer = tf.keras.optimizers.Adam()

  # Select metrics to measure the loss and the accuracy of the model. These metrics accumulate the values over epochs and then print the overall result.
  train_loss = tf.keras.metrics.SparseCategoricalCrossentropy(name='train_loss')
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

  validate_loss = tf.keras.metrics.SparseCategoricalCrossentropy(name='validate_loss')
  validate_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='validate_accuracy')

  test_loss = tf.keras.metrics.SparseCategoricalCrossentropy(name='test_loss')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')
  
  # Tensorboard configurations
  current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
  
  train_log_dir = 'logs/gradient_tape/' + current_time + '/train'
  validate_log_dir = 'logs/gradient_tape/' + current_time + '/validate'
  test_log_dir = 'logs/gradient_tape/' + current_time + '/test'

  train_summary_writer = tf.summary.create_file_writer(train_log_dir)
  validate_summary_writer = tf.summary.create_file_writer(validate_log_dir)
  test_summary_writer = tf.summary.create_file_writer(test_log_dir)

In [None]:
# make pairs
def make_pairs(x, y):
    num_classes = max(y) + 1
    digit_indices = [np.where(y == i)[0] for i in range(num_classes)]

    pairs = []
    labels = []

    for idx1 in range(len(x)):
        # add a matching example
        x1 = x[idx1]
        label1 = y[idx1]
        idx2 = random.choice(digit_indices[label1])
        x2 = x[idx2]
        
        pairs += [[x1, x2]]
        labels += [1]
    
        # add a not matching example
        label2 = random.randint(0, num_classes-1)
        while label2 == label1:
            label2 = random.randint(0, num_classes-1)

        idx2 = random.choice(digit_indices[label2])
        x2 = x[idx2]
        
        pairs += [[x1, x2]]
        labels += [0]

    return np.array(pairs), np.array(labels)

pairs_train, labels_train = make_pairs(x_train, y_train)
pairs_test, labels_test = make_pairs(x_test, y_test)

In [None]:
@tf.function
def train_step(images, labels):
  # For Training
  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_loss(labels, predictions)
  train_accuracy(labels, predictions)

@tf.function
def validate_step(images, labels):
  # for Validation
  predictions = model(images, training=True)
  v_loss = loss_object(labels, predictions)

  validate_loss(labels, predictions)
  validate_accuracy(labels, predictions)

@tf.function
def test_step(images, labels):
  # training=False is only needed if there are layers with different
  # behavior during training versus inference (e.g. Dropout).
  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss(labels, predictions)
  test_accuracy(labels, predictions)

In [None]:
def euclidean_distance(vects):
    x, y = vects
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))

class MyModel(Model):
  def __init__(self):
    super(MyModel, self).__init__()
    self.resize = Resizing(64, 64)
    self.rescale = Rescaling(1.0/255)
    self.conv = Conv2D(8, 8, activation='relu')
    self.conv2 = Conv2D(16, 4, activation='relu')
    self.flatten = Flatten()
    self.d1 = Dense(128, activation='relu')
    self.d2 = Dense(38)
    self.softmax = Softmax()

  def call(self, x, training=False):
    x = self.resize(x)
    x = self.rescale(x)
    x = self.conv(x)
    x = self.conv2(x)
    x = self.flatten(x)
    x = self.d1(x)
    x = self.d2(x)
    if not training:
      x = self.softmax(x)
    return x

model = MyModel()

In [None]:
wandb.init(project="siamese")
model.fit([pairs_train[:,0], pairs_train[:,1]], labels_train[:], batch_size=16, epochs= 10, callbacks=[WandbCallback()])

In [None]:

model.compile(loss = "binary_crossentropy", optimizer="adam", metrics=["accuracy"])
model.summary()

In [None]:
wandb.init(project="siamese")
model.fit([pairs_train[:,0], pairs_train[:,1]], labels_train[:], batch_size=16, epochs=10, callbacks=[WandbCallback()])

In [None]:
# Old: 
# # Weights not shared

# seq1 = Sequential()
# seq1.add(Flatten(input_shape=(28,28)))
# seq1.add(Dense(128, activation='relu'))

# seq2 = Sequential()
# seq2.add(Flatten(input_shape=(28,28)))
# seq2.add(Dense(128, activation='relu'))

# merge_layer = Concatenate()([seq1.output, seq2.output])
# dense_layer = Dense(1, activation="sigmoid")(merge_layer)
# model = Model(inputs=[seq1.input, seq2.input], outputs=dense_layer)

# input = Input((28,28))
# x = Flatten()(input)
# x = Dense(128, activation='relu')(x)
# dense = Model(input, x)

# input1 = Input((28,28))
# input2 = Input((28,28))

# dense1 = dense(input1)
# dense2 = dense(input2)

# merge_layer = Concatenate()([dense1, dense2])
# dense_layer = Dense(1, activation="sigmoid")(merge_layer)
# model = Model(inputs=[input1, input2], outputs=dense_layer)