In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

In [None]:
# Define the Q-Network model
def create_q_network(input_shape, action_space):
    model = Sequential(
        [
            tf.keras.Input(shape=input_shape),
            Dense(32, activation='relu'),
            Dense(32, activation='relu'),
            Dense(action_space, activation='linear')
        ]
    )
    model.compile(loss='mse', optimizer=Adam(learning_rate=0.001))
    return model

In [None]:
class ENV():
    def __init__(self):
        self.steps = 0
        self.env_col = 11
        self.env_row = 11
        self.state = (5, 5)
        self.action_space = 4
        self.reward = 0
        self.done = False
        self.obs = [(2,1),(3,1),(4,1),(5,1),(6,1)]
        # self.goal = (1, 1)

    def reset(self):
        self.goal = np.random.randint(0, min(self.env_col, self.env_row), size=2)
        self.state = np.random.randint(0, min(self.env_col, self.env_row), size=2)
        self.reward = 0
        self.done = False
        self.steps = 0
        return self.state, self.goal
    
    def step(self, action):
        self.steps += 1
        if action == 0: # up
            self.state = (self.state[0], self.state[1] + 1)
        elif action == 1: # down
            self.state = (self.state[0], self.state[1] - 1)
        elif action == 2: # left
            self.state = (self.state[0] - 1, self.state[1])
        elif action == 3: # right
            self.state = (self.state[0] + 1, self.state[1])
        else:
            raise ValueError("Invalid action")
        if self.state == tuple(self.goal):
            self.reward = 100
            self.done = True
        elif self.state[0] < 0 or self.state[0] >= self.env_col or self.state[1] < 0 or self.state[1] >= self.env_row:
            self.reward = -100
            self.done = True
        elif self.state in self.obs:
            print("THIS IS AN OBSTACLE!!!!!!!!!!!")
            exit()
        elif self.steps >= 50:
            self.reward = -100
            self.done = True
        else:
            self.reward = -1
        return self.state, self.reward, self.done


