In [1]:
from sklearn import datasets
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.metrics import accuracy_score

from scipy.stats import entropy


# import some data to play with
iris = datasets.load_iris()
X = iris.data
y = iris.target

X_train, X_test, y_train, y_test = train_test_split(
     X, y, test_size=0.33, random_state=42)


# normalize the data
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
X_train=scaler.fit_transform(X_train)
X_test=scaler.transform(X_test)
# def change_weights(X_train,y_train,X_test,y_test,weights):

In [2]:
# Check if the entropy functions work

In [3]:
pk = np.array([1/5, 2/5, 2/5])  # fair coin
H = entropy(pk)
print(H)

1.0549201679861442


In [4]:
H=-0.2*(np.log(0.2))-0.4*(np.log(0.4))-0.4*(np.log(0.4))
print(H)

1.0549201679861442


In [5]:

def calculate_probabilities(list_labels, uniq_labels):
    '''
    Author: Sara Nassar 
    this function calculates the probabilities of each label in the list of labels
    it is calculated by number of labels in class A/all labels
    number of labels in class B/all labels
    and so on
    '''
    
    # A dictionary to store the probabilities
    probabilities = dict.fromkeys(uniq_labels, 0)
    
    # Total number of labels
    total_labels = len(list_labels)
    
    for label in uniq_labels:
        # Counting the number of times the label occurs in the list
        count = list_labels.count(label)
        
        # Calculating the probability of the label
        probability = count / total_labels
        
        # Storing the calculated probability in the dictionary
        probabilities[label] = probability
        
    return probabilities    
    
    
# test your function
list_labels=[1,2,0,1,2,0]
uniq_labels=[0,1,2]
print(calculate_probabilities(list_labels,uniq_labels))
# this should print somehting like 0.33,0.33,0.33


{0: 0.3333333333333333, 1: 0.3333333333333333, 2: 0.3333333333333333}


In [6]:

def calc_entropy_from_probabilities(list_probas):
    '''
    Author: Sara Nassar 
    list_probas is the list of probabiities
    the formula for entropy is
    sum(-proba*log(proba))
    
    '''
    
    entropy_value = 0

    for proba in list_probas:
        # If the probability is not zero
        if proba != 0:
            entropy_value += -proba * np.log(proba)
     
    return entropy_value


# test your function
list_probas=[1/5, 2/5, 2/5]
print(calc_entropy_from_probabilities(list_probas))
# above should print 1.054...

1.0549201679861442


In [7]:
def information_gain(old_entropy,new_entropies,count_items):
    '''
    Author: Sara Nassar 
    from the list of new entropies, calculate the overall new entropy
    
    formula is something like:
    overall_new_entropy = entropy1*proportion1 + entropy2*proportion2+ entropy3*proportion3 ...
    
    igain=old_entropy-overall_new_entropy
    '''
    
    overall_new_entropy = 0
    
    # Calculating the total number of items
    total_items = sum(count_items)
    
    for i in range(len(new_entropies)):
        # Calculating the proportion of items in the current partition
        proportion = count_items[i] / total_items
        
        # Adding the entropy of the current partition weighted by its proportion to the overall new entropy
        overall_new_entropy += new_entropies[i] * proportion
        
    # Calculating the information gain
    information_gain = old_entropy - overall_new_entropy
    
    return information_gain

#test your function
old_entropy=1
new_entropies=[0,0.65]
count_items=[4,6]
print(information_gain(old_entropy,new_entropies,count_items))
# above should print 0.61
    
    
    

0.61


In [8]:
# iris

In [9]:

def initialize_weights(number_features):
    '''
    the first set of weights corresponding to the features
    For now, it defaults to 2
    '''
    
    weights=np.array([np.random.uniform() for i in range(number_features)])
    return weights
    

In [10]:
num_feats=X_train.shape[1]
print(initialize_weights(num_feats))

[0.38415502 0.27579153 0.71347403 0.10322215]


### Task 3: PSO optimization to find best weights at any iteration

In [11]:
def get_entropy_from_groups(new_entropies,count_items):
    overall_new_entropy = 0
    
    # Calculating the total number of items
    total_items = sum(count_items)
    
    for i in range(len(new_entropies)):
        # Calculating the proportion of items in the current partition
        proportion = count_items[i] / total_items
        
        # Adding the entropy of the current partition weighted by its proportion to the overall new entropy
        overall_new_entropy += new_entropies[i] * proportion
        
    return overall_new_entropy    

