In [None]:
import numpy as np
import pandas as pd
import quantecon as qe
from ast import literal_eval
from sklearn.ensemble import RandomForestRegressor
from matplotlib import pyplot as plt
import sys
sys.path.append(r'D:\OneDrive - Universitaet Bern\Documents\Others model\Jack Model\1_AgIrPdPtRu\1_AgIrPdPtRu\3_grid_search')
import grid_search

In [None]:
#Making Class
class pso:
    def __init__ (self, data, comparison, model_type, initial_magnitude_limit = 0.20, kicking_multiplier = 0.7, target = None, grid_distance = 0.07, boundaries = [0, 1], decimals = 3):
        self.datalog = data
        self.lower_boundary = boundaries[0]
        self.upper_boundary = boundaries[1]
        self.decimals = decimals
        self.comparison = comparison
        self.model_type = model_type
        self.kicking_multiplier = kicking_multiplier
        self.grid_distance = grid_distance
        
        #Reading "Elements" columns from string to list
        self.datalog['Elements'] = self.datalog["Elements"].apply(lambda x: literal_eval(x))
        
        #Select latest generation
        self.generation = self.datalog['Generation'].max()
        
        #Creating np.array of "Position" column and dropping the string type "Position" column
        self.position = []
        for i in range(len(self.datalog)):
            self.position.append(list(np.fromstring(self.datalog['Position'][i][1:-1], dtype=float, sep=' ')))
        self.position = np.array(self.position)
        #Checking whether the positions are out of boundaries or not
        for i in range(len(self.datalog)):    
            #Bouncing process if the position in the outside of the spaces
            if self.position[i].max() > self.upper_boundary or self.position[i].min() < self.lower_boundary:
                #Correcting the position
                self.position[i] = self.correct_position(self.position[i])
        #Putting the array into the dataframe
        self.datalog = self.datalog.drop(columns=['Position'])
        self.datalog = pd.concat([self.datalog, pd.DataFrame(([[i] for i in self.position]), columns = ['Position'])], axis = 1)
        
        #Creating target vector
        if target == None:
            target = []
            for i in range(len(self.datalog['Elements'][0])):
                target.append(1/len(self.datalog['Elements'][0]))
            target = np.array(target)
            target = np.around(target, decimals = self.decimals)
        
        #Creating "Velocity" column for the "0" generation
        if self.generation == 0:
            self.velocity = np.around((target - self.position), decimals = self.decimals)
            #Turning velocity into unit vector with certain magnitude
            self.velocity = self.initial_unit_vector_velocity(initial_magnitude_limit)
            self.datalog = pd.concat([self.datalog, pd.DataFrame(([[i] for i in self.velocity]), columns = ['Velocity'])], axis = 1)      
        
        #Creating blank parameter columns
        parameter_columns = ['F_damp', 'F_b_best', 'F_h_best', 'F_g_best', 'F_b_worst', 'F_h_worst', 'F_g_worst', 'mut_prob', 'flip_prob', 'mut_rate', 'v_max']
        for i in parameter_columns:
            self.datalog = pd.concat([self.datalog, pd.DataFrame(columns = [i], index = np.arange(len(self.datalog)))], axis = 1)

        
        #Creating blank "Activity" column
        self.datalog = pd.concat([self.datalog, pd.DataFrame(columns = ['Activity'], index = np.arange(len(self.datalog)))], axis = 1)      

               
        #Filling the "Activity" column with RFR
        self.f_activity(self.datalog)
        
        #Create "Distance" column
        self.distance = []
        for i in range(len(self.datalog)):
            self.distance.append(np.sqrt(sum((self.datalog.at[i, 'Position'] - self.comparison)**2)))
        self.distance = np.array(self.distance)
        self.distance = np.around(self.distance, decimals = decimals)
        self.datalog = self.datalog.assign(Distance = self.distance)
        
        #Creating dataframe of the latest generation
        self.working_generation = self.datalog.loc[self.datalog['Generation']==self.generation]
        
        #Create Possible Optima dataframe
        self.optima = self.datalog.loc[np.argmax(self.datalog['Activity']):np.argmax(self.datalog['Activity'])]
    
    def initial_unit_vector_velocity(self, initial_magnitude_limit):
        self.unit_vector_velocity = []
        for i in range(len(self.datalog)):
            self.unit_vector_velocity.append(list(self.velocity[i]/np.sqrt(sum(self.velocity[i]**2))))
        self.unit_vector_velocity = np.array(self.unit_vector_velocity)* initial_magnitude_limit
        self.unit_vector_velocity = np.around(self.unit_vector_velocity, decimals = self.decimals)
        return self.unit_vector_velocity
    
    
    #Adjust activity with the model
    def f_activity(self, dataframe):
        for i in range(len(dataframe)):
            #Correcting the possible rounding problem
            if sum(dataframe.at[i, 'Position']) != 1:
                dataframe.at[i, 'Position'][dataframe.at[i, 'Position'].argmax()] = dataframe.at[i, 'Position'].max() + (1 - sum(dataframe.at[i, 'Position']))
            #Determining the activity
            dataframe.at[i, 'Activity'] = float(grid_search.calculate_activity(metals = dataframe.at[i, 'Elements'], composition = dataframe.at[i, 'Position']))
        return
    
    
    def input_initial_parameter(self, parameter, value, ID, behavior):
        if behavior == 'uniform':
            for i in range(len(self.datalog)):
                self.datalog.at[i, parameter] = value
        else:
            self.datalog.at[ID, parameter] = value
        
        #Creating dataframe of the latest generation
        self.working_generation = self.datalog.loc[self.datalog['Generation']==self.generation] 
        return

    
    def update_parameter(self, ID, parameter, value):
        self.working_generation.at[ID, parameter] = value
        return
    
    
    def create_new_velocity(self):
        #Creating new velocity
        for i in range(len(self.working_generation)):
            damp_velocity = self.working_generation.at[i, 'F_damp'] * self.working_generation.at[i, 'Velocity']
            b_best_velocity = self.working_generation.at[i, 'F_b_best'] * self.delta_individual_record(i, 'Position', 'best')
            h_best_velocity = self.working_generation.at[i, 'F_h_best'] * self.delta_generation(i, 'Position', 'best') 
            g_best_velocity = self.working_generation.at[i, 'F_g_best'] * self.delta_global_record(i, 'Position', 'best')
            b_worst_velocity = self.working_generation.at[i, 'F_b_worst'] * self.delta_individual_record(i, 'Position', 'worst') 
            h_worst_velocity = self.working_generation.at[i, 'F_h_worst'] * self.delta_generation(i, 'Position', 'worst')    
            g_worst_velocity = self.working_generation.at[i, 'F_g_worst'] * self.delta_global_record(i, 'Position', 'worst')
            
            #Summation
            new_velocity = (damp_velocity + b_best_velocity + h_best_velocity + g_best_velocity - b_worst_velocity - h_worst_velocity - g_worst_velocity)    
                    
           
            #Mutation process
            self.working_generation.at[i,'Velocity'] = self.mutate(new_velocity,
                                                                   mutation_rate = self.working_generation.at[i, 'mut_rate'], 
                                                                   mutation_prob = self.working_generation.at[i, 'mut_prob'],
                                                                   flip_prob = self.working_generation.at[i, 'flip_prob'])
            
        #Normalization
        self.normalize_velocity()
        #Limitting velocity
        self.limitting_velocity()
        return
     
    
    def change_position(self):
        #Changing position
        for i in range(len(self.working_generation)):    
            new_position = self.working_generation.at[i, 'Position'] + self.working_generation.at[i, 'Velocity']
            #Bouncing process if the position in the outside of the spaces
            if new_position.max() > self.upper_boundary or new_position.min() < self.lower_boundary:
                #Bouncing the velocity
                self.working_generation.at[i, 'Velocity'] = self.correct_velocity(new_position, self.working_generation.at[i, 'Velocity'])
                #Correcting the position
                new_position = self.correct_position(new_position)
            self.working_generation.at[i,'Position'] = np.around(new_position, decimals = self.decimals)
        return
        

    def correct_velocity(self, new_position, velocity):
        #Changing the sign of velocity which belongs to position outside the spaces  
        bounced_velocity_0 = np.where(new_position < self.lower_boundary, velocity * -1, velocity)
        bounced_velocity_1 = np.where(new_position > self.upper_boundary, bounced_velocity_0 * -1, bounced_velocity_0)
        #Normalizing the velocity
        normalized_velocity = bounced_velocity_1 - (sum(bounced_velocity_1)/len(self.working_generation['Elements'][0]))
        #Preserving the magnitude of velocity
        unit_velocity = normalized_velocity/np.sqrt(sum(normalized_velocity**2))
        corrected_velocity = unit_velocity * np.sqrt(sum(bounced_velocity_1**2))
        corrected_velocity = np.around(corrected_velocity, decimals = self.decimals)
        return corrected_velocity


    def correct_position(self, new_position):
        while new_position.max() > self.upper_boundary or new_position.min() < self.lower_boundary:
            correction = []
            for i in range(len(self.datalog['Elements'][0])):
                if new_position[i] > self.upper_boundary:
                    #Fill the previous column of correction by 1/(n-1) of the correction
                    for x in range(i):
                        correction.append(-2*(self.upper_boundary - new_position[i])/(len(self.datalog['Elements'][0])-1))
                    #Fill column of correction
                    correction.append(2*(self.upper_boundary - new_position[i]))
                    #Fill the next column of correction by 1/(n-1) of the correction
                    for x in range(len(self.datalog['Elements'][0])-1-i):
                        correction.append(-2*(self.upper_boundary - new_position[i])/(len(self.datalog['Elements'][0])-1))

                if new_position[i] < self.lower_boundary:
                    #Fill the previous column of correction by 1/(n-1) of the correction
                    for x in range(i):
                        correction.append(-2*(self.lower_boundary - new_position[i])/(len(self.datalog['Elements'][0])-1))
                    #Fill column of correction
                    correction.append(2*(self.lower_boundary - new_position[i]))
                    #Fill the next column of correction by 1/(n-1) of the correction
                    for x in range(len(self.datalog['Elements'][0])-1-i):
                        correction.append(-2*(self.lower_boundary - new_position[i])/(len(self.datalog['Elements'][0])-1))

            correction = np.array(correction)
            correction = np.reshape(correction, (int(len(correction)/len(self.datalog['Elements'][0])), len(self.datalog['Elements'][0])))

            #Add the correction to the old position 
            for i in range(len(correction)):
                 new_position = new_position + correction[i]
        return new_position
        
    def update_distance(self, comparison):
        for i in range(len(self.working_generation)):
            self.working_generation.at[i, 'Distance'] = np.sqrt(sum((self.working_generation.at[i, 'Position']-comparison)**2))
            self.working_generation.at[i, 'Distance'] = np.around(self.working_generation.at[i, 'Distance'], decimals = 3)
        return
    
    def move(self):
        #Updating generation
        self.generation += 1
        self.working_generation['Generation'] += 1
        
        #Updating velocity
        self.create_new_velocity()
        
        #Check velocity
        self.check_velocity()
        
        #Updating position
        self.change_position()
        
        #Filling the "Activity" column
        self.f_activity(self.working_generation)
        
        #Updating "Distance" column
        self.update_distance(self.comparison)
        
        #Logging "Possible Optima"
        self.possible_optima_logging()
        
        #Concating the tables
        self.datalog = pd.concat([self.datalog, self.working_generation])
        self.datalog = self.datalog.reset_index(drop=True)
        self.store_datalog()
        return 

    
    def normalize_velocity(self):
        for i in range(len(self.working_generation)):
            self.working_generation.at[i, 'Velocity'] = self.working_generation.at[i, 'Velocity'] - (sum(self.working_generation.at[i, 'Velocity'])/len(self.working_generation['Elements'][0]))
            self.working_generation.at[i, 'Velocity'] = np.around(self.working_generation.at[i, 'Velocity'], decimals= self.decimals)
        return

    
    def limitting_velocity(self):
        for i in range(len(self.working_generation)):
            if np.sqrt(sum(self.working_generation.at[i, 'Velocity']**2)) > self.working_generation.at[i, 'v_max']:
                self.working_generation.at[i, 'Velocity'] = self.working_generation.at[i, 'Velocity'] / np.sqrt(sum(self.working_generation.at[i, 'Velocity']**2)) * self.working_generation.at[i, 'v_max']
                self.working_generation.at[i, 'Velocity'] = np.around(self.working_generation.at[i, 'Velocity'], decimals= self.decimals)
        return
    
    
    def check_velocity(self):
        for i in range(len(self.working_generation)):
            if np.sqrt(sum(self.datalog.loc[self.datalog['Generation'] == (self.generation-1)].reset_index().at[i, 'Velocity']**2)) < self.working_generation.at[i, 'v_min']:          
                self.working_generation.at[i, 'Velocity'] = self.working_generation.at[i, 'Velocity'] / np.sqrt(sum(self.working_generation.at[i, 'Velocity']**2)) * self.kicking_multiplier
                self.working_generation.at[i, 'Velocity'] = np.around(self.working_generation.at[i, 'Velocity'], decimals= self.decimals)
        return
    
    
    def mutate(self, new_velocity, mutation_rate, mutation_prob, flip_prob):
        mutated_velocity = []
        for i in range(len(new_velocity)):
            #Randoming number to decide whether mutation happen or not
            mutation = np.random.rand()
            step_size_i = 1
            
            #If mutation happen
            if mutation < mutation_prob:
                #Randoming the number for the step size
                step_size_i = np.random.uniform((1-mutation_rate), (1+mutation_rate))
                
                #Mutation and Flip
                if mutation < flip_prob * mutation_prob:
                    step_size_i = -1 * step_size_i
                    
                #Only mutation            
                else:
                    step_size_i = step_size_i
            mutated_velocity.append(new_velocity[i] * step_size_i)
        mutated_velocity = np.array(mutated_velocity)
        
        #Normalizing velocity
        normal_velocity = mutated_velocity - (sum(mutated_velocity)/len(self.working_generation['Elements'][0]))
       
        #Preserving the magnitude of velocity before mutation with checking the normal velocity first to avoid the error
        if np.sqrt(sum(normal_velocity**2)) == 0:
            unit_velocity = np.zeros(len(self.working_generation['Elements'][0]))
        else:
            unit_velocity = normal_velocity / np.sqrt(sum(normal_velocity**2)) * np.sqrt(sum(new_velocity**2))
        return unit_velocity
    
    
    def possible_optima_logging(self):
        for i in range(len(self.working_generation)):
            if np.sqrt(sum(self.working_generation.at[i, 'Velocity']**2)) < 1.5 * self.grid_distance:
                self.optima = pd.concat([self.optima, self.working_generation.loc[i:i]])
        self.optima.to_csv('../../raw_data/composition_vs_activity/model2/PSO/log/optima_gen_' + str(self.generation) + '.txt', sep='\t', index=False, mode='w')
    
    def g_best(self):
        return self.datalog.loc[np.argmax(self.datalog['Activity'])]
    
    def g_closest(self):
        return self.datalog.loc[np.argmin(self.datalog['Distance'])]
    
    def minimum_generation_activity(self, limit):
        return self.datalog[self.datalog['Activity']>=limit]['Generation'].min()
    
    def minimum_generation_distance(self, limit):
        return self.datalog[self.datalog['Distance']<=limit]['Generation'].min()
    
    def global_record_comparison(self, category):
        if category == 'best':
            individual = self.datalog.loc[np.argmax(self.datalog['Activity'])]
        elif category == 'worst':
            individual = self.datalog.loc[np.argmin(self.datalog['Activity'])]
        return individual 
    
    def generational_comparison(self, category):
        if category == 'best':
            individual = self.working_generation.loc[np.argmax(self.working_generation['Activity'])]
        elif category == 'worst':
            individual = self.working_generation.loc[np.argmin(self.working_generation['Activity'])]
        return individual
           
    def individual_record_comparison(self, ID, category):
        self.individual_data = self.datalog[self.datalog['ID']==ID].reset_index(drop=True)
        if category == 'best':
            individual = self.individual_data.loc[np.argmax(self.individual_data['Activity'])]
        elif category == 'worst':
            individual = self.individual_data.loc[np.argmin(self.individual_data['Activity'])]
        return individual
    
    def delta_global_record(self, ID, parameter, category):
        return self.global_record_comparison(category)[parameter] - self.working_generation[self.working_generation['ID']==ID][parameter].reset_index(drop=True)[0]
    
    def delta_generation(self, ID, parameter, category):
        return self.generational_comparison(category)[parameter] - self.working_generation[self.working_generation['ID']==ID][parameter].reset_index(drop=True)[0]
    
    def delta_individual_record(self, ID, parameter, category):
        return self.individual_record_comparison(ID, category)[parameter] - self.working_generation[self.working_generation['ID']==ID][parameter].reset_index(drop=True)[0]
    
    def individual_log(self, ID):
        return self.datalog[self.datalog['ID']==ID].reset_index(drop=True)
    
    def store_datalog(self):
        self.datalog.to_csv('../../raw_data/composition_vs_activity/model2/PSO/log/log_gen_' + str(self.generation) + '.txt', sep='\t', index=False, mode='w')
        return

