<a href="https://colab.research.google.com/github/YuTaNCCU/201902_ANN_Metaheuristic/blob/master/ES/ES_ANN_0501.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Load Package

In [1]:
#https://github.com/YuTaNCCU/201902_ANN_Metaheuristic/tree/master/ES
import random
from string import ascii_lowercase
from copy import deepcopy
from abc import ABCMeta, abstractmethod
from copy import deepcopy
from collections import deque
from numpy import argmax
from keras import backend as K
from keras.models import Sequential 
import numpy as np
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.metrics import log_loss

Using TensorFlow backend.


# Define ES class

In [0]:
class ES:
    """
    Conducts tabu search
    """
    __metaclass__ = ABCMeta

    #default hyper parameters
    InitialSigma = None
    ParentsSize = None
    ChildSize = None
    tao = None
    
    #for input/output
    KerasModels = None
    WeightsStrucure = None   
    weights = None
    
    #for record
    cur_steps = 1
    best_weight = None
    best_score = None
    
    UseOLSReg=None
    X_train=None
    y_train=None
    
    def __init__(self, KerasModels, X_train, y_train, UseOLSReg=False, InitialSigma = 0.1, ParentsSize = 15, ChildSize = 100, tao = 0.5):
        """
        :param KerasModels: a Keras model, like keras.engine.sequential.Sequential
        :param weights: initial weights, should be a Keras model weight
        :param max_steps: maximum number of steps to run algorithm for
        :param UseOLSReg: If True, than use "OLS Regression" for the last layer
        
        """
        self.KerasModels = KerasModels
        
        self.UseOLSReg = UseOLSReg
        
        self.X_train=X_train
        self.y_train=y_train
 
        if all(isinstance(x, float) for x in [InitialSigma, tao]) and all(x > 0 for x in [InitialSigma, tao]):
            self.InitialSigma = InitialSigma
            self.tao = tao
        else:
            raise TypeError('InitialSigma & tao must be a positive float')
            
        if all(isinstance(x, int) for x in [ParentsSize, ChildSize]) and all(x > 0 for x in [ParentsSize, ChildSize]):
            self.ParentsSize = ParentsSize
            self.ChildSize = ChildSize
        else:
            raise TypeError('ParentsSize, ChildSize & max_steps must be a positive integer')

    def __str__(self): 
        return ('ES STEPS: %d ' +
                'BEST SCORE: %.4f ') % \
               (self.cur_steps, self.best_score)

    def __repr__(self):
        return self.__str__() 
    
    def _FlattenWeights(self, weights):
        """
        flatten weights
        
        param weights: keras神經網路的權重格式:nparray包在list中
        return WeightsStrucure : 神經網路各層的權重shape包在list中，unflatten時會用到
        return FlattenedWeights : 一維list包含所有的權重
        """
        WeightsStrucure = []
        FlattenedWeights = []
        for i_layer in weights:
            WeightsStrucure.append(i_layer.shape)
            if len(i_layer.shape) == 1 :# 該層權重的shape為一維 e.g. (15,)      
                FlattenedWeights.extend(i_layer)
            else :# 該層權重的shape為二維 e.g. (30, 15)  
                for i_links in i_layer:
                    FlattenedWeights.extend(i_links)
        return WeightsStrucure, FlattenedWeights

    def _UnflattenWeights(self, WeightsStrucure, ModifiedWeights):
        """
        Unflatten(回復成原本的結構) weights  
        
        param WeightsStrucure : 神經網路各層的權重shape包在list中
        param ModifiedWeights : 一維list包含所有meteHeuristic修改過的權重
        return: keras神經網路的權重格式:nparray包在list中
        """
        UnflattenWeights = []
        i_index = 0 
        for i_layer in WeightsStrucure:
            if len(i_layer) == 1 : # 該層權重的shape為一維 e.g. (15,)      
                TempList = ModifiedWeights[i_index:(i_index + i_layer[0])]
                TempList = np.asarray(TempList)
                i_index = i_index + i_layer[0]
            else : # 該層權重的shape為二維 e.g. (30, 15)  
                TempList = ModifiedWeights[i_index:(i_index + (i_layer[0]*i_layer[1]))]
                TempList = np.reshape(TempList, i_layer )
                i_index = i_index + (i_layer[0]*i_layer[1])
            UnflattenWeights.append(TempList)
        return UnflattenWeights   
    
    def _best(self, Population_Child_score):
        """
        Finds the best member of a neighborhood
        :param Population_Child_score: a np array
        :return: the indtex of N best member, N = ParentsSize
        """
        return np.array( Population_Child_score ).argsort()[::-1][:self.ParentsSize]
    
    def _Recombination(self, Population_Parents_Weights, Population_Parents_Sigma, rows): #GenerateParents
        """
        Generate New Parents Polulation
        """
        Population_Weights_Recombination = np.zeros(shape = (rows, Population_Parents_Weights.shape[1]))
        Population_Sigma_Recombination = np.zeros(shape = (rows, Population_Parents_Weights.shape[1]))
        for index_row, _ in enumerate( Population_Weights_Recombination ):
            """
            可能可以平行計算
            """
            TwoRowschoiced = np.random.choice(Population_Parents_Weights.shape[0], size=2, replace=False,)
            Parent1Mask = np.random.randint(2, size=Population_Parents_Weights.shape[1])
            Parent2Mask = np.full(shape = Population_Parents_Weights.shape[1], fill_value = 1 )  - Parent1Mask
            
            Population_Weights_Recombination[index_row,:] = (Population_Parents_Weights[TwoRowschoiced] * [Parent1Mask, Parent2Mask]).sum(axis=0)
            Population_Sigma_Recombination[index_row,:] = Population_Parents_Sigma[TwoRowschoiced].mean(axis=0)
        return Population_Weights_Recombination, Population_Sigma_Recombination

    def _score(self, ModifiedWeights):
        
        """
        Returns objective function value of a state

        :param state: a state
        :return: objective function value of state
        """
        UnflattenedWeights = self._UnflattenWeights(WeightsStrucure = self.WeightsStrucure, ModifiedWeights = ModifiedWeights)
        self.KerasModels.set_weights(UnflattenedWeights)
        test_on_batch = self.KerasModels.test_on_batch(self.X_train, self.y_train, sample_weight=None) # return ['loss', 'acc']
        return test_on_batch[1]
    #==================
        #==================
          #==================
            #==================
    def _OLSReg(self, ModifiedWeights):
        
        """
        :param : 
        :return: Keras Models, objective function value of state
        """
        UnflattenedWeights = self._UnflattenWeights(WeightsStrucure = self.WeightsStrucure, ModifiedWeights = ModifiedWeights)
        
        #%% OLS Regression
        #obtain the output of an intermediate layer
        #https://keras.io/getting-started/faq/?fbclid=IwAR3Zv35V-vmEy85anudOrlxCExXYwyG6cRL1UR0AaLPU6sZEoBjsbX-8LXQ#how-can-i-obtain-the-output-of-an-intermediate-layer
        self.KerasModels.set_weights(UnflattenedWeights)
        layer_name = 'IntermediateLayer'
        intermediate_layer_model = keras_models_Model(inputs=self.KerasModels.input,
                                         outputs=self.KerasModels.get_layer(layer_name).output)
        intermediate_output = intermediate_layer_model.predict(self.X_train)

        #fit LM
        lm =  LogisticRegression(random_state=0, solver='liblinear').fit(intermediate_output, self.y_train)
        
        #lm =  LinearRegression().fit(intermediate_output, self.y_train)
        # 印出係數, 截距 print(lm.coef_, lm.intercept_)
        
        #score
        #score = log_loss(y_pred = lm.predict(intermediate_output), y_true= self.y_train)
        
        #get OutLayerWeights
        OutLayerWeights = [np.array(lm.coef_).reshape(self.WeightsStrucure[-2]),
                           np.array(lm.intercept_).reshape(self.WeightsStrucure[-1])]

        #update ES-optimized weights
        UnflattenedWeights[-2:] = OutLayerWeights        
        
        #self.KerasModels.set_weights(UnflattenedWeights)
        #test_on_batch = self.KerasModels.test_on_batch(self.X_train, self.y_train, sample_weight=None) # return ['loss', 'acc']
        
        #print( 'score',score, 'test_on_batch',test_on_batch)
        _, OLS_Optimized_Weight = self._FlattenWeights(UnflattenedWeights)
        return OLS_Optimized_Weight 

    def run(self, weights, max_steps=5, verbose=10, useOLSReg = False):
        """
        Conducts ES
        :param weights: 
        :param max_steps: 
        :param verbose: int which indicates how many iter to show score
        :return: Keras Models, best state and objective function value of best state
        """
        
        if isinstance(weights, list)  :
          
            self.WeightsStrucure, self.weights = self._FlattenWeights(weights)
            self.best_weight = self.weights
            self.best_score = self._score(self.best_weight)
        else:
            raise TypeError('initial_state must be a list') 
            
        self.max_steps = max_steps
        
        #Step1 initial             
        Population_Parents_Weights = np.array([self.weights, self.weights])         
        Population_Parents_Sigma = np.full(shape = (self.ParentsSize, len(self.weights)), fill_value = self.InitialSigma ) 
        Population_Parents_Weights, _ = self._Recombination(Population_Parents_Weights, Population_Parents_Sigma, rows = self.ParentsSize )
        self.cur_steps = 1
        while True:   
            #Step2 Child
            ##Discrete Recombination
            Population_Child_Weights, Population_Child_Sigma = self._Recombination(Population_Parents_Weights, Population_Parents_Sigma, rows = self.ChildSize )
            ##mutation1
            RamdonNormalValue = np.random.normal(0, 1, 1)
            RamdonNormalValueDifferent = np.random.normal(0, 1, Population_Child_Sigma.shape)
            Population_Child_Sigma = np.exp( (1-self.tao)*RamdonNormalValue + self.tao*RamdonNormalValueDifferent )
            ##mutation2
            Population_Child_Weights = Population_Child_Weights + np.random.normal(0, Population_Child_Sigma, Population_Child_Sigma.shape)
            
            
            # OLS Regression
            if useOLSReg == True:
              for i, i_Child in enumerate(Population_Child_Weights) :
                  OLS_Optimized_Weight = self._OLSReg(i_Child)
                  #print(OLS_Optimized_Weight,'i:\n', i, Population_Child_Weights[i])
                  Population_Child_Weights[i] = OLS_Optimized_Weight
            
            
            #step3 Evaluation
            Population_Child_score = []
            for i_Child in Population_Child_Weights :
                """
                可能可以平行計算
                """
                Population_Child_score.append( self._score(i_Child) )
                 
            BestNIndex = self._best(Population_Child_score)
            Population_Parents_Weights = Population_Child_Weights[BestNIndex,:]
            Population_Parents_Sigma = Population_Child_Sigma[BestNIndex,:]
            
            #更新best
            best_weight_This_Iter =  Population_Child_Weights[BestNIndex,:][0]
            best_score_This_Iter = self._score(Population_Child_Weights[BestNIndex,:][0])
            if best_score_This_Iter > self.best_score:
                self.best_weight =  Population_Child_Weights[BestNIndex,:][0]
                self.best_score = self._score(Population_Child_Weights[BestNIndex,:][0])
        
            #print process 
            if ((self.cur_steps ) % verbose == 0) and verbose:
               print(self)
                
            self.cur_steps = self.cur_steps + 1
            #step4 check stop criteria
            if self.cur_steps > max_steps:
                print( 'Stop: Reach max_steps' )
                break
        return self._UnflattenWeights(WeightsStrucure = self.WeightsStrucure, ModifiedWeights = self.best_weight), self.best_score 


