In [1]:
import sys
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
print(os.getcwd())

C:\Users\sleepybear\TerritoryGame_Maze\Territory_complex\Territory_complex_random


In [2]:
sys.path.append('C:/Users/sleepybear/TerritoryGame_Maze')
from TerritoryGame_MaPolicy import PPO  #指定路径导入一个名为TerritoryGame_MaPolicy的模块Specify the path to import a module named TerritoryGame_MaPolicy

In [3]:
sys.path.append('C:/Users/sleepybear/TerritoryGame_Maze/Territory_complex/Territory_complex_random')
from TerritoryGame_New_Random_Env import TerritoryBattleEnvCoopRandom

In [4]:
import logging
import pygame
import moviepy.editor as mpy

In [5]:
import tensorflow as tf
import numpy as np

In [6]:
sys.path.append('C:/Users/sleepybear/TerritoryGame_Maze')
from visualization import Visualization  # Import visualization class

In [7]:
import scipy.spatial

In [8]:
import time

In [9]:
def compute_policy_similarity(policy1, policy2):
    return scipy.spatial.distance.cosine(policy1.flatten(), policy2.flatten())
def detect_policy_switch(previous_policy, current_policy, threshold=0.1):
    return np.linalg.norm(previous_policy - current_policy) > threshold

#This code will save the pygame video after running, which will cause 5 minutes of unresponsiveness.
#If you don't want the video, you need to remove the code and storage list for saving the video.

In [12]:
def main():
    env = TerritoryBattleEnvCoopRandom(grid_size=12, max_steps=150)
    num_agents = env.num_agents
    state_size = env.observation_space.shape[0] * env.observation_space.shape[1]
    action_size = env.action_space.n
    
    ppo = PPO(state_size=state_size, action_size=action_size, num_agents=num_agents)
    ppo.load("C:/Users/sleepybear/TerritoryGame_Maze/Territory_complex/model/ppo_Territory_complex_model")

    num_episodes = 100
    batch_size = 32

    vis = Visualization(env)

    # 创建一个新的日志目录，每次运行时带有时间戳Create a new log directory with a timestamp each time it is run
    log_dir = os.path.join("logs", "analysis", f"run_{int(time.time())}")
    summary_writer = tf.summary.create_file_writer(log_dir)

    # 存储帧的列表，用于保存视频A list of storage frames, used to save video
    frames = []

    with open('output.txt', 'w') as file:
        file.write("Episode,Total_Red_Reward,Total_Blue_Reward,Red_Territories,Blue_Territories\n")

        win_counts = np.zeros(num_agents)
        previous_policies = [None] * num_agents
        policy_switches = np.zeros(num_agents)

        for episode in range(num_episodes):
            state = env.reset().flatten()
            done = False
            total_red_reward = 0
            total_blue_reward = 0
            step_count = 0

            all_states, all_actions, all_rewards_per_step, all_old_probs, all_values = [], [], [], [], []
            red_rewards, blue_rewards = [], []
            policy_similarities = []

            while not done and step_count < env.max_steps:
                step_count += 1

                actions = []
                action_probs = []
                policies = []

                for agent_index in range(num_agents):
                    action, policy = ppo.get_action(state, agent_index)
                    actions.append(action)
                    action_probs.append(policy)
                    policies.append(policy)

                next_state, rewards, done, info = env.step(actions)

                all_states.append(state)
                all_actions.append(actions)
                all_rewards_per_step.append(np.sum(rewards))
                all_old_probs.append(action_probs)

                red_rewards.append(np.sum([reward if agent['team'] == 'red' else 0 for reward, agent in zip(rewards, env.agents)]))
                blue_rewards.append(np.sum([reward if agent['team'] == 'blue' else 0 for reward, agent in zip(rewards, env.agents)]))

                # 记录对抗结果Record the confrontation results
                red_team_wins = info['red_count'] > info['blue_count']
                blue_team_wins = info['blue_count'] > info['red_count']

                for i, agent in enumerate(env.agents):
                    if agent['team'] == 'red' and red_team_wins:
                        win_counts[i] += 1
                    elif agent['team'] == 'blue' and blue_team_wins:
                        win_counts[i] += 1

                # 计算策略相似性Calculating strategy similarity
                for i in range(num_agents):
                    for j in range(i + 1, num_agents):
                        similarity = compute_policy_similarity(policies[i], policies[j])
                        policy_similarities.append(similarity)

                # 检测策略切换Detection strategy switching
                for agent_index in range(num_agents):
                    if previous_policies[agent_index] is not None:
                        if detect_policy_switch(previous_policies[agent_index], policies[agent_index]):
                            policy_switches[agent_index] += 1
                    previous_policies[agent_index] = policies[agent_index]

                for agent_index in range(num_agents):
                    value = ppo.value_models[agent_index](tf.convert_to_tensor(state[None, :])).numpy()[0]
                    all_values.append(value)

                state = next_state.flatten()
                total_red_reward += np.sum([reward if agent['team'] == 'red' else 0 for reward, agent in zip(rewards, env.agents)])
                total_blue_reward += np.sum([reward if agent['team'] == 'blue' else 0 for reward, agent in zip(rewards, env.agents)])

                vis.update(episode)
                env.handle_events()

                # 捕获当前帧并添加到帧列表Capture the current frame and add it to the frame list
                frame = pygame.surfarray.array3d(pygame.display.get_surface())
                frame = frame.transpose([1, 0, 2])
                frames.append(frame)

            if (episode + 1) % batch_size == 0:
                with summary_writer.as_default():
                    for agent_index in range(num_agents):
                        tf.summary.scalar(f"Agent_{agent_index}_Reward", total_red_reward if env.agents[agent_index]['team'] == 'red' else total_blue_reward, step=episode)
                        tf.summary.scalar(f"Agent_{agent_index}_Win_Rate", win_counts[agent_index] / (episode + 1), step=episode)
                        tf.summary.scalar(f"Agent_{agent_index}_Policy_Switches", policy_switches[agent_index], step=episode)
                    tf.summary.scalar(f"Average_Policy_Similarity", np.mean(policy_similarities), step=episode)

            output = (
                f"{episode + 1},{total_red_reward},{total_blue_reward},"
                f"{info['red_count']},{info['blue_count']}\n"
            )
            print(output)
            logging.info(f"Episode {episode + 1} completed: {output.strip()}")
            file.write(output)

        # 保存帧列表为视频 Save frame list as video
        clip = mpy.ImageSequenceClip(frames, fps=30)
        clip.write_videofile("pygame_recording.mp4", codec="libx264")

        vis.show()
        summary_writer.close()
        logging.info("Training completed.")

