<a href="https://colab.research.google.com/github/SkanderGasmi/contRL-donia/blob/main/ContRLMinigrid.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import Dependencies :

In [None]:
#from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import gymnasium
from gym.wrappers import FlattenObservation
import numpy as np
import random
from collections import deque
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, Flatten, Input, Reshape, Conv2D, MaxPooling2D, UpSampling2D, Cropping2D, Conv2DTranspose
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LambdaCallback
from tensorflow.keras.losses import MeanSquaredError
from tqdm import tqdm
from scipy.stats import norm
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from minigrid.wrappers import *


pygame 2.1.0 (SDL 2.0.16, Python 3.9.18)
Hello from the pygame community. https://www.pygame.org/contribute.html


# Helper Functions :

## create_environment :

In [None]:
def create_environment(env_name):
    """
    Initializes and returns a gym environment with specified wrappers.

    Parameters:
        env_name (str): The name of the environment to be created.

    Returns:
        gym.Env: The initialized environment with only the images as observations.
    """
    # Initialize the environment
    env = gymnasium.make(env_name, render_mode="rgb_array")

    # Apply wrappers for image-only observations
    env = ImgObsWrapper(env)

    return env

## select_environment :

In [None]:
def select_environment(environments):
    """
    Randomly selects and initializes an environment from the list of available environments.

    Parameters:
        environments (list): List of available environment names.

    Returns:
        gym.Env: The initialized environment with only the images as observations.
        env_name : String
    """
    env_name = random.choice(environments)
    return create_environment(env_name) , env_name

## normalize_images :

In [None]:
def normalize_images(images):
    """
    Normalizes image observations to the range [0, 1].

    Parameters:
        image (np.array): The images observations to be normalized.

    Returns:
        np.array: The normalized images observations.
    """
    return images.astype('float32') / 255.0

## Visualise_reconstruction :

In [None]:
def visualize_reconstruction(original, reconstructed, n=10, title="Reconstruction"):
    """
    Visualizes original and reconstructed images side by side.

    Parameters:
        original (np.array): The original input images to be displayed.
        reconstructed (np.array): The reconstructed images generated by the autoencoder.
        n (int): Number of images to display. Default is 10.
        title (str): Title for the plot. Default is "Reconstruction".

    Returns:
        None: This function does not return anything. It displays a plot of the images.

    """
    plt.figure(figsize=(20, 4))
    plt.suptitle(title)
    for i in range(n):
        # Original images
        ax = plt.subplot(2, n, i + 1)
        plt.imshow(original[i]*255)
        plt.title("Original")
        plt.axis('off')

        # Reconstructed images
        ax = plt.subplot(2, n, i + 1 + n)
        plt.imshow(reconstructed[i]*255)
        plt.title("Reconstructed")
        plt.axis('off')
    plt.show()

## calculate_reconstruction_error :

In [None]:
def calculate_reconstruction_error(observations, reconstructed_data):
    """
    Calculates the reconstruction error between the original observations and the reconstructed data.

    Parameters:
    observations (numpy array): The original data.
    reconstructed_data (numpy array): The data reconstructed by the autoencoder.

    Returns:
    float: The mean squared error between the observations and the reconstructed data.
    """
    # Flatten the input arrays to ensure they are 1-dimensional
    observations_flat = observations.flatten()
    reconstructed_data_flat = reconstructed_data.flatten()

    # Calculate the mean squared error between the flattened arrays
    error = mean_squared_error(observations_flat, reconstructed_data_flat)

    # Return the calculated error
    return error

## recognize_anomaly :

In [None]:
def recognize_anomaly(observations, current_ae, threshold):
    """
    Monitors reconstruction error with the current AE used and recognizes a change in the environment.

    Parameters:
        observations (np.array): The input observations to be reconstructed.
        current_ae (Autoencoder): The current autoencoder model.
        threshold (float): The threshold value for reconstruction error used to detect a change.

    Returns:
        bool: True if reconstruction error is above threshold, False otherwise.
    """
    # Happens during first ever encounter with an Environment
    if current_ae == None :
        return True

    # Reconstruct the observations
    reconstructed_data = current_ae.predict(observations)

    # Calculate the reconstruction error
    error = mean_squared_error(observations.flatten(), reconstructed_data.flatten())

    # Visualize original vs reconstructed images for the current autoencoder
    visualize_reconstruction(observations, reconstructed_data, n=5, title="Reconstruction with current Autoencoder")
    print(f"Threshold: {threshold}")
    print(f"Reconstruction error: {error}")

    # Check if the reconstruction error is above the threshold
    if error > threshold:
        print("Change in environment detected.")
        return True
    else:
        print("No significant change in environment detected.")
        return False

