本次作业包含game.py、strategy.py，main.ipynb三个文件。

game.py中实现了收益矩阵Payoff类和玩家基类Player类

strategy.py中实现了课上提到的四种策略（AlwaysCooperatePlayer，AlwaysDefectPlayer，DowningPlayer，RandomPlayer）

请在main.ipynb中完成作业，并基于结尾部分的代码进行验证。

In [68]:
import random

import pandas as pd
from game import Payoff, Player
from strategy import (
    AlwaysCooperatePlayer,
    AlwaysDefectPlayer,
    DowningPlayer,
    RandomPlayer,
)

# 第一题（60分）
请实现以下几种策略：
1. TitForTatPlayer(5分)
2. TitForTwoTatsPlayer(5分)
3. DavisPlayer(5分)
4. GrudgerPlayer（5分）
5. ShubikPlayer（20分）
6. FeldPlayer（20分）

### 1. TitForTatPlayer
以牙还牙，对手上次选择什么，我就选择什么。在第一轮选择合作

In [180]:
class TitForTatPlayer(Player):
    def move(self, opponent: Player):
        return opponent.history[-1] if opponent.history else "C"

### 2.TitForTwoTatsPlayer
前两轮选择合作，之后若对手连续两次背叛，则选择背叛，否则选择合作。

In [181]:
class TitForTwoTatsPlayer(Player):
    def move(self, opponent: Player):
        if len(opponent.history) < 2:
            return "C"
        return "D" if opponent.history[-2:] == ["D", "D"] else "C"

### 3.GrudgerPlayer
若对手从未背叛过，就一直合作；一旦对手背叛一次，就一直选择背叛。

In [182]:
class GrudgerPlayer(Player):
    def move(self, opponent: Player):
        return "D" if "D" in opponent.history else "C"

### 4.DavisPlayer
前10轮一定合作，之后如果发现对方背叛过，则一直背叛，否则一直合作。

In [183]:
class DavisPlayer(Player):
    def move(self, opponent: Player):
        if len(self.history) < 10:
            return "C"
        return "D" if "D" in opponent.history else "C"

### 5. ShubikPlayer
1. 开局示好，第一回合选择合作
2. 以直报怨，如果对方在上一回合背叛（出“D”），而自己上一回合是合作的（出“C”），策略会立刻触发报复。
3. 逐步升级，每次触发报复时：
    
    立刻反击：当前回合直接背叛（D）。

    延长惩罚：记录这次背叛，并让未来的惩罚时间比上一次多1回合。

    例如：第一次被背叛时，惩罚1回合；第二次被背叛时，惩罚2回合，依此类推。

4. 有限惩罚后和解，在设定的惩罚回合数结束后，策略会自动恢复合作。比如：如果当前需要惩罚2回合，那么连续出两次“D”后，第三回合主动变回“C”。

In [184]:
class ShubikPlayer(Player):
    def __init__(self, name: str):
        super().__init__(name)
        self.rage: int = 0
        self.betray_cnt: int = 0

    def move(self, opponent: Player):
        if len(self.history) == 0:
            return "C"

        if self.rage > 0:
            self.rage -= 1
            return "D"

        if opponent.history[-1] == "D" and self.history[-1] == "C":
            self.betray_cnt += 1
            self.rage = self.betray_cnt
            return "D"
        
        return "C"
    
    def reset(self):
        super().reset()
        self.rage = 0
        self.betray_cnt = 0

### 6.费尔德策略FeldPlayer

费尔德策略是一种**合作概率随时间衰减的随机策略**：
1. 开局阶段倾向于合作 (`C`)。
2. 若对手上一回合背叛 (`D`)，则本回合必然背叛。
3. 随着回合数增加，合作概率逐渐降低至预设阈值。在对手上一轮合作的情况下，按当前合作概率随机选择合作或背叛。

#### 合作概率计算
定义当前回合的合作概率为：
$$
p_{\text{coop}} = \max\left( p_{\text{start}} + \left( \frac{p_{\text{end}} - p_{\text{start}}}{T_{\text{decay}}} \right) \cdot t, \ p_{\text{end}} \right)
$$

- **符号说明**：
  - $p_{\text{start}}$: 初始合作概率 (`start_coop_prob`)，设为1.0
  - $p_{\text{end}}$: 最终合作概率 (`end_coop_prob`)，设为0.5
  - $T_{\text{decay}}$: 合作概率衰减周期 (`rounds_of_decay`)，设为200
  - $t$: 当前已进行回合数 (`len(self.history)`)


In [185]:
class FeldPlayer(Player):
    def __init__(self, name: str):
        super().__init__(name)
        self.start_coop_prob: float = 1.0
        self.end_coop_prob: float = 0.5
        self.rounds_of_decay: int = 200

    def move(self, opponent: Player):
        coop_prob: float = max(
            self.end_coop_prob,
            self.start_coop_prob
            + (self.end_coop_prob - self.start_coop_prob)
            / self.rounds_of_decay
            * len(self.history),
        )
        return "C" if random.random() <= coop_prob else "D"

