In [None]:
    def calculate_reward(self):
        logger.info("Calculating reward...")
        agent_pos = (self.agent_controler.agent.grid_x, self.agent_controler.agent.grid_y)
        reward = 0

        # Check if agent reached a new outpost
        if agent_pos in self.outpost_coords and agent_pos not in self.outposts_visited:
            reward += self.new_outpost_reward
            self.outposts_visited.add(agent_pos)
            logger.info(f"Agent reached new outpost. Reward: {self.new_outpost_reward}")

            self.recent_path.clear()  # Clear the path memory when reaching a new outpost
            if len(self.outposts_visited) == len(self.outpost_coords):
                reward += self.completion_reward
                logger.info(f"All outposts visited. Additional reward: {self.completion_reward}")
                self.early_stop = True
        else:
            unvisited_outposts = [outpost for outpost in self.outpost_coords if outpost not in self.outposts_visited]
            if unvisited_outposts:
                current_min_distance = min(manhattan_distance(agent_pos, outpost) for outpost in unvisited_outposts)
                
                if current_min_distance < self.previous_min_distance:
                    reward += self.closer_to_outpost_reward
                    logger.info(f"Agent moved closer to an outpost. Reward: {self.closer_to_outpost_reward}")
                elif current_min_distance > self.previous_min_distance:
                    reward += self.farther_from_outpost_penalty
                    logger.info(f"Agent moved away from outposts. Penalty: {self.farther_from_outpost_penalty}")
                
                self.previous_min_distance = current_min_distance
            else:
                logger.warning("No unvisited outposts left. The agent should have stopped already.")

            # Check for circular behavior
            if agent_pos in self.recent_path:
                reward += self.circular_behavior_penalty
                logger.info(f"Agent repeated a path. Penalty: {self.circular_behavior_penalty}")
            
            # Update recent path memory
            self.recent_path.append(agent_pos)
        reward += self.penalty
        return reward

In [None]:
class CustomEnv(gym.Env):
    def __init__(self, game_manager_args, simulation_manager_args, model_args):
        self.game_manager = GameManager(**game_manager_args)
        # ... rest of the initialization ...

class GameManager:
    def __init__(self, num_tiles, screen_size, vision_range, kg_completeness):
        self.num_tiles = num_tiles
        self.screen_size = screen_size
        self.vision_range = vision_range
        self.kg_completeness = kg_completeness
        # ... rest of the initialization ...

In [None]:
import os
import torch
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.evaluation import evaluate_policy

import cProfile
import pstats
import logging
import traceback
import time
import warnings
import json

from custom_env import CustomEnv
from agent_model import AgentModel

class Logger:
    def __init__(self, log_file='training.log'):
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        
        # File handler
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(logging.INFO)
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        # Formatter
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        # Add handlers to logger
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
        
        warnings.filterwarnings("always")

    def info(self, message):
        self.logger.info(message)

    def warning(self, message):
        self.logger.warning(message)

    def error(self, message):
        self.logger.error(message)

class EnvironmentManager:
    def __init__(self, game_manager_args, simulation_manager_args, model_args):
        self.game_manager_args = game_manager_args
        self.simulation_manager_args = simulation_manager_args
        self.model_args = model_args

    def make_env(self):
        env = CustomEnv(self.game_manager_args, self.simulation_manager_args, self.model_args)
        return Monitor(env)

    def set_kg_completeness(self, env, completeness):
        env.set_kg_completeness(completeness)

# ... [Rest of the classes remain the same] ...

if __name__ == '__main__':
    os.environ['PYGAME_DETECT_AVX2'] = '1'

    base_config = {
        'model_args': {'num_actions': 11},
        'simulation_manager_args': {'number_of_environments': 10, 'number_of_curricula': 3},
        'game_manager_args': {'num_tiles': 8, 'screen_size': 200, 'vision_range': 1, 'kg_completeness': 1.0},  # Add kg_completeness here
        'model_config': {
            'n_steps': 2048,
            'batch_size': 64,
            'learning_rate': 3e-4,
            'gamma': 0.99
        },
        'total_timesteps': 1000000
    }

    kg_completeness_values = [0.25, 0.5, 0.75, 1.0]

    logger = Logger('ablation_study.log')  # Specify the log file name
    ablation_study = AblationStudy(base_config, kg_completeness_values, logger)

    try:
        ablation_study.run()
    except Exception as e:
        logger.error(f"An error occurred during the ablation study: {str(e)}")
        logger.error(traceback.format_exc())

In [None]:
import os
import torch
import numpy as np
import copy
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.evaluation import evaluate_policy

from custom_env import CustomEnv
from agent_model import AgentModel
import logging
import traceback
import time
import warnings

# ... (previous imports and setup remain the same)