## Novelty_recognition :

In [None]:
def Novelty_recognition(observations, autoencoders, thresholds):
    """
    Detects Novelty and recognizes if the observations are from a known environment.

    Parameters:
    observations (numpy array): Array of observations to be reconstructed.
    autoencoders (list): List of trained autoencoders.
    thresholds (list of float): List of thresholds for reconstruction errors, each corresponding to an autoencoder.

    Returns:
    tuple: a boolean indicating if all errors were above thresholds (True if observations are new and unknown) ,
    Best autoencoder, lowest reconstruction error and index of the best autoencoder.
    """
    # First ever encounter with an environment
    if len(autoencoders) == 0 :
        print("First ever encounter with an environment.")
        return True, None, None, None

    # Initialize an array to store the reconstruction errors
    reconstruction_errors = np.full(len(autoencoders), np.inf)

    # Loop through each autoencoder
    for idx, (autoencoder, threshold) in enumerate(zip(autoencoders, thresholds)):
        print(f"Evaluating Autoencoder {idx + 1}/{len(autoencoders)}")

        # Reconstruct the observations
        reconstructed_data = ae.predict(observations)

         # Calculate the reconstruction error
        error = mean_squared_error(observations.flatten(), reconstructed_data.flatten())

        # Visualize original vs reconstructed images for the current autoencoder
        visualize_reconstruction(observations, reconstructed_data, n=5, title=f"Reconstructions using Autoencoder {idx + 1}/{len(autoencoders)}")
        print(f"Threshold for Autoencoder {idx + 1}/{len(autoencoders)} : {threshold}")
        print(f"Reconstruction error for Autoencoder {idx + 1}/{len(autoencoders)} : {error}")

        # Compare the error to the corresponding threshold
        if error < threshold:
            # Update the reconstruction errors array
            reconstruction_errors[idx] = error
            print("This is a possible match.")
        else:
            reconstruction_errors[idx] = np.inf
            print("As the reconstruction error is above this Autoencoder's threshold, the observations do not correspond to its environment.")

    # Check if all values are infinity
    all_inf = np.all(np.isinf(reconstruction_errors))

    if all_inf:
        print("Environment recognized as new.")
        return True, None, None, None

    # Find the smallest reconstruction error and its corresponding index
    lowest_error = np.min(reconstruction_errors)
    best_index = np.argmin(reconstruction_errors)
    best_autoencoder = autoencoders[best_index]

    print(f"Environment recognized as known. It's number {best_index+1} in the list")
    return False, best_autoencoder,lowest_error, best_index

## estimate_threshold

In [None]:
def estimate_threshold(autoencoder, validation_observations, confidence_level=0.95):
    """
    Estimates the reconstruction error threshold for an autoencoder.

    Parameters:
    - autoencoder: The trained autoencoder model.
    - validation_images: A numpy array of validation observations.
    - confidence_level: The confidence level for the threshold (default is 0.95).

    Returns:
    - threshold: The estimated threshold for reconstruction error.
    """
    # Calculate reconstruction errors
    reconstruction_errors = []
    for observation in validation_observations:
        reconstructed_observation = autoencoder.predict(np.expand_dims(observation, axis=0))
        error = mean_squared_error(observation.flatten(), reconstructed_observation.flatten())
        reconstruction_errors.append(error)

    reconstruction_errors = np.array(reconstruction_errors)

    # Fit a Gaussian distribution to the reconstruction errors
    mu, std = norm.fit(reconstruction_errors)

    # Estimate the threshold
    threshold = norm.ppf(confidence_level, loc=mu, scale=std)

    return threshold

## build_autoencoder :

