In [1]:
import numpy as np
import datetime
import random
from collections import Counter
import matplotlib.pyplot as plt

In [2]:
def l2_norm(x):
    return x / (np.linalg.norm(x)+ 10e-6)

In [3]:
# load prepared data 
data_dict = np.load('data_dict.npz')['data'].item()

tr_theta_x = data_dict['tr_theta_x'] # training image features extracted from deep CNN
tr_labels = data_dict['tr_labels'] # training image labels as indices matching class embeddings and names
val_theta_x = data_dict['val_theta_x']# validation image features extracted from deep CNN
val_labels = data_dict['val_labels'] # validation image labels as indices matching class embeddings and names
test_theta_x = data_dict['test_theta_x'] # test image features extracted from deep CNN
test_labels = data_dict['test_labels'] # test image labels as indices matching class embeddings and names

class_embeddings = data_dict['phi_y'] # class attributes vectors provided by the original dataset AWA
class_names = data_dict['class_name'] # class names in the same order as embeddings 

In [None]:
theta_x_dim, phi_y_dim = len(tr_theta_x[0]), len(class_embeddings[0])

In [None]:
# class split 
print "training classes:"
for idx in set(tr_labels):
    print class_names[idx], ',',
print "\nvalidation classes:"
for idx in set(val_labels):
    print class_names[idx], ',',
print "\ntest classes:"
for idx in set(test_labels):
    print class_names[idx], ',',

In [None]:
def train(X,Y,LR,T,W=None):
    
    # log file
    log_file = open('train_log.txt','a')
    
    # total number of training samples
    N = len(X)
    
    # initialize the joint embedding matrix W: is x_dim x y_dim where x_dim is input embedding dimension and yEmb_dim is the output embedding dimension
    # the initialization value is equal to 1 
    if W is None:
        W = np.ones((theta_x_dim, phi_y_dim))
    
    # SGD train the weights with T iteration using LR
    Indices= list(range(0,N))
    for t in range(0,T):
        
        ep_start = datetime.datetime.now()
        ep_loss = 0
        random.shuffle(Indices) # shuffle training data for each epoch
        
        for index in range(0,N):
            n = Indices[index] # pick a random sample n
            max_loss = -1 # max loss
            max_j = -1 # max_j keeps the index of the max rank

            inputP = l2_norm(np.dot(np.transpose(X[n]),W)) # project input on joint embedding space
            comp_true = np.dot(inputP,class_embeddings[Y[n]]) 
            
            for j in set(tr_labels): # iterate over tr classes to find the one with the maximum rank
                delta = 0 if Y[n] == j else 1 # 0-1 loss
                comp = np.dot(inputP,class_embeddings[j]) # compatability between input projection and class_embeddings[j]
                loss = delta + comp - comp_true
                if loss > max_loss:
                    max_loss = loss
                    max_j = j
            
            ep_loss += max_loss
            
            # update weights if violation
            if not (max_j == Y[n]):
                tranX = np.array(X[n])[np.newaxis]
                W = W - LR * (np.dot(tranX.T ,np.array(class_embeddings[max_j])[np.newaxis] - np.array(class_embeddings[Y[n]])[np.newaxis]))

                
        val_acc, val_class_acc = evaluate(val_X, val_Y,val_labels,W)
        ep_end = datetime.datetime.now()
        
        print "Epoch {}/{}".format(t,T)
        log_file.write("Epoch {}/{}\n".format(t,T))
        print "{}s training loss {} validation accuracy {} class accuracy {}".format(ep_end-ep_start,ep_loss/N,val_acc,val_class_acc)
        log_file.write("{}s training loss {} validation accuracy {} class accuracy {}\n".format(ep_end-ep_start,ep_loss/N,val_acc,val_class_acc))
        
    log_file.close()
    return W

In [None]:
st = datetime.datetime.now()
W = train(tr_theta_x,tr_labels,0.1,20)
end = datetime.datetime.now()
duration = end - st
print "training time {}".format(duration)
# print "test accuracy {}".format(evaluate(test_X,test_Y,test_labels,W))
# np.save('new_trained_w_50ep_0.1LR.npy',W)