In [None]:
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt
import random
import shutil # To unpack the dataset
import glob # To get the list of directories'
from PIL import Image, ImageChops

In [None]:
# Unzipping the dataset
# zip_path = "/content/drive/MyDrive/IISc/sgv-real-localize-siamese.zip"
# shutil.unpack_archive(zip_path,".")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
def read_data(data_dir):
    markers = []
    nomarkers = []
    nomarker_poss = []

    for d in glob.glob(data_dir + '/*'):
        marker_tensor = []
        nomarker_tensor = []
        nomarker_pos_tensor = []
        
        for fn in glob.glob(d + '/marker/*.jpg'):
            img = Image.open(fn).convert('RGB')
            img_tensor = tf.expand_dims(tf.convert_to_tensor(np.array(img)),0)#.unsqueeze(0)
            marker_tensor.append(img_tensor)
        
        for fn in glob.glob(d + '/nonmarker/*.jpg'):
            img = Image.open(fn).convert('RGB')
            img_tensor = tf.expand_dims(tf.convert_to_tensor(np.array(img)),0)#.unsqueeze(0)
            nomarker_tensor.append(img_tensor)
            # print('Image_name',fn)
            xstr, ystr = fn.split('_')[-2:]
            # print('xstr',xstr)
            # print('ystr',ystr)
            x, y = int(xstr)/150.0, int(ystr[:-4])/150.0
            # print('x',x)
            # print('y',y)
            nomarker_pos_tensor.append([x, y])
        
        marker_tensor = tf.concat(marker_tensor, 0)
        nomarker_tensor = tf.concat(nomarker_tensor, 0)
        nomarker_pos_tensor = tf.convert_to_tensor(np.array(nomarker_pos_tensor))

        markers.append(marker_tensor)
        nomarkers.append(nomarker_tensor)
        nomarker_poss.append(nomarker_pos_tensor)

    return markers, nomarkers, nomarker_poss

In [None]:
train_dir = "/content/drive/MyDrive/content/sgv-real-localize-siamese-4de85e91ea4f/real_data/train"
test_dir = "/content/drive/MyDrive/content/sgv-real-localize-siamese-4de85e91ea4f/real_data/test"
# read_data(train_dir);

In [None]:
train_adap_xs, train_xs, train_ys = read_data(train_dir)
key_dataset = tf.data.Dataset.from_tensor_slices(np.arange(len(train_adap_xs)*16))
print(train_adap_xs.shape)
print(key_dataset.shape)
print(key_dataset)

AttributeError: ignored

In [None]:
def get_random_data(adap_x, x, p):
  l_obj = len(adap_x)
  id = random.randint(0,l_obj-1)
  l_img = len(adap_x[id])
  img_id = random.randint(0, l_img-1)
  x_marker = adap_x[id][img_id]
  x_nonmarker = x[id][img_id]
  pos = p[id][img_id]
  return x_marker, x_nonmarker, pos

get_random_data(train_adap_xs, train_xs, train_ys);

In [None]:
class SiameseBlock(tf.keras.models.Model):
  def __init__(self,input_shape):
    super(SiameseBlock, self).__init__()
    vgg16 = tf.keras.applications.VGG16(include_top=False, input_shape=input_shape)
    self.siamese_block = tf.keras.Model(vgg16.input, vgg16.layers[-6].output)
  def call(self,x):
    y = self.siamese_block(x)
    return y

