# Env

#### Import Gym

In [None]:
#%pip install gymnasium

In [None]:
%pip install stable_baselines3

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np

# reward function parameters
PERSISTENCE_PENALTY = 0
HIT_REWARD = 1
REPEATED_PENALTY = -0.2
RADIUS = 2
PROXIMAL_REWARD = 0.2
SCORE_REWARD = 10

In [None]:
class BattleshipEnv(gym.Env):
    def __init__(self, board_size=10, ship_sizes=[5, 4, 3, 3, 2]):
        super(BattleshipEnv, self).__init__()
        self.board_size = board_size
        self.ship_sizes = ship_sizes
        self.observation_space = spaces.Box(low=-1, high=1, shape=(board_size, board_size), dtype=np.int8)
        self.action_space = spaces.Discrete(board_size * board_size)  # Attack grid positions
        self.seed_value = None

        self.current_step = 0
        self.hits = []
        self.steps_taken = []

        # Initialize board and randomly place ships with dtype=int8 to match observation_space
        self.ship_board = np.zeros((self.board_size, self.board_size), dtype=np.int8)  # 0: water, 1: ship
        self._place_ships()
        self.reset()

    def reset(self, seed=None, options=None):
        # Set the seed if provided
        self.seed_value = seed
        if seed is not None:
            np.random.seed(seed)

        #self.action_space = np.arange(100)  # Attack grid positions
        self.board = np.full((self.board_size, self.board_size), -1, dtype=np.int8)  # -1: unknown, 0: miss, 1: hit
        self.steps_taken.append(self.current_step)


        self.current_step = 0
        self.hits = []
        return self.board, {}

    def _place_ships(self):
        for ship_size in self.ship_sizes:
            placed = False
            while not placed:
                row, col = np.random.randint(0, self.board_size), np.random.randint(0, self.board_size)
                orientation = np.random.choice(['horizontal', 'vertical'])
                if self._can_place_ship(row, col, ship_size, orientation):
                    self._place_ship(row, col, ship_size, orientation)
                    placed = True

    def _can_place_ship(self, row, col, ship_size, orientation):
        if orientation == 'horizontal':
            if col + ship_size > self.board_size:
                return False
            return np.all(self.ship_board[row, col:col+ship_size] == 0)
        else:  # vertical
            if row + ship_size > self.board_size:
                return False
            return np.all(self.ship_board[row:row+ship_size, col] == 0)

    def _place_ship(self, row, col, ship_size, orientation):
        if orientation == 'horizontal':
            self.ship_board[row, col:col+ship_size] = 1
        else:
            self.ship_board[row:row+ship_size, col] = 1

    def step(self, action):
        row, col = divmod(action, self.board_size)
        reward = 0

        if self.board[row,col] == 0 or self.board[row,col] == 1:
            reward = REPEATED_PENALTY

            terminated = len(self.hits) == sum(self.ship_sizes)  # Game over when all ships are hit
            truncated = self.current_step > 2 * self.board_size ** 2  # End if the game takes too long

            return self.board, reward, terminated, truncated, {}

        # Determine hit or miss
        elif self.ship_board[row, col] == 1:
            self.board[row, col] = 1  # Mark as hit
            reward = HIT_REWARD  # Bonus for consecutive h

            self.hits.append((row,col))

        else:
            self.concurrent_hits = False
            self.board[row, col] = 0  # Mark as miss

            reward = PERSISTENCE_PENALTY # Increase penalty with consecutive misses


        p_rewards = self._checkDistance((row,col), self.hits)
        reward += p_rewards
        # Check termination conditions
        terminated = len(self.hits) == sum(self.ship_sizes)  # Game over when all ships are hit
        truncated = self.current_step > 2 * self.board_size ** 2  # End if the game takes too long

        if terminated:
            reward += SCORE_REWARD

        return self.board, reward, terminated, truncated, {}

    def render(self, mode='human'):
        print("Board:\n", self.board)

    def _checkDistance(self,p1, hits):
        rewards = 0
        for hit in hits:
            x1, y1 = p1
            x2, y2 = hit
            d = abs(x2-x1) + abs(y2-y1)
            if d <= RADIUS:
                rewards += PROXIMAL_REWARD

        return rewards




# Model Training

#### Import stable_baselines3

In [None]:
import gym
from stable_baselines3 import DQN, PPO, A2C
from stable_baselines3.common.monitor import Monitor

from stable_baselines3.common.callbacks import BaseCallback

class RewardLoggerCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(RewardLoggerCallback, self).__init__(verbose)
        self.episode_rewards = []
        self.episode_reward = 0

    def _on_step(self):
        # Accumulate reward for the current episode
        self.episode_reward += self.locals["rewards"][0]

        # Check if the episode is done
        if self.locals["dones"][0]:
            # Log episode reward and reset
            self.episode_rewards.append(self.episode_reward)
            if self.verbose > 0:
                #print(f"Episode {len(self.episode_rewards)} reward: {self.episode_reward}")
                pass
            self.episode_reward = 0

        return True

    def _on_training_end(self):
            # Print all rewards collected at the end
        print("Training finished.")
        print("Rewards per episode:", self.episode_rewards)

class StepLoggerCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(StepLoggerCallback, self).__init__(verbose)
        self.episode_rewards = []
        self.episode_steps = []
        self.episode_reward = 0
        self.current_steps = 0

    def _on_step(self):
        # Accumulate reward and step count for the current episode
        self.episode_reward += self.locals["rewards"][0]
        self.current_steps += 1

        # Check if the episode is done
        if self.locals["dones"][0]:
            # Log episode reward and steps, then reset
            self.episode_rewards.append(self.episode_reward)
            self.episode_steps.append(self.current_steps)
            if self.verbose > 0:
              #print(f"Episode {len(self.episode_rewards)}: Reward = {self.episode_reward}, Steps = {self.current_steps}")
              pass
            # Reset counters
            self.episode_reward = 0
            self.current_steps = 0

        return True

    def _on_training_end(self):
        # Print rewards and steps per episode at the end of training
        print("Training finished.")
        print("Rewards per episode:", self.episode_rewards)
        print("Steps per episode:", self.episode_steps)



In [None]:
# Initialize the environment
env = BattleshipEnv(board_size=10)
env = Monitor(env)

# Initialize the DQN model
model = PPO('MlpPolicy', env, verbose=0)  # 'MlpPolicy' uses a fully connected neural network by default

reward_callback = RewardLoggerCallback(verbose=1)
step_callback = StepLoggerCallback(verbose=1)

# Train the model
model.learn(total_timesteps=10_000, callback=step_callback)  # You can adjust the number of timesteps

import matplotlib.pyplot as plt

# Plot the steps per episode
plt.plot(step_callback.episode_steps)
plt.xlabel('Episode')
plt.ylabel('Steps Taken')
plt.title('Steps Taken per Episode During Training')
plt.show()

In [None]:
# Plot the steps per episode
plt.plot(env.get_episode_rewards())
plt.xlabel('Episode')
plt.ylabel('Rewards')
plt.title('Rewards Taken per Episode During Training')
plt.show()

In [None]:
# Plot the steps per episode
plt.plot(env.get_episode_lengths())
plt.xlabel('Episode')
plt.ylabel('Steps Taken')
plt.title('Steps Taken per Episode During Training')
plt.show()

In [None]:

# Initialize the environment
env2 = BattleshipEnv(board_size=10)
env2 = Monitor(env2)

# Initialize the DQN model
model2 = DQN('MlpPolicy', env2, verbose=0)  # 'MlpPolicy' uses a fully connected neural network by default

# reward_callback = RewardLoggerCallback(verbose=0)
# step_callback = StepLoggerCallback(verbose=0)

# Train the model
model2.learn(total_timesteps=100_000)  # You can adjust the number of timesteps

In [None]:
# Plot the steps per episode
plt.plot(env2.get_episode_rewards())
plt.xlabel('Episode')
plt.ylabel('Rewards')
plt.title('Rewards Taken per Episode During Training')
plt.show()

In [None]:
# Plot the steps per episode
plt.plot(env2.get_episode_lengths())
plt.xlabel('Episode')
plt.ylabel('Steps Taken')
plt.title('Steps Taken per Episode During Training')
plt.show()

In [None]:

# Initialize the environment
env3 = BattleshipEnv(board_size=10)
env3 = Monitor(env3)

# Initialize the DQN model
model3 = A2C('MlpPolicy', env3, verbose=1)  # 'MlpPolicy' uses a fully connected neural network by default

# reward_callback = RewardLoggerCallback(verbose=0)
# step_callback = StepLoggerCallback(verbose=0)

# Train the model
model3.learn(total_timesteps=100_000)  # You can adjust the number of timesteps

In [None]:
plt.plot(env3.get_episode_rewards())
plt.xlabel('Episode')
plt.ylabel('Rewards')
plt.title('Rewards Taken per Episode During Training')
plt.show()

In [None]:
# Plot the steps per episode
plt.plot(env3.get_episode_lengths())
plt.xlabel('Episode')
plt.ylabel('Steps Taken')
plt.title('Steps Taken per Episode During Training')
plt.show()

In [None]:
import matplotlib.pyplot as plt

def ShowGraphs(steps, rewards):
  # Create a figure with 1 row and 2 columns
  fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))  # Adjust figsize as needed

  # Plot Steps Taken per Episode on the first subplot
  ax1.plot(steps)
  ax1.set_xlabel('Episode')
  ax1.set_ylabel('Steps Taken')
  ax1.set_title('Steps Taken per Episode During Evaluation')

  # Plot Rewards per Episode on the second subplot
  ax2.plot(rewards)
  ax2.set_xlabel('Episode')
  ax2.set_ylabel('Rewards')
  ax2.set_title('Rewards per Episode During Evaluation')

  # Display the plots
  plt.tight_layout()  # Adjust layout to prevent overlapping
  plt.show()


In [None]:
def evaluateModel(model, num_episodes, env):
  steps_ep = []
  reward_ep = []
  for i in range(num_episodes):
    steps = 0
    t_reward = 0

    obs, _ = env.reset()
    while True:
      action, _states = model.predict(obs)
      new_obs, reward, done, truncated, info = env.step(action)
      t_reward+=reward

      obs = new_obs

      if done or truncated:
        steps_ep.append(steps)
        reward_ep.append(t_reward)
        break

      steps+=1
  ShowGraphs(steps_ep, reward_ep)
  return steps_ep, reward_ep


In [None]:
_, _ = evaluateModel(model, 100, BattleshipEnv(board_size=10))

In [None]:
_, _ = evaluateModel(model2, 100, BattleshipEnv(board_size=10))

In [None]:
_, _ = evaluateModel(model3, 100, BattleshipEnv(board_size=10))