In [None]:
# -----------------------------------------------------
# Citing Relevant Assets
# This project utilizes several open-source libraries and assets:
# 1. Pre-trained models are sourced from Huggingface and Stable Baselines3, 
#    specifically the Proximal Policy Optimization (PPO) models: "sb3/ppo-CartPole-v1" and "sb3/ppo-LunarLander-v2"
#    and "sb3/ppo-LunarLander-v2" (https://huggingface.co/sb3).
# 2. The PySR library is employed for symbolic regression: 
#    Cranmer, Miles, et al., "PySR: Fast & Scalable Symbolic Regression in Python" (2023), 
#    available at https://github.com/MilesCranmer/PySR.
# 3. For Reinforcement Learning environments, the Gym library from OpenAI is used: 
#    Brockman, G., et al., "OpenAI Gym", available at https://github.com/openai/gym.
# 4. The project is conducted in the context of the lecture Advanced Topics in Reinforcement Learning 
#    at Gottfried Wilhelm Leibniz Universität Hannover, under Prof. Dr. Marius Lindauer and Theresa Eimer (2024).
# 
# Ensure to properly cite these resources in any external use or publication derived from this work.
# -----------------------------------------------------

In [None]:
!pip install cython==3.0.10
!pip install swig==4.2.1
!pip install cmake==3.27.9
!pip install numpy==1.24.2
!pip install tqdm==4.66.4
!pip install torch==2.2.2
!pip install stable-baselines3[extra]==2.3.2
!pip install huggingface_sb3==3.0
!pip install pysr==0.19.4
!pip install carl-bench==1.1.0A
!pip install pygame==2.6.0
!pip install ffmpeg==1.4
!pip install gymnasium[box2d]>=0.28.1

In [None]:
# Standard Library Imports
import os
import random
import warnings
import pickle

# Third-Party Library Imports
import numpy as np
import torch
import gym
from tqdm import tqdm, trange
import matplotlib.pyplot as plt
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
from pysr import PySRRegressor
import sympy as sp
from huggingface_sb3 import load_from_hub
from google.colab import drive

# Suppress specific warnings
warnings.filterwarnings("ignore", message=".*progress bar will be turned off.*")

# Determine computation device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if device == "cuda":
    torch.cuda.synchronize()
print(f"Using device: {device}")

def load_pretrained_model(filename, repo_id, repo_filename):
    """
    Loads a pre-trained PPO model from Hugging Face or local storage.

    Parameters:
    filename (str): Local filename to check or save the model.
    repo_id (str): Hugging Face repo ID for downloading the model.
    repo_filename (str): Name of the model file in the repo.

    Returns:
    model: Loaded PPO model.
    """
    if not os.path.isfile(filename):
        filename_ = load_from_hub(repo_id=repo_id, filename=repo_filename).replace(".zip", "")
        print(f"Model retrieved from Hugging Face: {filename_}")
        model = PPO.load(filename_)
        model.save(filename)
    else:
        model = PPO.load(filename)

    print(f"Model exists locally: {os.path.isfile(filename)}")
    return model

def customize_environment(env):
    """
    Customize the environment parameters as needed.

    Parameters:
    env: Gym environment to be customized.

    Returns:
    Customized environment (if applicable).
    """
    # Modify environment settings if needed (e.g., turbulence in other environments)
    return env

def sample_observations(env, n_samples):
    """
    Samples random observations from the environment's observation space.

    Parameters:
    env: Gym environment.
    n_samples (int): Number of samples to collect.

    Returns:
    List of sampled observations.
    """
    return [env.observation_space.sample() for _ in range(n_samples)]

def get_action_values(model, observations, num_actions):
    """
    Computes action values for the provided observations using the model.

    Parameters:
    model: Trained model to compute action values.
    observations: List of observations to evaluate.
    num_actions (int): Number of available actions in the environment.

    Returns:
    List of computed action values.
    """
    action_samples = []
    with tqdm(total=len(observations), desc="Sampling action values") as pbar:
        for obs in observations:
            pbar.update(1)
            action_logits = model.policy.forward(torch.tensor(obs, dtype=torch.float32).to(device).view(1, -1))[0]
            action_samples.append(torch.nn.functional.one_hot(action_logits, num_actions).cpu().detach().numpy().flatten())

    return action_samples