In [None]:
class Model(tf.keras.models.Model):
  def __init__(self):
    super(Model, self).__init__()
    self.siamese = SiameseBlock(input_shape=(150,150,3))

    self.attention_layer_1 = tf.keras.layers.Conv2D(100, 3, activation="relu", padding="same")
    self.attention_layer_2 = tf.keras.layers.Conv2D(1, 1, activation="relu")

    self.post_attention_conv1 = tf.keras.layers.Conv2D(100, 3, activation="relu", padding="same")
    self.post_attention_conv2 = tf.keras.layers.Conv2D(1, 1, activation="relu")
    
  
  def call(self, xoc, xo, debug=False):

    twin1 = self.siamese(xoc)
    twin2 = self.siamese(xo)
    if debug: print("#1", twin1.shape)
    # print(t)

    attention_conv = self.attention_layer_1(twin1)
    attention_conv = self.attention_layer_2(attention_conv)
    if debug: print("#attention", attention_conv.shape)
    pre_shape = attention_conv.shape
    attention_conv = tf.reshape(attention_conv, shape=(xoc.shape[0],-1))
    if debug: print("#flattened_attention", attention_conv.shape)
    attention_weights = tf.nn.softmax(attention_conv, axis=1)
    if debug: print("#normalized_flatten_attention", attention_weights.shape)
    attention_weights = tf.reshape(attention_weights, shape=pre_shape)
    if debug: print("#reshaped_back_to_original_attention", attention_weights.shape)

    twin1 = twin1*attention_weights   #this will give attention map(actually attention weights are attention map, this code line will highlight the pointed object)
    if debug: print("#after_twin1_attentionmap", twin1.shape)
    twin1_reduced = tf.reduce_sum(twin1,axis=1,keepdims=True)
    twin1_reduced = tf.reduce_sum(twin1_reduced,axis=2,keepdims=True)
    if debug: print("#after_twin1_attentionmap_reducedto vector", twin1_reduced.shape)

    cross_correlation = twin1_reduced*twin2
    if debug: print("#combination of twin2 with attention map", cross_correlation.shape)
    tw_shape = twin1.shape
    # twin1 = tf.reshape(twin1, (xoc.shape[0], -1))
    # twin2 = tf.reshape(twin2, (xoc.shape[0], -1))

    # cross_correlation = tf.reshape(cross_correlation, shape=tw_shape)
    cc2 = self.post_attention_conv1(cross_correlation)
    if debug: print("#op of 1st bottlneck layer", cc2.shape)
    score_map = self.post_attention_conv2(cc2)
    if debug: print("#Scoremap: op of 2nd bottlneck layer", score_map.shape)
    score_map_shape = score_map.shape
    score_map = tf.reshape(score_map, (score_map_shape[0],-1))
    score_map_weights = tf.nn.softmax(score_map)
    score_map_weights = tf.reshape(score_map_weights, score_map_shape)
#below code is to apply softargmax
    # print(score_map_weights)
    a = np.array([[i] for i in range(1,19)])
    a = np.matmul(a, np.ones((1,18)))
    # print(a.shape)
    # print(a)
    a_t = a.T
    score_map_weights = tf.squeeze(score_map_weights, -1)
    p_y = tf.reduce_sum(score_map_weights * a, axis=1)
    p_y = tf.reduce_sum(p_y, axis=1, keepdims=True)
    # print(p_y.shape)
    p_x = tf.reduce_sum(score_map_weights * a_t, axis=1)
    p_x = tf.reduce_sum(p_x, axis=1, keepdims=True)
    # print(p_y.shape)
    p = tf.concat([p_x, p_y], axis=-1)

    return p, score_map_weights, attention_weights

In [None]:
def load_data(key):
  global train_adap_xs
  global train_xs
  global train_ys
  l_obj = len(train_adap_xs)
  id = random.randint(0,l_obj-1)
  l_img = len(train_adap_xs[id])
  img_id = random.randint(0, l_img-1)
  x_marker = train_adap_xs[id][img_id]
  x_nonmarker = train_xs[id][img_id]
  pos = train_ys[id][img_id]
  return (np.array(x_marker, dtype=np.float32)-127.5)/127.5, (np.array(x_nonmarker, dtype=np.float32)-127.5)/127.5, np.array(pos, dtype=np.float32)

train_dataset = key_dataset.map(lambda key: tf.numpy_function(load_data, [key], [tf.float32, tf.float32, tf.float32]), num_parallel_calls=tf.data.experimental.AUTOTUNE)