In [None]:
def build_autoencoder(input_shape):
    """
    Builds a convolutional autoencoder model.

    Parameters:
    - input_shape: tuple, shape of the input images
    - encoding_dim: int, dimension of the encoding layer

    Returns:
    - autoencoder: compiled autoencoder model
    """
    # Encoder
    input_img = Input(shape=input_shape)
    x = Conv2D(16, (3, 3), activation='relu', padding='same')(input_img)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
    encoded = MaxPooling2D((2, 2), padding='same')(x)

    # Decoder
    x = Conv2D(8, (3, 3), activation='relu', padding='same')(encoded)
    x = Conv2DTranspose(8, (3, 3), strides=(2, 2), activation='relu', padding='same')(x)
    x = Conv2D(16, (3, 3), activation='relu', padding='same')(x)
    x = Conv2DTranspose(16, (3, 3), strides=(2, 2), activation='relu', padding='same')(x)

    # Cropping to match the original size
    x = Cropping2D(((1, 0), (1, 0)))(x)  # Crop 1 pixel from the top and left

    decoded = Conv2D(input_shape[2], (3, 3), activation='sigmoid', padding='same')(x)

    # Model
    autoencoder = Model(input_img, decoded)
    autoencoder.compile(optimizer='adam', loss='binary_crossentropy')

    autoencoder.summary()
    return autoencoder

## train_autoencoder :

In [None]:
def train_autoencoder(observations):
    """
    Trains an autoencoder on the provided observations.

    Parameters:
    - observations: numpy array, shape (num_samples, height, width, channels), the dataset to train on

    Returns:
    - autoencoder: the trained autoencoder model
    """
    print(f"Collected observations shape : {observations.shape}")
    autoencoder = build_autoencoder( input_shape= observations[0].shape)
    # Train the autoencoder
    history = autoencoder.fit(observations, observations,
                    epochs=50,
                    batch_size=128,
                    shuffle=True,
                    validation_split=0.2)

    # Plot the training and validation loss
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

    print(f"Final training loss: {history.history['loss'][-1]:.4f}")
    print(f"Final validation loss: {history.history['val_loss'][-1]:.4f}")

    return autoencoder

## PPO training:

In [None]:
class TrainingLoggerCallback(BaseCallback):
    """
    Custom callback for logging training statistics and collecting observations.

    Parameters:
    - verbose: int, level of verbosity (0: no output, 1: info)

    Attributes:
    - episode_rewards: list, stores the rewards for each episode
    - episode_lengths: list, stores the lengths of each episode
    """
    def __init__(self, verbose=0):
        super(TrainingLoggerCallback, self).__init__(verbose)
        self.episode_rewards = []
        self.episode_lengths = []

    def _on_step(self) -> bool:
        if len(self.locals['infos']) > 0 and 'episode' in self.locals['infos'][0]:
            episode_info = self.locals['infos'][0]['episode']
            self.episode_rewards.append(episode_info['r'])
            self.episode_lengths.append(episode_info['l'])
        return True