class TrainingManager:
    # ... (previous methods remain the same)

    def run_ablation_study(self, kg_completeness_levels):
        results = {}
        base_env = self.create_env(1.0)  # Create a base environment with full knowledge graph

        for kg_completeness in kg_completeness_levels:
            logger.info(f"Starting ablation study for KG completeness: {kg_completeness}")
            
            # Create a deep copy of the base environment
            env = copy.deepcopy(base_env)
            env.unwrapped.kg_completeness = kg_completeness  # Update the kg_completeness of the copied environment
            
            model = self.create_model(env)
            
            episode_rewards, episode_lengths = self.train_model(model, env)
            
            model.save(f"ppo_custom_env_kg_{kg_completeness}")
            mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
            
            results[kg_completeness] = {
                'episode_rewards': episode_rewards,
                'episode_lengths': episode_lengths,
                'final_mean_reward': mean_reward,
                'final_std_reward': std_reward
            }
            
            env.close()
            
            logger.info(f"Completed ablation study for KG completeness: {kg_completeness}")
            logger.info(f"Final evaluation: Mean reward: {mean_reward:.2f} +/- {std_reward:.2f}")
        
        base_env.close()  # Close the base environment
        return results

    # ... (other methods remain the same)

def main():
    # ... (setup remains the same)

    training_manager = TrainingManager(model_args, simulation_manager_args, game_manager_args)
    kg_completeness_levels = [0.2, 0.5, 0.8, 1.0]  # Example levels for ablation study

    try:
        results = training_manager.run_ablation_study(kg_completeness_levels)
        training_manager.save_results(results)
        logger.info("Ablation study completed and results saved.")
    except Exception as e:
        logger.error(f"An error occurred in the main function: {str(e)}")
        logger.error(traceback.format_exc())

if __name__ == '__main__':
    main()

In [None]:
import copy
import pygame

# Modified SimulationManager class
class SimulationManager:
    def __init__(self, game_manager_args, number_of_environments=1, number_of_curricula=1, plot=False):
        self.base_game_managers = []
        self.curriculum_indices = []
        self.create_base_games(number_of_environments, game_manager_args, plot)
        self.curriculum_indices, self.step_size = self.get_curriculum(number_of_curricula)
        self.kg_completeness_environments = {}

    def create_base_games(self, number_of_games, game_manager_args, plot):
        num_tiles = game_manager_args['num_tiles']
        screen_size = game_manager_args['screen_size']
        vision_range = game_manager_args['vision_range']
        base_kg_completeness = 1.0  # Create base environments with full knowledge

        for _ in range(number_of_games):
            game_manager = GameManager(num_tiles, screen_size, base_kg_completeness, vision_range, plot)
            if len(game_manager.environment.outpost_locations) >= 3:
                self.base_game_managers.append(game_manager)

    def create_kg_completeness_environments(self, kg_completeness_levels):
        for kg_completeness in kg_completeness_levels:
            self.kg_completeness_environments[kg_completeness] = []
            for base_gm in self.base_game_managers:
                new_gm = copy.deepcopy(base_gm)
                new_gm.kg_completeness = kg_completeness
                new_gm.init_knowledge_graph()  # Reinitialize KG with new completeness
                self.kg_completeness_environments[kg_completeness].append(new_gm)

    # ... (other methods remain the same)

# Modified CustomEnv class
class CustomEnv(gym.Env):
    def __init__(self, simulation_manager, kg_completeness, model_args):
        super(CustomEnv, self).__init__()
        self.simulation_manager = simulation_manager
        self.kg_completeness = kg_completeness
        self.game_managers = simulation_manager.kg_completeness_environments[kg_completeness]
        self.current_game_index = -1
        self.set_current_game_manager()

        # ... (rest of the initialization remains the same)

    def set_current_game_manager(self):
        self.current_game_index = (self.current_game_index + 1) % len(self.game_managers)
        self.current_gm = self.game_managers[self.current_game_index]
        # Initialize pygame and set up the game
        pygame.init()
        self.current_gm.initialise_rendering()
        self.environment = self.current_gm.environment
        self.agent_controler = self.current_gm.agent_controler
        self.kg = self.current_gm.kg_class
        self.outpost_coords = self.environment.outpost_locations
        self.best_route_energy = 0

    def reset(self, seed=None, options=None):
        # ... (existing reset logic)
        self.set_current_game_manager()
        # ... (rest of the reset method)

    # ... (other methods remain the same)

# Modified TrainingManager class
class TrainingManager:
    def __init__(self, model_args, simulation_manager_args, game_manager_args):
        self.model_args = model_args
        self.simulation_manager = SimulationManager(
            game_manager_args,
            simulation_manager_args['number_of_environments'],
            simulation_manager_args['number_of_curricula']
        )
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logger.info(f"Using device: {self.device}")

    def create_env(self, kg_completeness):
        env = CustomEnv(self.simulation_manager, kg_completeness, self.model_args)
        return Monitor(env)

    def run_ablation_study(self, kg_completeness_levels):
        # Create environments for all KG completeness levels at the start
        self.simulation_manager.create_kg_completeness_environments(kg_completeness_levels)

        results = {}
        for kg_completeness in kg_completeness_levels:
            logger.info(f"Starting ablation study for KG completeness: {kg_completeness}")
            env = self.create_env(kg_completeness)
            model = self.create_model(env)
            
            episode_rewards, episode_lengths = self.train_model(model, env)
            
            model.save(f"ppo_custom_env_kg_{kg_completeness}")
            mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
            
            results[kg_completeness] = {
                'episode_rewards': episode_rewards,
                'episode_lengths': episode_lengths,
                'final_mean_reward': mean_reward,
                'final_std_reward': std_reward
            }
            
            env.close()
            
            logger.info(f"Completed ablation study for KG completeness: {kg_completeness}")
            logger.info(f"Final evaluation: Mean reward: {mean_reward:.2f} +/- {std_reward:.2f}")
        
        return results

