# Star Wars Galaxy Explorers (Phase 1)

<div style="
    text-align: center;
    font-family: Arial, sans-serif;
    color: yellow;
    background: black;
    padding: 20px;
">
    <p style="font-size: 12px; margin: 10px 0;">You're a neutral planet explorer organisation from Corusant in Star Wars,</p>
    <p style="font-size: 16px; margin: 10px 0;">looking to discover planets to work with in the relatively unexplored Outer Rim.</p>
    <p style="font-size: 20px; margin: 10px 0;">You send out drones connected to a hive mind,</p>
    <p style="font-size: 24px; margin: 10px 0;">which they learn from their experiences and failures and reports them to the hive mind.</p>
    <p style="font-size: 28px; margin: 10px 0;">The hive mind becomes more intelligent and becomes better at exploring</p>
    <p style="font-size: 32px; margin: 10px 0;">and finding planets in the vastness of space,</p>
    <p style="font-size: 36px; margin: 10px 0;">and allowing newer drones to find them quicker and quicker.</p>
</div>

In [1]:
# Import Gymnasium-related dependencies
import gymnasium as gym
from gymnasium import Env
from gymnasium.spaces import Discrete, Box, Dict, Tuple, MultiBinary, MultiDiscrete
from stable_baselines3.common.vec_env import VecNormalize, DummyVecEnv

# Import Stable Baselines3-related dependencies
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy

# Import pygame-related dependencies
import pygame

# Import helper dependencies
import numpy as np
import random
import os

## Build Outer Rim POMDP (Partially Observable Markov Decision Process) RL Environment

### What is the MDP components of the Outer Rim POMDP RL Environment?

**1. (Reality) State Space**
- all actual states of the Outer Rim POMDP RL Environment

| **Component**                       | **Type / Description**                             | **Purpose**                                                                         |
| ----------------------------------- | -------------------------------------------------- | ----------------------------------------------------------------------------------- |
| `map`                               | 2D grid consisting of different objects, denoted by characters (`' '`, `'#'`, `'.'`, `'S'`), where ' ' is empty space, '#' is a planet, '.' is a visited planet, 'S' is the starting position that the agent can see and remember | Represents environment layout and object locations (empty, planets, visited planets, start positions) |
| `state`                             | Tuple `(row, col)`                                 | Current agent location                                                              |
| `vision_radius`                     | 2D binary matrix                                   | Tracks all cells the agent has can see at a time, serves as the agent's 'vision'                                            |
| `seen_map`                          | 2D binary matrix                                   | Tracks all cells the agent has ever seen, serves as the agent's 'memory' or rewarding areas                                           |
| `mission_time_before_self_destruct` | Integer countdown                                  | Terminates mission/episode when time runs out                                               |

<br>

