# Imports

In [1]:
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random

import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam

from rl.agents import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

# Environment Data

In [2]:
#Imported data taken from the robot's readings of the entire environment.

data = [[94, 103, 128, 469, 443, 420, 420, 437, 641, 471, 447, 446, 464, 540, 540, 138, 137, 156, 191, 137, 103, 99], [101, 107, 130, 483, 483, 433, 429, 448, 649, 464, 448, 439, 456, 498, 813, 130, 126, 138, 166, 198, 109, 107], [103, 109, 143, 490, 446, 429, 429, 446, 645, 460, 446, 443, 455, 500, 964, 130, 130, 143, 198, 204, 109, 103], [103, 112, 145, 504, 446, 428, 428, 452, 649, 456, 441, 441, 455, 511, 158, 121, 121, 143, 205, 138, 114, 112], [103, 112, 139, 496, 443, 433, 433, 454, 649, 460, 439, 444, 460, 496, 157, 126, 126, 149, 191, 191, 109, 103], [121, 121, 157, 203, 139, 120, 120, 143, 500, 452, 427, 420, 446, 661, 464, 439, 439, 456, 483, 510, 126, 121], [120, 118, 162, 199, 139, 118, 120, 149, 500, 447, 435, 435, 454, 661, 460, 439, 439, 455, 500, 1585, 118, 118], [120, 120, 157, 199, 143, 122, 121, 166, 487, 448, 428, 428, 452, 668, 462, 437, 437, 456, 515, 154, 122, 120], [118, 118, 156, 195, 138, 122, 121, 149, 515, 447, 435, 435, 454, 661, 455, 437, 437, 447, 490, 149, 118, 118], [121, 121, 164, 199, 143, 121, 121, 158, 481, 448, 427, 427, 443, 662, 462, 437, 437, 456, 479, 506, 126, 121], [418, 427, 446, 483, 492, 114, 114, 139, 154, 149, 128, 128, 149, 166, 464, 447, 443, 456, 634, 639, 427, 420], [425, 428, 469, 491, 128, 113, 113, 154, 191, 145, 122, 122, 149, 933, 464, 443, 443, 462, 649, 651, 428, 427], [424, 428, 464, 483, 128, 113, 113, 149, 197, 143, 122, 121, 158, 500, 462, 439, 439, 462, 643, 471, 427, 429], [428, 437, 481, 483, 154, 121, 121, 149, 197, 204, 113, 113, 128, 156, 474, 435, 433, 443, 483, 644, 443, 428], [427, 429, 456, 490, 138, 122, 122, 156, 190, 827, 122, 121, 147, 865, 456, 435, 435, 447, 639, 645, 428, 427], [427, 435, 464, 643, 454, 437, 433, 460, 488, 149, 120, 120, 139, 843, 853, 128, 126, 145, 500, 469, 437, 427], [433, 435, 653, 649, 462, 443, 439, 464, 500, 143, 118, 118, 139, 158, 154, 120, 120, 139, 491, 479, 435, 428], [448, 454, 481, 654, 454, 439, 435, 454, 483, 913, 103, 103, 122, 204, 917, 126, 121, 143, 515, 491, 452, 447], [446, 447, 488, 654, 447, 428, 437, 462, 496, 121, 103, 103, 121, 228, 879, 128, 128, 147, 518, 479, 447, 448], [447, 452, 483, 649, 454, 428, 435, 475, 498, 498, 103, 103, 121, 138, 138, 130, 128, 147, 525, 481, 452, 445]]

Zone_1 = [data[0], data[1], data[2], data[3], data[4]]
Zone_2 = [data[5], data[6], data[7], data[8], data[9]]
Zone_3 = [data[10], data[11], data[12], data[13], data[14]]
Zone_4 = [data[15], data[16], data[17], data[18], data[19]]

# Training Environment

