# Multi-Agent Soccer Game with Deep Reinforcement Learning
## Google Colab統合版

このノートブックは深層強化学習を利用したマルチエージェントサッカーゲームの完全統合版です。

### 実装内容
- **環境**: 2v2サッカーゲーム（PettingZoo互換）
- **物理エンジン**: リアルタイムな衝突検出とボール物理
- **エージェント**: Random, DQN, MADDPG
- **観測空間**: 28次元（プレイヤー位置、ボール状態、ゲーム情報）
- **行動空間**: 5次元連続行動（移動+キック）
- **報酬システム**: 多目的報酬関数（ゴール、ボールコントロール、チームワーク）

### 特徴
1. 完全な物理シミュレーション
2. 複数の学習アルゴリズム実装
3. 創発的行動の分析
4. チームワーク評価メトリクス

## 1. 必要なライブラリのインストール

In [None]:
# 必要なライブラリのインストール
!pip install -q gymnasium
!pip install -q pettingzoo
!pip install -q pygame
!pip install -q torch torchvision
!pip install -q matplotlib seaborn
!pip install -q numpy

print("All dependencies installed successfully!")

## 2. 基本インポートとセットアップ

In [None]:
# 基本ライブラリのインポート
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple, Optional, Any, Union
import json
from collections import defaultdict, deque
import random
from dataclasses import dataclass
from abc import ABC, abstractmethod
import time
import os

# Gymnasium and PettingZoo imports
import gymnasium as gym
from gymnasium import spaces
from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector, wrappers

# Pygame for rendering (optional)
try:
    import pygame
    PYGAME_AVAILABLE = True
except ImportError:
    PYGAME_AVAILABLE = False
    print("Pygame not available. Rendering disabled.")

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

# Set style for plots
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"Device: {torch.device('cuda' if torch.cuda.is_available() else 'cpu')}")

## 3. 設定クラス（Configuration）

In [None]:
"""
Configuration file for Multi-Agent Soccer Game
"""

@dataclass
class SoccerEnvironmentConfig:
    """Environment configuration for soccer game"""
    FIELD_SIZE: Tuple[int, int] = (800, 600)
    GOAL_SIZE: Tuple[int, int] = (20, 200)
    BALL_RADIUS: int = 10
    PLAYER_RADIUS: int = 20
    MAX_STEPS: int = 1000

    NUM_PLAYERS_PER_TEAM: int = 2
    TEAM_COLORS: Tuple[str, str] = ('blue', 'red')
    PLAYER_SPEED: float = 5.0
    BALL_SPEED_MULTIPLIER: float = 1.5

    FRICTION: float = 0.95
    BALL_DECAY: float = 0.98
    COLLISION_THRESHOLD: float = 30.0

@dataclass
class MADDPGConfig:
    """MADDPG algorithm configuration"""
    obs_dim: int = 28
    action_dim: int = 5
    global_obs_dim: int = 112  # 28 * 4 agents
    global_action_dim: int = 20  # 5 * 4 agents
    hidden_dims: Tuple[int, ...] = (256, 128)

    actor_lr: float = 1e-4
    critic_lr: float = 1e-3
    gamma: float = 0.95
    tau: float = 0.01
    batch_size: int = 256
    buffer_size: int = int(1e6)
    noise_scale: float = 0.1
    noise_decay: float = 0.9999

@dataclass
class TrainingConfig:
    """Training configuration"""
    max_episodes: int = 10000
    max_steps_per_episode: int = 1000
    save_freq: int = 1000
    eval_freq: int = 500
    log_freq: int = 100

    # Reproducibility
    random_seed: int = 42

@dataclass
class ExperimentConfig:
    """Experiment configuration"""
    experiment_name: str = "soccer_multiagent"
    log_dir: str = "logs"
    save_dir: str = "saved_models"
    video_dir: str = "videos"

    # Algorithms to run
    algorithms: Tuple[str, ...] = ("random", "dqn", "ppo", "maddpg")

print("Configuration classes defined successfully!")

## 4. 物理エンジン（Physics Engine）

In [None]:
"""
Physics engine for soccer game
"""

class Ball:
    def __init__(self, x: float, y: float, radius: float = 10):
        self.pos = np.array([x, y], dtype=float)
        self.vel = np.array([0.0, 0.0], dtype=float)
        self.radius = radius

    def update(self, config: SoccerEnvironmentConfig):
        """Update ball position with physics"""
        # Apply velocity
        self.pos += self.vel

        # Apply ball decay (friction)
        self.vel *= config.BALL_DECAY

        # Boundary collision detection
        field_width, field_height = config.FIELD_SIZE

        # Horizontal boundaries (top/bottom)
        if self.pos[1] <= self.radius or self.pos[1] >= field_height - self.radius:
            self.vel[1] *= -0.8  # Energy loss on collision
            self.pos[1] = max(self.radius, min(field_height - self.radius, self.pos[1]))

        # Vertical boundaries (left/right - goals)
        goal_top = (field_height - config.GOAL_SIZE[1]) // 2
        goal_bottom = goal_top + config.GOAL_SIZE[1]

        # Left side
        if self.pos[0] <= self.radius:
            if goal_top <= self.pos[1] <= goal_bottom:
                # Goal scored
                return "goal_left"
            else:
                self.vel[0] *= -0.8
                self.pos[0] = self.radius

        # Right side
        elif self.pos[0] >= field_width - self.radius:
            if goal_top <= self.pos[1] <= goal_bottom:
                # Goal scored
                return "goal_right"
            else:
                self.vel[0] *= -0.8
                self.pos[0] = field_width - self.radius

        return None

