# COMP 551 final project code
paper: Unleashing the Potential of CNNs for Interpretable Few-Shot Learning

## data loading 
meta training set: 64 classes - training CNN
meta testing set: 20 classes - extract VC and validate
meta validation set: 16 classes - Not being used

On each trial (5-shot)
- Use 25 samples(5 classes) from training split to train CNN to achieve high accuracy.
- Use 25 samples(5 classes) from train subset of the testing split to extract VC
- Use 15 samples for each class(5 classes) form test subset of the testing split to classify images using VC.

10 trials:

desired output file: 

training set : (64 classes in total)
- train_x_1.csv : 5 sample each of 5 classes , train_y_1.csv : 5 sample for each of 5 classes
- test_x_1.csv : 5 sample each of 3 classes, test_y_1.csv : 5 sample each of 3 classes

- train_x_2.csv.....
- test_x_2.csv
....
....
- train_x_10.csv.....
- test_x_10.csv

testing set : (20 classes in total)
- train_x_1.csv : 5 sample each of 5 classes , train_y_1.csv : 5 sample for each of 5 classes
- test_x_1.csv : 5 sample each of 3 classes, test_y_1.csv : 5 sample each of 3 classes

- train_x_2.csv.....
- test_x_2.csv
....
....
- train_x_10.csv.....
- test_x_10.csv

the mini-imagenet structure
```
paper_reproduce_dataset/train/n01882714/images/n01882714_0.jpeg
```

use load_labels() to controll the number of class uses in a trial
then use load_data to get train_x, train_y



In [1]:
#helper function to import data
import re

def tryint(s):
    try:
        return int(s)
    except:
        return s

def alphanum_key(s):
    """ Turn a string into a list of string and number chunks.
        "z23a" -> ["z", 23, "a"]
    """
    return [ tryint(c) for c in re.split('([0-9]+)', s) ]

def sort_nicely(l):
    """ Sort the given list in the way that humans expect.
    """
    l.sort(key=alphanum_key)
    return l

#load data
import skimage
from skimage.io import ImageCollection,concatenate_images,imread

from skimage.color import gray2rgb
import numpy as np
from skimage import io
from os import listdir
from os.path import isfile, join


def print_image(data):
    from matplotlib import pyplot as plt
    plt.imshow(data, interpolation='nearest')
    plt.show()

def load_labels():
    labels = []
    with open("paper_reproduce_dataset/wnids.txt","r") as input:
        for row in input:
            labels.append(row[0:9])
    return labels

def imreadconvert(Xname):
    
    X=imread(Xname)
    if len(X.shape)==3:
        return X
    else:
        return gray2rgb(X)  
    
def load_data(label_dict,dataset):
    print ("loading",dataset, "data!")
    nsamples=10 #5 images for each of 200 labels
    file_names=[]
    labels=[] 
    
    for label in label_dict:
        #print str(label) 
        cur_dir="paper_reproduce_dataset/"+label+"/images"
        onlyfiles = [f for f in sort_nicely(listdir(cur_dir)) if isfile(join(cur_dir, f))]
        onlyfiles = random.sample(onlyfiles,nsamples)
        onlyfiles=[cur_dir+'/'+f for f in onlyfiles]
        file_names=file_names+onlyfiles    
        cur_labels=nsamples*[label]
        labels=labels+cur_labels
    image_collect = ImageCollection(file_names,load_func=imreadconvert)
    x_data = concatenate_images(image_collect)   
    print ("loaded",dataset, "data")
    y_data=np.asarray(labels)
    y_data=np.reshape(y_data,(len(y_data),))
   
    print("x_",dataset,".shape =",x_data.shape)
    print("y_",dataset,".shape =",y_data.shape)
   
    return x_data,y_data 
        

    

In [2]:
#VGG model structure
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten,Activation
from keras.layers import Conv2D, MaxPooling2D , AveragePooling2D,ZeroPadding2D


def VGG_13(num_class):
    model = Sequential()
    
    
    model.add(Conv2D(64, (3, 3), activation="relu",padding="same",input_shape=(64,64,3)))
    model.add(Conv2D(64, (3, 3), activation="relu",padding="same"))
    model.add(MaxPooling2D((2,2), strides=(2,2)))
    
    model.add(Conv2D(128, (3, 3), activation="relu",padding="same"))
    model.add(Conv2D(128, (3, 3), activation="relu",padding="same"))
    model.add(MaxPooling2D((2,2), strides=(2,2)))

    model.add(Conv2D(256, (3, 3), activation="relu",padding="same"))
    model.add(Conv2D(256, (3, 3), activation="relu",padding="same"))
    model.add(MaxPooling2D((2,2), strides=(2,2)))
    
    model.add(Conv2D(512, (3, 3), activation="relu",padding="same"))
    model.add(Conv2D(512, (3, 3), activation="relu",padding="same"))
    model.add(MaxPooling2D((2,2), strides=(2,2)))
    
    model.add(Conv2D(512, (3, 3), activation="relu",padding="same"))
    model.add(Conv2D(512, (3, 3), activation="relu",padding="same"))
    model.add(MaxPooling2D((2,2), strides=(2,2)))
    
    model.add(Flatten())
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_class, activation='softmax'))