def collect_inference_samples(env, model, n_samples_inference, max_steps, num_actions):
    """
    Collects state-action pairs via inference on the environment using the model.

    Parameters:
    env: Gym environment.
    model: Trained model for inference.
    n_samples_inference (int): Number of inference samples to collect.
    max_steps (int): Maximum steps allowed per episode.
    num_actions (int): Number of possible actions.

    Returns:
    Numpy arrays of observations and corresponding action values.
    """
    observations, action_values = [], []
    samples_inference = 0
    with tqdm(total=n_samples_inference, desc="Collecting inference samples") as pbar:
        while samples_inference < n_samples_inference:
            obs = env.reset(seed=seed)
            for _ in range(max_steps):
                action_logits = model.policy.forward(torch.tensor(obs, dtype=torch.float32).to(device).view(1, -1))[0]
                action_logits = torch.nn.functional.one_hot(action_logits, num_actions).cpu().detach().numpy().flatten()

                if np.random.random() < sample_point_probability:
                    observations.append(obs)
                    action_values.append(action_logits)
                    samples_inference += 1
                    pbar.update(1)
                    if samples_inference >= n_samples_inference:
                        break

                action = np.argmax(action_logits)
                obs, _, done, truncated = env.step(action)
                if done or truncated:
                    break

    return np.array(observations), np.array(action_values)

def retrieve_symbolic_models(observations, action_values, num_actions, passive_alternative_action=False):
    """
    Retrieves symbolic models for each action using symbolic regression.

    Parameters:
    observations: Numpy array of environment observations.
    action_values: Numpy array of corresponding action values.
    num_actions (int): Number of actions.
    passive_alternative_action (bool): Whether to consider passive alternative actions.

    Returns:
    List of symbolic models.
    """
    symbolic_models = []
    if passive_alternative_action:
        action_values = action_values[:, :-1]
        observations = observations[:, :-1]

    for i in range(action_values.shape[1]):
        print(f"Retrieving symbolic model for action {i + 1}")
        np.random.seed(seed)
        model_simplified = PySRRegressor(
            procs=0, multithreading=False, random_state=seed,
            niterations=n_pySR_iterations,
            binary_operators=["+", "-", "*", "/", "^"],
            unary_operators=["cos", "sin", "exp", "relu", "tanh", "atanh_clip", "sqrt", "cosh", "abs", "log"],
            constraints={'^': (-1, 1)},
            extra_sympy_mappings={"Heaviside": lambda x: x > 0},
            model_selection="accuracy",
            elementwise_loss="L2DistLoss()",
            maxsize=25,
            populations=50,
            parsimony=0.0
        )
        model_simplified.fit(observations, action_values[:, i])
        symbolic_models.append(model_simplified)
        print(f"Best symbolic model for action {i}: {model_simplified.get_best()}")

    return symbolic_models

def evaluate_symbolic_expression(symbolic_expression, observation):
    """
    Evaluates a symbolic expression for a given observation.

    Parameters:
    symbolic_expression: Symbolic expression to evaluate.
    observation: Environment observation as input.

    Returns:
    Evaluation result as a float.
    """
    try:
        result = symbolic_expression.subs({f"x{j}": observation[j] for j in range(len(observation))}).evalf()
        return float(result) if result.is_real else float(result.as_real_imag()[0])
    except TypeError as e:
        print(f"Error evaluating symbolic expression: {e}")
        return 0.0

class SymbolicPolicyAgent:
    def __init__(self, symbolic_models, num_actions, model_index=0, passive_alternative_action=False):
        """
        Initializes the symbolic policy agent.

        Parameters:
        symbolic_models: List of symbolic models for actions.
        num_actions (int): Number of actions.
        model_index (int): Index for selecting which model to use.
        passive_alternative_action (bool): If true, includes a passive alternative action in decision-making.
        """
        self.symbolic_models = symbolic_models
        self.model_index = model_index
        self.symbolic = [self.symbolic_models[i].sympy(max(0, len(self.symbolic_models[i].equations_) - self.model_index - 1)) for i in range(len(self.symbolic_models))]
        self.passive_alternative_action = passive_alternative_action

    def select_action(self, observation):
        """
        Selects an action based on symbolic models and observation.

        Parameters:
        observation: Current observation from the environment.

        Returns:
        Selected action index.
        """
        action_scores = [evaluate_symbolic_expression(expr, observation) for expr in self.symbolic]
        if self.passive_alternative_action:
            action_scores.append(0.5)
        return np.argmax(action_scores)

    def set_model_index(self, model_index):
        """
        Sets a new model index for symbolic evaluation.

        Parameters:
        model_index (int): Index of the model to use.
        """
        self.model_index = model_index
        self.symbolic = [self.symbolic_models[i].sympy(len(self.symbolic_models[i].equations_) - 1 - self.model_index) for i in range(len(self.symbolic_models))]

