In [1]:
import numpy as np
import matplotlib.pyplot as plt
import gym
from gym import error, spaces, utils
from gym.utils import seeding
from enum import Enum

class Plant:
    def __init__(self, species, maturity=110):
        self.species = species
        self.maturity = maturity         # consider 'days_to_maturity'
        self.age = 0
        
    def __repr__(self):
        return "{}".format(self.species)
    

class Field(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self, size=5, sow_limit=200, season=120, calendar=0):
        # parameters for overall field character
        self.size = size
        self.sow_limit = sow_limit
        self.season = season
        self.calendar = calendar
        
        # constants for computing end-of-season reward---distances represent meters
        self.crowding_dist = .02
        self.maize_maize_dist = .1
        self.bean_support_dist = .1
        self.crowding_penalty = .1
        self.maize_maize_penalty = .9
        self.bean_support_bonus = .6
        
        # OpenAI action and observation space specifications
        self.action_space = spaces.Discrete(4)
        ### self.observation_space = spaces.???
        
        # field is initialized by calling reset()
        self.field = None
        
    def step(self, action):
        # sow plants (or wait) depending on actions chosen
        # action is an array of n choices; value of n specified in agent code sow_limit
        # could be cleaned up with plants as an enumeration?
        
                   
             ## this part of the code is a work in progress!!   
             ##------------------START OF WIP------------------------------------   
                    
             #declare a new variable that will be the result of the computer figuring out where we want to plant   
             #developed_coord = curr   
             #curr is the result from rollout   
           
             #planttypeTuple = ("Maize", "Bean", "Squash")   
                
             #coordTuple = curr.strip().split()    
                
             #self.field = np.append(self.field, [[self.size*(coordTuple[0]), self.size*(coordTuple[1]), self.size*(coordTuple[2]), Plant(planttypeTuple[0]), Plant(planttypeTuple[1], Plant(planttypeTuple[2]))])   ,
                
            
            ###Experiment: each choice should be represented as an array with 3 elements:
            ### plant choice, y coordinate, x coordinate (in that order).
            ### i.e. action should look like: [[choice1, y1, x1], [choice2, y2, x2]...]
               
        for choice in action:
            if choice[0] == 0:
                self.field = np.append(self.field, [[self.size * choice[1], 
                                                 self.size * choice[2], 
                                                 Plant('Maize')]], axis=0)
            elif choice[0] == 1:
                self.field = np.append(self.field, [[self.size * choice[1], 
                                                 self.size * choice[2], 
                                                 Plant('Bean')]], axis=0)
            elif choice[0] == 2:
                self.field = np.append(self.field, [[self.size * choice[1], 
                                                 self.size * choice[2], 
                                                 Plant('Squash')]], axis=0)
            # when choice == 3, nothing is done (agent waits)   

             ##--------------------------END OF WIP----------------------------------   
                                                    
     #         for choice in action:   
     #             if choice == 0:   
     #                 self.field = np.append(self.field, [[self.size * coordTuple,    
     #                                              self.size * coordTuple,    
     #                                              Plant('Maize')]], axis=0)   
        
     #             elif choice == 1:   
     #                 self.field = np.append(self.field, [[self.size * input(),    
     #                                              self.size * input(),    
     #                                              Plant('Bean')]], axis=0)   
     #             elif choice == 2:   ,
     #                 self.field = np.append(self.field, [[self.size * input(),    
     #                                              self.size * input(),    
     #                                              Plant('Squash')]], axis=0)   
            
        
        # increment timekeeping
        self.calendar +=1
        for plant in self.field:
            plant[2].age += 1
            
        done = self.calendar == self.season
            
        if not done:
            reward = 0
        else:
            reward = self.get_reward()
            
        return self.field, reward, done, {}
    
    def reset(self):
        # field is initialized with one random corn plant in order to make sowing (by np.append) work
        self.field = np.array([[self.size * np.random.random(), 
                                self.size * np.random.random(), 
                                Plant('Maize')]])
        # timekeeping is reset
        self.calendar = 0
        
    def render(self, mode='human'):
        # initialize plant type arrays so that pyplot won't break if any is empty
        maize = np.array([[None, None]])
        bean = np.array([[None, None]])
        squash = np.array([[None, None]])
        maize_imm = np.array([[None, None]])
        bean_imm = np.array([[None, None]])
        squash_imm = np.array([[None, None]])
        
        # replace initial arrays with coordinates for each plant type; imm are plants that haven't matured
        maize = np.array([row for row in self.field 
                             if row[2].__repr__() == 'Maize' and row[2].age >= row[2].maturity])
        bean = np.array([row for row in self.field 
                            if row[2].__repr__() == 'Bean' and row[2].age >= row[2].maturity])
        squash = np.array([row for row in self.field 
                              if row[2].__repr__() == 'Squash' and row[2].age >= row[2].maturity])
        maize_imm = np.array([row for row in self.field 
                             if row[2].__repr__() == 'Maize' and row[2].age < row[2].maturity])
        bean_imm = np.array([row for row in self.field 
                             if row[2].__repr__() == 'Bean' and row[2].age < row[2].maturity])
        squash_imm = np.array([row for row in self.field 
                             if row[2].__repr__() == 'Squash' and row[2].age < row[2].maturity])
        
        # plot the field---currently breaks if any plant type is absent
        plt.figure(figsize=(10, 10))
        plt.scatter(maize[:,0], maize[:,1], c='green', s=200, marker = 'o', alpha=.5, edgecolor='#303030')
        plt.scatter(bean[:,0], bean[:,1], c='brown', s=150, marker = 'o', alpha=.5, edgecolor='#303030')
        plt.scatter(squash[:,0], squash[:,1], c='orange', s=400, marker = 'o', alpha=.5, edgecolor='#303030')
        plt.scatter(maize_imm[:,0], maize_imm[:,1], c='green', s=200, marker = 'o', alpha=.1, edgecolor='#303030')
        plt.scatter(bean_imm[:,0], bean_imm[:,1], c='brown', s=200, marker = 'o', alpha=.1, edgecolor='#303030')
        plt.scatter(squash_imm[:,0], squash_imm[:,1], c='orange', s=200, marker = 'o', alpha=.1, edgecolor='#303030')

        plt.show()
        
        print("Total yield in Calories is {}.\n---\n".format(round(reward, 1)))
    
    def close(self):
        # unneeded right now? AFAICT this is only used to shut down realtime movie visualizations
        pass
    
    def get_reward(self):
        # array of plant coordinates for computing distances
        xy_array = np.array([[row[0], row[1]] for row in self.field])

        # distances[m,n] is distance from mth to nth plant in field
        distances = np.linalg.norm(xy_array - xy_array[:,None], axis=-1)
        
        reward = 0
        i = 0
        while i < len(self.field):
            if self.field[i,2].age < self.field[i,2].maturity:
                reward += 0
            elif self.field[i,2].__repr__() == 'Maize':
                cal = 1
                j = 0
                while j < len(distances[0]):
                    if (self.field[j,2].__repr__() == 'Bean' 
                            and distances[i,j] < self.bean_support_dist):
                        cal += self.bean_support_bonus
                    if (self.field[j,2].__repr__() == 'Maize' 
                            and i !=j 
                            and distances[i,j] < self.maize_maize_dist):
                        cal *= self.maize_maize_penalty
                    if 0 < distances[i,j] < self.crowding_dist:
                        cal *= self.crowding_penalty
                    j += 1
                reward += cal
            elif self.field[i,2].__repr__() == 'Bean':
                reward += .25
            elif self.field[i,2].__repr__() == 'Squash':
                reward += 3
            i += 1        
        return reward