In [None]:
class MinigridFeaturesExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.Space, features_dim: int = 512, normalized_image: bool = False) -> None:
        super().__init__(observation_space, features_dim)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 16, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(16, 32, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(32, 64, (2, 2)),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with torch.no_grad():
            n_flatten = self.cnn(torch.as_tensor(observation_space.sample()[None]).float()).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        return self.linear(self.cnn(observations))

In [None]:
def moving_average(data, window_size):
        return np.convolve(data, np.ones(window_size) / window_size, mode='valid')

In [None]:
policy_kwargs = dict(
    features_extractor_class=MinigridFeaturesExtractor,
    features_extractor_kwargs=dict(features_dim=128),
)

In [None]:
def train_PPO_AE(current_env, env_name , collected_observations_ae ):
    """
    Train a PPO and an Autoencoder on the given environment.

    Parameters:
        current_env (gym.Env): The environment to train the PPO and AE on.

    Returns:
        tuple: A tuple containing the trained PPO model, the trained AE model and its corresponding threshold.
    """
    # Train PPO
    ppo_model = PPO("CnnPolicy", current_env, policy_kwargs=policy_kwargs, verbose=1)
    callback = TrainingLoggerCallback()
    print("Agent beginning his training on : ", env_name)
    results = ppo_model.learn(4e5, callback=callback)
    print("Agent now knows how to play in minigrid : " , env_name)

    #Visualise the training

    # Define the window size for the moving average
    window_size = 50

    # Ensure there's data to plot
    if callback.episode_rewards:
        # Calculate the moving average of episode rewards
        smoothed_rewards = moving_average(callback.episode_rewards, window_size)
    # Plot episode rewards with moving average
    plt.figure(figsize=(10, 5))
    plt.plot(range(len(smoothed_rewards)), smoothed_rewards)
    plt.xlabel('Episodes')
    plt.ylabel('Average Reward')
    titlevar= f"Training iterations for environmnet: {env_name}"
    plt.title(titlevar)
    plt.legend()
    plt.show()

    # Train autoencoder and estimate a threshold for it
    ae_model = train_autoencoder(collected_observations_ae[:900])
    threshold = estimate_threshold(ae_model, collected_observations_ae[900:])

    return ppo_model, ae_model, threshold

# Main Loop

In [None]:
# Initialize sets for AE and PPO models, thresholds and environment names
ae_set = []
ppo_set = []
thresholds = []
environments = ["MiniGrid-LavaGapS7-v0", "MiniGrid-DoorKey-6x6-v0" , "MiniGrid-Dynamic-Obstacles-5x5-v0"]


ae_model = None
ppo_model = None
rewards_throughout_playing =[]

# Parameters
num_episodes = 30 # for playing
num_steps_exploring = 1000 # for exploring
num_steps_playing = 50 # for playing

# Main Loop
for episode in range(num_episodes):
    # Select and reset a random environment
    current_environment , env_name = select_environment(environments)
    print( f"Episode number: {episode +1}. Current Environement in this episode: {env_name}" )

    # Play a bit, explore (Agent takes a first look)
    observations =[]
    observation, info = current_environment.reset(seed=42)
    for _ in range(num_steps_exploring):
       action = current_environment.action_space.sample() # random
       observation, reward, terminated, truncated, info = current_environment.step(action)
       observations.append(normalize_images(observation))
       if terminated or truncated:
          observation, info = current_environment.reset()
    collected_observations_ae = np.array(observations)

    # Anomaly detection (Detection of a change in environment)
    if episode == 0 :
        change_in_environment = True
    else :
        change_in_environment = recognize_anomaly(collected_observations_ae[:200], ae_model, threshold)

    if change_in_environment :
        isNew , best_autoencoder, lowest_error, best_index = Novelty_recognition(collected_observations_ae[:200], ae_set, thresholds)
        if isNew:
            # Train new DQN and AE for the new environment and estimate a threshold for the AE
            ppo_model, ae_model, threshold = train_PPO_AE(current_environment, env_name, collected_observations_ae)

            # Store AE and DQN models
            ae_set.append(ae_model)
            ppo_set.append(ppo_model)
            thresholds.append(threshold)
        else :
            ppo_model = ppo_set[best_index]
            ae_model = ae_set[best_index]
            threshold = threshold[best_index]

    # Play (The agent should perform well)
    done = False
    play_step = 0
    total_reward = 0
    current_state, _= current_environment.reset()
    current_state= normalize_images(current_state)
    while not done and play_step < num_steps_playing:
        play_step += 1
        action, _ = ppo_model.predict(current_state)

        next_state, reward, done, _, _ = current_environment.step(action)
        current_state = normalize_images(next_state)
        total_reward += reward

    print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    rewards_throughout_playing.append([env_name, total_reward])


Episode number: 1. Current Environement in this episode: MiniGrid-LavaGapS7-v0
First ever encounter with an environment.
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
Agent beginning his training on :  MiniGrid-LavaGapS7-v0
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 61.8     |
|    ep_rew_mean     | 0.0215   |
| time/              |          |
|    fps             | 821      |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 70.4        |
|    ep_rew_mean          | 0.0181      |
| time/                   |             |
|    fps                  | 515         |
|    iterations           | 2           |
|    time_elapsed         | 7           |
|    total_


KeyboardInterrupt



In [None]:
print (rewards_throughout_playing)

In [None]:
plt.figure(figsize=(10, 6))
plt.scatter(range(len(rewards_throughout_playing)), rewards_throughout_playing, color='r')
plt.title('Agent Performance')
plt.xlabel('Average Reward')
plt.ylabel('Episode')
plt.grid(True)
plt.show()