# Load Data

In [3]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.datasets import load_breast_cancer

#資料集是以dictionary的形式存在
cancer = load_breast_cancer()
df_feat = pd.DataFrame(cancer['data'],columns=cancer['feature_names'])

X = df_feat.iloc[:, ].values
y = cancer['target']

# Encoding categorical data
from sklearn.preprocessing import LabelEncoder
labelencoder_X_1 = LabelEncoder()
y = labelencoder_X_1.fit_transform(y)

# Splitting the dataset into the Training set and Test set
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)

#Feature Scaling
"""from sklearn.preprocessing import StandardScaler
sc = StandardScaler()
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)"""

#X_train.shape,X_test.shape,y_train.shape,y_test.shape

'from sklearn.preprocessing import StandardScaler\nsc = StandardScaler()\nX_train = sc.fit_transform(X_train)\nX_test = sc.transform(X_test)'

# Model Compile

In [9]:
from keras import backend as K
from keras.layers import Dense
from keras.models import Sequential, Model as keras_models_Model

model = Sequential()
model.add(Dense(10, activation='relu', input_shape=(30,)))
#model.add(Dense(3, activation='relu'))
model.add(Dense(3, activation='relu', name = 'IntermediateLayer'))
model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_5 (Dense)              (None, 10)                310       
_________________________________________________________________
IntermediateLayer (Dense)    (None, 3)                 33        
_________________________________________________________________
dense_6 (Dense)              (None, 1)                 4         
Total params: 347
Trainable params: 347
Non-trainable params: 0
_________________________________________________________________