# Pay attention to initial points, comparison, boundaries, data_type

In [None]:
metals = ['Ag', 'Ir', 'Pd', 'Pt', 'Ru']
initial_magnitude_limit = 0.2
comparison = np.array([0.15, 0., 0.85, 0., 0.])
model_type = 'model2'
boundaries = (0, 1)

In [None]:
def create_PSO(parameters, name):
    for i in range (5):
        population = pso(pd.read_csv('initial_points.txt', sep='\t'),
                         comparison = comparison,
                         grid_distance = 0.106,
                         initial_magnitude_limit = 0.2,
                         model_type = model_type)
        population.input_initial_parameter(parameter = 'F_damp', value = parameters[0], ID = 0, behavior = 'uniform')
        population.input_initial_parameter(parameter = 'F_b_best', value = parameters[1], ID = 0, behavior = 'uniform')
        population.input_initial_parameter(parameter = 'F_h_best', value = parameters[2], ID = 0, behavior = 'uniform')
        population.input_initial_parameter(parameter = 'F_g_best', value = parameters[3], ID = 0, behavior = 'uniform')
        population.input_initial_parameter(parameter = 'F_b_worst', value = parameters[4], ID = 0, behavior = 'uniform')
        population.input_initial_parameter(parameter = 'F_h_worst', value = parameters[5], ID = 0, behavior = 'uniform')
        population.input_initial_parameter(parameter = 'F_g_worst', value = parameters[6], ID = 0, behavior = 'uniform')
        population.input_initial_parameter(parameter = 'mut_prob', value = parameters[7], ID = 0, behavior = 'uniform')
        population.input_initial_parameter(parameter = 'flip_prob', value = parameters[8], ID = 0, behavior = 'uniform')
        population.input_initial_parameter(parameter = 'mut_rate', value = parameters[9], ID = 0, behavior = 'uniform')
        population.input_initial_parameter(parameter = 'v_max', value = parameters[10], ID = 0, behavior = 'uniform')
        population.input_initial_parameter(parameter = 'v_min', value = parameters[11], ID = 0, behavior = 'uniform')
        
        for x in range(30):
            population.move()
        population.datalog.to_csv(name + str(i) + '.txt', sep='\t', index=False, mode='w')
    return

In [None]:
parameter_model1 = [0.67, 1.95, 1.4 , 0.12, 0.07, 0.31, 0.04, 0.32, 0.17, 0.43, 0.2, 0.071]
parameter_model2 = [0.43, 0.02, 0.07, 1.18, 1.97, 1.09, 0.22, 0.11, 0.13, 0.04, 0.2, 0.071]
parameter_model3 = [0.63, 1.11, 1.  , 1.76, 0.08, 0.11, 0.02, 0.13, 0.54, 0.19, 0.2, 0.071]
parameter_model4 = [0.74, 1.67, 1.76, 0.42, 0.02, 0.05, 0.01, 0.11, 0.82, 0.37, 0.2, 0.071]

In [None]:
create_PSO(parameter_model1, '../../raw_data/composition_vs_activity/model2/PSO/PSO_0_')
create_PSO(parameter_model2, '../../raw_data/composition_vs_activity/model2/PSO/PSO_1_')
create_PSO(parameter_model3, '../../raw_data/composition_vs_activity/model2/PSO/PSO_2_')
create_PSO(parameter_model4, '../../raw_data/composition_vs_activity/model2/PSO/PSO_3_')