#     if weights_path:
#         model.load_weights(weights_path)

    return model



Using TensorFlow backend.


In [3]:
#data split and train model function
import random

from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.preprocessing import LabelBinarizer

from keras import optimizers

def save_model(model_to_save, json_name,h5_name):
#     serialize model to JSON
    model_json = model_to_save.to_json()
    with open(json_name, "w") as json_file:
        json_file.write(model_json)
    # serialize weights to HDF5
    model_to_save.save_weights(h5_name)
    print("Saved model to disk")

    
from keras.models import model_from_json  

def load_model(json_name,h5_name):
    # load json and create model
    json_file = open(json_name, 'r')
    loaded_model_json = json_file.read()
    json_file.close()
    loaded_model = model_from_json(loaded_model_json)
    # load weights into new model
    loaded_model.load_weights(h5_name)
    print("Loaded model from disk")
    return loaded_model


def split_metadata():
    label_dict=load_labels()
    train_split_label = label_dict[0:64]
    test_split_label = label_dict[64:80]
    
    #spliting dataset to 64 training class with 10 samples per class
    #and 16 testing class with 10 samples per class
    x_train,y_train = load_data(train_split_label,"train")
    print()
    x_test,y_test = load_data(test_split_label,"test")
    return x_train,y_train,x_test,y_test

def run_CNN_trial(train_trial):
    #set hyper param
    epochs = 30
    learning_rate = 0.01
    
    train_trial = list(zip(*train_trial))
    x_tr = np.asarray(train_trial[0])
    y_tr = np.asarray(train_trial[1])
    
    #preprocessing training data
    lb = preprocessing.LabelBinarizer()
    onehot_y_tr = lb.fit_transform(y_tr)
    num_class = onehot_y_tr.shape[1]
    x_train_tr,x_test_tr,y_train_tr,y_test_tr = train_test_split(x_tr, onehot_y_tr, test_size=0.5, random_state=42)
    
    cnn_model = VGG_13(num_class)
    print("start training CNN model.")
    
    optimizer = optimizers.SGD(lr=learning_rate)
    cnn_model.compile(loss=keras.losses.categorical_crossentropy,
                  optimizer=optimizer,
                  metrics=['accuracy'])

    from keras.callbacks import ModelCheckpoint

    checkpoint = ModelCheckpoint("best_model.hdf5", monitor='acc', verbose=1, save_best_only=True, mode='max')
    callbacks_list = [checkpoint]

    
    cnn_model.fit(x_train_tr, y_train_tr,
              epochs=epochs,
              callbacks=callbacks_list,    
              verbose=1,
              validation_data=(x_test_tr, y_test_tr))

    score = cnn_model.evaluate(x_test_tr, y_test_tr, verbose=0)
    
#     print('Test loss:', score[0])
#     print('Test accuracy:', score[1])
    
#     print("finish training CNN model.")
    
    return loaded_model

def train_model():
    
    x_train,y_train,x_test,y_test = split_metadata()
   
    train_data = list(zip(x_train,y_train))
    test_data = list(zip(x_test,y_test))
    #random choose 50 samples from training split to train CNN for one trail
    train_trial = random.sample(train_data,50)
    
#     cur_image = "paper_reproduce_dataset/"+"n01443537/images/"+"n01443537_0.JPEG"
#     print_image(imreadconvert(cur_image))
    
#     print_image(train_trial[0][0])
#     print(train_trial[0][1])
#     print_image(train_trial[1][0])
#     print(train_trial[1][1])
#     print_image(train_trial[2][0])
#     print(train_trial[2][1])
    
    cnn_model = run_CNN_trial(train_trial)
    return x_test,y_test,cnn_model 


In [15]:
# K-mean clustering visual concept
from sklearn.cluster import KMeans

from keras import backend as K

#get intermidiate layer output from model
#3rd maxpooling output
def get_int_layer(model,input):
    get_3rd_layer_output = K.function([model.layers[0].input],
                                      [model.layers[8].output])
    layer_output = get_3rd_layer_output([input])[0]
    return layer_output

def extract_VC(layer_output):
    num_sample = layer_output.shape[0] * layer_output.shape[1] * layer_output.shape[2]
    input_kmean = layer_output.reshape(num_sample,layer_output.shape[3])
    kmeans = KMeans(random_state=0).fit(input_kmean)
#     print(kmeans.labels_)
    visual_concepts = kmeans.cluster_centers_
    return visual_concepts

