# Sembradora 3000

It demonstrates how to use the [agentpy](https://agentpy.readthedocs.io) package to create and visualize networks, use the interactive module, and perform different types of sensitivity analysis. 

In [None]:
# Model design
import agentpy as ap
import random
import numpy as np
from collections import namedtuple, deque
from queue import PriorityQueue
from itertools import count
import math

# Visualization
import imageio
import imageio.v2 as imageio
import matplotlib
import matplotlib.pyplot as plt 
import matplotlib.colors as mcolors
import matplotlib.image as mpimg
import matplotlib.animation as animation
import seaborn as sns
from IPython.display import HTML

# Guardar
import pickle

# Sistema
import os

## Grid

In [None]:

"""
1 is tractor
2 is obstacle
3 is target
4 is seeds
"""

def is_connected(grid, free_positions):
    """Check if all free cells are connected using BFS."""
    n = grid.shape[0]
    visited = set()
    queue = deque([free_positions.pop()])
    visited.add(queue[0])

    directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
    
    connected_count = 0
    free_count = len(free_positions)

    while queue:
        x, y = queue.popleft()
        for dx, dy in directions:
            new_x, new_y = x + dx, y + dy
            if (0 <= new_x < n and 0 <= new_y < n and 
                (new_x, new_y) in free_positions and (new_x, new_y) not in visited):
                queue.append((new_x, new_y))
                visited.add((new_x, new_y))
                connected_count += 1
                
    return connected_count == free_count

def is_adjacent(pos1, pos2):
    return abs(pos1[0] - pos2[0]) <= 1 and abs(pos1[1] - pos2[1]) <= 1

def generate_grid(model, n, obstacles_count, crops_count):
    """Generate a grid with obstacles, ensuring free cells are connected."""
    while True:
        grid = ap.Grid(model, (n, n), track_empty=True)
        grid.add_field("occupied", 0)

        obstacle_positions = set()
        while len(obstacle_positions) < obstacles_count:
            pos = (random.randint(0, n-1), random.randint(0, n-1))
            if pos not in obstacle_positions:
                # Check if pos is adjacent to any seed position
                is_valid = True
                for seed_pos in model.p.seedsPositions:
                    if is_adjacent(pos, seed_pos):
                        is_valid = False
                        break
                if is_valid:
                    obstacle_positions.add(pos)

        for pos in obstacle_positions:
            grid["occupied"][pos] = 1

        free_positions = {(x, y) for x in range(n) for y in range(n) if grid["occupied"][x, y] == 0}
        
        if is_connected(grid, free_positions):
            break

    for pos in obstacle_positions:
        grid["occupied"][pos] = 1
        model.np_grid[pos] = 2
        
        crops_positions = set()
        while len(crops_positions) < crops_count:
            pos = (random.randint(0, n-1), random.randint(0, n-1))
            if pos not in obstacle_positions and pos not in crops_positions:
                crops_positions.add(pos)
                model.np_grid[pos] = 3

    agentlist = ap.AgentList(model, len(obstacle_positions), agent_type=2)
    grid.add_agents(agentlist, obstacle_positions)
    
    model.grid = grid
    
    return obstacle_positions, crops_positions

## Agente

In [None]:
class CollectingTractor(ap.Agent):
    def setup(self):
        """Initialize agent parameters."""
        self.initialized = False
        self.reset()

    def reset(self):
        """Reset agent attributes."""
        if not self.initialized:
            self._initialize_agent()
        else:
            self._restore_agent()

    def _initialize_agent(self):
        """Initialize the agent's initial state."""
        self.collected = 0
        self.destroyed = False
        self.condition = True
        self.seeds = 0
        self.type = 1
        self.pos = (0, 0)
        self.visited_positions = set()

        # Initialize Q-learning parameters
        self.q_table = np.zeros((self.p.grid_size, self.p.grid_size, 6))
        self.learning_rate = self.p.learning_rate
        self.discount_factor = self.p.discount_factor
        self.epsilon = self.p.epsilon

        self.initialized = True  # Mark as initialized

    def _restore_agent(self):
        """Restore the agent's state to its initial conditions."""
        self.collected = 0
        self.targetIndex = 1
        self.destroyed = False
        self.condition = True
        self.seeds = 0
        self.type = 1
        self.pos = (0, 0)
        self.visited_positions = set()

        # Initialize Q-learning parameters
        self.q_table = np.zeros((self.p.grid_size, self.p.grid_size, 6))
        self.learning_rate = self.p.learning_rate
        self.discount_factor = self.p.discount_factor
        self.epsilon = self.p.epsilon

    def move(self, dx=0, dy=0):
        """Move the tractor by (dx, dy)."""
        x, y = self.pos
        new_pos = (x + dx, y + dy)
        
        if 0 <= new_pos[0] < self.p.grid_size and 0 <= new_pos[1] < self.p.grid_size:
            if self.model.np_grid[new_pos] != 2:
                self.pos = new_pos
                return self.pos, True
            else:
                return self.pos, False
        return self.pos, False

    def move_up(self):
        return self.move(dy=1)

    def move_down(self):
        return self.move(dy=-1)

    def move_left(self):
        return self.move(dx=-1)

    def move_right(self):
        return self.move(dx=1)

    def drop(self):
        """Drop seeds at the current target position."""
        if self.seeds > 0 and self.pos and self.model.np_grid[self.pos] == 3:
            self.seeds -= 1
            # Remove target from grid
            self.model.np_grid[self.pos] = 0
            return self.pos, True
        else:
            return self.pos, False

    def collect(self):
        """Collect seeds if available at the current position."""
        if self.model.np_grid[self.pos] == 4 and self.seeds == 0:
            self.seeds = self.capacity
            return self.pos, 1
        elif self.model.np_grid[self.pos] == 4 and self.seeds >= 0 and self.seeds < self.capacity:
            self.seeds = self.capacity
            return self.pos, 2
        else:
            return self.pos, 3

    def get_reward(self, action, agents, accionado):
        """Calculate the reward based on the action and outcome."""
        reward = 0  # No base penalty for movement

        if action in [0, 1, 2, 3]:  # Movement actions
            if accionado:
                reward += 1  # Small reward for valid movement
            else:
                reward -= 10  # Strong penalty for hitting an obstacle or invalid move
        elif action == 4:  # Drop seeds
            if accionado:
                reward += 25 
            else:
                reward -= 1
        elif action == 5:  # Collect seeds
            if accionado == 1:
                reward += 20
            elif accionado == 2:
                reward += 10
            else:
                reward -= 1
    
        # Penalty for collision with other agents, not self
        for agent in agents:
            if agent != self and agent.pos == self.pos:
                reward -= 10

        # Penalty for collision with obstacles
        if self.model.np_grid[self.pos] == 2:
            reward -= 10

        # Reward for exploring new positions
        if self.pos not in self.visited_positions:
            reward += 1
            self.visited_positions.add(self.pos)
        
        return reward

    def q_learning_update(self, state, action, reward, next_state):
        """Update the Q-table using the Q-learning algorithm."""
        x, y = state
        next_x, next_y = next_state
        
        # Ensure the state and next_state are within the grid bounds
        if 0 <= x < self.p.grid_size and 0 <= y < self.p.grid_size and 0 <= next_x < self.p.grid_size and 0 <= next_y < self.p.grid_size:
            current_q = self.q_table[x, y, action]
            max_next_q = np.max(self.q_table[next_x, next_y])
            new_q = current_q + self.learning_rate * (reward + self.discount_factor * max_next_q - current_q)
            self.q_table[x, y, action] = new_q

    def acciones(self, action):
        """Map action to the corresponding method."""
        action_map = {
            0: self.move_up,
            1: self.move_down,
            2: self.move_left,
            3: self.move_right,
            4: self.drop,
            5: self.collect
        }
        return action_map[action]()

    def step(self, agents):
        """Execute a step in the agent's behavior."""
        if self.destroyed:
            return

        state = self.pos
        x, y = state
        
        # Action selection using epsilon-greedy
        if np.random.rand() < self.epsilon:
            action = np.random.choice([0, 1, 2, 3, 4, 5])
        else:
            action = np.argmax(self.q_table[x, y])
        
        # Execute the action and update Q-table
        next_state, accionado = self.acciones(action)
        
        # Ensure next_state is within bounds
        if 0 <= next_state[0] < self.p.grid_size and 0 <= next_state[1] < self.p.grid_size:
            reward = self.get_reward(action, agents, accionado)
            self.q_learning_update(state, action, reward, next_state)
        else:
            # Optional: Handle invalid state transitions, if necessary
            pass
        
        return reward

## Ambiente

In [None]:
class TractorModel(ap.Model):
    def setup(self):
        """Initialize the environment and generate coordinates."""
        self.initialized = False
        self.reset()

    def reset(self):
        """Reset or initialize the model state."""
        if not self.initialized:
            self._initialize_environment()
        else:
            self._restore_environment()

        self.steps = 0  # Reset the step counter

    def _initialize_environment(self):
        """Initialize the environment, agents, and their targets."""
        self.Collected = 0
        self.steps = 0
        self.max_steps = self.p.max_steps
        self.np_grid = np.zeros((self.p.grid_size, self.p.grid_size))

        # Generate and store obstacle positions
        obastacle_pos, crop_pos = generate_grid(self, self.p.grid_size, self.p.obstacles_count, self.p.crops_count)
        self.obstacle_positions = np.copy(self.grid["occupied"])
        self.crops_pos = crop_pos   
        
        # Create agents and assign initial properties
        self.agents = ap.AgentList(self, self.p.number_of_tractors, CollectingTractor)
        self._initialize_agents(obastacle_pos, crop_pos)

        self.grid.add_agents(self.agents, [tractor.pos for tractor in self.agents])
        
        for seed in self.p.seedsPositions:
            self.np_grid[seed] = 4
            
        self.initialized = True  # Mark as initialized
        
    def _restore_environment(self):
        """Restore the environment to its initial state."""
        self.Collected = 0
        self.steps = 0
        self.max_steps = self.p.max_steps
        self.np_grid = np.zeros((self.p.grid_size, self.p.grid_size))
        
        # Restore obstacle positions
        for x in range(self.p.grid_size):
            for y in range(self.p.grid_size):
                if self.obstacle_positions[x, y] == 1:
                    self.np_grid[x, y] = 2 
        
        # Restore crop positions
        for pos in self.crops_pos:
            self.np_grid[pos] = 3
            
        self.agents.capacity = self.p.capacity
        self.agents.seeds = self.p.starting_seeds
        self.agents.destroyed = False
        self.grid.add_agents(self.agents, [tractor.pos for tractor in self.agents])
        
        for seed in self.p.seedsPositions:
            self.np_grid[seed] = 4

    def _initialize_agents(self, obstacle_pos, crop_pos):
        """Initialize agents with positions and targets."""
        self.agents.capacity = self.p.capacity
        self.agents.seeds = self.p.starting_seeds
        self.agents.destroyed = False

        self.coordsUsed = set()
        self.coordsUsed.update(obstacle_pos)
        self.coordsUsed.update(crop_pos)
        self.targets_by_tractor = []
        
    def _get_free_position(self):
        """Get a free position on the grid."""
        while True:
            x, y = random.randint(0, self.p.grid_size - 1), random.randint(0, self.p.grid_size - 1)
            if self.grid["occupied"][(x, y)] == 0 and (x, y) not in self.coordsUsed and (x, y) != (0, 0):
                return (x, y)

    def step(self):
        total_reward = 0
        for tractor in self.agents:
            reward = tractor.step(self.agents)
            total_reward += reward
        if self.steps >= self.max_steps:
            for tractor in self.agents:
                tractor.destroyed = True
            self.end()
        self.steps += 1
        return total_reward

    def end(self):
        self.report('Total targets', self.agents.collected)


## Animation

In [None]:
"""
0 is empty
1 is tractor
2 is obstacle
3 is target
4 is seeds
"""

# Load images
tractor_img = mpimg.imread('tractor.png')
obstacle_img = mpimg.imread('obstacle.png')
target_img = mpimg.imread('target.png')
seeds_img = mpimg.imread('seeds.png')
empty_img = mpimg.imread('empty.png')

def save_frame(model, filename, total_reward):
    fig, ax = plt.subplots()
    # Clear the axis to avoid over-plotting
    ax.clear()

    # Plot the grid using images
    for (x, y), value in np.ndenumerate(model.np_grid):
        if value == 2:  # Obstacle
            ax.imshow(obstacle_img, extent=[y, y+1, x, x+1], aspect='auto')
        elif value == 3:  # Target
            ax.imshow(target_img, extent=[y, y+1, x, x+1], aspect='auto')
        elif value == 4:  # Seeds
            ax.imshow(seeds_img, extent=[y, y+1, x, x+1], aspect='auto')
        else:
            ax.imshow(empty_img, extent=[y, y+1, x, x+1], aspect='auto')
            
    # Add text for each tractor displaying the number of seeds it has
    for agent in model.agents:
        if agent.destroyed:
            continue
        ax.imshow(tractor_img, extent=[agent.pos[1], agent.pos[1]+1, agent.pos[0], agent.pos[0]+1], aspect='auto')
        ax.text(agent.pos[1] + 0.5, agent.pos[0] + 0.5, str(agent.seeds),
                color='black', fontsize=12, ha='center', va='center', weight='bold')

    # Fix axis limits based on the grid dimensions
    ax.set_xlim([0, model.np_grid.shape[1]])
    ax.set_ylim([0, model.np_grid.shape[0]])

    # Set aspect ratio to 'equal' to prevent image stretching
    ax.set_aspect('equal')

    # Set the title for the plot
    ax.set_title(f"Tractor model \n Time-step: {model.steps} - Total reward: {total_reward}")

    # Save the frame
    plt.savefig(filename)
    plt.close()

def create_gif(filenames, gif_filename):
    with imageio.get_writer(gif_filename, mode='I', duration=0.1) as writer:
        for filename in filenames:
            image = imageio.imread(filename)
            writer.append_data(image)
    
def delete_temp_frames(prefix):
    for filename in os.listdir():
        if prefix in filename:
            os.remove(filename)

## Parameters

In [None]:
def calculate_crops_count(grid_size, obstacles_count):
    return grid_size**2 - obstacles_count

tractorParameters = {
    'grid_size': 10,
    'obstacles_count': 10,
    'number_of_tractors': 2,
    'max_steps': 100,
    'seedsPositions': [(5, 5)],
    'capacity': 2,
    'starting_seeds': 2,
    'learning_rate': 0.9,
    'discount_factor': 0.99,
    'epsilon': 1.0,
    'crops_count': calculate_crops_count(10, 10)  
}

## Machine Learning

In [None]:
# Parámetros
rewards_per_episode = []
epsilon_values = []

# Inicializar el modelo
model = TractorModel(tractorParameters)
model.setup()

num_episodes = 1000

def run_episode(episode, save_gif=False):
    model.reset()
    total_reward = 0

    # Configura epsilon basado en el progreso de los episodios
    epsilon_start = 0.9  # Valor inicial de epsilon
    epsilon_end = 0.01    # Valor final de epsilon
    epsilon_decay_rate = epsilon_start - epsilon_end
    epsilon = max(epsilon_end, epsilon_start - (epsilon_decay_rate * episode / (num_episodes / 2)))
    
    for tractor in model.agents:
        tractor.epsilon = epsilon

    filenames = []
    while model.steps < model.max_steps:
        total_reward = model.step()  # Ejecutar un paso en el modelo
        
        if all([tractor.destroyed for tractor in model.agents]):
            break

        if save_gif:
            filename = f'frame_{model.steps}.png'
            save_frame(model, filename, total_reward)
            filenames.append(filename)
    
    if save_gif:
        gif_filename = '1rstSimulacionTractores.gif' if episode == 0 else 'lastSimulacionTractores.gif'
        create_gif(filenames, gif_filename)
        delete_temp_frames('frame')

    return total_reward, epsilon

for episode in range(num_episodes):
    save_gif = (episode == 0 or episode == num_episodes - 1)
    total_reward, epsilon = run_episode(episode, save_gif)
    
    rewards_per_episode.append(total_reward)
    epsilon_values.append(epsilon)

    # Monitorear el progreso cada 10 episodios
    if episode % 10 == 0 or episode == num_episodes - 1:
        print(f'Episodio {episode + 1}/{num_episodes}, Recompensa total: {total_reward:.2f}, Epsilon: {epsilon:.2f}')


In [None]:
# Guardar la Q-table al final del entrenamiento
q_table = model.agents[0].q_table  # Asumiendo que todos los agentes comparten la misma Q-table
with open('q_table.pkl', 'wb') as f:
    pickle.dump(q_table, f)

# Graficar el rendimiento
plt.figure(figsize=(14, 5))

plt.subplot(1, 2, 1)
plt.plot(rewards_per_episode, label='Total Reward per Episode')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Reward per Episode')
plt.legend()

plt.tight_layout()
plt.show()

Final execution

In [None]:
'''
# Asignar la Q-table cargada a los agentes
model = TractorModel(tractorParameters)
model.setup()

for agent in model.agents:
    agent.q_table = q_table  # Asignar la Q-table cargada a cada agente

# Ahora puedes continuar entrenando o simular el comportamiento
model.reset()  # Resetear el ambiente si es necesario
model.step()  # Ejecutar pasos del modelo utilizando la Q-table cargada
'''