In [3]:
class TrainZoneEnv(Env):
    def __init__(self):
        # Actions we can take: Guess 1 of 4 zones
        self.action_space = Discrete(4)
        #  array
        self.observation_space = Box(low=np.array([0]), high=np.array([3]))
        # Set initial Zone Guess
        self.state = 0
        # Import the real Zone's sensor data
        Zone = random.randint(0,3)
        Num = 5*(Zone)
        sample = data[Num]
        self.sensor_data = sample
        # Set amount of guesses per run
        self.guess_length = 10
        
    def step(self, action):
        # Difine the sensor values for each Zone
        sensor_zone = [Zone_1, Zone_2, Zone_3, Zone_4]
        #apply the action
        self.state = action
        # Reduce guess length by 1
        self.guess_length -= 1 
        
        # Calculate reward
        # if the sensor data is in the chosen zone class, reward = 1
        if self.sensor_data in sensor_zone[self.state]: 
            reward = 1
            correct = True
        else: 
            reward = -1
            correct = False
        
        # Check if guessing is done
        if self.guess_length <= 0 or correct == True: 
            done = True
        else:
            done = False
        
        # Set placeholder for info
        info = {}
        
        # Return step information
        return self.state, reward, done, info

    def render(self):
        # Implement viz
        pass
    
    def reset(self):
        # Reset Initial Zone Guess
        self.state = 0
        # Import a new Zone's sensor data
        Zone = random.randint(0,3)
        Num = 5*(Zone)
        sample = data[Num]
        self.sensor_data = sample
        # Reset Guess amount
        self.guess_length = 10
        return self.state

In [4]:
env = TrainZoneEnv()

episodes = 10
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0
    
    while not done:
        #env.render()
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score+=reward
    print('Episode:{} Score:{} Zone guess:{}'.format(episode, score, n_state+1))

Episode:1 Score:-2 zone guess 3
Episode:2 Score:-1 zone guess 1
Episode:3 Score:-3 zone guess 3
Episode:4 Score:-8 zone guess 4
Episode:5 Score:1 zone guess 2
Episode:6 Score:0 zone guess 1
Episode:7 Score:1 zone guess 4
Episode:8 Score:1 zone guess 2
Episode:9 Score:1 zone guess 2
Episode:10 Score:1 zone guess 4


  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


# Model Building

In [5]:
states = env.observation_space.shape
actions = env.action_space.n

def build_model(states, actions):
    model = Sequential()    
    model.add(Dense(24, activation='relu', input_shape=states))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(actions, activation='linear'))
    return model

In [6]:
del model

NameError: name 'model' is not defined

In [7]:
model = build_model(states, actions)
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 24)                48        
                                                                 
 dense_1 (Dense)             (None, 24)                600       
                                                                 
 dense_2 (Dense)             (None, 4)                 100       
                                                                 
Total params: 748
Trainable params: 748
Non-trainable params: 0
_________________________________________________________________


In [8]:
def build_agent(model, actions):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=50000, window_length=1)
    dqn = DQNAgent(model=model, memory=memory, policy=policy, 
                  nb_actions=actions, nb_steps_warmup=10, target_model_update=1e-2)
    return dqn

# Model Training

In [9]:
dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])
dqn.fit(env, nb_steps=50000, visualize=False, verbose=1)

  super().__init__(name, **kwargs)