# OLS Regression TEST

In [0]:

def _FlattenWeights(weights):
    """
    flatten weights

    param weights: keras神經網路的權重格式:nparray包在list中
    return WeightsStrucure : 神經網路各層的權重shape包在list中，unflatten時會用到
    return FlattenedWeights : 一維list包含所有的權重
    """
    WeightsStrucure = []
    FlattenedWeights = []
    for i_layer in weights:
        WeightsStrucure.append(i_layer.shape)
        if len(i_layer.shape) == 1 :# 該層權重的shape為一維 e.g. (15,)      
            FlattenedWeights.extend(i_layer)
        else :# 該層權重的shape為二維 e.g. (30, 15)  
            for i_links in i_layer:
                FlattenedWeights.extend(i_links)
    return WeightsStrucure, FlattenedWeights
a,b=_FlattenWeights(weights)
a[-2]

(3, 1)

In [0]:
#weights[-2:] = weights[-2:]*0
display(
    'WeightsStrucure==========='
    , WeightsStrucure
    , 'WeightsStrucure[-2:]==========='
    , WeightsStrucure[-2:] #最後一層的weights 及 bias
    , 'weights==========='
    , weights
    , 'weights[-2:]==========='
    , weights[-2:]
)

In [0]:
#%% OLS Regression
#obtain the output of an intermediate layer
#https://keras.io/getting-started/faq/?fbclid=IwAR3Zv35V-vmEy85anudOrlxCExXYwyG6cRL1UR0AaLPU6sZEoBjsbX-8LXQ#how-can-i-obtain-the-output-of-an-intermediate-layer
layer_name = 'IntermediateLayer'
intermediate_layer_model = keras_models_Model(inputs=model.input,
                                 outputs=model.get_layer(layer_name).output)
