In [0]:

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
cd "/content/drive/My Drive/Activation Map Based Ensembles"

/content/drive/My Drive/Activation Map Based Ensembles


#Work Starts From Here

In [0]:
%tensorflow_version 2.x

TensorFlow 2.x selected.


In [0]:
%matplotlib inline
import matplotlib.pyplot as plt
import tensorflow as tf
import datasets
from tensorflow.keras.layers import Conv2D, Activation, GlobalAveragePooling2D, Flatten, Dense
import numpy as np 
import pandas as pd 
from sklearn.metrics import pairwise_distances
from sklearn.metrics.pairwise import pairwise_kernels
from scipy import spatial
from scipy.stats import norm
from tqdm import tqdm

class ModelX(tf.keras.Model):
  
  def __init__(self, dataset_name, x_train=None, y_train=None):
  	# call the parent constructor
    super(ModelX, self).__init__()
    if type(x_train)== type(None):
    
      self.mean_activation_map = None
      self.classes = None
      (self.x_train, self.y_train), (self.x_test, self.y_test) = datasets.Datasets().load_data(dataset_name)
      self.NumClasses = 10 # Genralize this
      self.x_train = self.x_train.astype('float32')
      self.x_test = self.x_test.astype('float32')
      self.x_train /= 255                             
      self.x_test /= 255  
      self.y_train_actual = self.y_train.copy()
      self.y_test_actual = self.y_test.copy()
      self.y_train = tf.keras.utils.to_categorical(self.y_train, 10)
      self.y_test = tf.keras.utils.to_categorical(self.y_test, 10)
    else:
      self.mean_activation_map = None
      self.classes = None
      (self.x_train, self.y_train), (self.x_test, self.y_test) = datasets.Datasets().load_data(dataset_name)
      self.x_train = x_train
      self.y_train = y_train
      self.NumClasses = 10 # Genralize this
      self.x_train = self.x_train.astype('float32')
      self.x_test = self.x_test.astype('float32')
      self.x_train /= 255                             
      self.x_test /= 255  
      self.y_train_actual = self.y_train.copy()
      self.y_test_actual = self.y_test.copy()
      self.y_train = y_train
      self.y_test = tf.keras.utils.to_categorical(self.y_test, 10)
  	
    # layer set
    self.conv1 = Conv2D(32, (3, 3), padding="same", activation="relu")
    self.conv2 = Conv2D(32, (3, 3), padding="same", activation="relu")
    
    #Flattening
    self.flatten = Flatten()
    
    #Fully connected like part
    self.dense1 = Dense(512, activation="relu")
    self.dense2 = Dense(self.NumClasses, activation="softmax") 

  def call(self, inputs):
    # forward pass
    x = self.conv1(inputs)
    x = self.conv2(x)
    x = self.flatten(x)
    x = self.dense1(x)
    x = self.dense2(x)
    return x
  
  def train(self):
    model.compile(loss='categorical_crossentropy',
              optimizer="adam",
              metrics=['accuracy'])
    history = model.fit(self.x_train, self.y_train, verbose=1, validation_data=(self.x_test, self.y_test),epochs=5, batch_size=32)
    loss_and_metrics = model.evaluate(self.x_test, self.y_test, batch_size=128)
    loss2_and_metrics= model.evaluate(self.x_train,self.y_train,batch_size=128)
    print('Test loss:', loss_and_metrics[0])
    print('Test accuracy:', loss_and_metrics[1])
    print("  train accuracy : ",loss2_and_metrics[1])
    return history, loss_and_metrics[0], loss_and_metrics[1]

  def train_partial(self, x_train, y_train):
    history = model.fit(x_train, y_train, verbose=1, validation_data=(self.x_test, self.y_test),epochs=5, batch_size=32)
    loss_and_metrics = model.evaluate(self.x_test, self.y_test, batch_size=128)
    loss2_and_metrics= model.evaluate(self.x_train,self.y_train,batch_size=128)
    print('Test loss:', loss_and_metrics[0])
    print('Test accuracy:', loss_and_metrics[1])
    print("  train accuracy : ",loss2_and_metrics[1])
    return history, loss_and_metrics[0], loss_and_metrics[1]
  
  def evaluate_test(self):
    loss_and_metrics = model.evaluate(self.x_test, self.y_test, batch_size=128)
    loss2_and_metrics= model.evaluate(self.x_train,self.y_train,batch_size=128)
    print('Test loss:', loss_and_metrics[0])
    print('Test accuracy:', loss_and_metrics[1])
    return

  def seperate_classes(self):              # this function sperates the data according to classes and returns a list of X_train and y_train sperated
    self.classes = np.unique(self.y_train_actual)  # on the basis of class 
    samples_per_class = []
    for c in self.classes:
      arr = self.y_train_actual==c
      arr = np.reshape(arr,(arr.shape[0]))
      samples = self.x_train[arr,:,:,:]
      labels = self.y_train[arr,:]
      #print(samples.shape)
      #print(labels.shape)
      samples_per_class.append((samples,labels))
    # print(samples_per_class)
    return samples_per_class

  def mean_activation(self, inputs):
    x = self.conv1(inputs)
    mean_act1 = x.numpy()
    mean_act1 = np.mean(mean_act1.reshape(mean_act1.shape[0], mean_act1.shape[1] * mean_act1.shape[2] * mean_act1.shape[3]), axis=0)
    #print(mean_act1.shape)

    x = self.conv2(x)
    mean_act2 = x.numpy()
    mean_act2 = np.mean(mean_act2.reshape(mean_act2.shape[0], mean_act2.shape[1] * mean_act2.shape[2] * mean_act2.shape[3]), axis=0)
    #print(mean_act2.shape)

    x = self.flatten(x)
    x = self.dense1(x)
    mean_act3 = x.numpy()
    mean_act3 = np.mean(mean_act3, axis=0)
    #print(mean_act3.shape)

    x = self.dense2(x)
    mean_act4 = x.numpy()
    mean_act4 = np.mean(mean_act4, axis=0)
    #print(mean_act4.shape)
    return np.concatenate(( mean_act3, mean_act4), axis=None) #> check if this folder work

  def get_distance(self, sample_act, mean_act, distance_name="correlation"):
    if (distance_name=="cosine"):
      distance = spatial.distance.cosine(sample_act, mean_act)
    
    elif(distance_name=="correlation"):
      distance = spatial.distance.correlation(sample_act, mean_act)
    
    elif(distance_name=="linear"):
      distance = spatial.distance.linear(sample_act, mean_act)
    
    elif(distance_name=="euclidean"):
      distance = spatial.distance.euclidean(sample_act, mean_act)
    
    else:
      print("\nSorry distance not present ") # code should stop execution here insert an assert here
    return distance
  def fit_normal_return_index(self,distance):
      mean, std = norm.fit(distance)
      print("Mean", mean)
      print("Std", std)
      print("Upper Limit", (mean+(std*2)))
      print("Lower Limit", (mean-(std*2)))
      index = np.where(distance>(mean+(std*2)))
      index2 = np.where(distance<(mean-(std*2)))
      return np.concatenate((index,index2), axis=None) # galaat howa samran ki galti

  def get_activation_per_class(self):
    # print("in get_activation_per_class")
    samples_per_class = self.seperate_classes()
    # print("classes seperated")
    mean_activation_map = {}
    for idx, single_class in enumerate(samples_per_class):
      single_class_samples = single_class[0]
      single_class_labels  = single_class[1] 
      # print("taking mean activation for single class")
      mean_activation_map[self.classes[idx]] = self.mean_activation(single_class_samples) 
      # print("Test")
      print(self.classes[idx])
      print(single_class_labels[0])
      # print("{} class done".format(idx))
    self.mean_activation_map = mean_activation_map
    return  mean_activation_map

  def get_bad_samples(self):
    all_bad_samples = []
    samples_per_class = self.seperate_classes()
    for idx, single_class in tqdm(enumerate(samples_per_class)): #loop per class
      single_class_samples = single_class[0]
      single_class_labels  = single_class[1] 
      distance_from_mean = []
      for sample in single_class_samples: #loop per sample
        # print("single class sample",sample.shape)
        sample_mean_activation = self.mean_activation(sample[np.newaxis,...])
        # print("sample_mean_activation",sample_mean_activation.shape)
        # print("class Mean activation", self.mean_activation_map[idx].shape )
        distance_from_mean.append(self.get_distance(sample_mean_activation, self.mean_activation_map[idx]))
      # Fit gaussain and get indexes
      print("Finding bad index")
      bad_sample_index = self.fit_normal_return_index(distance_from_mean)
      bad_samples = single_class_samples[bad_sample_index,:,:,:]  
      bad_samples_labels = single_class_labels[bad_sample_index,:]
      all_bad_samples.append((bad_samples, bad_samples_labels))
    return all_bad_samples

      
        