Training for 50000 steps ...
Interval 1 (0 steps performed)
    1/10000 [..............................] - ETA: 11:42 - reward: -1.0000

  updates=self.state_updates,
  batch_idxs = np.random.random_integers(low, high - 1, size=size)


   40/10000 [..............................] - ETA: 2:30 - reward: -0.7500

  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=size)
  batch_idxs = np.random.random_integers(low, high - 1, size=s

2818 episodes - episode_reward: -1.624 [-10.000, 1.000] - loss: 0.738 - mae: 1.115 - mean_q: -0.865

Interval 2 (10000 steps performed)
2841 episodes - episode_reward: -1.599 [-10.000, 1.000] - loss: 0.827 - mae: 1.205 - mean_q: -0.979

Interval 3 (20000 steps performed)
2881 episodes - episode_reward: -1.534 [-10.000, 1.000] - loss: 0.852 - mae: 1.236 - mean_q: -1.020

Interval 4 (30000 steps performed)
2846 episodes - episode_reward: -1.584 [-10.000, 1.000] - loss: 0.848 - mae: 1.224 - mean_q: -1.006

Interval 5 (40000 steps performed)
done, took 323.779 seconds


<keras.callbacks.History at 0x1c5ea869460>

# Model Testing

In [13]:
class TestZoneEnv(Env):
    def __init__(self):
        # Actions we can take: Guess 1 of 4 zones
        self.action_space = Discrete(4)
        #  array
        self.observation_space = Box(low=np.array([0]), high=np.array([3]))
        # Set initial Zone Guess
        self.state = 0
        # Import the real Zone's sensor data
        sample = data[Num]
        self.sensor_data = sample
        # Set amount of guesses per run
        self.guess_length = 10
        
    def step(self, action):
        # Difine the sensor values for each Zone
        sensor_zone = [Zone_1, Zone_2, Zone_3, Zone_4]
        #apply the action
        self.state = action
        # Reduce guess length by 1
        self.guess_length -= 1 
        
        # Calculate reward
        # if the sensor data is in the chosen zone class, reward = 1
        if self.sensor_data in sensor_zone[self.state]: 
            reward = 1
            correct = True
        else: 
            reward = -1
            correct = False
        
        # Check if guessing is done
        if self.guess_length <= 0 or correct == True: 
            done = True
        else:
            done = False
        
        # Set placeholder for info
        info = {}
        
        # Return step information
        return self.state, reward, done, info

    def render(self):
        # Implement viz
        pass
    
    def reset(self):
        # Reset Initial Zone Guess
        self.state = 0
        # Import a new Zone's sensor data
        sample = data[Num]
        self.sensor_data = sample
        # Reset Guess amount
        self.guess_length = 10
        return self.state

In [36]:
# Choose the zone the Agent is in.
Zone = 4
Num = 5*(Zone -1)

In [38]:
# Test to determine if the Agent knows where it is.
env = TestZoneEnv()

scores = dqn.test(env, nb_episodes=1, visualize=False)
print(np.mean(scores.history['episode_reward']))

Testing for 1 episodes ...
Episode 1: reward: 1.000, steps: 1
1.0


# Transfer Learning

In [40]:
class TransferZoneEnv(Env):
    def __init__(self):
        # Actions we can take: Guess 1 of 4 zones
        self.action_space = Discrete(4)
        #  array
        self.observation_space = Box(low=np.array([0]), high=np.array([3]))
        # Set initial Zone Guess
        self.state = 0
        # Import the real Zone's sensor data
        self.sensor_data = sample
        # Set amount of guesses per run
        self.guess_length = 10
        
    def step(self, action):
        # Difine the sensor values for each Zone
        sensor_zone = [Zone_1, Zone_2, Zone_3, Zone_4]
        #apply the action
        self.state = action
        # Reduce guess length by 1
        self.guess_length -= 1 
        
        # Calculate reward
        # if the sensor data is in the chosen zone class, reward = 1
        if self.sensor_data in sensor_zone[self.state]: 
            reward = 1
            correct = True
        else: 
            reward = -1
            correct = False
        
        # Check if guessing is done
        if self.guess_length <= 0 or correct == True: 
            done = True
        else:
            done = False
        
        # Set placeholder for info
        info = {}
        
        # Return step information
        return self.state, reward, done, info

    def render(self):
        # Implement viz
        pass
    
    def reset(self):
        # Reset Initial Zone Guess
        self.state = 0
        # Import a new Zone's sensor data
        self.sensor_data = sample
        # Reset Guess amount
        self.guess_length = 10
        return self.state

In [46]:
# Import the sensor data from the robot

sample = data[5]

In [47]:
TrEnv = TransferZoneEnv()

scores = dqn.test(TrEnv, nb_episodes=1, visualize=False)
print(np.mean(scores.history['episode_reward']))

Testing for 1 episodes ...
Episode 1: reward: -10.000, steps: 10
-10.0