intermediate_output = intermediate_layer_model.predict(X_train)


#fit LM
lm = LinearRegression().fit(intermediate_output, y_train)
# 印出係數, 截距 print(lm.coef_, lm.intercept_)

#score
score = log_loss(y_pred = lm.predict(intermediate_output), y_true= y_train)



OutLayerWeights = [np.array(lm.coef_).reshape((3,1)), np.array(lm.intercept_).reshape((1))]

#update ES-optimized weights
weights[-2:] = OutLayerWeights        

model.set_weights(weights)
test_on_batch = model.test_on_batch(X_train, y_train, sample_weight=None) # return ['loss', 'acc']

print( 'score',score, 'test_on_batch[1]',test_on_batch[1])

score nan test_on_batch[1] 0.6505495


  loss = -(transformed_labels * np.log(y_pred)).sum(axis=1)
  loss = -(transformed_labels * np.log(y_pred)).sum(axis=1)


In [0]:
weights[-2:] = OutLayerWeights
display(
    'WeightsStrucure==========='
    , WeightsStrucure
    , 'WeightsStrucure[-2:]==========='
    , WeightsStrucure[-2:] #最後一層的weights 及 bias
    , 'weights==========='
    , weights
    , 'weights[-2:]==========='
    , weights[-2:]
)