def get_entropy(threshold,res,y_test):

    # make two groups
    group1=[]
    group2=[]

    for i in range(res.shape[0]):
        if res[i]<threshold:
            group1.append(y_test[i])
        else:
            group2.append(y_test[i])




    proba_gr1=calculate_probabilities(group1,np.unique(group1).tolist())
    proba_gr1=list(proba_gr1.values()) 
    entropy_group1=calc_entropy_from_probabilities(proba_gr1)
    count_group1=len(proba_gr1)

    proba_gr2=calculate_probabilities(group2,np.unique(group2).tolist())
    proba_gr2=list(proba_gr2.values()) 
    entropy_group2=calc_entropy_from_probabilities(proba_gr2)
    count_group2=len(proba_gr2)

    new_entropies=[entropy_group1,entropy_group2]
    count_items=[count_group1,count_group2]
    overall_new_entropy=get_entropy_from_groups(new_entropies,count_items)
    return overall_new_entropy


In [12]:
# # initialization
# # Author: Bilal 

# # step 1 calculate the probabilities of 0, 1 and 2 in the y_test array
# proba_init = calculate_probabilities(y_test.tolist(),np.unique(y_test).tolist())
# print("Initial proba=",proba_init)

# # step 2 calculate the initial entropy of y_test, using the probability values
# # you might have to convert the dictionary to a list
# # get only the probability values
# list_probas=list(proba_init.values())
# print(list_probas)
# entropy_init=calc_entropy_from_probabilities(list_probas)
# print("Initial entropy = ",entropy_init)




# wt_init=initialize_weights(num_feats)
# # right now the initialize_weights function only returns 2,2,2 
# print(wt_init)

# # multiply the weights with each feature and calculate the sum
# res=np.sum(X_test * wt_init, axis=1)
# print(res)

# best_threshold=-1
# best_entropy=np.inf
# # try diffrent thresholds
# for threshold in np.unique(np.sort(res)):
#     print(threshold)
#     new_entropy=get_entropy(threshold,res,y_test)
#     print(threshold,new_entropy)
#     if new_entropy<best_entropy:
#         best_entropy=new_entropy
#         best_threshold=threshold

        
# print("Best is ",best_entropy,"at thresh",best_threshold)        

In [13]:
def objective_fn(param1,param2,X,y):
    '''
    param1 and param2 are the parameters that we want to optimize
    say param1 is the weight vector and  param2 is the threshold
    '''
    # multiply the weights with each feature and calculate the sum
    res=np.sum(X * param1, axis=1)  
#     print(res)
    entropy=get_entropy(param2,res,y)
    return entropy
    
    
def objective_fn_vector(params1,params2,X,y):
    '''
    params1 is an array of weight vectors
    params2 is an array of thresholds
    '''
    results=[]
    for i in range(params1.shape[0]):
        param1=params1[i]
        param2=params2[i]
        res=objective_fn(param1,param2,X,y)
#         print(param2,res)
        results.append(res)
    
    return np.array(results)
    

In [14]:
params1=[initialize_weights(X_train.shape[1]) for i in range(100)]
params2=[np.random.uniform() for i in range(100)]
# we have a list of 100 weight vectors (params1) and 100 thresholds (params2)
params1=np.array(params1)
params2=np.array(params2)

In [15]:
z = objective_fn_vector(params1, params2, X_train, y_train)
# Find the global minimum
param1_min = params1[z.argmin()]
param2_min = params2[z.argmin()]

In [16]:

 
# Hyper-parameter of the algorithm
c1 = c2 = 0.1
w1 = np.array([np.random.uniform() for i in range(X_train.shape[1])])
w2 = 0.8 
# Create particles
n_particles = 20
np.random.seed(100)
params1=[initialize_weights(X_train.shape[1]) for i in range(n_particles)]
params2=[np.random.uniform() for i in range(n_particles)]
params1=np.array(params1)
params2=np.array(params2)


V_param1 = [initialize_weights(X_train.shape[1])*0.1 for i in range(n_particles)]
V_param2 = np.array([np.random.uniform()*0.1 for i in range(n_particles)])

# Initialize data
pbest = (params1,params2)
pbest_obj = objective_fn_vector(params1, params2, X_train, y_train)
gbest=(params1[pbest_obj.argmin()],params2[pbest_obj.argmin()])
gbest_obj = pbest_obj.min()
 


In [17]:
V_param2.shape

(20,)

In [18]:
pbest_obj