# 第二题（40分）
请补全Tournament类：

该类的属性包括：

| 属性名             | 类型                                                   | 含义与功能说明                                     |
|--------------------|--------------------------------------------------------|----------------------------------------------------|
| `players`          | `List[Player]`                                         | 所有参赛选手（策略）列表                          |
| `payoff`           | `Payoff`                                               | 收益矩阵，决定行动得分规则                        |
| `rounds`           | `int`                                                  | 每场比赛的轮数                                     |
| `_results`         | `Dict[str, int]`  | 存储每个玩家的累计总得分（按名称索引），即`Dict[玩家名, 累计得分]`，得分初始为0 |
| `_match_results`   | `Dict[Tuple[str, str], int]`| 每一组玩家对战中，player1 的得分，即`Dict[（player1名称, player2名称）, player1得分]` ，得分初始为0|
| `_round_history`   | `Dict[Tuple[str, str], List[Tuple[str, str]]]`| 存储每一组玩家在所有回合中的行为（如：`('C','D')`），即`Dict[（player1名称,player2名称）, List[（player1行动, player2行动），（player1行动, player2行动）......]`，list初始为空     |


该类的方法包括：
1. `_match(p1, p2)`：模拟两个玩家p1,p2之间的多轮对战；每轮获取双方动作，更新得分，并记录每一轮行为

    首先初始化双方得分和行为记录。
    然后，循环执行指定轮数的对战。
    在每一轮中，调用p1的move方法获取p1本轮应对p2的动作，然后调用p2的move方法获取p2本轮应对p1的动作。基于收益矩阵获取本轮双方的得分，加入总分中，并记录双方的行动到各自的行动记录中。
    最终，更新_round_history，返回双方各自的总得分。

2. `run()`：循环锦标赛的主控制流程；每两位玩家进行一次 _match()；累加得分并记录比赛结果到 _results 和 _match_results

    遍历每个玩家对（不包括自己与自己对战），对于每个玩家组合p1、p2，对玩家重置后，调用 _match 方法进行对战，将玩家p1的得分计入其总得分（更新_results），将p1与p2的对战结果记录在self._match_results中。   

3. ` __call__()`：让 Tournament 实例可以像函数一样调用；内部自动调用 run() 和 print_rankings()
4. `print_rankings()`：打印总排行榜（按平均得分降序）
5. `get_round_history(p1_name, p2_name)`：返回指定两位玩家对战的每一回合行为,用于复盘分析或可视化展示
6. `get_match_results()`：返回一个对战得分矩阵，行列为玩家名，元素为 player1 的得分，每行计算“平均得分”列供分析排名

以下代码已实现部分，请补充以下部分：
- 属性：`_results` ，`_match_results`，`_round_history`
- 方法：`_match`，`run`



In [186]:
from itertools import product


class Tournament:
    def __init__(self, players: list[Player], payoff: Payoff, rounds: int = 5):
        self.payoff = payoff
        self.players = players
        self.rounds = rounds

        self._results = {p.name: 0 for p in players}
        self._match_results = {
            (p1.name, p2.name): 0
            for p1, p2 in product(players, repeat=2)
            if p1.name != p2.name
        }
        self._round_history = {
            (p1.name, p2.name): []
            for p1, p2 in product(players, repeat=2)
            if p1.name != p2.name
        }

    def _match(self, p1: Player, p2: Player) -> tuple[int, int]:
        results = {p1.name: 0, p2.name: 0}
        for _ in range(self.rounds):
            move1 = p1.move(p2)
            move2 = p2.move(p1)

            p1.record_moves(move1)
            p2.record_moves(move2)
            
            score1, score2 = self.payoff.matrix[(move1, move2)]

            results[p1.name] += score1
            results[p2.name] += score2

            self._round_history[(p1.name, p2.name)].append((move1, move2))

        return results[p1.name], results[p2.name]

    def run(self) -> None:
        for p1, p2 in product(self.players, self.players):
            if p1.name == p2.name:
                continue

            p1.reset()
            p2.reset()

            score1, score2 = self._match(p1, p2)

            self._results[p1.name] += score1
            self._match_results[(p1.name, p2.name)] = score1

    def __repr__(self) -> str:
        return f"Tournament(players={len(self.players)}, rounds={self.rounds})"

    @property
    def rankings(self) -> list[tuple[str, int]]:
        """返回根据得分降序排序后的排行榜"""
        return sorted(self._results.items(), key=lambda x: x[1], reverse=True)

    def print_rankings(self) -> None:
        title = "Axelrod Tournament Results"
        records = [
            f"{rank:>3}. {name:<24} {score / len(self.players):7.0f} points (on average)"
            for rank, (name, score) in enumerate(self.rankings, start=1)
        ]
        max_length = max(len(record) for record in records)
        blank_length = (max_length - len(title)) // 2
        print("=" * blank_length, title, "=" * blank_length)
        for record in records:
            print(record)

    def __call__(self) -> None:
        """使 Tournament 实例可直接调用以运行并输出"""
        self.run()
        self.print_rankings()

    def get_round_history(self, p1_name: str, p2_name: str) -> pd.DataFrame:
        """
        返回指定玩家对战的回合记录，格式为pandas.DataFrame，包含2行N列，分别为player 1和player 2的行动。
        """
        history = self._round_history.get((p1_name, p2_name), None)
        if history is None:
            raise ValueError(f"No round history found for {p1_name} vs {p2_name}")
        return pd.DataFrame.from_records(
            history,
            columns=[f"P1: {p1_name}", f"P2: {p2_name}"],
            index=range(1, self.rounds + 1),
        )

    def get_match_results(self) -> pd.DataFrame:
        """
        以矩阵形式返回每对玩家的对战结果，以pandas.DataFrame格式返回。矩阵的行和列分别为玩家的名称。
        其中，第i行j列表示第i个玩家作为player 1且第j个玩家作为player 2时，player 1的得分和player 2的得分。
        该矩阵为对称矩阵，即第i行j列和第j行i列的值相同。
        """
        player_names = [p for p, s in self.rankings]
        result_matrix = pd.DataFrame(
            index=player_names, columns=player_names, dtype=object
        )

        for (p1_name, p2_name), score1 in self._match_results.items():
            result_matrix.at[p1_name, p2_name] = score1

        result_matrix["Average Score"] = result_matrix.mean(axis=1).transform(
            lambda x: int(round(x))
        )

        return result_matrix