In [None]:
#Define the DQN agent
class DQNAgent:
    def __init__(self, state_shape, action_space):
        self.state_shape = state_shape
        self.action_space = action_space
        self.memory = []
        self.gamma = 0.95  # Discount factor
        self.epsilon = 0.9  # Exploration rate
        self.epsilon_decay = 0.995  # Decay rate for exploration rate
        self.epsilon_min = 0.01  # Minimum exploration rate
        self.model = create_q_network(state_shape, action_space)

    def remember(self, state,input, goal, action, reward, next_state, done):
        self.memory.append((state, input, goal,  action, reward, next_state, done))

    def act(self, input):
        if np.random.rand() < self.epsilon:
            return np.random.choice(self.action_space)
        # print("state in action: ",state)

        input = np.reshape(input, 4)
        input = tf.convert_to_tensor(input)
        input = tf.expand_dims(input, 0)
        
        print("Predicting")
        q_values = self.model.predict(input, verbose=0)
        return np.argmax(q_values[0])

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        # Debug
        # print(self.memory)
        # batch = np.random.choice(self.memory, batch_size, replace=False)
        batch_ch = np.random.choice(len(self.memory), batch_size, replace=False)
        batch = [self.memory[i] for i in batch_ch]
        for state, input, goal, action, reward, next_state, done in batch:
            state = np.reshape(state, 2)
            next_state = np.reshape(next_state, 2)
            target = reward
            input_2 = []
            input_2.append(next_state)
            input_2.append(goal)
            input_2 = np.reshape(input_2, 4)
            print("printing input_2: ",input_2)
            if not done:
                # print(next_state.shape)
                input_2 = tf.convert_to_tensor(input_2)
                input_2 = tf.expand_dims(input_2, 0)
                target = reward + self.gamma * np.amax(self.model.predict(input_2)[0])
            # print(state.shape)
            input = tf.convert_to_tensor(input)
            input = tf.expand_dims(input, 0)
            print("Predicting")
            target_f = self.model.predict(input, verbose=0)
            target_f[0][action] = target
            # print(state[0], state[1])
            print("Fitting model")
            self.model.fit(input, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


In [None]:
#Define the DQN agent for TESTING
class DQNAgent_test:
    def __init__(self, state_shape, action_space):
        self.state_shape = state_shape
        self.action_space = action_space
        self.memory = []
        self.gamma = 0.95  # Discount factor
        self.epsilon = 0.9  # Exploration rate
        self.epsilon_decay = 0.995  # Decay rate for exploration rate
        self.epsilon_min = 0.01  # Minimum exploration rate
        self.model = create_q_network(state_shape, action_space)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.choice(self.action_space)
        # print("state in action: ",state)
        state = np.reshape(state, self.state_shape)
        print("prinintg rereshaped state: ", state)
        state = tf.convert_to_tensor(state)
        state = tf.expand_dims(state, 0)
        print("Predicting")
        q_values = self.model.predict(state)
        return np.argmax(q_values[0])

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        # Debug
        # print(self.memory)
        # batch = np.random.choice(self.memory, batch_size, replace=False)
        batch_ch = np.random.choice(len(self.memory), batch_size, replace=False)
        batch = [self.memory[i] for i in batch_ch]
        for state, action, reward, next_state, done in batch:
            state = np.reshape(state, self.state_shape)
            next_state = np.reshape(next_state, self.state_shape)
            target = reward
            if not done:
                # print(next_state.shape)
                next_state = tf.convert_to_tensor(next_state)
                next_state = tf.expand_dims(next_state, 0)
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
            # print(state.shape)
            state = tf.convert_to_tensor(state)
            state = tf.expand_dims(state, 0)
            print("Predicting")
            target_f = self.model.predict(state)
            target_f[0][action] = target
            # print(state[0], state[1])
            print("Fitting model")
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


In [None]:
# Initialize the environment and agent
state_shape = (2,)  # Example state shape, adjust according to your actual state representation # Only x and y coordinates
action_space = 4  # Example action space size, adjust according to your actual actions # 4 actions: up, down, left, right
agent = DQNAgent_test(4, action_space)
env = ENV()

In [None]:
#Training loop
num_episodes = 50  # Set the number of training episodes
batch_size = 32  # Set the batch size for replay
total_reward = []
for episode in range(num_episodes):
    print("Episode: ", episode)
    input = []
    print("IM RESETTING BOTH GOAL AND START POINT !!!!!!!!!!!!!!!!!")
    state, goal = env.reset()
    input.append(state)
    input.append(goal)
    print("prinintg concat: ", input)

    input = np.reshape(input, 4)
    print("printing input ", input)
    state = np.reshape(state, state_shape)
    goal = np.reshape(goal, state_shape)
    # print("prininting state and goal ", state, goal)
    done = False
    episode_reward = 0
    if episode%10 == 0 and episode != 0 :
        agent.model.save("model_dqn_diff_start_diff_goal_{}.h5".format(episode))
        model = load_model('model_dqn_diff_start_diff_goal_{}.h5'.format(episode))
    while not done:
        action = agent.act(input)
        next_state, reward, done = env.step(action)
        next_state = np.reshape(next_state, state_shape)
        agent.remember(state, input ,goal, action, reward, next_state, done)
        state = next_state
        episode_reward += reward
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)
    total_reward.append(episode_reward)

In [None]:
def act(self, input):
        if np.random.rand() < 0.9:
            return np.random.choice(4)
        # print("state in action: ",state)

        input = np.reshape(input, 4)
        input = tf.convert_to_tensor(input)
        input = tf.expand_dims(input, 0)
        
        print("Predicting")
        q_values = self.model.predict(input)
        return np.argmax(q_values[0])

In [None]:
# Use the trained agent to control the agent in the environment
state, goal = env.reset()
print("goal: ", goal)
print("state: ", state)
model = load_model('model_dqn_diff_start_diff_goal_30.h5')
done = False

while not done:
    input = []
    input.append(state)
    input.append(goal)
    # print("prinintg concat: ", input)
    input = np.reshape(input, 4)
    action = agent.act(input)
    next_state, _, done = env.step(action)
    next_state = np.reshape(next_state, state_shape)
    input_2 = []
    input_2.append(next_state)
    input_2.append(goal)
    input_2 = np.reshape(input_2, 4)
    # Take action with the agent in the environment
    state = next_state
    print(state)

In [None]:
agent.model.save("model_dqn_diff_start_diff_goal.h5")

In [None]:
import matplotlib.pyplot as plt
plt.plot(total_reward)
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.show()