-
Notifications
You must be signed in to change notification settings - Fork 0
/
GA_data3.py
317 lines (250 loc) · 8.56 KB
/
GA_data3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
#!/usr/bin/python
#
# author: Alexander Collins
# note:
# This file is almost exactly the same as binary_GA,
# the reason i've re-added all the re-used functions
# is so the other data GA files don't get a mixed up
# Solution() class returned.
#
# =======
# imports
# =======
import sys
import time
import random
import GA_csv
import GA_data1
# =======
# globals
# =======
# output options
OUT_DATA = True
CSV_NAME = None
# data set variables
data_file = "data_sets/data3.txt"
data_R_count = 2000 # number of rows within data
data_c_size = 5 # size of variables within data
# genetic algorithm variables
c_size = data_c_size # size of Rule's condition
R_count = 300 # number of rules per individual
generation_limit = 50
P_size = 20 # size of population (of Solutions)
G_size = (c_size*2 + 1) * R_count # size of Solution's genome (+ 1 for output)
C_rate = 0.9 # crossover rate (0.0 to 1.0) # NOTE: "typically 0.6 to 0.9"
M_rate = 0.07#1 / P_size # mutation rate (0.0 to 1.0) # NOTE: 1 / P_size or 1 / G_size
# =======
# classes
# =======
class Rule:
def __init__(self):
self.condition = []
self.output = -1
def __str__(self):
return str(self.condition) + "\t" + str(self.output)
def condition_matches(self, target):
matching = 0
for i, c in enumerate(self.condition):
if c[0] <= float(target[i]) <= c[1]:
matching += 1
return matching == data_c_size
class Solution():
def __init__(self, genome):
self.genome = genome
self.fitness = -1
self.rules = []
def __str__(self):
return str(self.genome) + ' = ' + str(self.fitness)
def build_rules(self, R_count, c_size):
self.rules = []
g = 0
for r in range(R_count):
rule = Rule()
for c in range(c_size):
if self.genome[g] < self.genome[g+1]:
rule.condition.append([self.genome[g], self.genome[g+1]])
else:
rule.condition.append([self.genome[g+1], self.genome[g]])
g += 2
rule.output = round(self.genome[g])
g += 1
self.rules.append(rule)
# =========
# functions
# =========
def fitness(individual):
individual.fitness = 0
individual.build_rules(R_count, c_size)
data = open(data_file)
# loop through each rule in data_file
for r in range(data_R_count):
data_rule = data.readline().rstrip('\n').split(" ")
# loop through & compare each rule in individual.rules
for rule in individual.rules:
if rule.condition_matches(data_rule[:data_c_size]):
if rule.output == int(data_rule[data_c_size + 1]):
individual.fitness += 1
break
return individual.fitness
def eval(population):
fittest = 0
for i in population:
if fitness(i) > fittest:
fittest = i.fitness
return fittest
def termination_criteria(generation, population):
# check termination criteria
eval(population)
if generation == generation_limit:
terminate = True
else:
terminate = False
# write data
if OUT_DATA is True or CSV_NAME is not None:
write_data(generation, population, OUT_DATA, CSV_NAME)
return terminate
def write_data(generation, population, out_data, csv_name):
# get the fitness data set
data_set = []
for i in population:
data_set.append(i.fitness)
# calculate population fitness data
fittest = max(data_set)
unfittest = min(data_set)
average = int(sum(data_set) / len(population))
# debug print population fitness data
if out_data is True:
print("GENERATION " + str(generation))
# # find fittest memeber of population
# fittest = population[0]
# for p in population:
# if fittest.fitness < p.fitness:
# fittest = p
# # print fittest member's rules
# for r in range(R_count):
# print(str(fittest.rules[r]))
# print population stats
print("Fittest:\t" + str(fittest))
print("Average:\t" + str(average))
print("Unfittest:\t" + str(unfittest))
print("-------------------------")
# write population fitness data to csv
if csv_name is not None:
GA_csv.write(csv_name, [generation, fittest, average, unfittest])
def run(generation_limit, P_size, G_size, C_rate, M_rate):
# initialisation
random.seed(time.time())
generation = 0
population = init(P_size, G_size)
# main loop
while termination_criteria(generation, population) is False:
parents = tournament_selection(population, 2)
offspring = single_crossover(parents, C_rate, G_size)
offspring = mutate(offspring, M_rate)
population = elitism(population, offspring)
generation += 1
def init(population_size, genome_size):
population = []
for p in range(population_size):
genome = [random.random() for g in range(genome_size)]
population.append(Solution(genome))
return population
def tournament_selection(population, p_count):
parents = []
for i in population:
# select candidates
candidates = [random.choice(population) for p in range(p_count)]
# tournament
fittest = candidates[0]
for c in candidates:
if c.fitness > fittest.fitness:
fittest = c
# add fittest candidate as parent
parents.append(fittest)
return parents
def roulette_selection(population):
parents = []
# roulette
for i in population:
# get total population fitness
overall = 0
for i in population:
overall += i.fitness
# wheel selection
selection = random.randint(0, overall)
# spin wheel
f_count = 0
for i in population:
if f_count < selection:
f_count += i.fitness
if f_count >= selection:
parent = i
break
# add selected parent to parents
parents.append(parent)
return parents
def single_crossover(population, crossover_rate, genome_size):
offspring = []
for i in population:
# pick two random population
parent1 = random.choice(population)
parent2 = random.choice(population)
# crossover
if random.random() <= crossover_rate:
split = random.randint(0, genome_size)
child1 = parent1.genome[0:split] + \
parent2.genome[split:genome_size]
child2 = parent2.genome[0:split] + \
parent1.genome[split:genome_size]
else:
child1 = parent1.genome
child2 = parent2.genome
# append child1, child2 to offspring
offspring.append(Solution(child1))
offspring.append(Solution(child2))
return offspring
def mutate(population, mutation_rate):
offspring = []
for i in population:
genome = []
# mutate
for g in i.genome:
if random.random() <= mutation_rate:
genome.append(g + random.uniform(-0.9, 0.9))
else:
genome.append(g)
# append mutated genome
offspring.append(Solution(genome))
return offspring
def elitism(old_population, new_population):
survivors = []
# find fittest in old_population and least fit in new_population
o_best = 0 # index of the fittest in old_population
n_worst = 0 # index of the least fit in new_population
for p in range(len(old_population)):
if old_population[p].fitness > old_population[o_best].fitness:
o_best = p
if new_population[p].fitness < new_population[n_worst].fitness:
n_worst = p
# replace least fit in new_population with fittest in new_population
for p in range(len(old_population)):
if p == n_worst and old_population[o_best].fitness > new_population[n_worst].fitness:
genome = old_population[o_best].genome
else:
genome = new_population[p].genome
survivors.append(Solution(genome))
return survivors
# ===========
# entry point
# ===========
if __name__ == '__main__':
if (len(sys.argv) >= 2):
CSV_NAME = sys.argv[1]
GA_csv.init(CSV_NAME)
run(generation_limit, P_size, G_size, C_rate, M_rate)
def main(argv=''):
global CSV_NAME
if argv != '':
CSV_NAME = argv
GA_csv.init(CSV_NAME)
run(generation_limit, P_size, G_size, C_rate, M_rate)