In [None]:
model = Model()
# Building the model
for xoc, xo, p in train_dataset.batch(batch_size=2):
  # print(p)
  p_hat,score,att = model(xoc, xo, debug=True)
  # print(p_hat)
  # print(score.shape)
  break
model.summary()
model.layers[0].layers[0].summary()
# model.layers[1].layers[1].summary()
# model.layers[2].layers[2].summary()

In [None]:
# from tensorflow.keras.utils import plot_model 
# plot_model(model, 'model.png', show_shapes = True)

In [None]:
train_dataset_batched=train_dataset.batch(batch_size=20, drop_remainder=True)
optim = tf.keras.optimizers.Adam(learning_rate=0.00001)
loss_tracker = []
for epoch in range(100):
  total_loss = 0
  for id, (xoc, xo, p) in enumerate(train_dataset_batched):
    with tf.GradientTape() as tape:
      p_hat,_,_ = model(xoc, xo)
      loss = (p_hat*150/18 - p*150)**2
      loss = tf.reduce_sum(loss, axis=-1)
      loss = tf.reduce_mean(loss)
    total_loss += loss.numpy().item()
    grad = tape.gradient(loss, model.trainable_variables)
    optim.apply_gradients(zip(grad, model.trainable_variables))
    if id % 20 == 0:
      print("Epoch {} batch {} loss {}".format(epoch,id, loss)) 
  loss_tracker.append(total_loss/1)
  print("Epoch {} total_loss {}".format( epoch, total_loss/1))

In [None]:
plt.plot(loss_tracker)
plt.show()
model.save_weights("weights.h5")
model.save_weights("/content/drive/MyDrive/content/Attention/weights_150x150_18x18.h5")

In [None]:
# model.load_weights("/content/drive/MyDrive/IISc/weights_150x150_18x18.h5")

In [None]:
# test_dir = "/content/sgv-real-localize-siamese-4de85e91ea4f/real_data/test"
test_adap_xs, test_xs, test_ys = read_data(test_dir)
test_key_dataset = tf.data.Dataset.from_tensor_slices(np.arange(len(test_adap_xs)*16))

In [None]:
def load_test_data(key):
  global test_adap_xs
  global test_xs
  global test_ys
  l_obj = len(test_adap_xs)
  id = random.randint(0,l_obj-1)
  l_img = len(test_adap_xs[id])
  img_id = random.randint(0, l_img-1)
  x_marker = test_adap_xs[id][img_id]
  x_nonmarker = test_xs[id][img_id]
  pos = test_ys[id][img_id]
  return (np.array(x_marker, dtype=np.float32)-127.5)/127.5, (np.array(x_nonmarker, dtype=np.float32)-127.5)/127.5, np.array(pos, dtype=np.float32)

test_dataset = test_key_dataset.map(lambda key: tf.numpy_function(load_test_data, [key], [tf.float32, tf.float32, tf.float32]), num_parallel_calls=tf.data.experimental.AUTOTUNE)

In [None]:
count = 0
loss = 0
for id, (xoc, xo, p_o) in enumerate(test_dataset.batch(1)):
  plt.subplots(figsize=(18,8 ))
  p,scoremap,attention_map = model(xoc,xo)
  plt.subplot(141)
  plt.imshow(xoc[0])
  plt.title("Input")
  plt.subplot(142)
  plt.imshow(attention_map[0][:,:,0], cmap="gray")
  plt.title("Attention Map")
  plt.subplot(143)
  plt.imshow(scoremap[0], cmap="gray")
  plt.title("Score Map")
  plt.subplot(144)
  plt.title("Output")
  plt.imshow(xo[0])
  loss += (p/18 - p_o)**2
  plt.scatter(p[0][0]*150/18,p[0][1]*150/18,c="r",marker="x")
  plt.show()
  count += 1
  # break
print(loss, count, loss/count)