In [2]:
class Path:

    def __init__(self, path, score):
        self.path = path
        self.score = score

    def get_score(self):
        return self.score
    
    def get_path(self):
        return self.path


In [3]:
import numpy as np
class GA_Agent:

    def __init__(self, generations, field, crossover, mutation, pop_size):
        self.generations = generations
        self.field = field
        self.cross = crossover
        self.mutation  = mutation
        self.population = self.generate_new_population(pop_size)

    ##Called with 16 paths
    def run_tournament(self, paths):
        winners = []
        size = len(paths)
        if size == 2:
            return paths
        group_a = paths[0:int(size/2)]
        group_b = paths[int(size/2):size]

        i  = 0

        while i < int(size/2):
            if group_a[i].score > group_a[i + 1].score:
                winners.append(group_a[i])
            else:
                winners.append(group_a[i+1])
            if group_b[i].score > group_b[i+1].score:
                winners.append(group_b[i])
            else:
                winners.append(group_b[i+1])
            i+=2
        return self.run_tournament(winners)
    
    ##Currently does cross = 0.5
    def crossover(self, parents):
        parent_a = parents[0]
        parent_b = parents[1]
        children = []
        new_child = np.empty((0,3))
        new_child_b = np.empty((0,3))

        for i in range(int(parent_a.path.shape[0])/2):
            new_child = np.append(new_child, parent_a.path[i])
            new_child_b = np.append(new_child, parent_b.path[i])

        for i in range(int(parent_a.path.shape[0])/2, int(parent_a.path.shape[0])):
            new_child = np.append(new_child, parent_b.path[i])
            new_child_b = np.append(new_child, parent_a.path[i])
            
        self.field.reset()
        new_child = np.reshape(new_child, (int(len(path)/3), 3))
        new_child_b = np.reshape(new_child_b, (int(len(path)/3), 3))

        for j in range(int(new_child.shape[0])):
            observation, reward, done, _ = render_field.step(population[j][i:i+10])
        to_append = Path(new_child, reward)

        for j in range(int(new_child.shape[0])):
            observation, reward, done, _ = render_field.step(population[j][i:i+10])
        to_append = Path(new_child, reward)
            

    def run_generation(self, num):
        for i in range(num):
            new_pop = []
            while not(len(new_pop) == len(self.population)):
                tournament_paths = []
                for j in range(16):
                    index = np.random.randint(100)
                    tournament_paths.append(self.population[index])
                best_parents = self.run_tournament(tournament_paths)
                new_children = self.crossover(best_parents)
                new_pop.append(new_children[:])
            self.population = new_pop
            
    def run_generations(self):
        self.run_generation(self.generations)

    def generate_new_population(self, size):
        pop = []
        for i in range(size):
            self.field.reset()
            path = np.empty((0,3))
            done = False
            while not(done):
                step = np.ones((10,3))

                for i in range(10):
                    step[i][0] = np.random.randint(4)
                    step[i][1] = np.random.randint(10)
                    step[i][2] = np.random.randint(10)
                observation, reward, done, _ = self.field.step(step)
                for i in range(10):
                    path = np.append(path, step[i])
                path = np.reshape(path, (int(len(path)/3), 3))
            ret = Path(path, reward)
            pop.append(ret)
        return pop

    def get_population(self):
        return self.population

In [50]:
##Population generation. 100 initial population


test_field = Field()
population = [] #List of numpy arrays (steps to take)



for i in range(100):
    test_field.reset()
    path = np.empty((0,3))
    done = False
    while not(done):
        step = np.ones((10,3))

        for i in range(10):
            step[i][0] = np.random.randint(4)
            step[i][1] = np.random.randint(10)
            step[i][2] = np.random.randint(10)
        observation, reward, done, _ = test_field.step(step)
        for i in range(10):
            path = np.append(path, step[i])
        path = np.reshape(path, (int(len(path)/3), 3))
    ret = Path(path, reward)
    population.append(ret)
print(len(population))


    


100


In [49]:
render_field = Field()
render_field.reset()

total_yield = 0

for j in range(100):
    render_field.reset()
    for i in range(int(population[0].shape[0]/10)):
        observation, reward, done, _ = render_field.step(population[j][i:i+10])
        total_yield += reward
    if j % 10 == 0:
        print("Processed",j)

print(total_yield/100)





Processed 0
Processed 10
Processed 20
Processed 30
Processed 40
Processed 50
Processed 60
Processed 70
Processed 80
Processed 90
145.1010359597972


In [21]:
observation, reward, done, _ = test_field.step(population[0])
print(done)

False