array([0.67013703, 0.82232957, 0.64329013, 0.70573338, 0.73886477,
       0.82232957, 1.09729975, 0.5237323 , 0.77244152, 0.4620281 ,
       0.86703698, 0.81919055, 1.09729975, 0.81919055, 0.73355763,
       0.74030523, 0.82232957, 0.73805779, 1.09729975, 0.68309963])

In [19]:
def update():
    "Function to do one iteration of particle swarm optimization"
    global V_param1,V_param2, params1,params2, pbest, pbest_obj, gbest, gbest_obj
    # Update params
    r11,r12, r2 = np.random.rand(3)
    V_param1=w1*V_param1+c1*r11*(pbest[0] - params1)+ c2*r2*(gbest[0]-params1)
    V_param2=w2*V_param2+c1*r12*(pbest[1] - params2)+ c2*r2*(gbest[1]-params2)    
#     V = w * V + c1*r11*(pbest - params1) + c2*r2*(gbest.reshape(-1,1)-X)
    params1 = params1 + V_param1
    params2 = params2 + V_param2
    
    obj = objective_fn_vector(params1, params2, X_train, y_train)
    for i in range(pbest[0].shape[0]):
        if pbest_obj[i]>=obj[i]:
            pbest[0][i]=params1[i]
            pbest[1][i]=params2[i]
            pbest_obj[i]=obj[i]

            
    gbest=(params1[pbest_obj.argmin()],params2[pbest_obj.argmin()])
    gbest_obj = pbest_obj.min()
 


In [20]:
for i in range(100):
    update()
print("PSO found best solution at f({})={}".format(gbest, gbest_obj))
print("Global optimal at f({})={}".format([param1_min,param2_min], objective_fn(param1_min, param2_min, X_train, y_train)))


PSO found best solution at f((array([0.64227925, 0.27855107, 0.47590872, 0.86590257]), 0.6704879386456812))=0.4161039895073432
Global optimal at f([array([0.84095501, 0.08586732, 0.608571  , 0.92579005]), 0.6739728092759083])=0.4620281046196322


### End of task 3

In [None]:
def change_weights(weights):
    new_weights=[]
    for i in range(weights[-1].shape[0]):
        new_weights.append(np.random.uniform(0,1))
    return np.array(new_weights)

In [None]:
def apply_and_measure_accuracy(X,y,weights):    
    res=np.sum(X*weights[-1],axis=1)
    res = np.tanh(res)
    res[res>0.5]=1
    res[res<=0.5]=0
    acc=accuracy_score(y, res)
    return acc
    
def get_train_test_accuracy(X_train,y_train,X_test,y_test,weights):
    train_acc=apply_and_measure_accuracy(X_train,y_train,weights)
    test_acc=apply_and_measure_accuracy(X_test,y_test,weights)
    return train_acc,test_acc
    

In [None]:
wt_init=[initialize_weights(num_feats)]
res=np.sum(X_test*wt_init[-1],axis=1)
res = np.tanh(res)
res[res>0.5]=1
res[res<=0.5]=0
print(res.shape)
acc=accuracy_score(y_test, res)
test_accuracies=[acc]

res=np.sum(X_train*wt_init[-1],axis=1)
res = np.tanh(res)
res[res>0.5]=1
res[res<=0.5]=0
print(res.shape)
acc=accuracy_score(y_train, res)

train_accuracies=[acc]
print("Initial test acc",test_accuracies)




def train_weights(X_train,y_train,X_test,y_test,weights,train_accuracies,test_accuracies):
    print("Trial number ",len(weights))
    
    train_acc,test_acc=get_train_test_accuracy(X_train,y_train,X_test,y_test,weights)
    
    
    
    # store the accuracy in this list of accuracies
    train_accuracies.append(train_acc)
    test_accuracies.append(test_acc)
    print("train",train_acc,"test",test_acc)
    print(test_accuracies[-1],test_accuracies[-2])
    
    # exit condition
    if test_accuracies[-1]<test_accuracies[-2]:
        print("returning")
        return weights,train_accuracies,test_accuracies
    
    # change the weights according to the accuracy
    new_weights=change_weights(weights)
    weights.append(new_weights)
    return train_weights(X_train,y_train,X_test,y_test,weights,train_accuracies,test_accuracies)
    


In [None]:
weights,train_accuracies,test_accuracies=train_weights(X_train,y_train,X_test,y_test,wt_init,train_accuracies,test_accuracies)


In [None]:
### Test the weights in the pre-final iteration
res=np.sum(X_test*weights[-2],axis=1)
res = np.tanh(res)
res[res>0.5]=1
res[res<=0.5]=0
acc=accuracy_score(y_test, res)
print(acc)
print(res)