if __name__ == "__main__":
    try:
        main()
    except SystemExit:
        pass


Loading model from: C:/Users/sleepybear/TerritoryGame_Maze/Territory_complex/model/ppo_Territory_complex_model_policy_0.h5
Loading model from: C:/Users/sleepybear/TerritoryGame_Maze/Territory_complex/model/ppo_Territory_complex_model_policy_1.h5
Loading model from: C:/Users/sleepybear/TerritoryGame_Maze/Territory_complex/model/ppo_Territory_complex_model_policy_2.h5
Loading model from: C:/Users/sleepybear/TerritoryGame_Maze/Territory_complex/model/ppo_Territory_complex_model_policy_3.h5
Loading model from: C:/Users/sleepybear/TerritoryGame_Maze/Territory_complex/model/ppo_Territory_complex_model_value_0.h5
Loading model from: C:/Users/sleepybear/TerritoryGame_Maze/Territory_complex/model/ppo_Territory_complex_model_value_1.h5
Loading model from: C:/Users/sleepybear/TerritoryGame_Maze/Territory_complex/model/ppo_Territory_complex_model_value_2.h5
Loading model from: C:/Users/sleepybear/TerritoryGame_Maze/Territory_complex/model/ppo_Territory_complex_model_value_3.h5
Attacker 0 (Team red

                                                                                                                       

Moviepy - Done !
Moviepy - video ready pygame_recording.mp4


#If you want to use tensorboard, you must first clear the files in the analysis folder to ensure that there is only one curve.
#If you do not clear it, the results of multiple trainings in different timelines will be displayed.
#You also need to end the previous tensorboard process in the task manager, otherwise the server will still display the last training results.

In [13]:
# 启动 TensorBoard
!tensorboard --logdir logs/analysis --port=6010
#http://localhost:6010

2024-09-03 11:52:38.360368: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-09-03 11:52:42.708920: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all
TensorBoard 2.17.0 at http://localhost:6010/ (Press CTRL+C to quit)