def evaluate_symbolic_agent(env, agent, n_episodes):
    """
    Evaluates the symbolic policy agent over multiple episodes.

    Parameters:
    env: Gym environment.
    agent: Symbolic policy agent to evaluate.
    n_episodes (int): Number of episodes to evaluate.

    Returns:
    Mean and standard deviation of rewards, and list of rewards.
    """
    rewards = []
    for _ in trange(n_episodes, desc="Evaluating agent"):
        obs = env.reset()
        done, episode_reward = False, 0
        while not done:
            action = agent.select_action(obs)
            obs, reward, done, truncated = env.step(action)
            done = done or truncated
            episode_reward += reward
        rewards.append(episode_reward)

    return np.mean(rewards), np.std(rewards), rewards

def evaluate_and_plot_models(env, symbolic_models, num_actions, evaluated_episodes):
    """
    Evaluates and plots performance of symbolic models.

    Parameters:
    env: Gym environment.
    symbolic_models: List of symbolic models to evaluate.
    num_actions (int): Number of actions in the environment.
    evaluated_episodes (int): Number of episodes to evaluate per model.

    Returns:
    List of mean rewards, standard deviations, and rewards for each model.
    """
    results, all_rewards = [], []
    for k in range(len(symbolic_models[0].equations_)):
        try:
            symbolic_agent = SymbolicPolicyAgent(symbolic_models, num_actions, model_index=k)
            mean_reward, std_reward, rewards = evaluate_symbolic_agent(env, symbolic_agent, evaluated_episodes)
            results.append((mean_reward, std_reward))
            all_rewards.append(rewards)

            # Plotting results
            k_indices = np.arange(len(results))
            mean_rewards = [result[0] for result in results]
            std_rewards = [result[1] for result in results]

            plt.figure(figsize=(10, 6))
            plt.plot(k_indices, mean_rewards, label="Mean Reward", color='royalblue')
            plt.fill_between(k_indices, np.array(mean_rewards) - np.array(std_rewards), np.array(mean_rewards) + np.array(std_rewards), color='royalblue', alpha=0.3)
            plt.xlabel("k-best model index")
            plt.ylabel("Reward")
            plt.title(f"Mean Reward and Confidence Interval for k-best Models (seed: {seed})")
            plt.legend()
            plt.xticks(k_indices)
            plt.show()

        except Exception as e:
            print(f"No entries for {k+1}th best model: {e}")

    return results, all_rewards


In [None]:
# Main Execution

# Connect to Google Drive if storing results in Drive is desired
use_drive = True
directory = "ATRL/"
if use_drive:
    drive.mount('/content/drive', force_remount=True)
    storage_path = '/content/drive/MyDrive/' + directory
else:
    storage_path = directory

# Ensure storage directory exists
os.makedirs(storage_path) if not os.path.exists(storage_path) else print(storage_path + " already exists")

# Set seed for reproducibility
seed = 1000

# Configuration Parameters
sample_point_probability = 0.1  # Probability of sampling data points (higher = more diverse samples, but more time)
evaluated_episodes = 25         # Number of episodes to evaluate the model's performance (higher = more precise results)
n_pySR_iterations = 100         # Number of iterations for symbolic regression (increase for better results)
n_samples_inference = 10_000    # Number of inference samples (upper limit for pySR framework)
designation = f"cartpole_s{seed}_both_actions_represented"

# Load pre-trained model from Hugging Face or local storage
trained_policy_filename = "ppo-CartPole-v1.zip"
repo_id = "sb3/ppo-CartPole-v1"
repo_filename = "ppo-CartPole-v1.zip"
model = load_pretrained_model(trained_policy_filename, repo_id, repo_filename)

# Load and customize the environment
env = gym.make("CartPole-v1")
env = customize_environment(env)
num_actions = 2  # CartPole has two actions: left (0) and right (1)
max_steps = 500  # Maximum steps per episode in CartPole

# Set seeds for reproducibility across libraries
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
env.reset(seed=seed)

# Collect inference samples from the environment
observations, action_values = collect_inference_samples(env, model, n_samples_inference, max_steps, num_actions)

# Retrieve symbolic models using the collected data
symbolic_models = retrieve_symbolic_models(observations, action_values, num_actions)

# Store symbolic models to disk
symbolic_model_filename = storage_path + f'pysr_models_{designation}.pkl'
with open(symbolic_model_filename, 'wb') as f:
    pickle.dump(symbolic_models, f)

# Evaluate the symbolic models and plot performance
results, all_rewards = evaluate_and_plot_models(env, symbolic_models, num_actions, evaluated_episodes)

# Store individual episode rewards to disk
rewards_filename = storage_path + f'episode_rewards_{designation}.pkl'
with open(rewards_filename, 'wb') as f:
    pickle.dump(all_rewards, f)

# Close the environment to free resources
env.close()