# Main function
def main():
    model_args = {'num_actions': 11}
    simulation_manager_args = {'number_of_environments': 100, 'number_of_curricula': 20}
    game_manager_args = {'num_tiles': 16, 'screen_size': 200, 'vision_range': 2}

    training_manager = TrainingManager(model_args, simulation_manager_args, game_manager_args)
    kg_completeness_levels = [0.2, 0.5, 0.8, 1.0]  # Example levels for ablation study

    try:
        results = training_manager.run_ablation_study(kg_completeness_levels)
        training_manager.save_results(results)
        logger.info("Ablation study completed and results saved.")
    except Exception as e:
        logger.error(f"An error occurred in the main function: {str(e)}")
        logger.error(traceback.format_exc())

if __name__ == '__main__':
    main()

In [None]:
# Modified SimulationManager class
class SimulationManager:
    def __init__(self, game_manager_args, number_of_environments=1, number_of_curricula=1, plot=False):
        self.game_managers = []
        self.curriculum_indices = []
        self.create_games(number_of_environments, game_manager_args, plot)
        self.curriculum_indices, self.step_size = self.get_curriculum(number_of_curricula)
        self.kg_completeness = game_manager_args['kg_completeness']

    # ... (other methods remain the same)

    def set_kg_completeness(self, kg_completeness):
        self.kg_completeness = kg_completeness

# Modified CustomEnv class
class CustomEnv(gym.Env):
    def __init__(self, game_manager_args, simulation_manager_args, model_args, plot=False):
        super(CustomEnv, self).__init__()
        # ... (other initializations remain the same)

        self.simulation_manager = SimulationManager(
            game_manager_args,
            simulation_manager_args['number_of_environments'], 
            simulation_manager_args['number_of_curricula'],
            plot=plot
        )
        
        self.current_game_index = -1 # set to -1 so reset increments to 0
        self.set_current_game_manager()

        # ... (rest of the initialization remains the same)

    def set_current_game_manager(self):
        if self.current_game_index >= len(self.simulation_manager.game_managers):
            self.current_game_index = 0
        self.current_gm = self.simulation_manager.game_managers[self.current_game_index]
        # Initialize pygame and set up the game with the current kg_completeness
        self.current_gm.start_ablation_game(self.simulation_manager.kg_completeness)
        self.environment = self.current_gm.environment
        self.agent_controler = self.current_gm.agent_controler
        self.kg = self.current_gm.kg_class
        self.outpost_coords = self.environment.outpost_locations
        self.best_route_energy = 0

    def reset(self, seed=None, options=None):
        # ... (existing reset logic)
        self.set_current_game_manager()  # This will call start_ablation_game
        # ... (rest of the reset method)

    # ... (other methods remain the same)

# Modified TrainingManager class
class TrainingManager:
    def __init__(self, model_args, simulation_manager_args, game_manager_args):
        self.model_args = model_args
        self.simulation_manager_args = simulation_manager_args
        self.game_manager_args = game_manager_args
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logger.info(f"Using device: {self.device}")

        # Create a single SimulationManager to be used across all ablation steps
        self.simulation_manager = SimulationManager(
            game_manager_args,
            simulation_manager_args['number_of_environments'],
            simulation_manager_args['number_of_curricula']
        )

    def create_env(self, kg_completeness):
        # Set the new KG completeness in the SimulationManager
        self.simulation_manager.set_kg_completeness(kg_completeness)
        
        env = CustomEnv(self.game_manager_args, self.simulation_manager_args, self.model_args)
        env.simulation_manager = self.simulation_manager  # Use the existing SimulationManager
        return Monitor(env)

    # ... (other methods remain the same)

    def run_ablation_study(self, kg_completeness_levels):
        results = {}
        for kg_completeness in kg_completeness_levels:
            logger.info(f"Starting ablation study for KG completeness: {kg_completeness}")
            env = self.create_env(kg_completeness)
            model = self.create_model(env)
            
            episode_rewards, episode_lengths = self.train_model(model, env)
            
            model.save(f"ppo_custom_env_kg_{kg_completeness}")
            mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
            
            results[kg_completeness] = {
                'episode_rewards': episode_rewards,
                'episode_lengths': episode_lengths,
                'final_mean_reward': mean_reward,
                'final_std_reward': std_reward
            }
            
            env.close()
            
            logger.info(f"Completed ablation study for KG completeness: {kg_completeness}")
            logger.info(f"Final evaluation: Mean reward: {mean_reward:.2f} +/- {std_reward:.2f}")
        
        return results

# Main function remains the same