# Main

In [19]:
# Initialize
weights = model.get_weights() 
MyES = ES(model, X_train, y_train, InitialSigma = 0.1, ParentsSize = 15, ChildSize = 100, tao = 0.5)   
weights, ES_Optimized_ObjVal  = MyES.run(weights, useOLSReg =True, max_steps=3, verbose = 1)

# Optimize
GlobalBestAccuracy = 0
NoImproveTimes = 0
while True:
  # Gradient-based Optimize
  model.set_weights(weights)
  model.fit(X_train, y_train, epochs=3, batch_size=32)
  weights = model.get_weights() 

  # ES
  weights, ES_Optimized_ObjVal  = MyES.run(weights, max_steps=5, verbose = 1)
  
  # Stop Criteria
  if ES_Optimized_ObjVal > GlobalBestAccuracy:
    GlobalBestAccuracy = ES_Optimized_ObjVal
    NoImproveTimes = 0
  else: 
    NoImproveTimes = NoImproveTimes + 1
    if NoImproveTimes == 5:
      break
      

ES STEPS: 1 BEST SCORE: 0.9363 
ES STEPS: 2 BEST SCORE: 0.9363 
ES STEPS: 3 BEST SCORE: 0.9363 
Stop: Reach max_steps
Epoch 1/3
Epoch 2/3
Epoch 3/3
ES STEPS: 1 BEST SCORE: 0.8967 
ES STEPS: 2 BEST SCORE: 0.9187 
ES STEPS: 3 BEST SCORE: 0.9231 
ES STEPS: 4 BEST SCORE: 0.9231 
ES STEPS: 5 BEST SCORE: 0.9231 
Stop: Reach max_steps
Epoch 1/3
Epoch 2/3
Epoch 3/3
ES STEPS: 1 BEST SCORE: 0.9275 
ES STEPS: 2 BEST SCORE: 0.9275 
ES STEPS: 3 BEST SCORE: 0.9275 
ES STEPS: 4 BEST SCORE: 0.9275 
ES STEPS: 5 BEST SCORE: 0.9275 
Stop: Reach max_steps
Epoch 1/3
Epoch 2/3
Epoch 3/3
ES STEPS: 1 BEST SCORE: 0.9319 
ES STEPS: 2 BEST SCORE: 0.9319 
ES STEPS: 3 BEST SCORE: 0.9319 
ES STEPS: 4 BEST SCORE: 0.9319 
ES STEPS: 5 BEST SCORE: 0.9319 
Stop: Reach max_steps
Epoch 1/3
Epoch 2/3
Epoch 3/3
ES STEPS: 1 BEST SCORE: 0.9319 
ES STEPS: 2 BEST SCORE: 0.9319 
ES STEPS: 3 BEST SCORE: 0.9319 
ES STEPS: 4 BEST SCORE: 0.9319 
ES STEPS: 5 BEST SCORE: 0.9319 
Stop: Reach max_steps
Epoch 1/3
Epoch 2/3
Epoch 3/3
ES S