class Player:
    def __init__(self, x: float, y: float, team: int, player_id: int, radius: float = 20):
        self.pos = np.array([x, y], dtype=float)
        self.vel = np.array([0.0, 0.0], dtype=float)
        self.team = team
        self.player_id = player_id
        self.radius = radius
        self.has_ball = False

    def update(self, action: np.ndarray, config: SoccerEnvironmentConfig):
        """Update player position based on action"""
        # Extract movement and kick actions
        move_x, move_y = action[0], action[1]
        kick_power = action[2] if len(action) > 2 else 0.0
        kick_dir_x = action[3] if len(action) > 3 else 0.0
        kick_dir_y = action[4] if len(action) > 4 else 0.0

        # Apply movement
        movement = np.array([move_x, move_y]) * config.PLAYER_SPEED
        self.vel = movement
        self.pos += self.vel

        # Apply friction
        self.vel *= config.FRICTION

        # Boundary constraints
        field_width, field_height = config.FIELD_SIZE
        self.pos[0] = max(self.radius, min(field_width - self.radius, self.pos[0]))
        self.pos[1] = max(self.radius, min(field_height - self.radius, self.pos[1]))

        return kick_power, np.array([kick_dir_x, kick_dir_y])

class PhysicsEngine:
    def __init__(self, config: SoccerEnvironmentConfig):
        self.config = config
        self.ball = Ball(
            config.FIELD_SIZE[0] // 2,
            config.FIELD_SIZE[1] // 2,
            config.BALL_RADIUS
        )

        # Initialize players
        self.players = []
        self._init_players()

    def _init_players(self):
        """Initialize player positions"""
        field_width, field_height = self.config.FIELD_SIZE

        # Team 0 (left side - blue)
        self.players.append(Player(field_width * 0.2, field_height * 0.3, 0, 0))
        self.players.append(Player(field_width * 0.2, field_height * 0.7, 0, 1))

        # Team 1 (right side - red)
        self.players.append(Player(field_width * 0.8, field_height * 0.3, 1, 0))
        self.players.append(Player(field_width * 0.8, field_height * 0.7, 1, 1))

    def reset(self):
        """Reset physics state"""
        self.ball.pos = np.array([
            self.config.FIELD_SIZE[0] // 2,
            self.config.FIELD_SIZE[1] // 2
        ], dtype=float)
        self.ball.vel = np.array([0.0, 0.0], dtype=float)

        # Reset player positions
        field_width, field_height = self.config.FIELD_SIZE
        positions = [
            (field_width * 0.2, field_height * 0.3),  # Team 0, Player 0
            (field_width * 0.2, field_height * 0.7),  # Team 0, Player 1
            (field_width * 0.8, field_height * 0.3),  # Team 1, Player 0
            (field_width * 0.8, field_height * 0.7),  # Team 1, Player 1
        ]

        for i, (x, y) in enumerate(positions):
            self.players[i].pos = np.array([x, y], dtype=float)
            self.players[i].vel = np.array([0.0, 0.0], dtype=float)
            self.players[i].has_ball = False

    def step(self, actions: Dict[str, np.ndarray]) -> Tuple[Optional[str], Optional[int]]:
        """Step physics simulation"""
        # Update players
        kicks = {}
        for i, player in enumerate(self.players):
            agent_key = f"player_{i}"
            if agent_key in actions:
                kick_power, kick_dir = player.update(actions[agent_key], self.config)
                if kick_power > 0:
                    kicks[i] = (kick_power, kick_dir)

        # Check player collisions with ball and apply kicks
        ball_touched_by = None
        for i, player in enumerate(self.players):
            dist = np.linalg.norm(player.pos - self.ball.pos)
            if dist <= player.radius + self.ball.radius:
                ball_touched_by = i
                player.has_ball = True

                # Apply kick if player is kicking
                if i in kicks:
                    kick_power, kick_dir = kicks[i]
                    kick_dir = kick_dir / (np.linalg.norm(kick_dir) + 1e-8)  # Normalize
                    self.ball.vel += kick_dir * kick_power * self.config.BALL_SPEED_MULTIPLIER
            else:
                player.has_ball = False

        # Update ball
        goal_result = self.ball.update(self.config)

        # Handle player-player collisions
        self._handle_player_collisions()

        return goal_result, ball_touched_by

    def _handle_player_collisions(self):
        """Handle collisions between players"""
        for i in range(len(self.players)):
            for j in range(i + 1, len(self.players)):
                p1, p2 = self.players[i], self.players[j]
                dist = np.linalg.norm(p1.pos - p2.pos)

                if dist < p1.radius + p2.radius:
                    # Separate players
                    direction = p1.pos - p2.pos
                    direction = direction / (np.linalg.norm(direction) + 1e-8)
                    overlap = (p1.radius + p2.radius) - dist

                    p1.pos += direction * overlap * 0.5
                    p2.pos -= direction * overlap * 0.5

    def get_state(self) -> Dict:
        """Get current state of all entities"""
        return {
            'ball': {
                'pos': self.ball.pos.copy(),
                'vel': self.ball.vel.copy()
            },
            'players': [
                {
                    'pos': player.pos.copy(),
                    'vel': player.vel.copy(),
                    'team': player.team,
                    'player_id': player.player_id,
                    'has_ball': player.has_ball
                }
                for player in self.players
            ]
        }

print("Physics engine implemented successfully!")