In [None]:
# model and env
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import gymnasium as gym
from collections import deque
import time

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class FootballEnv(gym.Env):
    def __init__(self, grid_rows=10, grid_cols=10):
        super(FootballEnv, self).__init__()
        self.grid_rows = grid_rows
        self.grid_cols = grid_cols

        self.action_space = gym.spaces.Discrete(10)
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(8,), dtype=np.float32)
        self.layout = np.zeros((grid_rows, grid_cols), dtype=str)
        self.layout[:, :] = "."
        self.layout[self.grid_rows//2, self.grid_cols//2] = "C"
        self.layout[self.grid_rows//2-8 : self.grid_rows//2+9, -6:-1] = "D"
        self.layout[self.grid_rows//2-8 : self.grid_rows//2+9, 0:5] = "d"
        self.layout[:, self.grid_cols//2] = "M"
        self.layout[:, -1] = "O"
        self.layout[:, 0] = "O"
        self.layout[0, :] = "O"
        self.layout[-1, :] = "O"
        self.layout[self.grid_rows//2-4 : self.grid_rows//2+5, -1] = "G"
        self.layout[self.grid_rows//2-4 : self.grid_rows//2+5, 0] = "g"
        self.ball_pos = (self.grid_rows//2, self.grid_cols//2)
        self.episode_steps = 0
        self.goal_team = None
        self.team_has_ball = -1

    def _get_state(self, player, players):
        same_team = []
        other_team = []
        for play in players:
            if player.team == play.team:
                same_team.extend([play.position[0]/self.grid_rows - 1, 
                                 play.position[1]/self.grid_cols - 1])
            else:
                other_team.extend([play.position[0]/self.grid_rows - 1, 
                                 play.position[1]/self.grid_cols - 1])
        other_team = [-1*x for x in other_team]

        goal_x = self.grid_rows//2 / (self.grid_rows-1)
        goal_y = (self.grid_cols - 1) / (self.grid_cols - 1) if player.team == 0 else 0
            
        return np.array([
            *same_team,
            *other_team,
            float(player.has_ball),
            float(self.team_has_ball),
            goal_x,
            goal_y
        ], dtype=np.float32)

    def reset(self, players, seed=None, options=None):
        super().reset(seed=seed)
        occupied_positions = set()
        
        for player in players:
            player.has_ball = False
            player.prev_position = None
            while True:
                if player.team == 0:
                    player_row = random.randint(self.grid_rows//2-3, self.grid_rows//2+3)
                    player_col = random.randint(1, self.grid_cols//4)
                else:
                    player_row = random.randint(self.grid_rows//2-3, self.grid_rows//2+3)
                    player_col = random.randint(3*self.grid_cols//4, self.grid_cols-2)

                if (player_row, player_col) not in occupied_positions:
                    player.position = (player_row, player_col)
                    occupied_positions.add((player_row, player_col))
                    break

        while True:
            ball_row = random.randint(1, self.grid_rows-2)
            ball_col = random.randint(self.grid_cols//4, 3*self.grid_cols//4)
            if (ball_row, ball_col) not in occupied_positions:
                self.ball_pos = (ball_row, ball_col)
                break
                
        self.episode_steps = 0
        self.goal_team = None
        return {}, {}

    def step(self, action, player, players):
        self.episode_steps += 1
        reward = player.step_penalty
        done = False
        truncated = False
        team_reward_applied = False
    
        # Store previous ball possession status
        had_ball_before_move = player.has_ball
        
        if action < 8:
            dx, dy = [(0, -1), (1, 0), (0, 1), (-1, 0), 
                     (-1, -1), (-1, 1), (1, -1), (1, 1)][action]
            new_pos = (player.position[0] + dx, player.position[1] + dy)
    
            if (0 <= new_pos[0] < self.grid_rows and 
                0 <= new_pos[1] < self.grid_cols and 
                self.layout[new_pos[0], new_pos[1]] != "O"):
                player.prev_position = player.position
                player.position = new_pos
                
                # Only move ball if player already has it
                if had_ball_before_move:
                    self.ball_pos = new_pos
            else:
                reward -= 20
                done = True
                return self._get_state(player, players), reward, done, True, {}
    
        # Update ball possession status AFTER movement
        player.prev_ball_statues = player.has_ball  # Keeping your original typo for consistency
        player.has_ball = player.position == self.ball_pos
        # print(player.has_ball)
        
        # Update which team has the ball
        if player.has_ball:
            self.team_has_ball = player.team
        elif self.team_has_ball == player.team and had_ball_before_move and not player.has_ball:
            # If this player's team had the ball (through this player) and doesn't anymore
            self.team_has_ball = -1
    
        if player.has_ball:
            reward += player.ball_possession_bonus + 0.001
    
        if not player.has_ball and self.team_has_ball != player.team:
            reward += player.no_possession
    
        dist_to_ball = np.sqrt((player.position[0] - self.ball_pos[0])**2 + 
                              (player.position[1] - self.ball_pos[1])**2)
        if dist_to_ball < 5 and not player.has_ball:
            reward += player.near_ball_bonus
        
        if player.has_ball:
            dist_to_goal = self.grid_cols - 1 - player.position[1]
            if dist_to_goal < 10:
                reward += player.near_goal_bonus * (1 - dist_to_goal/10.0)
    
        current_ball_cell = self.layout[self.ball_pos[0], self.ball_pos[1]]
        goal_team = None
        if current_ball_cell == 'G':
            goal_team = 0
        elif current_ball_cell == 'g':
            goal_team = 1
    
        if goal_team is not None and not team_reward_applied:
            for p in players:
                if p.team == goal_team:
                    p_reward = p.goal_reward
                else:
                    p_reward = p.opp_scoring
                
                if p == player:
                    reward += p_reward
                    
            done = True
            team_reward_applied = True
            self.goal_team = goal_team
    
        truncated = self.episode_steps >= 3000
        return self._get_state(player, players), reward, done, truncated, {'goal_team': self.goal_team}

    def render(self, players):
        grid = np.full((self.grid_rows, self.grid_cols), '-')
        for i in range(self.grid_rows):
            for j in range(self.grid_cols):
                if self.layout[i,j] == 'O':
                    grid[i,j] = '#'
                elif self.layout[i,j] in ['G', 'g']:
                    grid[i,j] = '|'
        
        grid[self.ball_pos[0], self.ball_pos[1]] = 'B'
        for player in players:
            grid[player.position[0], player.position[1]] = 'P' if player.team == 0 else 'Q'
        
        print('-' * (self.grid_cols + 2))
        for row in grid:
            print('|' + ''.join(row) + '|')
        print('-' * (self.grid_cols + 2))

class Player:
    def __init__(self, role, team, env):
        self.role = role
        self.team = team
        self.position = (env.grid_rows//2, env.grid_cols//4 if team == 0 else 3*env.grid_cols//4)
        self.prev_position = None
        self.has_ball = False
        self.goal_reward = 52
        self.step_penalty = -0.003
        self.ball_possession_bonus = 0.008
        self.near_ball_bonus = 0.00001
        self.near_goal_bonus = 0.00002
        self.opp_scoring = -50
        self.no_possession = -0.0005
        self.goal_sym = 'G' if team == 0 else 'g'
        self.opp_goal_sym = 'g' if team == 0 else 'G'

class DQN(nn.Module):
    def __init__(self, input_dim, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, action_size)
        
        for layer in [self.fc1, self.fc2, self.fc3, self.fc4]:
            nn.init.xavier_uniform_(layer.weight)
            nn.init.zeros_(layer.bias)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        return self.fc4(x)

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_min = 0.05
        self.epsilon_decay = 0.99995
        self.learning_rate = 0.0003
        self.memory = deque(maxlen=100000)
        self.batch_size = 256
        self.target_update_freq = 5
        
        self.device = device
        self.model = DQN(state_size, action_size).to(self.device)
        self.target_model = DQN(state_size, action_size).to(self.device)
        self.target_model.load_state_dict(self.model.state_dict())
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        
        self.rewards_history = []
        self.episode_count = 0

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state, evaluate=False):
        if not evaluate and np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            q_values = self.model(state)
        return torch.argmax(q_values).item()

    def replay(self):
        if len(self.memory) < self.batch_size:
            return 0
            
        minibatch = random.sample(self.memory, self.batch_size)
        states = torch.FloatTensor(np.array([t[0] for t in minibatch])).to(self.device)
        actions = torch.LongTensor([t[1] for t in minibatch]).to(self.device)
        rewards = torch.FloatTensor([t[2] for t in minibatch]).to(self.device)
        next_states = torch.FloatTensor(np.array([t[3] for t in minibatch])).to(self.device)
        dones = torch.FloatTensor([t[4] for t in minibatch]).to(self.device)
    
        curr_q = self.model(states).gather(1, actions.unsqueeze(1))
        next_q = self.target_model(next_states).max(1)[0].detach()
        target = rewards + (1 - dones) * self.gamma * next_q
    
        loss = F.mse_loss(curr_q.squeeze(), target)
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
        self.optimizer.step()
        return loss.item()

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def decay_epsilon(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


In [None]:
def train_multi_agent(players, episodes, max_steps):
    env = FootballEnv()
    agents = [DQNAgent(8, 8) for _ in players]
    
    for episode in range(episodes):
        env.reset(players)
        states = [env._get_state(p, players) for p in players]
        total_rewards = [0.0 for _ in players]
        losses = [[] for _ in players]
        done = False
        truncated = False
        goal_team = None

        for step in range(max_steps):
            if done or truncated:
                break

            actions = [agents[i].act(states[i]) for i in range(len(players))]
            experiences = []
            
            for i in range(len(players)):
                next_state, reward, done, truncated, info = env.step(
                    actions[i], players[i], players
                )
                experiences.append((states[i], actions[i], reward, next_state, done or truncated))
                total_rewards[i] += reward
                states[i] = next_state

            if 'goal_team' in info and info['goal_team'] is not None:
                goal_team = info['goal_team']
                for i in range(len(players)):
                    team_reward = players[i].goal_reward if players[i].team == goal_team else players[i].opp_scoring
                    exp = list(experiences[i])
                    exp[2] += team_reward
                    experiences[i] = tuple(exp)
                    total_rewards[i] += team_reward

            for i in range(len(players)):
                agents[i].remember(*experiences[i])
                if len(agents[i].memory) >= agents[i].batch_size:
                    loss = agents[i].replay()
                    losses[i].append(loss)

        for idx, agent in enumerate(agents):
            agent.episode_count += 1
            if agent.episode_count % agent.target_update_freq == 0:
                agent.update_target_model()
            agent.decay_epsilon()
            agent.rewards_history.append(total_rewards[idx])

        avg_losses = [np.mean(loss) if loss else 0.0 for loss in losses]
        if episode %1000 == 0:
            print(episode)
        # print(f"Episode {episode + 1}/{episodes}")
        # print(f"Total Rewards: {total_rewards}")
        # print(f"Average Losses: {avg_losses}")
        # print(f"Epsilon: {agents[0].epsilon:.3f}")
        # env.render(players)

    return agents

env = FootballEnv()
players = [Player('F', i%2, env) for i in range(2)]
trained_agents = train_multi_agent(players, episodes=10000, max_steps=1000)

In [None]:
# Evaluvating the agents
def evaluate_multi_agent(agents, env, players, episodes=1, max_steps=1000, render=True):
    episode_rewards = []
    
    for episode in range(episodes):
        env.reset(players)
        total_rewards = [0.0 for _ in players]
        done = False
        truncated = False
        
        for step in range(max_steps):
            if done or truncated:
                break
            
            if render:
                env.render(players)
                time.sleep(0.1)
            
            # Get states for all players
            states = [env._get_state(player, players) for player in players]
            
            # Process actions for all players in random order
            order = np.random.permutation(len(players))
            for idx in order:
                agent = agents[idx]
                player = players[idx]
                
                action = agent.act(states[idx], evaluate=True)  # No exploration
                next_state, reward, done, truncated, _ = env.step(action, player, players)
                # print(idx,action,player.has_ball)
                
                total_rewards[idx] += reward
                
                if done or truncated:
                    break

        episode_rewards.append(total_rewards)
        print(f"Total Rewards: {total_rewards}")
    
    return np.mean(episode_rewards, axis=0)


avg_rewards = evaluate_multi_agent(
    trained_agents,
    env,
    players,
    episodes=1,
    render=True
)
print(f"Average rewards across evaluation episodes: {avg_rewards}")