1. Import Dependencies

In [None]:
!pip install gym
!pip install stable_baselines3

In [None]:
import gym
from gym import Env
from gym.spaces import Discrete, Dict, MultiBinary, MultiDiscrete 
import numpy as np
import random
import os
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.env_checker import check_env

# Building an Environment

In [None]:
class ScenarioManager:
    def getActionSpace(self):
        return MultiDiscrete([2, 6])
    def getObservationSpace(self):
        return Dict({"missles": Discrete(100), "expectedShipDamage": MultiDiscrete([3,3,3,3,3,3]), "currentShipDamage": MultiDiscrete([3,3,3,3,3,3])})
    def getRandomizedState(self):
        return {"missles": random.randint(1, 99),
         "expectedShipDamage": 
         np.array([random.randint(0, 2), 
         random.randint(0, 2), 
         random.randint(0, 2), 
         random.randint(0, 2), 
         random.randint(0, 2), 
         random.randint(0, 2)]), 
         "currentShipDamage": np.array([0,0,0,0,0,0])}
    def getState(self, numberOfMissles, tD1, tD2, tD3, tD4, tD5, tD6):
        return {"missles": numberOfMissles,
         "expectedShipDamage": np.array([tD1, tD2, tD3, tD4, tD5, tD6]), 
         "currentShipDamage": np.array([0,0,0,0,0,0])}
    def step(self, state, action):
        reward = 0
        done = False
        numberOfTargets = 6

        # Do Nothing
        if action[0] == 0:
            reward = -10
        # Attack Ship
        elif action[0] == 1:
            # Should we reward here or after hit or even after checking against expected damage?
            shipIndex = action[1]

            # Reducing reward because overhitting target.
            # Might be a place to improve on once we look into defensive ships and attacking jets
            if state["currentShipDamage"][shipIndex] >= state["expectedShipDamage"][shipIndex]:
                reward = -50
            else:
                reward = 10
            
            roll = random.randint(0, 100)
            if roll <= 60:
                # Should we reward more here?
                state["currentShipDamage"][shipIndex] = max(1, state["expectedShipDamage"][shipIndex])
            
                roll = random.randint(0, 100)
                if roll <= 50:
                    # Should we reward more here?
                    state["currentShipDamage"][shipIndex] = max(2, state["expectedShipDamage"][shipIndex])
            
        else:
            # Should doing nothing have a penalty?
            reward = -10
        
        shouldReward = True
        for shipIndex in range(0, numberOfTargets):
            if state["currentShipDamage"][shipIndex] < state["expectedShipDamage"][shipIndex]:
                shouldReward = False
                break
        
        if shouldReward:
            reward = 100
            done = True

        # Reduce shower length by 1 second
        state["missles"] -= 1

        # Check if shower is done
        if state["missles"] <= 0:
            done = True

        # Set placeholder for info
        info = {}

        # Return step information
        return state, reward, done, info

class ScenarioEnv(Env):
    def __init__(self, numberOfMissles, tD1, tD2, tD3, tD4, tD5, tD6):
        manager = ScenarioManager()
        self.manager = manager
        # Actions we can take: 0 - Do Nothing, 1 - Launch
        self.action_space = manager.getActionSpace()
        # Target Damage state array: 0 - Untouched, 1 - Disabled, 2 - Destroyed
        self.observation_space = manager.getObservationSpace()
        # store initial state
        self.numberOfMissles = numberOfMissles
        self.tD1 = tD1
        self.tD2 = tD2
        self.tD3 = tD3
        self.tD4 = tD4
        self.tD5 = tD5
        self.tD6 = tD6
        # Set start state
        self.state = self.manager.getState(self.numberOfMissles, self.tD1, self.tD2, self.tD3, self.tD4, self.tD5, self.tD6)

    def step(self, action):
        # Return step information
        return self.manager.step(self.state, action)

    def render(self):
        # Implement viz
        pass

    def reset(self):
        # Reset shower temperature
        self.state = self.manager.getState(self.numberOfMissles, self.tD1, self.tD2, self.tD3, self.tD4, self.tD5, self.tD6)
        return self.state

class TrainingScenarioEnv(Env):
    def __init__(self):
        manager = ScenarioManager()
        self.manager = manager
        # Actions we can take: 0 - Do Nothing, 1 - Launch
        self.action_space = manager.getActionSpace()
        # Target Damage state array: 0 - Untouched, 1 - Disabled, 2 - Destroyed
        self.observation_space = manager.getObservationSpace()
        # Set start state
        self.state = manager.getRandomizedState()

    def step(self, action):
        # Return step information
        return self.manager.step(self.state, action)

    def render(self):
        # Implement viz
        pass

    def reset(self):
        # Reset shower temperature
        self.state = self.manager.getRandomizedState()
        return self.state


Check Training Environment

In [None]:
trainingEnv=TrainingScenarioEnv()
check_env(trainingEnv, warn=True)

Check Scenario Environment

In [None]:
scenarioEnv = ScenarioEnv(14, 2, 1, 1, 1, 0, 0)
check_env(scenarioEnv, warn=True)

# Train Model

In [None]:
log_path = os.path.join('Training', 'Logs')

In [None]:
trainingEnv.reset()
model = PPO("MultiInputPolicy", trainingEnv, verbose=1, tensorboard_log=log_path)

In [None]:
model.learn(total_timesteps=200000)

# Save Model

In [None]:
model.save('PPO')

# Evaluate Model

In [None]:
trainingEnv.reset()
evaluate_policy(model, trainingEnv, n_eval_episodes=1000, render=False)

In [None]:
randomSampleEnv = TrainingScenarioEnv()
randomSampleEnvObs = randomSampleEnv.reset()
done = False
score = 0 

while not done:
    action, _ = model.predict(randomSampleEnvObs)
    randomSampleEnvObs, reward, done, info = randomSampleEnv.step(action)
    score+=reward
    print('Score:{} Action:{} State:{}'.format(score, action, randomSampleEnvObs))
randomSampleEnv.close()

In [None]:
scenarioEnv.reset()
evaluate_policy(model, scenarioEnv, n_eval_episodes=10000, render=False)

In [None]:
sampleEnvObs = scenarioEnv.reset()
done = False
score = 0 

while not done:
    action, _ = model.predict(sampleEnvObs)
    sampleEnvObs, reward, done, info = scenarioEnv.step(action)
    score+=reward
    print('Score:{} Action:{} State:{}'.format(score, action, sampleEnvObs))
scenarioEnv.close()