**2. (Agent's) Observation Space**
- all observable states by the agent in the Outer Rim POMDP RL Environment

| **Component**                    | **Type / Description**                                 | **Purpose**                                                                         |
| -------------------------------- | ------------------------------------------------ | ----------------------------------------------------------------------------------- |
| `vision_radius`                     | 2D binary matrix                                   | Tracks all cells the agent has can see at a time, serves as the agent's 'vision'                                            |
| `seen_memory` / `reward_memory`                          | 2D binary matrix                                   | Tracks all cells the agent has ever seen, serves as the agent's 'memory'                                            |

<br>

**3. Action Space**
- denoted by the type: Discrete(4)

| **Index** | **Action Name** | **Meaning**         |
| --------- | --------------- | ------------------- |
| 0         | Forward         | Forward by 1 pixel  |
| 1         | Backward        | Backward by 1 pixel |
| 2         | Leftward        | Leftward by 1 pixel |
| 3         | Rightward       | Leftward by 1 pixel |

<br>

**4. Transition Probability**  
- Deterministic

<br>

**5. Reward Function**  
- see the 'Calculate Reward with Reward Function' section in the 'step()' function below

In [None]:
class OuterRimEnv(Env):
    def __init__(self):
        # --- related to world's state spaces --------------------------------------------------------
        self.map = self.generate_map()
        self.num_rows, self.num_cols = self.map.shape
        self.start_position = tuple(np.argwhere(self.map == 'S')[0])
        self.state = self.start_position        # Initialising the initial state of the RL Environment
        

        # --- related to RL agent, observation spaces and action spaces ----------------------------------------------------
        self.action_space = Discrete(4)

        # Giving the RL agent, 'vision' around it
        # Vision radius: 1 → 3x3 grid (center + 1 square in each direction)
        self.vision_radius = 2
        obs_height = obs_width = 2 * self.vision_radius + 1

        # Each cell is one of: empty space (' '), planet ('#'), visited planet ('.'), start ('S')
        # Map characters as integers → you can define a vocab for it
        self.char_to_int = {' ': 0, '#': 1, '.': 2, 'S': 3}

        # Observation: local grid (3x3) with integer values, plus mission time
        self.observation_space = Dict({
            "vision": Box(low=0, high=3, shape=(obs_height, obs_width), dtype=np.uint8),
            "seen_memory": Box(low=0, high=1, shape=(obs_height, obs_width), dtype=np.uint8)
        })
        

        #  --- mission state -----------------------------------------------
        self.mission_time_before_self_destruct = 1000

        # To be used in reward function to give the RL agent a reward for seeing an unexplored pixel
        # for the first time. Creating a copy of the map to mark regions that are seen or not.
        self.seen_map = np.zeros((self.num_rows, self.num_cols), dtype=bool)


        #  --- pygame -----------------------------------------------
        pygame.init()
        self.cell_size = 20  # reduce cell size to fit 40x40 on screen
        self.screen = pygame.display.set_mode(
            (self.num_cols * self.cell_size, self.num_rows * self.cell_size)
        )
        pygame.display.set_caption("Star Wars Galaxy Explorer (Phase 1)")


    ####################
    # Helper functions #
    ####################
    def generate_map(self, rows=40, cols=40, num_planets=20):
        map = np.full((rows, cols), " ", dtype='<U1')

        # Randomly choose N planet positions (excluding start) and keeping the number of planets in each
        # episode constant
        available_positions = [(i, j) for i in range(rows) for j in range(cols) if (i, j) != (39, 21)]
        planet_positions = random.sample(available_positions, num_planets)

        for i, j in planet_positions:
            map[i, j] = '#'

        map[39, 21] = 'S'
        return map

    def get_RL_agent_local_observation(self):
        r, c = self.state
        v = self.vision_radius
        size = 2*v + 1

        obs = np.zeros((size, size), dtype=np.uint8)
        mem = np.zeros_like(obs, dtype=np.uint8)

        for dr in range(-v, v + 1):
            for dc in range(-v, v + 1):
                    rr, cc = r + dr, c + dc
                    if 0 <= rr < self.num_rows and 0 <= cc < self.num_cols:
                        cell_char = self.map[rr, cc]
                        obs[dr + v, dc + v] = self.char_to_int.get(cell_char, 0)
                        mem[dr + v, dc + v] = int(self.seen_map[rr, cc])
                    else:
                        obs[dr + v, dc + v] = 0
                        mem[dr + v, dc + v] = 0

        return {
            "vision": obs,          # Current visual snapshot (local terrain)
            "seen_memory": mem,     # Agent's remembered "explored" map
        }

    def check_valid_position(self, position):
        row, col = position

        # If RL agent goes out of the map
        if row < 0 or col < 0 or row >= self.num_rows or col >= self.num_cols:
            return False
        
        return True


    ###############################################################
    # OpenAI Gymnasium and Stable Baselines3's required functions #
    ###############################################################
    def step(self, action):
        # --- Decrease 'mission_time_before_self_destruct' time -------------------------------------
        self.mission_time_before_self_destruct -= 1

        # --- Apply RL agent action -----------------------------------------------------------------
        new_pos = np.array(self.state)
        if action == 0:     # Forward
            new_pos[0] -= 1
        elif action == 1:   # Backward
            new_pos[0] += 1
        elif action == 2:   # Leftward
            new_pos[1] -= 1 
        elif action == 3:   # Rightward
            new_pos[1] += 1

        # Check if RL agent is in a valid position
        if self.check_valid_position(new_pos):
            self.state = tuple(new_pos)

        #########################################
        # Calculate Reward with Reward Function #
        #########################################
        reward = 0

        r, c = self.state
        v = self.vision_radius

        exploration_reward = 0

        # --- Reward points for every planet visited -------------------------------------
        if self.map[r, c] == '#':       # If a planet is visited
            reward += 50
            self.map[r, c] = '.'        # Mark planet as visited, so RL agent dosent choose to stay there infinitely and force it to find other planets
            self.visited_planets += 1

        # --- Penalise points if agent steps into a previously seen region or at the starting position --------------------
        if self.seen_map[r, c]:  # already seen
            reward -= 0.3

        # Penalty for stepping back onto the starting position
        if (r, c) == self.start_position:
            reward -= 0.3

        # --- Penalise points for revisitng an already visited planet -------------------------------------
        if self.map[r, c] == '.':
            reward -= 1.0  # or some stronger penalty

        # --- Reward points for newly explored (first-time seen) cells in vision --------------------------
        for dr in range(-v, v + 1):
            for dc in range(-v, v + 1):
                rr, cc = r + dr, c + dc
                if 0 <= rr < self.num_rows and 0 <= cc < self.num_cols:
                    if not self.seen_map[rr, cc]:
                        self.seen_map[rr, cc] = True
                        exploration_reward += 0.3  # reward per new cell seen
        
        reward += exploration_reward

        # --- Penalise points for camping near a corner (2x2 area) -------------------------------------
        if (r <= 1 and c <= 1) or \
        (r <= 1 and c >= self.num_cols - 2) or \
        (r >= self.num_rows - 2 and c <= 1) or \
        (r >= self.num_rows - 2 and c >= self.num_cols - 2):
            reward -= 0.1


        if self.mission_time_before_self_destruct <= 0:
            done = True
        else:
            done = False

        truncated = False
        info = {
            "visited_planets": self.visited_planets,
            "total_planets": self.total_planets
        }

        return self.get_RL_agent_local_observation(), reward, done, truncated, info


    def render(self):
        # Clear the screen
        self.screen.fill((255, 255, 255))  

        agent_r, agent_c = self.state
        v = self.vision_radius

        # Draw env elements one cell at a time
        for row in range(self.num_rows):
            for col in range(self.num_cols):
                cell_left = col * self.cell_size
                cell_top = row * self.cell_size

                # If seen_map is True and it's just empty space (i.e. not a planet or visited)
                if self.seen_map[row, col] and self.map[row, col] == ' ':
                    pygame.draw.rect(self.screen, (255, 200, 200), (cell_left, cell_top, self.cell_size, self.cell_size))

                # Draw the vision radius in yellow (as a background highlight)
                if abs(row - agent_r) <= v and abs(col - agent_c) <= v:
                    pygame.draw.rect(self.screen, (255, 255, 0), (cell_left, cell_top, self.cell_size, self.cell_size))

                if self.map[row, col] == '#':  # Draw non-visited planet in Blue
                    pygame.draw.rect(self.screen, (0, 0, 255), (cell_left, cell_top, self.cell_size, self.cell_size))
                elif self.map[row, col] == '.':  # Draw visited planet in Green
                    pygame.draw.rect(self.screen, (0, 255, 0), (cell_left, cell_top, self.cell_size, self.cell_size))
                elif self.map[row, col] == 'S':  # Draw starting position in Black
                    pygame.draw.rect(self.screen, (0, 0, 0), (cell_left, cell_top, self.cell_size, self.cell_size))

                if (row, col) == self.state:  # Draw RL agent position in Gray
                    pygame.draw.rect(self.screen, (125, 125, 125), (cell_left, cell_top, self.cell_size, self.cell_size))

        pygame.display.update()  # Update the display
        # pygame.time.delay(50)   # Slow down the rendering

    def reset(self, *, seed=None, options=None):
        # --- Generate a new map ------------------------------------------------------------------------
        self.map = self.generate_map(rows=40, cols=40, num_planets=20)

        # --- Reinitialize dependent properties ---------------------------------------------------------
        self.num_rows, self.num_cols = self.map.shape
        self.seen_map = np.zeros((self.num_rows, self.num_cols), dtype=bool)
        self.start_position = tuple(np.argwhere(self.map == 'S')[0])
        self.state = self.start_position
        self.mission_time_before_self_destruct = 1000

        self.total_planets = np.sum(self.map == '#')
        self.visited_planets = 0

        # --- Update Pygame screen if dimensions changed ------------------------------------------------
        self.screen = pygame.display.set_mode(
            (self.num_cols * self.cell_size, self.num_rows * self.cell_size)
        )

        info = {}
        return self.get_RL_agent_local_observation(), info

In [3]:
# Understanding the state and action spaces used in the Outer Rim RL Environment
env = OuterRimEnv()
print(env.observation_space)
print(env.action_space)
print(env.reset())

Dict('seen_memory': Box(0, 1, (5, 5), uint8), 'vision': Box(0, 3, (5, 5), uint8))
Discrete(4)
({'vision': array([[0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 3, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0]], dtype=uint8), 'seen_memory': array([[0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0]], dtype=uint8)}, {})


### Testing the Outer Rim POMDP RL Environment if it works with a baseline algorithm that takes random actions

In [None]:
env = OuterRimEnv()

episodes = 1
for episode in range(1, episodes+1):
    # Initialise starting state of the RL agent in the RL Environment before an episode, done to false, and starting 
    # episode score to 0
    obs, _ = env.reset()
    # print(f"Initial State: {obs}")
    done = False
    episode_score = 0

    # During an episode:
    while not done:
        env.render()
        # RL agent determines action to take
        # - In this case, we are randomly sampling an action to take by our RL agent in the RL Environment (this line of
        #   code defines that baseline algorithm that takes random actions (instead of an RL algorithm))
        action = env.action_space.sample()
        # RL Environment generates the next state and reward gained upon taking the action in the current state
        obs, reward, done, truncated, info = env.step(action)
        # Append the reward gained upon taking the action in the current state to the cumulative episode date
        episode_score += reward

    print(f"Episode: {episode} Score: {episode_score} Planets found: {info['visited_planets']}/{info['total_planets']}")


env.close()

Episode: 1 Score: -148.39999999999966 Planets found: 1/20


## Train a PPO DRL algorithm in a RL Environment

### Vectorising and Normalising rewards 
(I did not do normalisation for this RL Environment, I tested it and it dosen't work very well strangely)

In [5]:
# env = DummyVecEnv([lambda: OuterRimEnv()])
# env = VecNormalize(
#     env,
#     norm_obs=True,        # optional — helps if your vision or time values vary wildly
#     norm_reward=True,     # KEY TO STABILISE PLANET VS EXPLORATION reward scale
#     clip_reward=10.0      # Clip extreme spikes (planet +50 → ~+1.5 after norm)
# )

### For logging purposes of the training process of the PPO DRL algorithm

In [6]:
# Stating the path where we want to store our training logs files in the local folder './Training_Project_3_Custom/logs'
log_path = os.path.join('Training_Star_Wars_Galaxy_Phase_1', 'logs')
print(log_path)

Training_Star_Wars_Galaxy_Phase_1\logs


### Creating the PPO DRL algorithm in the RL Environment

In [7]:
# What does each of the parameters in the 'PPO' DRL algorithm class mean?
# - 'policy' (e.g. 'MlpPolicy'  - refers to the learning architecture used a the policy of the RL algorithm, which in this
#               or 'CnnPolicy')   is FNN/MLP
# - 'env'                       - refers to the RL environment to train the RL algorithm in
# - 'verbose'                   - controls how much information is printed to the console/log during training
#                                 -> 'verbose=0' means 'Silent', no output at all
#                                 -> 'verbose=1' means 'Info', shows key training events: episode rewards, updates, losses, etc.
#                                 -> 'verbose=2' means 'Debug' shows more detailed info like hyperparameters, rollout steps, and internal logs
# - 'tensorboard_log'           - states to do the training logging in Tensorboard
PPO_DRL_model = PPO(
    'MultiInputPolicy', 
    env, 
    verbose=1, 
    tensorboard_log=log_path,
    batch_size=1024,
    n_steps=4096,       
    learning_rate=3e-4,    
    n_epochs=15,          
    clip_range=0.2,        
    ent_coef=0.01,      
    gae_lambda=0.95,    
    vf_coef=0.5, 
)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


### Training the PPO DRL algorithm in the RL Environment to become a PPO DRL model

In [8]:
PPO_DRL_model.learn(total_timesteps=1000000)

Logging to Training_Star_Wars_Galaxy_Phase_1\logs\PPO_9


---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1e+03    |
|    ep_rew_mean     | 17       |
| time/              |          |
|    fps             | 1330     |
|    iterations      | 1        |
|    time_elapsed    | 3        |
|    total_timesteps | 4096     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 1e+03       |
|    ep_rew_mean          | 33.3        |
| time/                   |             |
|    fps                  | 1121        |
|    iterations           | 2           |
|    time_elapsed         | 7           |
|    total_timesteps      | 8192        |
| train/                  |             |
|    approx_kl            | 0.012610096 |
|    clip_fraction        | 0.0793      |
|    clip_range           | 0.2         |
|    entropy_loss         | -1.38       |
|    explained_variance   | 0.000479    |
|    learning_rate        | 0.

<stable_baselines3.ppo.ppo.PPO at 0x216dd5adfd0>

## Save PPO DRL model

In [9]:
PPO_Model_Custom = os.path.join('Training_Star_Wars_Galaxy_Phase_1', 'Saved RL Models', 'PPO_Model_Star_Wars_Galaxy_1M')
PPO_DRL_model.save(PPO_Model_Custom)

## Reload PPO DRL model

In [10]:
PPO_Model_Custom = os.path.join('Training_Star_Wars_Galaxy_Phase_1', 'Saved RL Models', 'PPO_Model_Star_Wars_Galaxy_1M')
reloaded_PPO_DRL_model = PPO.load(PPO_Model_Custom, env=env)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


## Test the PPO DRL model in a RL Environment

In [12]:
env = OuterRimEnv()

episodes = 5
total_planets_found = 0

for episode in range(1, episodes+1):
    # Initialise starting state of the RL agent in the RL Environment before an episode, done to false, and starting 
    # episode score to 0
    obs, _ = env.reset()
    print(f"Initial State: {obs}")
    done = False
    episode_score = 0

    # During an episode:
    while not done:
        env.render()
        # RL agent determines action to take
        # - Now, we are no longer randomly sampling an action to take by our RL agent in the RL Environment, but
        #   instead we are using the PPO DRL model to predict the action at each time step in an episode instead based
        #   on the current observations/states in the RL Environment
        action, _ = reloaded_PPO_DRL_model.predict(obs)
        # RL Environment generates the next state and reward gained upon taking the action in the current state
        obs, reward, done, truncated, info = env.step(action)
        # Append the reward gained upon taking the action in the current state to the cumulative episode date
        episode_score += reward

    total_planets_found += info['visited_planets']
    print(f"Episode: {episode} Score: {episode_score} Planets found: {info['visited_planets']}/{info['total_planets']}")

env.close()

average_planets_found = total_planets_found / episodes
print(f"\nAverage planets found over {episodes} episodes: {average_planets_found:.2f}")

Initial State: {'vision': array([[0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 3, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0]], dtype=uint8), 'seen_memory': array([[0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0]], dtype=uint8)}
Episode: 1 Score: 763.0000000000235 Planets found: 14/20
Initial State: {'vision': array([[0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 3, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0]], dtype=uint8), 'seen_memory': array([[0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0]], dtype=uint8)}
Episode: 2 Score: 676.500000000029 Planets found: 13/20
Initial State: {'vision': array([[0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 3, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0]], dtype=uint8), 'seen_memory': array([[0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0],
       [0, 0, 0, 