<h1>Plant Placement RL (Deep Q Learning Example)</h1>

This is a example usage of Tianshou's RL library which makes use of Deep Q learning, an off policy RL technique to train a very simplistic version of a plant placement model. There are 3 different plants types and no characteristics have been encoded. 

Install Necessary Libararies (Use Python 3.11 Please)

1. gymnasium
2. numpy
3. torch
4. tianshou (latest version) Note: If you pip install tianshou, it will not give you the latest version

In [None]:
!pip install gymnasium numpy torch

In [None]:
pip install git+https://github.com/thu-ml/tianshou.git@master --upgrade

<h3>1. Import Necessary Libraries</h3>

In [None]:
import gymnasium as gym
import numpy as np
import torch
from tianshou.policy import DQNPolicy
from tianshou.utils.net.common import Net
from tianshou.data import Collector, VectorReplayBuffer, Batch
from tianshou.trainer import OffpolicyTrainer

<h3>2. Define Plant Env Class</h3>

In [None]:
class PlantEnvironment(gym.Env):
    def __init__(self):
        self.grid_size = (5, 5)  # 5x5 grid
        self.possible_plants = [1, 2, 3]  # different types of plants
        self.num_actions = self.grid_size[0] * self.grid_size[1] * len(self.possible_plants)  # total action count
        self.max_steps = 10  # Maximum steps per episode
        self.current_step = 0  # Track the number of steps taken
        
        # Define state and action spaces
        self.observation_space = gym.spaces.Box(low=0, high=3, shape=self.grid_size, dtype=int)
        self.action_space = gym.spaces.Discrete(self.num_actions)
    
    def reset(self, **kwargs):
        # Reset the board to empty
        self.state = np.zeros(self.grid_size, dtype=int)
        self.current_step = 0
        return self.state.flatten(), {}  # Return observation and empty info dict
    
    def step(self, action):
        # Unflatten the action
        x = (action // len(self.possible_plants)) // self.grid_size[1]
        y = (action // len(self.possible_plants)) % self.grid_size[1]
        plant_type = action % len(self.possible_plants) + 1  # Plant type starts at 1

        reward = 0

        # Place the plant and calculate reward
        if self.state[x, y] == 0:  # if empty
            self.state[x, y] = plant_type
            reward += 1  # +1 for a valid placement
            
            # Additional rewards or penalties based on neighbors
            reward += self.calculate_spacing_reward(x, y, plant_type)
        else:
            reward -= 1  # -1 for invalid placement on occupied space

        self.current_step += 1
        done = self.current_step >= self.max_steps  # End episode after max_steps
        
        return self.state.flatten(), reward, done, False, {}  # Return empty info dict

    def calculate_spacing_reward(self, x, y, plant_type):
        """Reward based on spacing and neighboring plants."""
        reward = 0
        # Penalize for placing the same plant type too close
        neighbors = [
            (x - 1, y), (x + 1, y),  # left and right
            (x, y - 1), (x, y + 1),  # up and down
        ]
        for nx, ny in neighbors:
            if 0 <= nx < self.grid_size[0] and 0 <= ny < self.grid_size[1]:  # Check boundaries
                if self.state[nx, ny] == plant_type:
                    reward -= 0.5  # Penalize for placing similar plants adjacent
                elif self.state[nx, ny] != 0:
                    reward += 0.2  # Reward for diversity in neighboring plants
        return reward

    def render(self):
        print(self.state)  # Print grid to visualize the placement


<h3>3. Define Neural Netowrk and RL Policy</h3>

In [None]:
# Define the neural network that learns the rules
state_shape = (5 * 5,)  # Flattened grid shape
num_actions = 5 * 5 * 3  # Total number of possible actions

net = Net(
    state_shape=state_shape, 
    action_shape=num_actions,  # Flattened action space
    hidden_sizes=[64, 64], 
    device='cpu'
)
optim = torch.optim.Adam(net.parameters(), lr=0.001)

# Create a policy for the agent using DQN
policy = DQNPolicy(
    model=net, 
    optim=optim, 
    discount_factor=0.9, 
    estimation_step=1, 
    target_update_freq=100,
    action_space=gym.spaces.Discrete(num_actions)  # Specify action space
)

<h3>4. Create Environment and Buffers</h3>

In [None]:
# Create environment for training
train_env = PlantEnvironment()
test_env = PlantEnvironment()

# Create buffer for the robot's memory
train_buffer = VectorReplayBuffer(total_size=10000, buffer_num=1)

# Create collectors to collect experiences while training and testing
train_collector = Collector(policy, train_env, train_buffer)
test_collector = Collector(policy, test_env)

<h3>5. Define Trainer and Training</h3>

In [None]:
# Define training
trainer = OffpolicyTrainer(
    policy=policy,
    train_collector=train_collector,
    test_collector=test_collector,
    max_epoch=10,  
    step_per_epoch=1000,  
    step_per_collect=10, 
    episode_per_test=10,  # test every 10 episodes
    batch_size=64,  
    update_per_step=0.1,  
)

# Start training
result = trainer.run()
print(f"Training finished! Best reward: {result.best_reward}")


<h3>6. Test</h3>

In [None]:
state, _ = test_env.reset()
done = False

# Test the trained policy
while not done:
    # Wrap the state in a Batch object with an empty info dictionary
    action = policy.forward(Batch(obs=[state], info={})).act[0]
    state, reward, done, _, _ = test_env.step(action)
    test_env.render()  # visualize grid