In [0]:
model = ModelX("cifar10")

dataset class object created
loading Dataset  cifar10


In [0]:
model.train()

Train on 50000 samples, validate on 10000 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Test loss: 1.779957174873352
Test accuracy: 0.6365
  train accuracy :  0.97816


(<tensorflow.python.keras.callbacks.History at 0x7f53ae677470>,
 1.779957174873352,
 0.6365)

In [0]:
mean_activation_map = model.get_activation_per_class()

0
[1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
1
[0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
2
[0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
3
[0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]
4
[0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]
5
[0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
6
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
7
[0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]
8
[0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
9
[0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]


In [0]:
mean_activation_map[0].shape

(522,)

In [0]:
print(mean_activation_map.keys())
for key in mean_activation_map.keys():
  print(mean_activation_map[key].shape)

dict_keys([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
(522,)
(522,)
(522,)
(522,)
(522,)
(522,)
(522,)
(522,)
(522,)
(522,)


In [0]:
bad_data = model.get_bad_samples()


0it [00:00, ?it/s][A

In [0]:
x_train_bad_samples = bad_data[0][0]
y_train_bad_labels = bad_data[0][1]
for one_class in bad_data[1:]:
  print(one_class[0].shape)
  x_train_bad_samples = np.concatenate((x_train_bad_samples,one_class[0]),axis=0)
  y_train_bad_labels = np.concatenate((y_train_bad_labels,one_class[1]),axis=0)
print(x_train_bad_samples.shape)
print(y_train_bad_labels.shape)

In [0]:
model.evaluate(x=x_train_bad_samples,y=y_train_bad_labels)

In [0]:
len(bad_data)

In [0]:
# model1 = ModelX("cifar10",x_train_bad_samples,y_train_bad_labels)

In [0]:
model.train_partial(x_train_bad_samples, y_train_bad_labels)

In [0]:
model.evaluate_test()