def get_VC_dict(cnn_model,x_test,y_test):
    test_data_dict_x=dict()
    test_data_dict_y=dict()
    vc_dict=dict()
    for row in range(x_test.shape[0]):
        if y_test[row] not in test_data_dict_x:
            test_data_dict_x[y_test[row]]=[x_test[row]]
            test_data_dict_y[y_test[row]]=[y_test[row]]
        else:
            test_data_dict_x[y_test[row]].append(x_test[row])
            test_data_dict_y[y_test[row]].append(y_test[row])
    #for each class use half data to extract VC, half to validate
    test_data_dict_train_x={key:0 for key in test_data_dict_x.keys()}
    test_data_dict_train_y={key:0 for key in test_data_dict_y.keys()}

    test_data_dict_valid_x={key:0 for key in test_data_dict_x.keys()}
    test_data_dict_valid_y={key:0 for key in test_data_dict_y.keys()}
    
    for key in test_data_dict_x.keys():
        half_len = int(len(test_data_dict_x[key])/2)
        test_data_dict_train_x[key]=test_data_dict_x[key][0:half_len]
        test_data_dict_train_y[key]=test_data_dict_y[key][0:half_len]
        test_data_dict_valid_x[key]=test_data_dict_x[key][half_len::]
        test_data_dict_valid_y[key]=test_data_dict_y[key][half_len::]
    
    vc_dict={key:0 for key in test_data_dict_x.keys()}

    for key in test_data_dict_x.keys():
        layer_output = get_int_layer(cnn_model,test_data_dict_train_x[key])
        vc_dict[key]=extract_VC(layer_output)
        
    return vc_dict,test_data_dict_valid_x,test_data_dict_valid_y
# for el in list(test_data_dict_valid_y.values()):
#     print(el)

In [10]:
#distance function for each pixel p in intermediate layer and one Visual Concept
#input shape : f_p:(256,) f_vc:(256,)
from numpy import linalg as LA
def d_p_v(f_p,f_vc):
    fp_fvc = np.dot(f_p,f_vc)
    norm_fp_fvc = LA.norm(f_p)*LA.norm(f_vc)
    distance = 1- fp_fvc/norm_fp_fvc
    return distance

# print(list(vc_dict.values())[0][0])
# print(distance_vc(a,list(vc_dict.values())[0][0]))

#VC-encoding
def b_p_v(f_p,f_vc):
    threshold = 1
    
    distance_vc = d_p_v(f_p,f_vc)
    if distance_vc<threshold:
        return 1
    else:
        return 0

#Nearest Neighbor similarity
def get_neighbors(input_p):
    return 0

def similarity(input_image,input_vc_list):
#     sum_b_vc = 0
#     sum_b_dot_vc = 0
#     sum_b_vc_max_neighb_b_dot = 0
#     sum_b_dot_max_neighb_b_vc = 0
    for pixel in input_image:
        for vc in input_vc_list:
            b_vc = b_p_v(pixel,vc)
            sum_b_vc += b_vc
            
#             neighbor_list = get_neighbors(pixel)
#             b_vc_max_neighb_b_dot = b_vc * max(for n in neighbor_list 
#                 sum_b_vc_max_neighb_b_dot = 
    
#     K_b_b = 1/2 * (sum_b_vc_max_neighb_b_dot/sum_b_vc+sum_b_dot_max_neighb_b_vc/sum_b_dot_vc) 
    return 0

def nearest_neighbor(cnn_model,input_image_x,vc_dict):
    image_classes = list(vc_dict.keys())
    int_layer_output = get_int_layer(cnn_model,input_image_x)
    max_similarity = -9999
    match_class = None
    for image_class in image_classes:
        similarity = similarity(input_image_x,vc_dict[image_class])
        if  similarity >= max_similarity:
            max_similarity = similarity
            match_class = image_class
        
    return match_class

def total_accuracy(vc_dict,test_data_dict_valid_x,test_data_dict_valid_y):
    image_classes = list(vc_dict.keys())
    true_positive = 0
    total = 0
    for image_class in image_classes:
        image_list = test_data_dict_valid_x[image_class]
        label_list = test_data_dict_valid_y[image_class]
        for image_id in range(len(image_list)):
            pred_class = nearest_neighbor(image_list[image_id],vc_dict)
            if pred_class == label_list[image_id]:
                true_positive += 1
            total += 1
    return true_positive/total

In [7]:
#train model
# x_test,y_test,cnn_model = train_model()  
# save_model(cnn_model, "VC_CNN.json", "VC_CNN.hdf5")

loaded_model = load_model("VC_CNN.json","best_model.hdf5")
x_train,y_train,x_test,y_test = split_metadata()


Loaded model from disk
loading train data!
loaded train data
x_ train .shape = (640, 64, 64, 3)
y_ train .shape = (640,)

loading test data!
loaded test data
x_ test .shape = (160, 64, 64, 3)
y_ test .shape = (160,)


In [29]:
#Obtain VC dictionary and test dictionary
vc_dict,test_data_dict_valid_x,test_data_dict_valid_y=get_VC_dict(loaded_model,x_test,y_test)

In [28]:
#print result accuracy for this trial
print(total_accuracy(vc_dict,test_data_dict_valid_x,test_data_dict_valid_y))

(64, 64, 3)