# main(用于验证)

In [187]:
payoff = Payoff(R=3, P=1, S=0, T=10)
payoff

Payoff Matrix:
                   player 2 
                   C       D
player 1  C     (3, 3)  (0, 10)
          D     (10, 0)  (1, 1)

In [188]:
players = [
    AlwaysCooperatePlayer("AlwaysCooperate"),
    AlwaysDefectPlayer("AlwaysDefect"),
    TitForTatPlayer("TitForTat"),
    TitForTwoTatsPlayer("TitForTwoTats"),
    DavisPlayer("Davis"),
    DowningPlayer("Downing", payoff),
    FeldPlayer("Feld"),
    GrudgerPlayer("Grudger"),
    RandomPlayer("Random"),
    ShubikPlayer("Shubik"),
]

In [189]:
random.seed(2025)
tournament = Tournament(players, payoff, rounds=200)
tournament()  # 直接调用 __call__ 方法

  1. Downing                      740 points (on average)
  2. AlwaysDefect                 634 points (on average)
  3. Grudger                      597 points (on average)
  4. Davis                        577 points (on average)
  5. Shubik                       568 points (on average)
  6. TitForTat                    514 points (on average)
  7. Random                       468 points (on average)
  8. TitForTwoTats                456 points (on average)
  9. Feld                         384 points (on average)
 10. AlwaysCooperate              381 points (on average)


In [190]:
tmp_df = tournament.get_round_history("Feld", "Shubik")
display(tmp_df)

Unnamed: 0,P1: Feld,P2: Shubik
1,C,C
2,C,C
3,C,C
4,C,C
5,C,C
...,...,...
196,D,D
197,D,D
198,D,D
199,D,C


In [191]:
tournament.get_match_results()

Unnamed: 0,Downing,AlwaysDefect,Grudger,Davis,Shubik,TitForTat,Random,TitForTwoTats,Feld,AlwaysCooperate,Average Score
Downing,,200.0,206.0,288.0,602.0,602.0,1181.0,722.0,1595.0,2000.0,822
AlwaysDefect,200.0,,209.0,290.0,362.0,209.0,1226.0,218.0,1622.0,2000.0,704
Grudger,226.0,199.0,,600.0,600.0,600.0,1072.0,600.0,1477.0,600.0,664
Davis,208.0,190.0,600.0,,600.0,600.0,1051.0,600.0,1317.0,600.0,641
Shubik,602.0,182.0,600.0,600.0,,600.0,810.0,600.0,1084.0,600.0,631
TitForTat,602.0,199.0,600.0,600.0,600.0,,659.0,600.0,677.0,600.0,571
Random,82.0,92.0,101.0,146.0,317.0,713.0,,985.0,1021.0,1223.0,520
TitForTwoTats,342.0,198.0,600.0,600.0,600.0,600.0,497.0,,528.0,600.0,507
Feld,50.0,38.0,155.0,141.0,417.0,714.0,538.0,789.0,,999.0,427
AlwaysCooperate,0.0,0.0,600.0,600.0,600.0,600.0,333.0,600.0,474.0,,423
