In [1]:
#%pip install tensorflow
import tensorflow as tf
from environment import Environment
import random
import numpy as np



#Actions
* Move (up, right, down, left)
* Rotation
* Jump

#Observations
> lidar with following legend
* 0 => background
* 1 => goal
* -1 => obstacles
* -0.5 => walls

#Reward
* hit goal -> +10
* hit obstacle -> -1
* time's out -> -10

#Episodes
>Start => will Spawn agent and start timer
>End => due to time's out or hit goal

In [2]:
# Hyperparameters
camera_shape = (3, 3, 3)
input_shape = camera_shape[0] * camera_shape[1]* camera_shape[2]
num_actions = 4 + 2 + 1 + 1 # 4 directions, 2 rotate, 1 jump, 1 wait

# Model
learning_rate = 0.001

In [3]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(8, (3, 3), activation='relu'),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(8, activation='relu', input_shape=(input_shape,)), # Input layer
    tf.keras.layers.Dense(8, activation='relu'), # Hidden layer
    tf.keras.layers.Dense(num_actions) # Output layer with num_actions neurons
])

In [4]:
def custom_loss(y_true, y_pred):
    return tf.reduce_mean(tf.square(y_true - y_pred))

In [5]:
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate) # Adjust the learning rate as needed
model.compile(optimizer=optimizer, loss=custom_loss)

In [6]:
class ExperienceBuffer:
    def __init__(self, max_size): # 5
        self.buffer = []
        self.max_size = max_size

    def add(self, experience):
        if len(self.buffer) >= self.max_size:
            self.buffer.pop(0)
        self.buffer.append(experience)

    def sample_mini_batch(self, batch_size): # 2
        return random.sample(self.buffer, batch_size)

In [7]:
target_model = tf.keras.models.clone_model(model)
target_model.set_weights(model.get_weights())

ValueError: You must provide an `input_shape` argument.

In [None]:
def update_target_network(target_model, model, tau): # tau = 0.5
    target_weights = target_model.get_weights()
    model_weights = model.get_weights()

    for i in range(len(target_weights)):
        target_weights[i] = tau * model_weights[i] + (1 - tau) * target_weights[i]

    target_model.set_weights(target_weights)

In [None]:
# exploration stategy
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995

def get_action(state):
    if random.random() < epsilon:
        return random.randint(0, num_actions - 1)
    else:
        return tf.argmax(model.predict(state)[0]).numpy()

In [None]:
# Define hyperparameters
epsilon = 0.1  # Exploration factor (adjust as needed)
gamma = 0.99   # Discount factor
batch_size = 32  # Mini-batch size
max_buffer_size = 10000  # Maximum buffer size (adjust as needed)
target_update_frequency = 100  # Frequency to update target network
num_episodes = 1000  # Number of episodes to train

# Initialize your experience replay buffer
experience_buffer = ExperienceBuffer(max_buffer_size)

# Initialize the environment
env = Environment()

In [None]:
print(model.summary())
model.predict(np.zeros((1, input_shape)))

In [None]:
env.start()

# Training loop
for episode in range(num_episodes):
    state = env.reset()  # Reset the environment to start a new episode
    episode_reward = 0
    done = False

    while not done:
        # Choose an action using epsilon-greedy strategy
        if random.uniform(0, 1) < epsilon:
            action = random.randint(0, num_actions - 1)  # Random action
        else:
            # Use the Q-network to select the action with the highest Q-value
            q_values = model.predict(state.reshape(1, -1))[0]
            action = np.argmax(q_values)

        # Execute the selected action in the environment
        next_state, reward, done, _ = env.set_action(action)

        # Store the experience in the replay buffer
        experience_buffer.add((state, action, reward, next_state, done))

        # Sample a mini-batch of experiences from the buffer
        batch = experience_buffer.sample_mini_batch(batch_size)

        # Compute target Q-values using the target network and Bellman equation
        target_q_values = []
        for sample in batch:
            s, a, r, s_next, d = sample
            if d:
                target_q_values.append(r)  # If the episode is done, Q-value is the immediate reward
            else:
                target_q = r + gamma * np.max(target_model.predict(s_next.reshape(1, -1))[0])
                target_q_values.append(target_q)

        # Compute the loss and update the Q-network using backpropagation
        states, actions, _, _, _ = zip(*batch)
        target_q_values = np.array(target_q_values)
        loss = model.train_on_batch(np.array(states), target_q_values)

        episode_reward += reward
        state = next_state

        # Update the target network weights periodically
        if episode % target_update_frequency == 0:
            update_target_network(target_model, model, tau=0.5)  # Adjust tau as needed

    print(f"Episode: {episode}, Reward: {episode_reward}")
    env.stop()
