In [1]:
# | default_exp methods


In [2]:
# | hide
# | export
from gh_pages_example.utils import *
from gh_pages_example.model_utils import *
from gh_pages_example.types import *

import collections
import functools
import math
import typing
from typing import Optional, List, Generator, Union, Callable
from warnings import warn

import fastcore.test
from nbdev.showdoc import *
import numpy as np
import nptyping
from scipy.linalg import schur, eigvals
from scipy.sparse import csr_matrix, csc_matrix

  if (ind not in allowed_inds) and (str(ind) not in allowed_inds):


In [3]:
np.set_printoptions(suppress=True)


# Methods in Evolutionary Game Theory

> A set of methods for solving Evolutionary Games (see Nowak 2006 and the references section)

## Evolutionary Dynamics in Finite Populations

We examine a finite population of players using different strategies who engage in social learning.

In the limit of small mutations, most of the time everyone plays the same strategy. States in which everyone plays the same strategy are known as **monomorphic states**. Occassionally, mutant strategies can fixate in the population, resulting in everyone adopting the same new strategy. We can use Markov Chains to analyse the relative frequencies with which each strategy is played by the population.

The steps for computing the ergodic (i.e. long-run, stationary) strategy distribution is as follows:

1. Build a transition matrix between all monomorphic states
2. Find the ergodic distribution for the markov chain defined using this transition matrix

### Fermi social learning

> A Fermi social learning rule means that individuals make pairwise comparisons between their own strategy and and another strategy in the population that they may choose to copy.

#### Derivation

Each period of the evolutionary game involves individuals being randomly selected to play against one another individual.

Letting $Z$ denote the size of the population, and $π$ denote the game's payoff matrix, we can compute the fitness of a strategy, $B$ for example, when $k$ individuals are of type $B$ as follows:

\begin{equation}
ΠB_k = πBA \frac{k-1}{Z - 1} + πBB \frac{Z-k}{Z- 1}
\end{equation}

where $πBA$ and $πBB$ are the payoffs for playing $B$ against type $A$ or $B$ respectively.

The **Fermi social learning rule** adopts strategy $B$ selected from the population over their current strategy $A$ with probability given by:

\begin{equation}
Pr(adopt \, B | k) = \frac{1}{(1 + \exp^{-\beta (ΠB_k - ΠA_k)})}
\end{equation}

where $ΠB_k - ΠA_k$ is the relative fitness of strategy $B$ over $A$ in a population with $k$ individuals of type $B$, the rest of type $A$. Notice how the larger the relative fitness, the closer the denominator, and therefore the probability, is to $1$.

Using the Fermi social learning rule above, we can write the probability of increasing the number of type $B$ individuals as

\begin{equation}
T^+_B(k) = \frac{Z-k}{Z} \frac{k}{Z} Pr(adopt \, B | k) 
\end{equation}
Z
as an individual of type $A$ needs to randomly be chosen to compare their strategy against someone of type $B$.

and the probability of decreasing the number of type $B$ individuals as

\begin{equation}
T^-_B(k) = \frac{k}{Z} \frac{Z-k}{Z} Pr(adopt \, A | k) 
\end{equation}

as an individual of type $B$ needs to randomly be chosen to compare their strategy against someone of type $A$.

We will often employ their ratio, which is: 

\begin{equation}
\frac{T^-_B(k)}{T^+_B(k)} = \frac{Pr(adopt \, A | k) }{Pr(adopt \, B | k)} = \frac{1 + \exp^{-\beta (ΠB_k - ΠA_k)}}{1 + \exp^{-\beta (ΠA_k - ΠB_k)}}
\end{equation}

Notice that $\frac{1 + \exp^x}{1 + \exp^{-x}} = \exp^{x}$

So, this ratio simplifies to $\frac{T^-_B(k)}{T^+_B(k)} =  \exp^{-\beta (ΠB_k - ΠA_k)}$


#### Definition

In [4]:
# | export

def fermi_learning(fitnessA: nptyping.NDArray,  # fitness of strategy A
                   fitnessB: nptyping.NDArray,  # fitness of strategy B
                   β: nptyping.NDArray,  # learning rate
                   ) -> nptyping.NDArray:
    """Compute the likelihood that a player with strategy A adopts strategy B using the fermi function."""
    return (1 + np.exp(-β*(fitnessB - fitnessA)))**(-1)


#### Examples and Tests

When each strategy has the same fitness, then the likelihood that a player adopts strategy $B$ is 50%, no matter the value of $\beta$.

In [5]:
x = fermi_learning(np.array([5]),
                   np.array([5]),
                   np.array([1]),)
nptyping.assert_isinstance(
    x, nptyping.NDArray[nptyping.Shape["1"], typing.Any])
fastcore.test.test_eq(x, 0.5)


### Fixation rate

> The fixation rate for type B in a population of type A, $\rho$, is defined as the probability that the appearance of a mutant of type B leads to the entire population adopting type B instead of A, i.e. what is the likelihood that a mutant of type B invades population A.

#### Derivation


A derivation of the fixation rate defined below can be found in Nowak 2006 (reproduced below).

> Consider a one-dimensional stochastic process on a discrete state space, $ i \in \{0, 1, \cdots, N\}$ that represents the number of individuals in a population of $N$ individuals who are of type $B$, the rest are type $A$.
>
> In each stochastic event, the number of individuals of type $B$ can at most increase or decrease by 1.
>
> For a given number of individuals, $i$, let $a_i$, $b_i$, and $1 - a_i - b_i$ represent the chance of an increase, decrease, or no change in $i$.
> 
> This stochastic process follows the transition matrix ,$P$ (*not to be confused with the transition matrices we discuss elsewhere!*)
>
>
> \begin{equation}
P \, = \, \begin{pmatrix}
1 & 0 & 0 & \cdots & 0 & 0 & 0\\
b_1 & (1 - a_1 - b_1) & a_1 & \cdots & 0 & 0 & 0\\
\vdots & \vdots & \vdots & \ddots & \vdots & \vdots & \vdots\\
0 & 0 & 0 & \cdots & b_{n-1} & (1 - a_{n-1} - b_{n-1}) & a_{n-1}\\
0 & 0 & 0 & \cdots & 0 & 0 & 1\\
\end{pmatrix}
\end{equation}
>
> Denote by $x_i$ the probability of reaching state $N$ when starting from $i$.
>
> From transition matrix $P$ above, we can see that $x_i$ must satisfy:
>
> $x_0 = 0$
>
> $x_i = b_i x_{i-1} + (1 - a_i - b_i) x_i + a_i x_{i+1}$
>
> $x_N = 1$
>
> The fixation rate for a mutant B in a population of type A is clearly $x_1$
>
> We can solve for $x_i$ by rewriting the above as $b_i x_i - b_i  x_{i-1} = a_i x_{i+1} - a_i x_i$.
> 
> We can denote $y_i = x_i - x_{i-1}$ to simplify the above to $y_{i+1} = \frac{b_i}{a_i} y_i$
>
> Notice that $\sum_{i=1}^N{y_i} = x_N - x_0 = 1$ and that $y_1 = x_1$
>
> We can use the above to write
\begin{equation}
x_1 + {\sum_{i=2}^N{y_i}} = x_1 (1 + {\sum_{i=1}^{N-1}{\prod_{j=1}^{i} \frac{b_j}{a_j}}}) = 1
\end{equation}
>
> And so
\begin{equation}
x_1 = \frac{1}{(1 + \sum_{i=1}^{N-1}{\prod_{j=1}^{i} \frac{b_j}{a_j}})}
\end{equation}
>
> Note that $x_1$ is the fixation rate for a mutant $B$ in a population of type $A$, often denoted as $\rho$.
>
> *Also note that $1 - x_{N-1}$ is the fixation rate for a mutant $A$ in a population of type $B$. We could find expressions for all $x_i$ if we note that $x_i = x_1 (1 + \sum_{j=1}^{i-1}{\prod_{k=1}^{j} \frac{b_k}{a_k}})$ (see Nowak 2006 for further details).*

We can use our definitions above to determine when the fixation rate for a mutant $B$ in a population of type $A$ is greater than that for a mutant $A$ in a population of type $B$. 

This condition requires that $x_1 > 1 - x_{N-1}$, i.e. $\frac{1}{(1 + \sum_{i=1}^{N-1}{\prod_{j=1}^{i} \frac{b_j}{a_j}})} > \frac{\prod_{j=1}^{N-1} \frac{b_j}{a_j}}{(1 + \sum_{i=1}^{N-1}{\prod_{j=1}^{i} \frac{b_j}{a_j}})}$.

Using the fermi social learning rule and the aforementioned simplifications, we can see that this condition holds true whenever $1 > \exp^{-\beta \sum_{j=1}^{N-1}{\Pi_B(j) - \Pi_A(j)}}$ which implies $\sum_{j=1}^{N-1}{\Pi_B(j)} > \sum_{j=1}^{N-1}{\Pi_A(j)}$.

Lastly, we can make use of the equation $\sum_{j=1}^{N-1}{j}=\frac{(N-1) N}{2}$ to simplify this condition to $\pi_{BA} + \pi_{BA} > \pi_{AA} + \pi_{AB}$

This is exactly the risk dominance condition implied by 2 by 2 payoff matrices. The risk dominance condition has been used in the literature to offer a reason to motivate selecting one monomorphic equilibria over another in such games. In such games there is a precise connection between risk dominance and the monomorphic equilibria selected for by social learning. This connection disappears in games with larger payoff matrices (which is why theorists tends to consider the concept of stochastic stability instead, perhaps using Young's method (Young 2003)).

Even in games with more than 2 players (or populations), we can make use of this condition to tell us in which direction the fixation rate is stronger between two strategies. At times, this is enough to gain an intuition for the gradient of selection present in polymorphic states where multiple strategies coexist in one or more populations.

#### Definition

In [6]:
# | export

T_type = list[nptyping.NDArray[nptyping.Shape["N_models"], typing.Any]]


def fixation_rate(Tplus: T_type,  # A list of NDarrays, one array (of size n_models) for each possible number of mutants in the population; the probability of gaining one mutant
                  # A list of NDarrays, one array (of size n_models) for each possible number of mutants in the population; the probability of losing one mutant
                  Tneg: T_type,
                  ) -> nptyping.NDArray[nptyping.Shape["N_models"], typing.Any]:  # Fixation rates for the given strategy in each model
    """Calculate the likelihood that a mutant invades the population."""
    Z = len(Tplus) + 1
    ρ = (np.sum([np.prod([Tneg[j-1]/Tplus[j-1]
                         for j in range(1, i+1)],
                         axis=0,
                         keepdims=False)
                 for i in range(1, Z)],
                axis=0,
                keepdims=False)
         + 1)**-1
    # The fixation rate may be very close to 0. Innacuracies with floats
    # may mean that we run into issues later on. We assume the fixation rate
    # never drops below 1e-10.
    ρ = np.maximum(ρ, 1e-10)
    return ρ


In [7]:
show_doc(fixation_rate)


---

[source](https://github.com/PaoloBova/gh-pages-example/blob/main/gh_pages_example/methods.py#L45){target="_blank" style="float:right; font-size:smaller"}

### fixation_rate

>      fixation_rate (Tplus:list[nptyping.base_meta_classes.NDArray],
>                     Tneg:list[nptyping.base_meta_classes.NDArray])

Calculate the likelihood that a mutant invades the population.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| Tplus | list | A list of NDarrays, one array (of size n_models) for each possible number of mutants in the population; the probability of gaining one mutant |
| Tneg | list | A list of NDarrays, one array (of size n_models) for each possible number of mutants in the population; the probability of losing one mutant |
| **Returns** | **NDArray** | **Fixation rates for the given strategy in each model** |

#### Examples and Tests

When the chance of gaining a mutant always equals the chance of losing a mutant, then the fixation rate will be $\frac{1}{Z}$

Note that because we have to sample the population for a mutant and the player of the type being invaded, the chance of gaining or losing a mutant can be no greater than $\frac{k}{Z} \frac{Z-k}{Z}$

In [8]:
Z = 2  # With Z=2, we only need to evaluate Tplus and Tneg for when k=1
Tplus_example = [np.array([1/8])]
Tneg_example = [np.array([1/8])]


In [9]:
# |  hide
# validate test inputs
assert len(Tplus_example) == len(Tneg_example)
for tplus, tneg in zip(Tplus_example, Tneg_example):
    assert tplus.shape == tneg.shape


In [10]:
fixation_rate_result = fixation_rate(Tplus_example, Tneg_example)


In [11]:
# | hide
nptyping.assert_isinstance(fixation_rate_result,
                           nptyping.NDArray[nptyping.Shape["1"], typing.Any])


True

In [12]:
fastcore.test.test_eq(fixation_rate_result, np.array([0.5]))


When the chance of gaining a mutant is half the chance of losing a mutant, then the fixation rate will be

\begin{equation}
\rho = \frac{1}{(1 + \sum_{j=1}^{Z-1}{2^j})}
\end{equation}

When $Z=2$, we have $\rho = \frac{1}{3}$

In [13]:
Z = 2  # With Z=2, we only need to evaluate Tplus and Tneg for when k=1
Tplus_example = [np.array([0.1])]
Tneg_example = [np.array([0.2])]


In [14]:
# |  hide
# validate test inputs
assert len(Tplus_example) == len(Tneg_example)
for i, tplus in enumerate(Tplus_example):
    assert tplus.shape == Tneg_example[i].shape


In [15]:
fixation_rate_result = fixation_rate(Tplus_example, Tneg_example)


In [16]:
fastcore.test.test_eq(fixation_rate_result, np.array([1/3]))


In [17]:
# | hide
nptyping.assert_isinstance(fixation_rate_result,
                           nptyping.NDArray[nptyping.Shape["1"], typing.Any])


True

We could instead consider an example where we have a mutant Defector (D) who appears in a population of Cooperators (C) playing a standard Prisoner's Dilemma.

We will consider an example of such a scenario where chance of gaining/losing a D player be given by $\frac{1}{1 + e^{\pm \beta \frac{Z+1}{Z-1}}}$.

The fixation rate will be given by the following expression:

\begin{equation}
\rho = \frac{1}{1 + \sum_{j=1}^{Z-1}{(\frac{1 + e^{- \beta \frac{Z+1}{Z-1}}}{1 + e^{\beta \frac{Z+1}{Z-1}}})^j}}
\end{equation}

For this example, we will let $\beta=1$ and $Z=10$, so $\beta \frac{Z+1}{Z-1} = \frac{11}{9}$.

In [18]:
β = 1
Z = 10
ρ_CD = 1 / (1 + sum((1 + np.exp(- β * (Z + 1) / (Z-1)))**j
                    / (1 + np.exp(β * (Z + 1) / (Z-1)))**j
                    for j in range(1, Z)))
Tplus_example = [np.array([1 / (1 + np.exp(- β * (Z + 1) / (Z-1)))])
                 for _ in range(Z-1)]
Tneg_example = [np.array([1 / (1 + np.exp(β * (Z + 1) / (Z-1)))])
                for _ in range(Z-1)]


In [19]:
# |  hide
# validate test inputs
assert len(Tplus_example) == len(Tneg_example)
for i, tplus in enumerate(Tplus_example):
    assert tplus.shape == Tneg_example[i].shape


In [20]:
# | hide
nptyping.assert_isinstance(fixation_rate(Tplus_example, Tneg_example),
                           nptyping.NDArray[nptyping.Shape["1"], typing.Any])


True

In [21]:
fastcore.test.is_close(fixation_rate(Tplus_example, Tneg_example), ρ_CD)


True

Finally, it is useful to know how the fixation rate behaves when any elements of Tplus are zero (as the fixation rate divides by those elements). Even though the Fermi learning rule we use theoretically gives a number between 0 and 1 exclusive, in practise the number may underflow to a 0 if low enough. This will cause unexpected behaviour if we allow it in our alogorithm for computing the transition matrix.

We can avoid this issue by using a slightly altered method for calculating the fixation rate, taking advantage of our choice to use the `fermi_learning` rule.

In the above fixation rate calculations we used the `fermi_learning` function to calculate the probability of a player with strategy $D$ adopting strategy $C$ (and likewise for the probability of a player with $C$ adopting $D$). Their ratio takes the form, $\frac{1 + e^x}{1 + e^{-x}}$. It is not too hard to verify that $\frac{1 + e^x}{1 + e^{-x}} = e^x$.

Moreover, we can avoid taking the product of the ratios at all, since the product of exponentials (with the same base) is just the exponential of the sum of their exponents.

By using the above substitution and algebraic manipulation, we can substantially mitigate the numerical stability issues. For this reason, we will not use `fermi_learning` nor `fixation_rate` in our algorithm at all (although in most cases we would expect these methods to yield the same answers). Instead, we will use `fixation_rate_stable`.

In [22]:
# | export

@multi
def fixation_rate_stable(ΠA: list,  # Average payoffs for the strategy A they consider adopting for each number of mutants following A
                         ΠB: list,  # Average payoffs for the strategy B that the player currently follows for each number of mutants following A
                         β: Array1D,  # learning rate
                         method="cheap", # method to dispatch on
                         ):
       return method

@method(fixation_rate_stable)
def fixation_rate_stable(ΠA: list,  # Average payoffs for the strategy A they consider adopting for each number of mutants following A
                         ΠB: list,  # Average payoffs for the strategy B that the player currently follows for each number of mutants following A
                         β: Array1D,  # learning rate
                         method=None, # method to dispatch on
                         ):
    """Calculate the likelihood that a mutant B invades population A
    using a numerically stable method."""
    fastcore.test.test_eq(len(ΠA), len(ΠB))
    Z = len(ΠA) + 1
    ρ = (np.sum([np.exp(np.clip(np.sum([-β*(ΠB[j-1] - ΠA[j-1])
                                        for j in range(1, i+1)],
                                       axis=0,
                                       keepdims=False),
                                -500,
                                500))  # avoid underflow/overflow warnings
                 for i in range(1, Z)],
                axis=0,
                keepdims=False)
         + 1)**-1
    # The fixation rate may be very close to 0. Innacuracies with floats
    # may mean that we run into issues later on. We assume the fixation rate
    # never drops below 1e-10.
#     ρ = np.maximum(ρ, 1e-15)
    return ρ

@method(fixation_rate_stable, "cheap")
def fixation_rate_stable(ΠA: list,  # Average payoffs for the strategy A they consider adopting for each number of mutants following A
                         ΠB: list,  # Average payoffs for the strategy B that the player currently follows for each number of mutants following A
                         β: Array1D,  # learning rate
                         method=None, # method to dispatch on
                         ):
    """Calculate the likelihood that a mutant B invades population A
    using a numerically stable method."""
    fastcore.test.test_eq(len(ΠA), len(ΠB))
    Z = len(ΠA) + 1
    # avoid underflow/overflow warnings
    ρ = (np.sum(np.exp(np.clip(np.cumsum([-β*(ΠB[j-1] - ΠA[j-1])
                                          for j in range(1, Z)],
                                         axis=0),
                               -500,
                               500)),  
                axis=0,
                keepdims=False)
         + 1)**-1
    # The fixation rate may be very close to 0. Innacuracies with floats
    # may mean that we run into issues later on. We assume the fixation rate
    # never drops below 1e-10.
#     ρ = np.maximum(ρ, 1e-15)
    return ρ

@method(fixation_rate_stable, "cheap2")
def fixation_rate_stable(ΠA: list,  # Average payoffs for the strategy A they consider adopting for each number of mutants following A
                         ΠB: list,  # Average payoffs for the strategy B that the player currently follows for each number of mutants following A
                         β: Array1D,  # learning rate
                         method=None, # method to dispatch on
                         ):
    """Calculate the likelihood that a mutant B invades population A
    using a numerically stable method."""
    fastcore.test.test_eq(len(ΠA), len(ΠB))
    Z = len(ΠA) + 1
    # avoid underflow/overflow warnings
    ρ = (np.sum(2**(np.clip(np.cumsum([-β*(ΠB[j-1] - ΠA[j-1])
                                          for j in range(1, Z)],
                                         axis=0),
                               -500,
                               500)),  
                axis=0,
                keepdims=False)
         + 1)**-1
    # The fixation rate may be very close to 0. Innacuracies with floats
    # may mean that we run into issues later on. We assume the fixation rate
    # never drops below 1e-10.
#     ρ = np.maximum(ρ, 1e-15)
    return ρ


We can see in the examples which follow that both methods usually give the same answers.

To match an earlier example where `Tplus` and `Tneg` were both equal to $\frac{1}{8}$ (as $Z=2$ we only need to consider one value for each when $k=1$), we let $\beta=1$ and recall that $T^+_B(k) = \frac{Z-k}{Z} \frac{k}{Z} Pr(adopt \, B | k) = \frac{Z-k}{Z} \frac{k}{Z} \frac{1}{1 + \exp^{-\beta (ΠB(k) - ΠA(k))}} $

We can then say that $ΠA - ΠB = \log{(\frac{1}{\frac{4}{8}} - 1)} = \log{\frac{4}{4}} = \log{4} - \log{4}$

Notice that to achieve netural drift, the payoffs have to be equal.

In [23]:
Z = 2
β = 1
ΠA = [np.array([np.log(4)])]
ΠB = [np.array([np.log(4)])]
result = fixation_rate_stable(ΠA, ΠB, β)
fastcore.test.test_close(result, 0.5)
result = fixation_rate_stable(ΠA, ΠB, β, method=None)
fastcore.test.test_close(result, 0.5)
result = fixation_rate_stable(ΠA, ΠB, β, method="cheap")
fastcore.test.test_close(result, 0.5)


We can also consider an example from a payoff matrix I've run into in practise.

In [24]:
payoffs = np.array([[51, 0.6, 51],
                    [114.3, 57.75, 39.38],
                    [51, 0.99798, 51]])


We are interested in the fixation rate of a mutant B in a population of A

Strategy A is the strategy represented by row 3

Strategy B is the strategy represented by row 2


In [25]:
Z = 100
β = 1


We need only the average payoffs for the stable calculation.

In [26]:
ΠA = [k/(Z-1) * payoffs[2, 1] + (Z-k-1)/(Z-1) * payoffs[2, 2]
      for k in range(1, Z)]
ΠB = [(k-1)/(Z-1) * payoffs[1, 1] + (Z-k)/(Z-1) * payoffs[1, 2]
      for k in range(1, Z)]

result_stable = fixation_rate_stable(ΠA, ΠB, β)
# method=None may not be the default forever
result_stable_none = fixation_rate_stable(ΠA, ΠB, β, method=None)
result_stable_cheap = fixation_rate_stable(ΠA, ΠB, β, method="cheap")


We also need the adoption rates for the unstable calculation

In [27]:
Tneg = [fermi_learning(ΠB[k-1], ΠA[k-1], β)
        for k in range(1, Z)]
Tplus = [fermi_learning(ΠA[k-1], ΠB[k-1], β)
         for k in range(1, Z)]

# Naiive and unstable calculation
result_unstable = fixation_rate(Tplus, Tneg)


In [28]:
fastcore.test.test_close(result_stable, 0)
fastcore.test.test_close(result_stable_none, 0)
fastcore.test.test_close(result_stable_cheap, 0)
fastcore.test.test_close(result_unstable, 0)


In terms of efficiency, the `"cheap"` method is much faster for
computing `fixation_rate_stable`.

In [29]:
Z = 2
β = 1
ΠA = [np.log(np.array([4 for _ in range(int(1e6))])) for _ in range(100)]
ΠB = [np.log(np.array([4 for _ in range(int(1e6))])) for _ in range(100)]

In [30]:
# result = fixation_rate_stable(ΠA, ΠB, β, method=None)
# fastcore.test.test_close(result[0], 1/101)

In [31]:
# result = fixation_rate_stable(ΠA, ΠB, β, method="cheap")
# fastcore.test.test_close(result[0], 1/101)

### Build transition matrix

Recall that step 1 of finding the solution to the Evolutionary Game dynamics is to build a transition matrix between all monomorphic states. 

The transition matrix captures the probability that if the population of the Evolutionary Game transitions to another state. We read an entry of the transition matrix as saying the probability of transitioning from the row state to column state.

In [32]:
# | export
class ModelTypeEGT():
    """This is the schema for an Evolutionary Game Theory model.

    Note: This schema is not enforced and is here purely for documentation
    purposes."""

    def __init__(self,
                 Z: int,  # the size of the population
                 strategy_set: list[str],  # the set of strategies in the model
                 β: Array1D,  # the learning rate
                 payoffs: Array3D,  # the payoffs of the game
                 transition_matrix: Array3D = None,  # the model's transition matrix
                 ergodic: Array2D = None,  # ergodic distribution of the model's markov chain
                 ):
        pass


In [33]:
show_doc(ModelTypeEGT)


---

[source](https://github.com/PaoloBova/gh-pages-example/blob/main/gh_pages_example/methods.py#L153){target="_blank" style="float:right; font-size:smaller"}

### ModelTypeEGT

>      ModelTypeEGT (Z:int, strategy_set:list[str],
>                    β:gh_pages_example.types.Array1D,
>                    payoffs:gh_pages_example.types.Array3D,
>                    transition_matrix:gh_pages_example.types.Array3D=None,
>                    ergodic:gh_pages_example.types.Array2D=None)

This is the schema for an Evolutionary Game Theory model.

Note: This schema is not enforced and is here purely for documentation
purposes.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| Z | int |  | the size of the population |
| strategy_set | list |  | the set of strategies in the model |
| β | Array1D |  | the learning rate |
| payoffs | Array3D |  | the payoffs of the game |
| transition_matrix | Array3D | None | the model's transition matrix |
| ergodic | Array2D | None | ergodic distribution of the model's markov chain |

In [34]:
# | export
# | hide
@multi
def build_transition_matrix(models: dict  # A dictionary that contains the parameters in `ModelTypeEGT`
                            ):
    """Build a transition matrix between all monomorphic states using the
    fermi social learning rule."""
    return models.get('dispatch-type')


@method(build_transition_matrix)
def build_transition_matrix(models: dict  # A dictionary that contains the parameters in `ModelTypeEGT`
                            ):
    """Build a transition matrix between all monomorphic states
    using the fermi social learning rule for each model.    
    """

    Z, S, β = [models[k] for k in ['Z', 'strategy_set', 'β']]
    π = models['payoffs']
    n_models = π.shape[0]
    M = np.zeros((n_models, len(S), len(S)))
    for row_ind, s in enumerate(S):
        for col_ind, sₒ in enumerate(S):
            if row_ind == col_ind:
                M[:, row_ind, row_ind] += 1
                # We calibrate these entries later so rows add up to 1
                continue
            πAA = π[:, row_ind, row_ind]
            πAB = π[:, row_ind, col_ind]
            πBA = π[:, col_ind, row_ind]
            πBB = π[:, col_ind, col_ind]
            ΠA = [πAA*(Z-k-1)/(Z-1) + πAB*k/(Z-1)
                  for k in range(1, Z)]
            ΠB = [πBA*(Z-k)/(Z-1) + πBB*(k-1)/(Z-1)
                  for k in range(1, Z)]
            # We use a numerically stable method to find the fixation rate, ρ.
            # ρ is the probability that mutant B successfully invades A
            ρ = fixation_rate_stable(ΠA, ΠB, β)
            M[:, row_ind, col_ind] = ρ / max(1, len(S)-1)
            M[:, row_ind, row_ind] -= ρ / max(1, len(S)-1)
    return {**models, "transition_matrix": M}


In [35]:
# | export
# | hide
@method(build_transition_matrix, 'unstable')
def build_transition_matrix(models: dict  # A dictionary that contains the parameters in `ModelTypeEGT`
                            ):
    """Build a transition matrix using a numerically unstable method."""

    Z, S, β = [models[k] for k in ['Z', 'strategy_set', 'β']]
    π = models['payoffs']
    n_models = π.shape[0]
    M = np.zeros((n_models, len(S), len(S)))
    for row_ind, s in enumerate(S):
        for col_ind, sₒ in enumerate(S):
            if row_ind == col_ind:
                M[:, row_ind, row_ind] += 1
                # We calibrate these entries later so rows add up to 1
                continue
            πAA = π[:, row_ind, row_ind]
            πAB = π[:, row_ind, col_ind]
            πBA = π[:, col_ind, row_ind]
            πBB = π[:, col_ind, col_ind]
            ΠA = [πAA*(Z-k-1)/(Z-1) + πAB*k/(Z-1)
                  for k in range(1, Z)]
            ΠB = [πBA*(Z-k)/(Z-1) + πBB*(k-1)/(Z-1)
                  for k in range(1, Z)]
            Tneg = [fermi_learning(ΠB[k-1], ΠA[k-1], β)
                    for k in range(1, Z)]
            Tplus = [fermi_learning(ΠA[k-1], ΠB[k-1], β)
                     for k in range(1, Z)]
            ρ = fixation_rate(Tplus, Tneg)
            M[:, row_ind, col_ind] = ρ / max(1, len(S)-1)
            M[:, row_ind, row_ind] -= ρ / max(1, len(S)-1)
    return {**models, "transition_matrix": M}


In [36]:
show_doc(build_transition_matrix.__dispatch_fn__)


---

[source](https://github.com/PaoloBova/gh-pages-example/blob/main/gh_pages_example/methods.py#L1612){target="_blank" style="float:right; font-size:smaller"}

### build_transition_matrix

>      build_transition_matrix (models:dict)

Build a transition matrix between all monomorphic states using the
fermi social learning rule.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| models | dict | A dictionary that contains the parameters in `ModelTypeEGT` |

#### Examples and Tests

Consider the following two examples.

**Example 1**

Let all payoffs be equal in the game's payoff matrix. All expected payoffs will be equal too.

So, Fermi learning will say that each individual has a 50% chance of adopting the behaviour of the one they observe.

We therefore have an equal chance during each epoch of gaining or losing an individual of the given type, in this example we denote the type as $s \in \{A, B\}$, although this probability depends on population size $Z$ and the current number of individuals of that type, $k$, $T^+_s(k) = T^-_s(k) = \frac{Z-k}{Z} \frac{k}{Z} \frac{1}{2}$.

Recall that we calculate the fixation rate, $\rho$ as follows:
\begin{equation}
\rho = \frac{1}{1 + \sum_{j=1}^{N-1}{\prod_{k=1}^{j} \frac{b_k}{a_k}}}
\end{equation}
where $N=Z$, $b_k = T^-_s(k)$ and $a_k = T^+_s(k)$

In this example, for each strategy $s$, $T^-_s(k) = T^+_s(k), \, \forall k$, so $\rho = \frac{1}{Z}$.

We only have $2$ strategies, and $Z=10$, so the final transition matrix will look like

\begin{equation}
M \, = \, \begin{pmatrix}
1 - \frac{\rho}{2 - 1} & \frac{\rho}{2 - 1} &\\
\frac{\rho}{2 - 1} & 1 - \frac{\rho}{2 - 1}\\
\end{pmatrix}
= \begin{pmatrix}
0.9 & 0.1 &\\
0.1 & 0.9\\
\end{pmatrix}
\end{equation}

Note that the above example describes neutral drift, the idea that even if there is no advantage to be gained from any particular strategy, social learning can still result in the spread of that behaviour. Neutral drift also occurs if we set the Fermi learning rate $\beta = 0$, no matter what payoff matrix describes the game.

In [37]:
payoffs = np.array([[[2, 2],
                     [2, 2]]
                    ])
Z = 10
β = 1
models = {"payoffs": payoffs,
          "Z": Z,
          "β": β,
          "strategy_set": ["A", "B"],
          }
result = build_transition_matrix(models)


In [38]:
fastcore.test.test_close(result['transition_matrix'],
                         np.array([[0.9, 0.1],
                                   [0.1, 0.9]]))


**Example 2**

Let the payoff matrix be akin to a Prisoner's Dilemma with two strategies, $C$ or $D$ (Cooperate or Defect respectively):

\begin{pmatrix}
2 & 0\\
3 & 1\\
\end{pmatrix}

Again, for this simple example, the relative average success of strategy $C$ is independent of the number of $C$ players, $k$. This is rarely the case in practise but permits a more legible example.

$C$'s relative success over $D$ will be $\frac{2 (k-1)}{Z-1} - \frac{3 k + (Z - k - 1)}{Z-1} = - \frac{Z + 1}{Z-1}$.

Fermi learning means the probability of a $D$ player adopting what they see $C$ do is:

\begin{equation}
\frac{1}{1 + e^{- \beta (\Pi_C(k) - \Pi_D(k))}} = \frac{1}{1 + e^{\beta \frac{Z + 1}{Z-1}}}
\end{equation}

The fixation rate for mutant $C$ in a population of $D$ players, $\rho_{DC}$, can be computed as

\begin{equation}
\rho_{DC} = \frac{1}{1 + \sum_{j=1}^{Z-1}{(\frac{1 + e^{\beta \frac{Z + 1}{Z-1}}}{1 + e^{-\beta \frac{Z + 1}{Z-1}}})^j}}
\end{equation}

Similarly, the fixation rate for mutant $D$ in a population of $C$ players, $\rho_{CD}$, can be computed as 

\begin{equation}
\rho_{CD} = \frac{1}{1 + \sum_{j=1}^{Z-1}{(\frac{1 + e^{-\beta \frac{Z + 1}{Z-1}}}{1 + e^{\beta \frac{Z + 1}{Z-1}}})^j}}
\end{equation}

For $Z=10$ and $\beta = 1$, the above yields the following transition matrix,

\begin{equation}
M \, = \, \begin{pmatrix}
1 - \frac{\rho_{CD}}{2 - 1} & \frac{\rho_{CD}}{2 - 1} &\\
\frac{\rho_{DC}}{2 - 1} & 1 - \frac{\rho_{DC}}{2 - 1}\\
\end{pmatrix}
\approx \begin{pmatrix}
0.295 & 0.705 &\\
0.000 & 1.000\\
\end{pmatrix}
\end{equation}



Note how in the above fixation rate calculations how we used the `fermi_learning` function to calculate the probability of a player with strategy $D$ adopting strategy $C$ (and likewise for the probability of a player with $C$ adopting $D$). This function has special properties which aid us in calculating the fixation rate.

Notice how the ratio of the two adoption rates takes the form, $\frac{1 + e^x}{1 + e^{-x}}$. It is not too hard to verify that $\frac{1 + e^x}{1 + e^{-x}} = e^x$.

We utilities this property to considerably improve the numerical stability of our algorithm for building a transition matrix. For this reason, we do not use `fermi_learning` in our algorithm at all.

We can similarly note that $\frac{1}{1 + e^{-x}} = 1 - \frac{1}{1 + e^{x}}$, i.e. the two adoption rates are complementary probabilities.


In [39]:
payoffs = np.array([[[2, 0],
                     [3, 1]],
                    ])
Z = 10
β = 1
models = {"payoffs": payoffs,
          "Z": Z,
          "β": β,
          "strategy_set": ["C", "D"],
          }
result = build_transition_matrix(models)


In [40]:
ρ_CD = 1 / (1 + sum((1 + np.exp(- β * (Z + 1) / (Z-1)))**j
                    / (1 + np.exp(β * (Z + 1) / (Z-1)))**j
                    for j in range(1, Z)))
ρ_DC = 1 / (1 + sum((1 + np.exp(β * (Z + 1) / (Z-1)))**j
                    / (1 + np.exp(- β * (Z + 1) / (Z-1)))**j
                    for j in range(1, Z)))


In [41]:
ρ_CD_alt = 1 / (1 + sum(np.exp(- j * β * (Z + 1) / (Z-1))
                        for j in range(1, Z)))
ρ_DC_alt = 1 / (1 + sum(np.exp(j * β * (Z + 1) / (Z-1))
                        for j in range(1, Z)))


In [42]:
fastcore.test.test_close(ρ_CD, ρ_CD_alt)
fastcore.test.test_close(ρ_DC, ρ_DC_alt)


In [43]:
fastcore.test.test_close(result['transition_matrix'],
                         np.array([[1 - ρ_CD, ρ_CD],
                                   [ρ_DC, 1 - ρ_DC]]))


#### Example 3

Here is an additional example for the 3 by 3 matrix we discussed when testing other functions.

This time, we make sure we get the correct probabilities for each transition.

In [44]:
payoffs = np.array([[[51, 0.6, 51],
                     [114.3, 57.75, 39.38],
                     [51, 0.99798, 51]],
                    ])


In [45]:
expected = np.array([[[0.495, 0.5, 0.005],
                     [0, 1, 0],
                     [0.005, 0, 0.995]],
                     ])


In [46]:
Z = 100
β = 1
models = {"payoffs": payoffs,
          "Z": Z,
          "β": β,
          "strategy_set": ["AS", "AU", "PS"],
          }
result = build_transition_matrix(models)
fastcore.test.test_close(result['transition_matrix'], expected)


### Find ergodic strategy distribution

Step 2 is to find the ergodic distribution for the Evolutionary Game using the transition matrix we constructed in step 1.

Let $M$ denote the transition matrix, and $\omega_t$ be the column vector describing the proportions with which each strategy is played in the population.

We can describe the evolution of this system with $\omega_{t+1} = M^T \omega_t$, i.e. the proportion of players that use a given strategy in the next round will be equal to the sum of the proportions of players for each strategy who adopted that strategy in the current round. Equivalently, we can also consider $\omega_t$ as describing the probabilities that the system at time t is in each of the monomorphic states.

As each of the monomporphic states described in the transition matrix is reachable from any other with some probability and since the transition probabilities only depend on the current state, what we have is a markov chain which is irreducible.

The ergodicity theorem guarantees that such irreducible and aperiodic markov chains have an ergodic distribution that the system converges to, no matter where it starts. An ergodic distribution (also called a stationary distribution),  $\omega^*$ satisfies  $\omega^* = M^T \omega^*$ [[1]](https://gregorygundersen.com/blog/2019/10/28/ergodic-markov-chains/) [[2]](http://www.stat.columbia.edu/~liam/teaching/neurostat-spr11/papers/mcmc/Ergodicity_Theorem.pdf) [[3]](https://textbooks.math.gatech.edu/ila/1553/stochastic-matrices.html).

Our ergodic distribution, $\omega^*$, is therefore defined as the normalised right-hand eigenvector with eigenvalue 1 of the transposed transition matrix, $M^T$ (or equivalently, if we defined $\omega$ as a row vector instead, $\omega^*$ would be the left-hand eigenvector with eigenvalue 1 of transition matrix, $M$; numerical computing packages usually return the right-hand eigenvectors more directly, which is why I used the other formalism).

We use standard linear algebra methods from the [numpy](https://numpy.org/) package to find this eigenvector. These numerical methods will usually not return an eigenvector which is normalised to sum to 1, so we must normalise the eigenvector we are given. See their documentation to learn more about these numerical methods.

In [47]:
# | export
def find_ergodic_distribution(models: dict  # A dictionary that contains the parameters in `ModelTypeEGT`
                              ):
    """Find the ergodic distribution of a markov chain with the
    given transition matrix."""
    M = models["transition_matrix"]
    # find unit eigenvector of markov chain
    Λ, V = np.linalg.eig(M.transpose(0, 2, 1))
    V = np.real_if_close(V)
    x = np.isclose(Λ, 1)
    # if multiple unit eigenvalues then choose the first
    y = np.zeros_like(x, dtype=bool)
    idx = np.arange(len(x)), x.argmax(axis=1)
    y[idx] = x[idx]
    ergodic = np.array(V.transpose(0, 2, 1)[y], dtype=float)
    # ensure ergodic frequencies are positive and sum to 1
    ergodic = np.abs(ergodic) / np.sum(np.abs(ergodic), axis=1)[:, None]
    return {**models, 'ergodic': ergodic}


#### An exploration of EGT Tools Approach

In [48]:
#| export
@multi
def calculate_stationary_distribution(transition_matrix: Union[np.ndarray, csr_matrix, csc_matrix],
                                      method=None):
    """A multimethod for calculating the stationary distribution of different
    types of matrices."""
    return method

@method(calculate_stationary_distribution)
def calculate_stationary_distribution(transition_matrix: Union[np.ndarray, csr_matrix, csc_matrix], # A single 2D transition matrix or a 3D array containing a stack of transition matrices
                                      method=None # The method to use to find the statonary distribution, the default approach relies on using `numpy.linalg.eig` which is not recommended for non-hermitian matrices. Use "shcur" if matrix is non-hermitian.
                                      ) -> np.ndarray:
    """
    Calculates stationary distribution from a transition matrix of Markov chain.

    The use of this function is not recommended if the matrix is non-Hermitian. Please use
    calculate_stationary_distribution_non_hermitian instead in this case.

    The stationary distribution is the normalized eigenvector associated with the eigenvalue 1

    Parameters
    ----------
    transition_matrix : Union[numpy.ndarray, scipy.sparse.csr_matrix, scipy.sparse.csc_matrix]
        A 2 dimensional transition matrix

    Returns
    -------
    numpy.ndarray
        A 1-dimensional vector containing the stationary distribution

    See Also
    --------
    egttools.utils.calculate_stationary_distribution_non_hermitian

    """
    if (type(transition_matrix) == csr_matrix) or (type(transition_matrix) == csc_matrix):
        tmp = transition_matrix.toarray()
    else:
        tmp = transition_matrix
        
    if np.ndim(transition_matrix)==2:
        tmp = transition_matrix[None, ...]
    
    # Check if there is any transition with value 1 - this would mean that the game is degenerate
    if np.isclose(tmp, 1., atol=1e-11).any():
        warn(
            "Some of the entries in the transition matrix are close to 1 (with a tolerance of 1e-11). "
            "This could result in more than one eigenvalue of magnitute 1 "
            "(the Markov Chain is degenerate), so please be careful when analysing the results.", RuntimeWarning)
        
    # `numpy.linalg.eig` returns the right-handed eigenvectors so we need to tranpose our transition matrices first.
    tmp = tmp.transpose(0, 2, 1)

    # calculate stationary distributions using eigenvalues and eigenvectors
    eigenvalues, eigenvectors = np.linalg.eig(tmp)
    
    # look for the first element closest to 1 in the list of eigenvalues
    index_stationary = (np.arange(len(eigenvalues)),
                        np.argmin(np.abs(eigenvalues - 1.0), axis=-1))
    mask_stationary = np.zeros_like(eigenvalues, dtype=bool)
    mask_stationary[index_stationary] = True
    sd = np.abs(eigenvectors.transpose(0, 2, 1)[mask_stationary].real)
    return sd / np.sum(sd, axis=-1)[:, None]  # normalize


In [49]:
#| export
@method(calculate_stationary_distribution, "schur")
def calculate_stationary_distribution(transition_matrix: Union[np.ndarray, csr_matrix, csc_matrix], # A single 2D transition matrix or a 3D array containing a stack of transition matrices
                                      method=None # The method to use to find the statonary distribution, the default approach relies on using `numpy.linalg.eig` which is not recommended for non-hermitian matrices. Use "shcur" if matrix is non-hermitian.
                                      ) -> np.ndarray:
    """
    Calculates stationary distribution from a transition matrix of Markov chain.

    The stationary distribution is the normalized eigenvector associated with the eigenvalue 1

    Parameters
    ----------
    transition_matrix : Union[numpy.ndarray, scipy.sparse.csr_matrix, scipy.sparse.csc_matrix]
        A 2 dimensional transition matrix

    Returns
    -------
    numpy.ndarray
        A 1-dimensional vector containing the stationary distribution

    See Also
    --------
    egttools.utils.calculate_stationary_distribution_non_hermitian

    """
    if (type(transition_matrix) == csr_matrix) or (type(transition_matrix) == csc_matrix):
        tmp = transition_matrix.toarray()
    else:
        tmp = transition_matrix
        
    if np.ndim(transition_matrix)==2:
        tmp = transition_matrix[None, ...]
    
    # Check if there is any transition with value 1 - this would mean that the game is degenerate
    if np.isclose(tmp, 1., atol=1e-11).any():
        warn(
            "Some of the entries in the transition matrix are close to 1 (with a tolerance of 1e-11). "
            "This could result in more than one eigenvalue of magnitute 1 "
            "(the Markov Chain is degenerate), so please be careful when analysing the results.", RuntimeWarning)

    # calculate stationary distributions using eigenvalues and eigenvectors
    schur_results = [schur(m) for m in tmp]
    eigenvectors = np.array([r[1] for r in schur_results]).real
    eigenvalues = np.array([eigvals(r[0]) for r in schur_results]).real
    # look for the first element closest to 1 in the list of eigenvalues
    index_stationary = (np.arange(len(eigenvalues)),
                        np.argmin(np.abs(eigenvalues - 1.0), axis=-1))
    mask_stationary = np.zeros_like(eigenvalues, dtype=bool)
    mask_stationary[index_stationary] = True
    sd = np.abs(eigenvectors[mask_stationary].real)
    return sd / np.sum(sd, axis=-1)[:, None]  # normalize


In [50]:
#| export
def gth_solve(A, overwrite=False):
    r"""
    This routine computes the stationary distribution of an irreducible
    Markov transition matrix (stochastic matrix) or transition rate
    matrix (generator matrix) `A`.
    More generally, given a Metzler matrix (square matrix whose
    off-diagonal entries are all nonnegative) `A`, this routine solves
    for a nonzero solution `x` to `x (A - D) = 0`, where `D` is the
    diagonal matrix for which the rows of `A - D` sum to zero (i.e.,
    :math:`D_{ii} = \sum_j A_{ij}` for all :math:`i`). One (and only
    one, up to normalization) nonzero solution exists corresponding to
    each reccurent class of `A`, and in particular, if `A` is
    irreducible, there is a unique solution; when there are more than
    one solution, the routine returns the solution that contains in its
    support the first index `i` such that no path connects `i` to any
    index larger than `i`. The solution is normalized so that its 1-norm
    equals one. This routine implements the Grassmann-Taksar-Heyman
    (GTH) algorithm [1]_, a numerically stable variant of Gaussian
    elimination, where only the off-diagonal entries of `A` are used as
    the input data. For a nice exposition of the algorithm, see Stewart
    [2]_, Chapter 10.
    Parameters
    ----------
    A : array_like(float, ndim=2)
        Stochastic matrix or generator matrix. Must be of shape n x n.
    Returns
    -------
    x : numpy.ndarray(float, ndim=1)
        Stationary distribution of `A`.
    overwrite : bool, optional(default=False)
        Whether to overwrite `A`.
    References
    ----------
    .. [1] W. K. Grassmann, M. I. Taksar and D. P. Heyman, "Regenerative
       Analysis and Steady State Distributions for Markov Chains,"
       Operations Research (1985), 1107-1116.
    .. [2] W. J. Stewart, Probability, Markov Chains, Queues, and
       Simulation, Princeton University Press, 2009.
    """
    A1 = np.array(A, dtype=float, copy=not overwrite, order='C')
    # `order='C'` is for use with Numba <= 0.18.2
    # See issue github.com/numba/numba/issues/1103

    if len(A1.shape) != 2 or A1.shape[0] != A1.shape[1]:
        raise ValueError('matrix must be square')

    n = A1.shape[0]
    x = np.zeros(n)

    # === Reduction === #
    for k in range(n-1):
        scale = np.sum(A1[k, k+1:n])
        if scale <= 0:
            # There is one (and only one) recurrent class contained in
            # {0, ..., k};
            # compute the solution associated with that recurrent class.
            n = k+1
            break
        A1[k+1:n, k] /= scale

        A1[k+1:n, k+1:n] += np.dot(A1[k+1:n, k:k+1], A1[k:k+1, k+1:n])

    # === Backward substitution === #
    x[n-1] = 1
    for k in range(n-2, -1, -1):
        x[k] = np.dot(x[k+1:n], A1[k+1:n, k])

    # === Normalization === #
    x /= np.sum(x)

    return x

In [51]:
#| export
@method(calculate_stationary_distribution, "quantecon")
def calculate_stationary_distribution(transition_matrix: Union[np.ndarray, csr_matrix, csc_matrix], # A single 2D transition matrix or a 3D array containing a stack of transition matrices
                                      method=None # The method to use to find the statonary distribution, the default approach relies on using `numpy.linalg.eig` which is not recommended for non-hermitian matrices. Use "shcur" if matrix is non-hermitian.
                                      ) -> np.ndarray:
    """
    Calculates stationary distribution from a transition matrix of Markov chain.

    The stationary distribution is the normalized eigenvector associated with the eigenvalue 1

    Parameters
    ----------
    transition_matrix : Union[numpy.ndarray, scipy.sparse.csr_matrix, scipy.sparse.csc_matrix]
        A 2 dimensional transition matrix

    Returns
    -------
    numpy.ndarray
        A 1-dimensional vector containing the stationary distribution

    See Also
    --------
    egttools.utils.calculate_stationary_distribution_non_hermitian

    """
    if (type(transition_matrix) == csr_matrix) or (type(transition_matrix) == csc_matrix):
        tmp = transition_matrix.toarray()
    else:
        tmp = transition_matrix
        
    if np.ndim(transition_matrix)==2:
        tmp = transition_matrix[None, ...]
    
    # Check if there is any transition with value 1 - this would mean that the game is degenerate
    if np.isclose(tmp, 1., atol=1e-11).any():
        warn(
            "Some of the entries in the transition matrix are close to 1 (with a tolerance of 1e-11). "
            "This could result in more than one eigenvalue of magnitute 1 "
            "(the Markov Chain is degenerate), so please be careful when analysing the results.", RuntimeWarning)
    return np.array([gth_solve(p) for p in tmp])


In [52]:
#| export
def calculate_sd_helper(models):
    P =  models['transition_matrix']
    sd = calculate_stationary_distribution(P, method=models.get('sd-method'))
    return {**models, "ergodic": sd }

#### A brief note on the quantecon method


Quantecon.py have a class MarkovChain which checks for the number of recurrence
classes in the matrix before looking for stationary distributions for each one
using the GTH algorithm. I noticed that finding the recurrence classes is much more
expensive the finding the stationary distributions (at least for smaller matrices).

Moreover, the transition matrices that we build for Moran and similar evolutionary
processes are technically irreducible by construction (in other words, they only
have one recurrence class). I therefore skip the search for recurence classes and
simply call the `gth_solve` function (lightly edited from Quantecon.py) to find
the stationary distribution.

Additional note: there are numerical difficulties to ensuring our transition matrices
are actually irreducible. We deal with these when computing the fixation rates in
`fixation_rate_stable`. As it is plausible that this method could be used for more general
transition matrices (e.g. when computing the payoffs of a stochastic game), we leave in a
warning when the matrix is close to being degenerate (i.e. reducible).

#### Examples and Tests

Let our transition matrix, $M$ be

\begin{equation}
M = \begin{pmatrix}
\frac{3}{4} & \frac{1}{4} \\
\frac{1}{4} & \frac{3}{4} \\
\end{pmatrix}
\end{equation}

Note that $M^T$ is a stochastic matrix because each column of the transposed matrix would sum to $1$ (in general the rows of the transposed matrix are unlikely to sum to 1, but choosing an example like the above makes it easy to compute the eigenvectors).

It's not too hard to verify that the characteristic polynomial of $M^T$ can be factored into $(\lambda - 1)(\lambda - \frac{1}{2})$, so we have two eigenvalues, $1$ and $\frac{1}{2}$.

It's not too hard to verify that column vector $[1, 1]$ is the eigenvector of $M^T$ with eigenvalue $1$
.

Now that we know the weights placed on each strategy, we can compute the strategy distribution by normalising our eigenvector.

The ergodic distribution i $\omega^* = [\frac{1}{2}, \frac{1}{2}]$.

In [53]:
M = np.array([[[3/4, 1/4],
               [1/4, 3/4]],
             [[3/4, 1/4],
               [1/4, 3/4]],
              ])
models = {"transition_matrix": M}
result = find_ergodic_distribution(models)


In [54]:
fastcore.test.test_eq(result['ergodic'],
                      np.array([[1/2, 1/2]]))


In [55]:
fastcore.test.test_eq(calculate_stationary_distribution(M[0]),
                      np.array([1/2, 1/2]))
fastcore.test.test_eq(calculate_stationary_distribution(M),
                      np.array([[1/2, 1/2]]))
fastcore.test.test_eq(calculate_stationary_distribution(M,
                                                        method="schur"),
                      np.array([[1/2, 1/2]]))
fastcore.test.test_eq(calculate_stationary_distribution(M,
                                                        method="quantecon"),
                      np.array([[1/2, 1/2]]))

In [56]:
# #| hide
# # Here is some code which illustrates how one could use sympy to find the relevant eigenvectors
# # using symbolic methods (but please note that even sympy must resort to numerical methods if
# # the matrices are bigger than 5 by 5 in size, due to the fundamental lack of exact solutions to
# # polynomial equations with order greater than 5)
# import sympy
# for m in M:
#     # Sympy needs integers or expressions to work
#     # Integers is usually safer
#     m = np.array(1000 * m, dtype=int)
#     M_symbolic = sympy.Matrix(m)
#     for result in M_symbolic.eigenvects():
#         lamda, multiplicity, evs = result

#         # print("lambda: " , lamda,
#         #           "multiplicity: ", multiplicity,
#         #           "eigenvectors: ", evs)


Here is another quick illustrative example.

Let our transition matrix, $M$ be

\begin{equation}
M = \begin{pmatrix}
\frac{3}{4} & \frac{1}{4} \\
\frac{3}{4} & \frac{1}{4} \\
\end{pmatrix}
\end{equation}

$M^T$ is a stochastic matrix. It is easy to verify that $[\frac{3}{4}, \frac{1}{4}]$ is the normalised eigenvector with eigenvalue 1.


In [57]:
M = np.array([[[3/4, 1/4],
               [3/4, 1/4]],
              ])
models = {"transition_matrix": M}
result = find_ergodic_distribution(models)

In [58]:
fastcore.test.test_eq(result['ergodic'],
                      np.array([[3/4, 1/4]]))

In [59]:
fastcore.test.test_eq(calculate_stationary_distribution(M[0]),
                      np.array([3/4, 1/4]))
fastcore.test.test_eq(calculate_stationary_distribution(M),
                      np.array([[3/4, 1/4]]))
fastcore.test.test_eq(calculate_stationary_distribution(M.transpose(0, 2, 1), method="schur"),
                      np.array([[3/4, 1/4]]))
fastcore.test.test_eq(calculate_stationary_distribution(M, method="quantecon"),
                      np.array([[3/4, 1/4]]))

Notice that `find_ergodic_distribution` and the default method of `calculate_stationary_distribution` are equivalent and similar in speed. Unfortunately, they
are inappropriate for non-hermitian matrices. The `"schur"` method is more appropriate if
the matrices are normal, but this code cannot be easily vectorized as the other numpy-based
solutions can. For the most accurate results, it makes sense to use the eigen library.

In [60]:
M = np.array([[[3/4, 1/4],
               [3/4, 1/4]]
              for _ in range(10000)
              ])
models = {"transition_matrix": M}


In [61]:
result = find_ergodic_distribution(models)
fastcore.test.test_eq(result['ergodic'],
                      np.array([[3/4, 1/4]]))

In [62]:
fastcore.test.test_eq(calculate_stationary_distribution(M),
                      np.array([[3/4, 1/4]]))

In [63]:
fastcore.test.test_eq(calculate_stationary_distribution(M.transpose(0, 2, 1),
                                                        method="schur"),
                      np.array([[3/4, 1/4]]))

In [64]:
fastcore.test.test_eq(np.array([gth_solve(m) for m in M]),
                      np.array([[3/4, 1/4]]))
fastcore.test.test_eq(calculate_stationary_distribution(M,
                                                        method="quantecon"),
                      np.array([[3/4, 1/4]]))
fastcore.test.test_eq(calculate_sd_helper({**models,
                                           "sd-method": "quantecon"})['ergodic'],
                      np.array([[3/4, 1/4]]))

On a technical level, we know that the matrix is irreducible since all the
transition probabilities should be strictly positive by design. 

Unfortunately, on a numerical level, these matrices are nearly degenerate. It
is unclear how to deal with this best. 

Options:
- bound all transition rates below by 1e-10 or a similar factor.
- round all transition rates below 1e-10 to zero and find stationary distribution of all recurrent classes.

Whichever option we go with, only the quantecon method is appropriate for finding the
stationary distribution(s).

I think we should go for the first option as this allows us to exploit that there
is only one recurrent class in our transition matrices and therefore only one
stationary distribution that is relevant to our results. We can also skip the
expensive quantecon routines needed to find the recurrent classes of the matrix that
would ultimately be very difficult to speed up.

We should of course check whether gth_solve used by quantecon is even affected by the floating point issues and near-degeneracy of the transition matrices that it is given.

In [65]:
show_doc(find_ergodic_distribution)

---

[source](https://github.com/PaoloBova/gh-pages-example/blob/main/gh_pages_example/methods.py#L247){target="_blank" style="float:right; font-size:smaller"}

### find_ergodic_distribution

>      find_ergodic_distribution (models:dict)

Find the ergodic distribution of a markov chain with the
given transition matrix.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| models | dict | A dictionary that contains the parameters in `ModelTypeEGT` |

### Run full markov chain algorithm

Finally, here is a helper function to both build the transition matrix for the model and find its ergodic distribution.

In [66]:
# | export
def markov_chain(models: dict  # A dictionary that contains the parameters in `ModelTypeEGT`
                 ):
    """Find the ergodic distribution of the evolutionary
    game given by each model in models."""
    return thread_macro(models,
                        build_transition_matrix,
                        find_ergodic_distribution)


In [67]:
show_doc(markov_chain)


---

[source](https://github.com/PaoloBova/gh-pages-example/blob/main/gh_pages_example/methods.py#L506){target="_blank" style="float:right; font-size:smaller"}

### markov_chain

>      markov_chain (models:dict)

Find the ergodic distribution of the evolutionary
game given by each model in models.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| models | dict | A dictionary that contains the parameters in `ModelTypeEGT` |

## Multiple Populations

### Building blocks

I will now describe the building blocks for an algorithm for building the transition matrix for a evolutionary game with multiple populations.

When we have multiple populations, it is very easy to have many possible transitions. For this reason, it is important that we have a way to programatically handle them. 

My algorithm allows one to build these transition matrices using only the following information (in addition to other parameters needed in the single population case):
- (i) Payoffs
- (ii) Each sector's strategies
- (iii) Allowed sectors for each player
- (iv) Sampling rule (optional)
- (v) profile filters (optional)

(iii) in particular allows for a great deal of flexibility in setting up a game. It is possible to capture games where the number of players may vary or where the interaction contains players who may be sampled from one of several poulations. This flexibility will allow us to study a wide range of models from the literature. By default, (iv) and (v) are already quite general, though one can provide their own sampling rules and profile filters should they want even greater flexibility.

#### Payoffs

`payoffs` is a nested dictionary which is 2 levels deep. On the first level, each key is a string of the form "...n3-n2-n1". n1 is an integer which encodes the strategy that player 1 uses in the interaction. n2 and n3 are the same. We have as many integers as there are possible players in the interaction. This key therefore captures the strategy profile employed by the players.

At the second level, we have keys for each player, "P1", "P2", "P3", ... 

Each value is a 1D numpy array containing the player's payoffs (relevant to the specified strategy profile) for each model we are solving the game for.

There are number of important hints to follow when writing your payoffs for use in my multiple populations algorithm. 
- Players are allowed to be from any sector. Note that I assume strategies are coded for each sector in order. So, Sector 1's strategies are coded from 1 to num_s1_strategies, Sector 2's are coded from num_s1_strategies + 1 to num_s1_strategies + num_s2 strategies, and so on. In this way the strategy code tells us which sector the strategy is from and which strategy they follow.
- I use a 0 to indicate that the player is not involved in the current interaction. Intuitively you may prefer to think of the player as doing nothing and being from Sector 0. This allows us to flexibly allow the possibility of an uncertain number of players in each interaction.
- In some games, it is possible that there are a large number of possible strategy profiles (e.g. a game with 3 sectors, 3 strategies, and up to 4 players in each interaction would have 10**4 possible strategy profiles). However, very few of those strategy profiles will be relevant to building the transition matrix, especially if the order of the players does not matter, and if some players must belong to certain sectors. The number of strategy profiles will often be much smaller than the number of parameter combinations (what I refer to as models) we wish to solve for.


#### Sector strategies

A dictionary with keys for each sector and values as lists of integers which encode the sector's strategies. Recall that sector 1's strategies start from 1, sector 2's strategies start from num_s1_strategies + 1 and so on.

We will use the sector strategies to generate the set of recurrent states, the states that the system visits in the limit of rare mutations. Such states have every member of a sector (also often reffered to as a population) using the same strategy, i.e. they are monomorphic. These are the states we need to the build the transition matrix for.

#### Allowed_sectors

`allowed_sectors` specifies which sectors each player can be from and therefore specifies all possible interactions in the game.

It is a dictionary where the keys are players, e.g. "P1" and the values are lists of sectors, e.g. ["S1", "S2"]. This tells the algorithm which interactions are possible in the game.

To specify that the player may not be present in an interaction, you can specify "S0" in the list.

Note: `sectors` are perhaps more commonly reffered to as `populations` or `subpopulations` in the literature. For the sake of brevity, I use `sectors` instead when naming variables.

#### Sampling rule

A sampling rule tells us the likelihood that a strategy profile will be selected given the current state of the system and the number of mutants (of the specified type). The sampling also needs to know which player represents the agent who is comparing the two strategies under consideration as this player does not need to be sampled. 

However, if this agent could have been one of several players (they would be playing the same strategy in the strategy profile), then the sampling rule should also multiply the likelihood by the probability that the agent would have been chosen as the current player.

In [68]:
# | export

@multi
def sample_profile(models):
    return models.get('sample_profile_key')


@method(sample_profile)
def sample_profile(models):
    """We sample players from their allowed sectors as per the sector weights."""
    sector_strategies = models['sector_strategies']
    allowed_sectors = models['allowed_sectors']
    profile = models['profile']
    chosen_player = models['chosen_player']
    chosen_strategy = int(models['chosen_strategy'])
    current_strategy = int(models['current_strategy'])
    mutant_strategy = int(models['mutant_strategy'])
    affected_sector = models['affected_sector']
    n_mutants = models['n_mutants']
    Z = models['Z']
    sector_weights = models.get('sector_weights', {})
    assert n_mutants >= 1
    assert n_mutants <= Z[affected_sector] - 1
    assert current_strategy != mutant_strategy
    assert chosen_strategy in [current_strategy, mutant_strategy]
    assert affected_sector in sector_strategies.keys()
    assert chosen_player in allowed_sectors.keys()

    profile_tuple = list(map(int, profile.split("-")))
    assert chosen_strategy in profile_tuple

    # TODO: does it make sense for chosen_player_likelihood to take into account
    # any possible position our chosen_player could have been in, no matter
    # which strategies each player actually plays in the profile?
    above = sector_weights.get(chosen_player, {}).get(affected_sector, 1)
    below = 0
    for player, sectors in allowed_sectors.items():
        if affected_sector in sectors:
            below += sector_weights.get(player, {}).get(affected_sector, 1)
    if below == 0:
        raise ValueError("""affected_sector is never allowed in the game, 
                         double check allowed_sectors""")
    # print("chosen_player_likelihood: ", above / below)
    chosen_player_likelihood = above / below

    likelihood = chosen_player_likelihood

    n_sampled_from_affected_sector = 1
    n_mutants_sampled = 1 if chosen_strategy == mutant_strategy else 0
    for i, strategy in enumerate(profile_tuple[::-1]):
        valid_strategy = False
        for sector in allowed_sectors[f"P{i+1}"]:
            if strategy in map(int, sector_strategies[sector]):
                valid_strategy = True
        if not valid_strategy:
            raise ValueError(f"""Profile, {profile}, implies a player plays a
                             strategy from a sector they do not belong to.""")
        if f"P{i+1}" == chosen_player:
            continue
        if strategy in map(int, sector_strategies[affected_sector]):
            if strategy == current_strategy:
                likelihood *= ((Z[affected_sector]
                                - (n_sampled_from_affected_sector
                                   - n_mutants_sampled)
                                - n_mutants)
                               / (Z[affected_sector]
                                  - n_sampled_from_affected_sector))
                # print("current-lk: ",
                #       (Z[affected_sector]
                #        - (n_sampled_from_affected_sector - n_mutants_sampled)
                #        - n_mutants),
                #       "/",
                #       (Z[affected_sector] - n_sampled_from_affected_sector))
                n_sampled_from_affected_sector += 1
            elif strategy == mutant_strategy:
                likelihood *= ((n_mutants
                                - n_mutants_sampled)
                               / (Z[affected_sector]
                                  - n_sampled_from_affected_sector))
                # print("mutant-lk: ",
                #       (n_mutants - n_mutants_sampled),
                #       "/",
                #       (Z[affected_sector] - n_sampled_from_affected_sector))
                n_mutants_sampled += 1
                n_sampled_from_affected_sector += 1
            else:
                raise ValueError("""At least one profile implies the copresence
                                 of 3 strategies for one sector. This default
                                 profile likelihood method is not meant to be
                                 used for such situations. Make sure you use
                                 profile filters to prevent passing such
                                 strategy profiles to this sampling rule.""")
        relevant_sector = [sector
                           for sector in sector_strategies.keys()
                           if strategy in map(int, sector_strategies[sector])]
        if len(relevant_sector) > 1:
            raise ValueError("Each sector must have unique strategy codes")
        elif len(relevant_sector) == 0:
            raise ValueError("Strategy does not belong to any sector")
        else:
            relevant_sector = relevant_sector[0]
        above = sector_weights.get("P{i+1}", {}).get(relevant_sector, 1)
        below = sum(sector_weights.get("P{i+1}", {}).get(sector, 1)
                    for sector in allowed_sectors[f"P{i+1}"])
        likelihood *= above / below
        # print("relevant_sector_likelihood: ", above / below)
    return likelihood


##### Tests for `sample_profile`

The `sample_profile` default method is very general. It will calculate the
likelihood of the profile by assuming that all allowed sectors for each
player are uniformly sampled from (unless sector weights are provided) and
consider the likelihood of sampling a mutant from the sector which has a mutant.

##### Test 1

I conduct a number of tests below on a simple game where every player in an interaction is
fixed to a particular sector. In such games, the likelihood of each profile being chosen is
is always 1, no matter how many mutants there are, as no individual plays against a player
from the same sector.

In [69]:
# | export
Z = {"S3": 10, "S2": 10, "S1": 10}
sector_strategies = {"S3": [5, 6],
                     "S2": [3, 4],
                     "S1": [1, 2]}
allowed_sectors = {"P3": ["S3"],
                   "P2": ["S2"],
                   "P1": ["S1"]}
n_players = len(allowed_sectors.keys())
n_strategies = [len(strategies) for strategies in sector_strategies]

sector_weights = {}

models = {"Z": Z,
          "sector_strategies": sector_strategies,
          "allowed_sectors": allowed_sectors,
          "profile": "5-3-1",
          "chosen_player": "P1",
          "chosen_strategy": 1,
          "current_strategy": 1,
          "mutant_strategy": 2,
          "affected_sector": "S1",
          "n_mutants": 2,
          #   "sector_weights": sector_weights,
          }
result1 = sample_profile(models)
result2 = sample_profile({**models, "n_mutants": 8})
result3 = sample_profile({**models, "n_mutants": 9})
result4 = sample_profile({**models,
                          "affected_sector": "S2",
                          "chosen_player": "P2",
                          "current_strategy": 3,
                          "mutant_strategy": 4,
                          "chosen_strategy": 3})
result5 = sample_profile({**models,
                          "profile": "5-3-2",
                          "current_strategy": 1,
                          "mutant_strategy": 2,
                          "chosen_strategy": 2})
result6 = sample_profile({**models,
                          "profile": "6-3-1",
                          "affected_sector": "S3",
                          "chosen_player": "P3",
                          "current_strategy": 5,
                          "mutant_strategy": 6,
                          "chosen_strategy": 6})
expected = (1  # "P1" is from "S1": it must be our chosen_player with the chosen_strategy
            * 1  # "P2" can only be from "S2", but all players form that sector play 3
            * 1  # "P3" can only be from "S3", but all players form that sector play 5
            )
fastcore.test.test_eq(result1, expected)
fastcore.test.test_eq(result2, expected)
fastcore.test.test_eq(result3, expected)
fastcore.test.test_eq(result4, expected)
fastcore.test.test_eq(result5, expected)
fastcore.test.test_eq(result6, expected)


##### Test 2

There are several errors a user may encounter if they give invalid values.

In [70]:
# | export
Z = {"S3": 10, "S2": 10, "S1": 10}
sector_strategies = {"S3": [5, 6],
                     "S2": [3, 4],
                     "S1": [1, 2]}
allowed_sectors = {"P3": ["S3"],
                   "P2": ["S2"],
                   "P1": ["S1"]}
n_players = len(allowed_sectors.keys())
n_strategies = [len(strategies) for strategies in sector_strategies]

sector_weights = {}

models = {"Z": Z,
          "sector_strategies": sector_strategies,
          "allowed_sectors": allowed_sectors,
          "profile": "5-3-1",
          "chosen_player": "P1",
          "chosen_strategy": 1,
          "current_strategy": 1,
          "mutant_strategy": 2,
          "affected_sector": "S1",
          "n_mutants": 2,
          #   "sector_weights": sector_weights,
          }

with fastcore.test.ExceptionExpected(ex=AssertionError):
    sample_profile({**models, "n_mutants": 1000})
with fastcore.test.ExceptionExpected(ex=AssertionError):
    sample_profile({**models, "chosen_strategy": 2})
with fastcore.test.ExceptionExpected(ex=AssertionError):
    sample_profile({**models, "profile": "5-3-2"})
with fastcore.test.ExceptionExpected(ex=AssertionError):
    sample_profile({**models,
                    "mutant_strategy": 5,
                    "current_strategy": 6})
with fastcore.test.ExceptionExpected(ex=AssertionError):
    sample_profile({**models,
                    "mutant_strategy": 1,
                    "current_strategy": 1})
with fastcore.test.ExceptionExpected(ex=ValueError):
    sample_profile({**models,
                    "sector_strategies": {**sector_strategies,
                                          "S3": [3, 4]}})
with fastcore.test.ExceptionExpected(ex=ValueError):
    sample_profile({**models, "profile": "10-6-1"})
with fastcore.test.ExceptionExpected(ex=AssertionError):
    sample_profile({**models,
                    "sector_strategies": {"S3": [5, 6],
                                          "S2": [3, 4]}})
with fastcore.test.ExceptionExpected(ex=ValueError):
    sample_profile({**models,
                    "allowed_sectors": {"P1": ["S2"],
                                        "P2": ["S2"],
                                        "P3": ["S3"]}})
with fastcore.test.ExceptionExpected(ex=ValueError):
    sample_profile({**models,
                    "sector_strategies": {"S1": [1, 2, 3]},
                    "allowed_sectors": {"P1": ["S1"],
                                        "P2": ["S1"],
                                        "P3": ["S1"]},
                    "profile": "3-2-1"})
with fastcore.test.ExceptionExpected(ex=ValueError):
    sample_profile({**models,
                    "profile": "3-2-1"})
with fastcore.test.ExceptionExpected(ex=KeyError):
    sample_profile({**models,
                    "sector_strategies": {"S1": [1, 2, 3]},
                    "profile": "3-2-1"})


##### Test 3

Now, consider a more complicated example where we have 2 players that belong to the same sector and one player who is fixed to another sector. Now the profile likelihoods depend on
the number of mutants.



In [71]:
# | export
Z = {"S2": 10, "S1": 10}
sector_strategies = {"S2": [3, 4],
                     "S1": [1, 2]}
allowed_sectors = {"P3": ["S2"],
                   "P2": ["S1"],
                   "P1": ["S1"]}
n_players = len(allowed_sectors.keys())
n_strategies = [len(strategies) for strategies in sector_strategies]

sector_weights = {}

models = {"Z": Z,
          "sector_strategies": sector_strategies,
          "allowed_sectors": allowed_sectors,
          "profile": "3-2-1",
          "chosen_player": "P1",
          "chosen_strategy": 1,
          "current_strategy": 1,
          "mutant_strategy": 2,
          "affected_sector": "S1",
          "n_mutants": 2,
          #   "sector_weights": sector_weights,
          }


result1 = sample_profile(models)
result2 = sample_profile({**models, "n_mutants": 8})
result3 = sample_profile({**models,
                          "chosen_strategy": 2,
                          "chosen_player": "P2"})
result4 = sample_profile({**models,
                          "current_strategy": 2,
                          "mutant_strategy": 1,
                          "chosen_strategy": 2,
                          "chosen_player": "P2"})
result5 = sample_profile({**models,
                          "profile": "3-2-2",
                          "current_strategy": 1,
                          "mutant_strategy": 2,
                          "chosen_strategy": 2})
result6 = sample_profile({**models,
                          "profile": "3-1-2",
                          "current_strategy": 1,
                          "mutant_strategy": 2,
                          "chosen_strategy": 2})
expected1 = (  # "P1" is our chosen_player. The likelihood of choosing our
    # chosen player given the affected_sector is just the likelihood of
    # choosing that player instead of any other player. chosen_player
    # is the first player we sample, and they must be present, so only
    # the liklelihood of each player sampling from the affected sector
    # matters. In this case, 2 of the players have the same same of
    # sampling from "S1", and the other has a 0% change. So, the
    # likelihood of them being "P1" is 50%.
    0.5
    # "P2", like our chosen player, "P1" is from "S1". We have 2
    # mutants in the affected sector, and "P2" is using the mutant
    # strategy. The probability of this happening is
    * 2 / (10 - 1)
    * 1  # "P3" can only be from "S3" and all "S3" members play 3
)
expected2 = (0.5 * 8 / 9 * 1)
expected3 = (8 / 9 * 0.5 * 1)
expected4 = (2 / 9 * 0.5 * 1)
expected5 = (0.5 * 1 / 9 * 1)
expected6 = (0.5 * 8 / 9 * 1)
fastcore.test.test_eq(result1, expected1)
fastcore.test.test_eq(result2, expected2)
fastcore.test.test_eq(result3, expected3)
fastcore.test.test_eq(result4, expected4)
fastcore.test.test_eq(result5, expected5)
fastcore.test.test_eq(result6, expected6)


##### Test 4

I also test it for a game where all 3 players can be from the same two sectors.

In [72]:
# | export
Z = {"S2": 10, "S1": 10}
sector_strategies = {"S2": [3, 4],
                     "S1": [1, 2]}
allowed_sectors = {"P3": ["S1", "S2"],
                   "P2": ["S1", "S2"],
                   "P1": ["S1", "S2"]}
n_players = len(allowed_sectors.keys())
n_strategies = [len(strategies) for strategies in sector_strategies]

sector_weights = {}

models = {"Z": Z,
          "sector_strategies": sector_strategies,
          "allowed_sectors": allowed_sectors,
          "profile": "2-2-1",
          "chosen_player": "P1",
          "chosen_strategy": 1,
          "current_strategy": 1,
          "mutant_strategy": 2,
          "affected_sector": "S1",
          "n_mutants": 2,
          #   "sector_weights": sector_weights,
          }


result1 = sample_profile(models)
result2 = sample_profile({**models, "n_mutants": 8})
result3 = sample_profile({**models,
                          "chosen_strategy": 2,
                          "chosen_player": "P2"})
result4 = sample_profile({**models,
                          "current_strategy": 2,
                          "mutant_strategy": 1,
                          "chosen_strategy": 2,
                          "chosen_player": "P2"})
result5 = sample_profile({**models,
                          "profile": "3-2-2",
                          "current_strategy": 1,
                          "mutant_strategy": 2,
                          "chosen_strategy": 2})
result6 = sample_profile({**models,
                          "profile": "3-1-2",
                          "current_strategy": 1,
                          "mutant_strategy": 2,
                          "chosen_strategy": 2})
expected1 = (  # "P1" is our chosen_player. We could have chosen any player
    # since all players can sample from the affected sector. Moreover,
    # they all do so with equal likelihood, so the likelihood of
    # choosing to follow "P-1" is 1/3
    1 / 3
    # "P2", like our chosen player, "P1" is from "S1". We have 2
    # mutants in the affected sector, and "P2" is using the mutant
    # strategy. We need the likelihood that an "S1" member was sampled
    # instead of an "S2" member as well as the likelihood the "S1"
    # member was a mutant
    * 2 / (10 - 1) * 0.5
    # "P3" is also from "S1" and plays the mutant strategy too.
    # We sample without replacement, so the probability is
    * 1 / (10 - 2) * 0.5
)
expected2 = (1 / 3
             * 8 / 9 / 2
             * 7 / 8 / 2)
expected3 = (8 / 9 / 2
             * 1 / 3
             * 1 / 8 / 2)
expected4 = (2 / 9 / 2
             * 1 / 3
             * 7 / 8 / 2)
expected5 = (1 / 3
             * 1 / 9 / 2
             * 1 / 2)
expected6 = (1 / 3
             * 8 / 9 / 2
             * 1 / 2)
fastcore.test.test_eq(result1, expected1)
fastcore.test.test_eq(result2, expected2)
fastcore.test.test_eq(result3, expected3)
fastcore.test.test_eq(result4, expected4)
fastcore.test.test_eq(result5, expected5)
fastcore.test.test_eq(result6, expected6)


#### Profile filters

Profile filters work by filtering a list of profiles for only those profiles
which meet the required conditions. Se `create_profiles` to read up how we
create a list of profiles, and `profile_filter` for different profile filters.

We can also use the `apply_profile_filters` function which by default filters
our profiles so that we only keep those profiles which are relevant to the
transition and are consistent with the given `allowed_sectors`.

### Create all recurrent states

In [73]:
# | export
def create_recurrent_states(models):
    """Create all recurrent-states for the set of models."""
    sector_strategies = models['sector_strategies']
    n_states = np.prod([len(S) for S in sector_strategies.values()])
    sorted_keys = sorted(sector_strategies, reverse=True)
    strategy_axes = [sector_strategies[k] for k in sorted_keys]
    grid = build_grid_from_axes(strategy_axes)
    states = ["-".join(map(str, row)) for row in grid]
    fastcore.test.test_eq(len(states), n_states)
    return states


Here is a quick test for `create_recurrent_states`

In [74]:
result = create_recurrent_states({"sector_strategies": {"S1": [1, 2],
                                                        "S2": [3, 4],
                                                        "S3": [5, 6]}})
expected = ['5-3-1',
            '5-3-2',
            '5-4-1',
            '5-4-2',
            '6-3-1',
            '6-3-2',
            '6-4-1',
            '6-4-2']
fastcore.test.test_eq(result, expected)


### Check transition is valid

Here is a method for checking that a given transition is valid.

In [75]:
# | export
def valid_transition(ind1: str,  # The index of the current state, expressed in the form "{strategy_code}-{strategy_code}-{strategy_code}"
                     ind2: str,  # The index of the next state, expressed in the same form as `ind1`
                     ) -> bool:  # True if the transition is valid, false otherwise
    """Check if the transition from ind1->ind2 is valid
    i.e. that only one population undergoes a change in strategy."""
    ind1_tuple = list(map(int, ind1.split("-")))
    ind2_tuple = list(map(int, ind2.split("-")))
    differ = [i1 != i2 for i1, i2 in zip(ind1_tuple, ind2_tuple)]
    valid = sum(differ) == 1
    return valid


**Tests for `valid_transition`**

In [76]:
# | export
fastcore.test.test_eq(valid_transition("1-1-1", "2-1-1"), True)
fastcore.test.test_eq(valid_transition("1-1-1", "2-1-2"), False)
fastcore.test.test_eq(valid_transition("1-1-1", "0-0-0"), False)
fastcore.test.test_eq(valid_transition("1-1-1", "22-1-3"), False)
# Even though possible, self transitions are marked as false since we never compute them directly
fastcore.test.test_eq(valid_transition("1-1-1", "1-1-1"), False)


### A multimethod for computing the likelihoods of different strategy profiles

In [77]:
# |export
@multi
def compute_profile_dist(models):
    """Compute the probability distribution of the relevant profiles."""
    return models.get('profile_dist_rule')


@method(compute_profile_dist)
def compute_profile_dist(models):
    """Compute the probability distribution of the relevant profiles - default."""
    chosen_strategy = models['chosen_strategy']
    profiles = models['profiles_filtered']
    profile_distribution = {}
    for profile in profiles:
        profile_tuple = list(map(int, profile.split("-")))
        possible_players = [f"P{i+1}"
                            for i, strategy in enumerate(profile_tuple[::-1])
                            if strategy == chosen_strategy]
        profile_distribution[profile] = {}
        for chosen_player in possible_players:
            likelihood = sample_profile({**models,
                                         "profile": profile,
                                         "chosen_player": chosen_player})
            profile_distribution[profile][chosen_player] = likelihood
    return profile_distribution


In [78]:
# | export
@method(compute_profile_dist, 'multi-player-symmetric')
def compute_profile_dist(models):
    """Compute the probability distribution of the relevant profiles - we have
    one profile per combination of players and only compute the likelihood for
    the relevant player type."""
    chosen_strategy = models['chosen_strategy']
    profiles = models['profiles_filtered']
    profile_distribution = {}
    counter = collections.Counter()
    visited = collections.defaultdict()
    for profile in profiles:
        profile_tuple = list(map(int, profile.split("-")))
        unique, counts = np.unique(profile_tuple, return_counts=True)
        counter_key = "-".join([f"{u}:{c}" for u, c in zip(unique, counts)])
        counter[counter_key] += 1
        visited[counter_key] = False
    for profile in profiles:
        profile_tuple = list(map(int, profile.split("-")))
        unique, counts = np.unique(profile_tuple, return_counts=True)
        counter_key = "-".join([f"{u}:{c}" for u, c in zip(unique, counts)])
        if visited[counter_key]:
            # If we have seen a profile with the same strategy counts, skip it
            continue
        else:
            visited[counter_key] = True
            possible_players = [f"P{i+1}"
                                for i, strategy in enumerate(profile_tuple[::-1])
                                if strategy == chosen_strategy]
            profile_distribution[profile] = {}
            if len(possible_players) > 0:
                # Player order does not matter
                chosen_player = possible_players[0]
                likelihood = sample_profile({**models,
                                             "profile": profile,
                                             "chosen_player": chosen_player})
                # We must multiply the above likelihood by the number of ways
                # this combination of players can be permuted.
                likelihood *= counter[counter_key] * len(possible_players)
                profile_distribution[profile][chosen_player] = likelihood
    return profile_distribution


#### Tests for `compute_profile_dist`

##### Test 1

I consider a model with 2 sectors, who each have 2 strategies and 10 members,
and play a game with 2 players who can each be from either sector.

We therefore specify the sectors sizes, `Z`, the `sector_strategies` and the `allowed_sectors`

`profiles_filtered` will be all profiles in such a game relevant to a transition
between Sector 1 playing their first strategy and Sector 1 playing their
second strategy. The members of Sector 2 play their first strategy.

I encode these recurrent states as "3-1" and "3-2" respectively.

I also mark "S1" as the `affected_sector`, as well as the `mutant_strategy`
and `current_strategy`. 

`profiles_filtered` then includes: all possible "x-y" where x,y in {1,2,3},
since there will be no "S2" players playing strategy 4.

We also have to specify the `chosen_strategy` to indicate whether we are
interested in the profile likelihoods from the perspective of a mutant player
or a current player.

The tests consider different values of `n_mutants`.

In [79]:
# | export
Z = {"S2": 10, "S1": 10}
sector_strategies = {"S2": [3, 4],
                     "S1": [1, 2]}
allowed_sectors = {"P2": ["S1", "S2"],
                   "P1": ["S1", "S2"]}
n_players = len(allowed_sectors.keys())
n_strategies = [len(strategies) for strategies in sector_strategies]

sector_weights = {}

models = {"Z": Z,
          "sector_strategies": sector_strategies,
          "allowed_sectors": allowed_sectors,
          "n_players": n_players,
          "n_strategies": n_strategies,
          "chosen_strategy": 1,
          "current_strategy": 1,
          "mutant_strategy": 2,
          "affected_sector": "S1",
          "n_mutants": 2,
          #   "sector_weights": sector_weights,
          }

models = thread_macro(models,
                      create_profiles,
                      (assoc, "transition_indices", ["3-1", "3-2"]),
                      apply_profile_filters)
profiles_filtered = ['1-1', '1-2', '1-3',
                     '2-1', '2-2', '2-3',
                     '3-1', '3-2', '3-3']
fastcore.test.test_eq(models["profiles_filtered"], profiles_filtered)

result1 = compute_profile_dist(models)
expected1 = {'1-1': {"P1": 7 / 9 / 2 / 2, "P2": 7 / 9 / 2 / 2},
             '1-2': {"P2": 2 / 9 / 2 / 2},
             '1-3': {"P2": 1 / 2 / 2},
             '2-1': {"P1": 2 / 9 / 2 / 2},
             '2-2': {},
             '2-3': {},
             '3-1': {"P1": 1 / 2 / 2},
             '3-2': {},
             '3-3': {}}
for profile in profiles_filtered:
    fastcore.test.test_eq(result1[profile], expected1[profile])


result1_sum = 0
for likelihoods_by_player in result1.values():
    for likelihood in likelihoods_by_player.values():
        result1_sum += likelihood
fastcore.test.test_eq(result1_sum, 1)


result2 = compute_profile_dist({**models, "chosen_strategy": 2})
expected2 = {'1-1': {},
             '1-2': {"P1": 8 / 9 / 2 / 2},
             '1-3': {},
             '2-1': {"P2": 8 / 9 / 2 / 2},
             '2-2': {"P1": 1 / 9 / 2 / 2, "P2": 1 / 9 / 2 / 2},
             '2-3': {"P2": 1 / 2 / 2},
             '3-1': {},
             '3-2': {"P1": 1 / 2 / 2},
             '3-3': {}}
for profile in profiles_filtered:
    fastcore.test.test_eq(result2[profile], expected2[profile])

result2_sum = 0
for likelihoods_by_player in result1.values():
    for likelihood in likelihoods_by_player.values():
        result2_sum += likelihood
fastcore.test.test_eq(result2_sum, 1)

result3 = compute_profile_dist({**models, "n_mutants": 5})
expected3 = {'1-1': {"P1": 4 / 9 / 2 / 2, "P2": 4 / 9 / 2 / 2},
             '1-2': {"P2": 5 / 9 / 2 / 2},
             '1-3': {"P2": 1 / 2 / 2},
             '2-1': {"P1": 5 / 9 / 2 / 2},
             '2-2': {},
             '2-3': {},
             '3-1': {"P1": 1 / 2 / 2},
             '3-2': {},
             '3-3': {}}
for profile in profiles_filtered:
    fastcore.test.test_eq(result3[profile], expected3[profile])

result3_sum = 0
for likelihoods_by_player in result1.values():
    for likelihood in likelihoods_by_player.values():
        result3_sum += likelihood
fastcore.test.test_eq(result2_sum, 1)


##### Test 2

I next consider the same model but this time each interaction has 5 players.

In such cases, it is desirable to use the multiplayer-symmetric method for
computing the likelihood of the relevant profiles.

As before, we specify the sectors sizes, `Z`, the `sector_strategies` and the `allowed_sectors`

`profiles_filtered` will be all profiles in such a game relevant to a transition
between Sector 1 playing their first strategy and Sector 1 playing their
second strategy. The members of Sector 2 play their first strategy.

I encode these recurrent states as "3-1" and "3-2" respectively.

I also mark "S1" as the `affected_sector`, as well as the `mutant_strategy`
and `current_strategy`. 

`profiles_filtered` then includes: all possible "x-y" where x,y in {1,2,3},
since there will be no "S2" players playing strategy 4.

While we need all of these profiles for this new method, the profiles we
only compute the likelihoods for a subset. There is only one relevant profile
per unique strategy count. The profile chosen is the first such profile when
iterating through `profiles_filtered` (care must be taken to ensure that the
payoffs are computed in a similar way - we will have a method to ensure this).

We also have to specify the `chosen_strategy` to indicate whether we are
interested in the profile likelihoods from the perspective of a mutant player
or a current player.

The tests verifies that the result of this method is the same as if aggregated
the likelihoods using the default method.

In [80]:
# | export
Z = {"S2": 10, "S1": 10}
sector_strategies = {"S2": [3, 4],
                     "S1": [1, 2]}
allowed_sectors = {"P5": ["S1", "S2"],
                   "P4": ["S1", "S2"],
                   "P3": ["S1", "S2"],
                   "P2": ["S1", "S2"],
                   "P1": ["S1", "S2"]}
n_players = len(allowed_sectors.keys())
n_strategies = [len(strategies) for strategies in sector_strategies]

sector_weights = {}

models = {"Z": Z,
          "sector_strategies": sector_strategies,
          "allowed_sectors": allowed_sectors,
          "n_players": n_players,
          "n_strategies": n_strategies,
          "chosen_strategy": 1,
          "current_strategy": 1,
          "mutant_strategy": 2,
          "affected_sector": "S1",
          "n_mutants": 2,
          #   "sector_weights": sector_weights,
          }

models = thread_macro(models,
                      create_profiles,
                      (assoc, "transition_indices", ["3-1", "3-2"]),
                      apply_profile_filters)
profiles_filtered = ['1-1-1-1-1',
                     '1-1-1-1-2',
                     '1-1-1-1-3',
                     '1-1-1-2-2',
                     '1-1-1-2-3',
                     '1-1-1-3-3',
                     '1-1-2-2-2',
                     '1-1-2-2-3',
                     '1-1-2-3-3',
                     '1-1-3-3-3',
                     '1-2-2-2-2',
                     '1-2-2-2-3',
                     '1-2-2-3-3',
                     '1-2-3-3-3',
                     '1-3-3-3-3',
                     '2-2-2-2-2',
                     '2-2-2-2-3',
                     '2-2-2-3-3',
                     '2-2-3-3-3',
                     '2-3-3-3-3',
                     '3-3-3-3-3', ]
# fastcore.test.test_eq(models["profiles_filtered"], profiles_filtered)

result1 = compute_profile_dist({**models,
                                "profile_dist_rule": "multi-player-symmetric"})
result2 = compute_profile_dist(models)

# The expected results take the probability of attaining a given profile when
# sampling all other players from the availabe sectors and player types (top few
# lines), divided by the chance your agent was sampled to be in of the player
# positions (in this case this is always 1/5), multiplied by the number of
# different positions the player could be in for the given profile, and then
# multiplied by the number of permutations of the given (read these numbers
# from left to right on the last line).
expected1 = {'1-1-1-1-1': {'P1': (math.comb(7, 4) / math.comb(9, 4) / 2**4
                                  / 5 * 5 * 1)},
             '1-1-1-1-2': {'P2': (math.comb(7, 3) / math.comb(9, 3) / 2**3
                                  * math.comb(2, 1) / math.comb(6, 1) / 2
                                  / 5 * 4 * 5)},
             '1-1-1-1-3': {'P2': (math.comb(7, 3) / math.comb(9, 3) / 2**3
                                  * (1 / 2)
                                  / 5 * 4 * 5)},
             '1-1-1-2-2': {'P3': (math.comb(7, 2) / math.comb(9, 2) / 2**2
                                  * math.comb(2, 2) / math.comb(7, 2) / 2**2
                                  / 5 * 3 * 10)},
             '1-1-1-2-3': {'P3': (math.comb(7, 2) / math.comb(9, 2) / 2**2
                                  * math.comb(2, 1) / math.comb(7, 1) / 2
                                  * (1 / 2)
                                  / 5 * 3 * 20)},
             '1-1-1-3-3': {'P3': (math.comb(7, 2) / math.comb(9, 2) / 2**2
                                  * (1 / 2)**2
                                  / 5 * 3 * 10)},
             '1-1-2-2-2': {'P4': 0.0},  # Too many mutants so impossible
             '1-1-2-2-3': {'P4': (math.comb(7, 1) / math.comb(9, 1) / 2
                                  * math.comb(2, 2) / math.comb(8, 2) / 2**2
                                  * (1 / 2)
                                  / 5 * 2 * 30)},
             '1-1-2-3-3': {'P4': (math.comb(7, 1) / math.comb(9, 1) / 2
                                  * math.comb(2, 1) / math.comb(8, 1) / 2
                                  * (1 / 2)**2
                                  / 5 * 2 * 30)},
             '1-1-3-3-3': {'P4': (math.comb(7, 1) / math.comb(9, 1) / 2
                                  * (1 / 2)**3
                                  / 5 * 2 * 10)},
             '1-2-2-2-2': {'P5': 0.0},  # Too many mutants so impossible
             '1-2-2-2-3': {'P5': 0.0},  # Too many mutants so impossible
             '1-2-2-3-3': {'P5': (math.comb(2, 2) / math.comb(9, 2) / 2**2
                                  * (1 / 2)**2
                                  / 5 * 1 * 30)},
             '1-2-3-3-3': {'P5': (math.comb(2, 1) / math.comb(9, 1) / 2
                                  * (1 / 2)**3
                                  / 5 * 1 * 20)},
             '1-3-3-3-3': {'P5': ((1 / 2)**4
                                  / 5 * 1 * 5)},
             '2-2-2-2-2': {},  # Chosen strategy not present so impossible
             '2-2-2-2-3': {},  # Chosen strategy not present so impossible
             '2-2-2-3-3': {},  # Chosen strategy not present so impossible
             '2-2-3-3-3': {},  # Chosen strategy not present so impossible
             '2-3-3-3-3': {},  # Chosen strategy not present so impossible
             '3-3-3-3-3': {}}  # Chosen strategy not present so impossible
for profile in profiles_filtered:
    for player in expected1[profile].keys():
        fastcore.test.test_close(result1[profile][player],
                                 expected1[profile][player])


result1_sum = 0
for likelihoods_by_player in result1.values():
    for likelihood in likelihoods_by_player.values():
        result1_sum += likelihood
fastcore.test.test_close(result1_sum, 1)

# The expected results take the probability of attaining a given profile when
# sampling all other players from the availabe sectors and player types (top few
# lines), divided by the chance your agent was sampled to be in of the player
# positions (in this case this is always 1/5).
expected2 = {'1-1-1-1-1': {'P1': (math.comb(7, 4) / math.comb(9, 4) / 2**4
                                  / 5)},
             '1-1-1-1-2': {'P2': (math.comb(7, 3) / math.comb(9, 3) / 2**3
                                  * math.comb(2, 1) / math.comb(6, 1) / 2
                                  / 5)},
             '1-1-1-1-3': {'P2': (math.comb(7, 3) / math.comb(9, 3) / 2**3
                                  * (1 / 2)
                                  / 5)},
             '1-1-1-2-2': {'P3': (math.comb(7, 2) / math.comb(9, 2) / 2**2
                                  * math.comb(2, 2) / math.comb(7, 2) / 2**2
                                  / 5)},
             '1-1-1-2-3': {'P3': (math.comb(7, 2) / math.comb(9, 2) / 2**2
                                  * math.comb(2, 1) / math.comb(7, 1) / 2
                                  * (1 / 2)
                                  / 5)},
             '1-1-1-3-3': {'P3': (math.comb(7, 2) / math.comb(9, 2) / 2**2
                                  * (1 / 2)**2
                                  / 5)},
             '1-1-2-2-2': {'P4': 0.0},  # Too many mutants so impossible
             '1-1-2-2-3': {'P4': (math.comb(7, 1) / math.comb(9, 1) / 2
                                  * math.comb(2, 2) / math.comb(8, 2) / 2**2
                                  * (1 / 2)
                                  / 5)},
             '1-1-2-3-3': {'P4': (math.comb(7, 1) / math.comb(9, 1) / 2
                                  * math.comb(2, 1) / math.comb(8, 1) / 2
                                  * (1 / 2)**2
                                  / 5)},
             '1-1-3-3-3': {'P4': (math.comb(7, 1) / math.comb(9, 1) / 2
                                  * (1 / 2)**3
                                  / 5)},
             '1-2-2-2-2': {'P5': 0.0},  # Too many mutants so impossible
             '1-2-2-2-3': {'P5': 0.0},  # Too many mutants so impossible
             '1-2-2-3-3': {'P5': (math.comb(2, 2) / math.comb(9, 2) / 2**2
                                  * (1 / 2)**2
                                  / 5)},
             '1-2-3-3-3': {'P5': (math.comb(2, 1) / math.comb(9, 1) / 2
                                  * (1 / 2)**3
                                  / 5)},
             '1-3-3-3-3': {'P5': ((1 / 2)**4
                                  / 5)},
             '2-2-2-2-2': {},  # Chosen strategy not present so impossible
             '2-2-2-2-3': {},  # Chosen strategy not present so impossible
             '2-2-2-3-3': {},  # Chosen strategy not present so impossible
             '2-2-3-3-3': {},  # Chosen strategy not present so impossible
             '2-3-3-3-3': {},  # Chosen strategy not present so impossible
             '3-3-3-3-3': {}}  # Chosen strategy not present so impossible
for profile in profiles_filtered:
    for player in expected1[profile].keys():
        fastcore.test.test_close(result2[profile][player],
                                 expected2[profile][player])


### A multimethod for computing each strategy's success

In [81]:
# |export
@multi
def compute_success(models):
    """Compute the success of the two strategies under consideration."""
    return models.get('compute_success_rule', "cheap")


@method(compute_success)
def compute_success(models):
    """Compute the success of the two strategies under consideration for each
    number of k mutants implied by the transition."""
    models = apply_profile_filters(models)
    ind1, ind2 = models['transition_indices']
    sector_strategies = models['sector_strategies']
    Z = models['Z']
    payoffs = models['payoffs']

    ind1_tuple = list(map(int, ind1.split("-")))
    ind2_tuple = list(map(int, ind2.split("-")))
    differ = [i1 != i2 for i1, i2 in zip(ind1_tuple, ind2_tuple)]
    affected_sector = f"S{np.argmax(differ[::-1]) + 1}"
    current_strategy = ind1_tuple[np.argmax(differ)]
    mutant_strategy = ind2_tuple[np.argmax(differ)]

    ΠA = []
    ΠB = []
    for n_mutants in range(1, Z[affected_sector]):
        dist1 = compute_profile_dist({**models,
                                      'chosen_strategy': current_strategy,
                                      'current_strategy': current_strategy,
                                      'mutant_strategy': mutant_strategy,
                                      'affected_sector': affected_sector,
                                      'n_mutants': n_mutants})
        dist2 = compute_profile_dist({**models,
                                      'chosen_strategy': mutant_strategy,
                                      'current_strategy': current_strategy,
                                      'mutant_strategy': mutant_strategy,
                                      'affected_sector': affected_sector,
                                      'n_mutants': n_mutants})
        success_A = 0
        for profile, player_map in dist1.items():
            for player, likelihood in player_map.items():
                success_A += payoffs[profile][player] * likelihood
        ΠA.append(success_A)
        success_B = 0
        for profile, player_map in dist2.items():
            for player, likelihood in player_map.items():
                success_B += payoffs[profile][player] * likelihood
        ΠB.append(success_B)
    return ΠA, ΠB

@method(compute_success, "cheap")
def compute_success(models):
    """Compute the success of the two strategies under consideration for each
    number of k mutants implied by the transition."""
    models = apply_profile_filters(models)
    ind1, ind2 = models['transition_indices']
    sector_strategies = models['sector_strategies']
    Z = models['Z']
    payoffs = models['payoffs']

    ind1_tuple = list(map(int, ind1.split("-")))
    ind2_tuple = list(map(int, ind2.split("-")))
    differ = [i1 != i2 for i1, i2 in zip(ind1_tuple, ind2_tuple)]
    affected_sector = f"S{np.argmax(differ[::-1]) + 1}"
    current_strategy = ind1_tuple[np.argmax(differ)]
    mutant_strategy = ind2_tuple[np.argmax(differ)]

    ΠA = []
    ΠB = []
    for n_mutants in range(1, Z[affected_sector]):
        dist1 = compute_profile_dist({**models,
                                      'chosen_strategy': current_strategy,
                                      'current_strategy': current_strategy,
                                      'mutant_strategy': mutant_strategy,
                                      'affected_sector': affected_sector,
                                      'n_mutants': n_mutants})
        dist2 = compute_profile_dist({**models,
                                      'chosen_strategy': mutant_strategy,
                                      'current_strategy': current_strategy,
                                      'mutant_strategy': mutant_strategy,
                                      'affected_sector': affected_sector,
                                      'n_mutants': n_mutants})
        payoffsA = []
        likelihoodsA = []
        for profile, player_map in dist1.items():
            for player, likelihood in player_map.items():
                payoffsA.append(payoffs[profile][player])
                likelihoodsA.append(likelihood)
        ΠA.append(np.dot(np.array(payoffsA).T, likelihoodsA))
        payoffsB = []
        likelihoodsB = []
        for profile, player_map in dist2.items():
            for player, likelihood in player_map.items():
                payoffsB.append(payoffs[profile][player]) 
                likelihoodsB.append(likelihood)
        ΠB.append(np.dot(np.array(payoffsB).T, likelihoodsB))
    return ΠA, ΠB


@method(compute_success, "functional")
def compute_success(models):
    """Compute the success of the two strategies under consideration for each
    number of k mutants implied by the transition."""
    models = apply_profile_filters(models)
    ind1, ind2 = models['transition_indices']
    sector_strategies = models['sector_strategies']
    Z = models['Z']
    payoffs_function = models['payoffs_function']

    ind1_tuple = list(map(int, ind1.split("-")))
    ind2_tuple = list(map(int, ind2.split("-")))
    differ = [i1 != i2 for i1, i2 in zip(ind1_tuple, ind2_tuple)]
    affected_sector = f"S{np.argmax(differ[::-1]) + 1}"
    current_strategy = ind1_tuple[np.argmax(differ)]
    mutant_strategy = ind2_tuple[np.argmax(differ)]

    ΠA = []
    ΠB = []
    for n_mutants in range(1, Z[affected_sector]):
        dist1 = compute_profile_dist({**models,
                                      'chosen_strategy': current_strategy,
                                      'current_strategy': current_strategy,
                                      'mutant_strategy': mutant_strategy,
                                      'affected_sector': affected_sector,
                                      'n_mutants': n_mutants})
        dist2 = compute_profile_dist({**models,
                                      'chosen_strategy': mutant_strategy,
                                      'current_strategy': current_strategy,
                                      'mutant_strategy': mutant_strategy,
                                      'affected_sector': affected_sector,
                                      'n_mutants': n_mutants})
        # Record the strategy counts in the population implied by the number of
        # mutants so that our payoff function can make use of this information
        strategy_counts = {str(strategy): Z[sector]
                           for strategy in ind1_tuple
                           for sector in sector_strategies.keys()
                           if strategy in sector_strategies[sector]}
        strategy_counts = {**strategy_counts,
                           str(current_strategy): Z[affected_sector] - n_mutants,
                           str(mutant_strategy): n_mutants}
        population_state = models.get("population_state", {})
        population_state = {**population_state,
                            "strategy_counts": strategy_counts}
        # Compute the payoffs for each strategy in each possible profile
        # and the likelihood of that profile occuring.
        payoffsA = []
        likelihoodsA = []
        for profile, player_map in dist1.items():
            for player, likelihood in player_map.items():
                payoffsA.append(payoffs_function({**models,
                                                  "population_state": population_state,
                                                  "profile": profile,
                                                  "player": player}))
                likelihoodsA.append(likelihood)
        ΠA.append(np.dot(np.array(payoffsA).T, likelihoodsA))
        payoffsB = []
        likelihoodsB = []
        for profile, player_map in dist2.items():
            for player, likelihood in player_map.items():
                payoffsB.append(payoffs_function({**models,
                                                  "population_state": population_state,
                                                  "profile": profile,
                                                  "player": player}))
                likelihoodsB.append(likelihood)
        ΠB.append(np.dot(np.array(payoffsB).T, likelihoodsB))
    return ΠA, ΠB


#### Tests for `compute_success`

##### Test 1

For simplicity, I assume payoffs are always 1. Naturally, the success of
each strategy will also be 1.

I consider a model with 2 sectors, who each have 2 strategies and 10 members,
and play a game with 2 players who can each be from either sector.

We therefore specify the sectors sizes, `Z`, the `sector_strategies` and the `allowed_sectors`

`profiles_filtered` will be all profiles in such a game relevant to a transition
between Sector 1 playing their first strategy and Sector 1 playing their
second strategy. The members of Sector 2 play their first strategy.

I encode these recurrent states as "3-1" and "3-2" respectively.

I also mark "S1" as the `affected_sector`, as well as the `mutant_strategy`
and `current_strategy`. 

`profiles_filtered` then includes: all possible "x-y" where x,y in {1,2,3},
since there will be no "S2" players playing strategy 4.

We also have to specify the `chosen_strategy` to indicate whether we are
interested in the profile likelihoods from the perspective of a mutant player
or a current player.

The tests consider different values of `n_mutants`.

In [82]:
# | export
Z = {"S2": 10, "S1": 10}
sector_strategies = {"S2": [3, 4],
                     "S1": [1, 2]}
allowed_sectors = {"P2": ["S1", "S2"],
                   "P1": ["S1", "S2"]}
n_players = len(allowed_sectors.keys())
n_strategies = [len(strategies) for strategies in sector_strategies]

sector_weights = {}

models = {"Z": Z,
          "sector_strategies": sector_strategies,
          "allowed_sectors": allowed_sectors,
          "n_players": n_players,
          "n_strategies": n_strategies,
          #   "sector_weights": sector_weights,
          }

models = thread_macro(models,
                      create_profiles,
                      (assoc, "transition_indices", ["3-1", "3-2"]),
                      apply_profile_filters)
profiles_filtered = ['1-1', '1-2', '1-3',
                     '2-1', '2-2', '2-3',
                     '3-1', '3-2', '3-3']
fastcore.test.test_eq(models["profiles_filtered"], profiles_filtered)

payoffs = {}
for profile in profiles_filtered:
    payoffs[profile] = {}
    for player in allowed_sectors.keys():
        payoffs[profile][player] = 1
models = {**models, "payoffs": payoffs}

result1 = compute_success(models)
expected1 = [[1 for _ in range(9)],
             [1 for _ in range(9)], ]
for result, expected in zip(result1[0], expected1[0]):
    fastcore.test.test_close(result, expected)
for result, expected in zip(result1[1], expected1[1]):
    fastcore.test.test_close(result, expected)


##### Test 2

I also consider more general payoffs where many entries are unique. However,
the order of strategies does not matter. 

When the order does not matter, it is very easy to calulate
an expression for what the success of each strategy should be for the 2 player
interactions considered in this example.

In [83]:
# | export
Z = {"S2": 10, "S1": 10}
sector_strategies = {"S2": [3, 4],
                     "S1": [1, 2]}
allowed_sectors = {"P2": ["S1", "S2"],
                   "P1": ["S1", "S2"]}
n_players = len(allowed_sectors.keys())
n_strategies = [len(strategies) for strategies in sector_strategies]

sector_weights = {}

models = {"Z": Z,
          "sector_strategies": sector_strategies,
          "allowed_sectors": allowed_sectors,
          "n_players": n_players,
          "n_strategies": n_strategies,
          #   "sector_weights": sector_weights,
          }

models = thread_macro(models,
                      create_profiles,
                      (assoc, "transition_indices", ["3-1", "3-2"]),
                      apply_profile_filters)
profiles_filtered = ['1-1', '1-2', '1-3',
                     '2-1', '2-2', '2-3',
                     '3-1', '3-2', '3-3']
fastcore.test.test_eq(models["profiles_filtered"], profiles_filtered)

payoffs = {
    '1-1': {'P1': 2, 'P2': 2},
    '1-2': {'P1': 4, 'P2': 0},
    '1-3': {'P1': 3, 'P2': 3},
    '1-4': {'P1': 6, 'P2': 0},
    '2-1': {'P1': 0, 'P2': 4},
    '2-2': {'P1': 1, 'P2': 1},
    '2-3': {'P1': 0, 'P2': 6},
    '2-4': {'P1': 2, 'P2': 1},
    '3-1': {'P1': 3, 'P2': 3},
    '3-2': {'P1': 6, 'P2': 0},
    '3-3': {'P1': 4, 'P2': 4},
    '3-4': {'P1': 8, 'P2': 0},
    '4-1': {'P1': 0, 'P2': 6},
    '4-2': {'P1': 1, 'P2': 2},
    '4-3': {'P1': 0, 'P2': 8},
    '4-4': {'P1': 2, 'P2': 2},
}

result2 = compute_success({**models, "payoffs": payoffs})
# 50% chance of facing strategy 3 no matter if we look at player 1 or 2.
# Otherwise, we have a 0.5 * (n_mutants / z_s1) chance of facing strategy 2
# and a 0.5 * ((z_s1 - n_mutants) / z_s1) chance of facing strategy 1.
expected2 = [[((0.5
                * k / (Z["S1"] - 1)
                * payoffs['2-1']['P1'])
               + (0.5
                  * (Z["S1"] - 1 - k) / (Z["S1"] - 1)
                  * payoffs['1-1']['P1'])
               + 0.5 * payoffs['3-1']['P1'])
              for k in range(1, 10)],
             [((0.5
                * (k - 1) / (Z["S1"] - 1)
                * payoffs['2-2']['P1'])
               + (0.5
                  * (Z["S1"] - k) / (Z["S1"] - 1)
                  * payoffs['1-2']['P1'])
               + 0.5 * payoffs['3-2']['P1'])
              for k in range(1, 10)], ]
for result, expected in zip(result2[0], expected2[0]):
    fastcore.test.test_close(result, expected)
for result, expected in zip(result2[1], expected2[1]):
    fastcore.test.test_close(result, expected)


##### Test 3

We can find a similar expression for when the order does matter.

In [84]:
# | export
Z = {"S2": 10, "S1": 10}
sector_strategies = {"S2": [3, 4],
                     "S1": [1, 2]}
allowed_sectors = {"P2": ["S1", "S2"],
                   "P1": ["S1", "S2"]}
n_players = len(allowed_sectors.keys())
n_strategies = [len(strategies) for strategies in sector_strategies]

sector_weights = {}

models = {"Z": Z,
          "sector_strategies": sector_strategies,
          "allowed_sectors": allowed_sectors,
          "n_players": n_players,
          "n_strategies": n_strategies,
          #   "sector_weights": sector_weights,
          }

models = thread_macro(models,
                      create_profiles,
                      (assoc, "transition_indices", ["3-1", "3-2"]),
                      apply_profile_filters)
profiles_filtered = ['1-1', '1-2', '1-3',
                     '2-1', '2-2', '2-3',
                     '3-1', '3-2', '3-3']
fastcore.test.test_eq(models["profiles_filtered"], profiles_filtered)

payoffs = {}
for profile in profiles_filtered:
    payoffs[profile] = {}
    for player in allowed_sectors.keys():
        payoffs[profile][player] = np.random.beta(1, 1)
models = {**models, "payoffs": payoffs}

result2 = compute_success({**models, "payoffs": payoffs})
# 50% chance of facing strategy 3 no matter if we look at player 1 or 2.
# Otherwise, we have a 0.5 * (n_mutants / z_s1) chance of facing strategy 2
# and a 0.5 * ((z_s1 - n_mutants) / z_s1) chance of facing strategy 1.
# For each of these there is a 50% of being player 1 or player 2.
expected2 = [[((0.5
                * k / (Z["S1"] - 1)
                * (payoffs['2-1']['P1']
                   + payoffs['1-2']['P2']) / 2
                )
               + (0.5
                  * (Z["S1"] - 1 - k) / (Z["S1"] - 1)
                  * (payoffs['1-1']['P1']
                     + payoffs['1-1']['P2']) / 2
                  )
               + (0.5
                  * (payoffs['3-1']['P1']
                     + payoffs['1-3']['P2']) / 2
                  )
               )
              for k in range(1, 10)],
             [((0.5
                * (k - 1) / (Z["S1"] - 1)
                * (payoffs['2-2']['P1']
                   + payoffs['2-2']['P2']) / 2
                )
               + (0.5
                  * (Z["S1"] - k) / (Z["S1"] - 1)
                  * (payoffs['1-2']['P1']
                     + payoffs['2-1']['P2']) / 2
                  )
               + (0.5
                  * (payoffs['3-2']['P1']
                     + payoffs['2-3']['P2']) / 2
                  )
               )
              for k in range(1, 10)], ]
for result, expected in zip(result2[0], expected2[0]):
    fastcore.test.test_close(result, expected)
for result, expected in zip(result2[1], expected2[1]):
    fastcore.test.test_close(result, expected)


### Infer number of models

In [85]:
# | export
def vals(d: dict):
    "Return the values of a dictionary."
    return d.values()


In [86]:
# | export
def infer_n_models(models):
    "Infer the number of models from the model payoffs."
    try:
        payoffs = models.get('payoffs')
        payoff_vector = thread_macro(payoffs,
                                     vals,
                                     iter,
                                     next,
                                     vals,
                                     iter,
                                     next)
        n_models = (1
                    if isinstance(payoff_vector, (float, int))
                    else len(payoff_vector))
    except:
        raise ValueError("""Unable to infer `n_models`.
                         `payoffs` is structured incorrectly""")
    return n_models


#### Tests for `infer_n_models`

In [87]:
profiles_filtered = ['1-1', '1-2', '1-3',
                     '2-1', '2-2', '2-3',
                     '3-1', '3-2', '3-3']
payoffs = {}
for profile in profiles_filtered:
    payoffs[profile] = {}
    for player in allowed_sectors.keys():
        payoffs[profile][player] = np.repeat(np.random.beta(1, 1), 5)
infer_n_models({'payoffs': payoffs})


5

### An algorithm for building the transition matrix

We now have the methods we need for building the transition matrix for a game with an arbitrary number of sectors and various interactions between those sectors.

The algorithm goes as follows:

For each possible transition
  - Check if the transition is valid
    - If self-transition, assign the value 1
    - If not, skip
  - Filter profiles down to only those which are relevant
  - Compute average payoffs using the payoffs and those profiles
  - Compute the fixation rate
  - Compute transition probabilities

In [88]:
# |export
# | hide
@method(build_transition_matrix, 'multiple-populations')
def build_transition_matrix(models: dict  # A dictionary that contains the parameters in `ModelTypeEGTMultiple`
                            ):
    """Build a transition matrix between all monomorphic states
    when there are multiple populations.    
    """
    β = models['β']
    n_models = models.get('n_models',
                          infer_n_models(models))
    S = models.get('recurrent_states',
                   create_recurrent_states(models))
    M = np.zeros((n_models, len(S), len(S)))
    for row_ind in range(M.shape[-1]):
        M[:, row_ind, row_ind] += 1
    transition_inds = [(i, j)
                       for i in range(len(S))
                       for j in range(len(S))]
    for row_ind, col_ind in transition_inds:
        current_state, new_state = S[row_ind], S[col_ind]
        if current_state == new_state:
            continue
        if not valid_transition(current_state, new_state):
            continue
        ΠA, ΠB = compute_success(assoc(models,
                                       "transition_indices",
                                       [current_state, new_state]))
        # TODO: Clean up assymetric beta code below
        ind1_tuple = list(map(int, current_state.split("-")))
        ind2_tuple = list(map(int, new_state.split("-")))
        differ = [i1 != i2 for i1, i2 in zip(ind1_tuple, ind2_tuple)]
        affected_sector = f"S{np.argmax(differ[::-1]) + 1}"
        if isinstance(β, dict):
            ρ = fixation_rate_stable(ΠA, ΠB, β[affected_sector])
        else:
            ρ = fixation_rate_stable(ΠA, ΠB, β)
        n_mutations = sum(valid_transition(current_state, s_alt)
                          for s_alt in S)
        M[:, row_ind, col_ind] = ρ / n_mutations
        M[:, row_ind, row_ind] -= ρ / n_mutations
    return {**models, 'transition_matrix': M,
            'recurrent_states': S,
            'n_models': n_models}


In [89]:
# | echo:false
# Note: I can't directly call show doc on the expression `build_transition_matrix.__multi__['multiple-populations']`
# as it throws an error when I generate documentation using nbdev_preview.
# I can fix it with:
# x = build_transition_matrix.__multi__['multiple-populations']
# show_doc(x)
# or using my Clojure-style threading macro:
thread_macro(build_transition_matrix.__multi__['multiple-populations'],
             show_doc)


---

[source](https://github.com/PaoloBova/gh-pages-example/blob/main/gh_pages_example/methods.py#L1612){target="_blank" style="float:right; font-size:smaller"}

### build_transition_matrix

>      build_transition_matrix (models:dict)

Build a transition matrix between all monomorphic states
when there are multiple populations.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| models | dict | A dictionary that contains the parameters in `ModelTypeEGTMultiple` |

#### Tests and examples for `build_transition_matrix` for multiple populations

##### Test 1

I first consider a random payoff matrix and check that the
resulting transition matrices have rows which sum to 1.

As we are working with multiple populations, our `models` variable needs to declare this with the `dispatch-type` key.

In [90]:
# | export
β = 1
Z = {"S1": 50, "S2": 50, "S3": 50}
allowed_sectors = {"P3": ["S3"],
                   "P2": ["S2"],
                   "P1": ["S1"]}
sector_strategies = {"S3": [4, 5],
                     "S2": [2, 3],
                     "S1": [0, 1]}

models = {"dispatch-type": "multiple-populations",
          "β": β,
          "Z": Z,
          "allowed_sectors": allowed_sectors,
          "sector_strategies": sector_strategies,
          }

models = thread_macro(models,
                      create_profiles,
                      apply_profile_filters)

payoffs = {}
for profile in models['profiles_filtered']:
    payoffs[profile] = {}
    for player in models['allowed_sectors'].keys():
        payoffs[profile][player] = np.array([np.random.beta(1, 1)
                                             for _ in range(2)])
models = {**models, "payoffs": payoffs}

result = build_transition_matrix(models)['transition_matrix']
result_sums = np.sum(result, axis=-1)
fastcore.test.test_close(result_sums, 1)


##### Test 2

Here is an example of how to build a transition matrix when we have 2 sectors, playing 2 strategies each, and
engage in 2 player interactions with players being from either sector.

In the limit of small mutation rates, the system spends almost all its time in states where each population plays one strategy. Moreover, only a mutant for one population has the opportunity to fixate in that population. This means we only need to consider transitions where the strategy played by one population has changed. Transitions where both populations would have to change strategy occur with probability 0.

In an earlier test, we showed that one could compute the success of each
strategy in a pairwise contest analytically, even in the presence of other
sectors with fixed populations. 

Once we have the successes for each `n_mutant` value, we can rely on our
well-tested `fixation_rate` function to help compute our expected transition
probabilities.

In [91]:
# | export
# | hide
@multi
def compute_success_analytical(models):
    return models.get('success_analytical_derivation')


@method(compute_success_analytical, '2sector2strategy2player')
def compute_success_analytical(models):
    """Compute the success of each strategy involved in a transition for
    a game with two players who can each be from one of two sectors. Each
    sector uses two strategies."""

    # 50% chance of facing fixed strategy no matter if we look at mutant or
    # current strategy
    # Otherwise, we have a 0.5 * (n_mutants / z_s1) chance of facing strategy 2
    # and a 0.5 * ((z_s1 - n_mutants) / z_s1) chance of facing strategy 1.
    # For each of theses there is a 50% of being player 1 or player 2.
    transition_indices = models['transition_indices']
    Z = models['Z']
    payoffs = models['payoffs']
    ind1, ind2 = transition_indices
    ind1_tuple = list(map(int, ind1.split("-")))
    ind2_tuple = list(map(int, ind2.split("-")))
    differ = [i1 != i2 for i1, i2 in zip(ind1_tuple, ind2_tuple)]
    affected_sector = f"S{np.argmax(differ[::-1]) + 1}"
    current_strategy = ind1_tuple[np.argmax(differ)]
    mutant_strategy = ind2_tuple[np.argmax(differ)]
    # We only support two sectors here, so we know the other value must be the
    # fixed sector.
    fs = ind1_tuple[np.argmin(differ)]
    cs = current_strategy
    ms = mutant_strategy
    SA, SB = [], []
    z = Z[affected_sector]
    for k in range(1, z):
        successA = ((0.5
                    * k / (z - 1)
                    * (payoffs[f"{ms}-{cs}"]['P1']
                        + payoffs[f"{cs}-{ms}"]['P2']) / 2
                     )
                    + (0.5
                        * (z - 1 - k) / (z - 1)
                        * (payoffs[f"{cs}-{cs}"]['P1']
                           + payoffs[f"{cs}-{cs}"]['P2']) / 2
                       )
                    + (0.5
                        * (payoffs[f"{fs}-{cs}"]['P1']
                           + payoffs[f"{cs}-{fs}"]['P2']) / 2
                       )
                    )
        SA.append(successA)
        successB = ((0.5
                     * (k - 1) / (z - 1)
                     * (payoffs[f"{ms}-{ms}"]['P1']
                         + payoffs[f"{ms}-{ms}"]['P2']) / 2
                     )
                    + (0.5
                       * (z - k) / (z - 1)
                       * (payoffs[f"{cs}-{ms}"]['P1']
                           + payoffs[f"{ms}-{cs}"]['P2']) / 2
                       )
                    + (0.5
                       * (payoffs[f"{fs}-{ms}"]['P1']
                          + payoffs[f"{ms}-{fs}"]['P2']) / 2
                       )
                    )
        SB.append(successB)
    return SA, SB


In [92]:
# | export
Z = {"S2": 10, "S1": 10}
β = 1
sector_strategies = {"S2": [3, 4],
                     "S1": [1, 2]}
allowed_sectors = {"P2": ["S1", "S2"],
                   "P1": ["S1", "S2"]}
sector_weights = {}

models = {"dispatch-type": "multiple-populations",
          "β": β,
          "Z": Z,
          "allowed_sectors": allowed_sectors,
          "sector_strategies": sector_strategies,
          #   "sector_weights": sector_weights,
          }

models = thread_macro(models,
                      create_profiles,
                      apply_profile_filters)

payoffs = {}
for profile in models['profiles_filtered']:
    payoffs[profile] = {}
    for player in allowed_sectors.keys():
        payoffs[profile][player] = np.array([np.random.beta(1, 1)
                                             for _ in range(2)])
models = {**models, "payoffs": payoffs}

result = thread_macro({**models, "payoffs": payoffs},
                      build_transition_matrix,
                      (get, "transition_matrix"))

# Generate expected results
S = ['3-1', '3-2', '4-1', '4-2']
matrix_inds = [(i, j)
               for i in range(len(S))
               for j in range(len(S))]
n_models = infer_n_models(models)
M = np.zeros((n_models, len(S), len(S)))
for i in range(M.shape[-1]):
    M[:, i, i] += 1
for i, j in matrix_inds:
    transition_indices = [S[i], S[j]]
    current_state, new_state = transition_indices
    if current_state == new_state:
        continue
    if not valid_transition(current_state, new_state):
        continue
    transition_indices = [current_state, new_state]
    expected_model = {"transition_indices": transition_indices,
                      "Z": Z,
                      "payoffs": payoffs,
                      "success_analytical_derivation": "2sector2strategy2player"}
    PA, PB = compute_success_analytical(expected_model)
    rho = fixation_rate_stable(PA, PB, β)
    M[:, i, j] += rho / 2
    M[:, i, i] -= rho / 2

# Test that expected and actual results are close
for row_ind in range(M.shape[2]):
    for col_ind in range(M.shape[1]):
        for model_ind in range(M.shape[0]):
            if not fastcore.test.is_close(M[model_ind, col_ind, row_ind],
                                          result[model_ind, col_ind, row_ind]):
                print("indices: ", model_ind, col_ind, row_ind)
            fastcore.test.test_close(M[model_ind, col_ind, row_ind],
                                     result[model_ind, col_ind, row_ind])


##### Test 3

I also test that we can match closely the transition matrix values for a
game with 2 sectors, 2 strategies, and 3 players, where the first 2
players are from one sector and the third player is always from the remaining
sector.

In [93]:
#| export
@method(compute_success_analytical, '2sector2strategy3player')
def compute_success_analytical(models):
    """Compute the success of each strategy involved in a transition for
    a game with three players where two players are from one sector and the
    third is from another. Each sector uses two strategies."""

    # 100% chance of third player playing either their chosen strategy if they
    # are the chosen agent, or otherwise 100% chance of third player playing
    # their current strategy.
    # If chosen agent in first sector, we have a (n_mutants / z_s1) chance of
    # facing strategy 2 and a ((z_s1 - n_mutants) / z_s1) chance of facing
    # strategy 1.
    # For each of these, there is a 50% chance of being player 1 or player 2.
    # If chosen player is the third player, then they have a 100% of facing
    # against the fixed strategy played by the other two players.
    transition_indices = models['transition_indices']
    Z = models['Z']
    payoffs = models['payoffs']
    allowed_sectors = models['allowed_sectors']
    ind1, ind2 = transition_indices
    ind1_tuple = list(map(int, ind1.split("-")))
    ind2_tuple = list(map(int, ind2.split("-")))
    differ = [i1 != i2 for i1, i2 in zip(ind1_tuple, ind2_tuple)]
    affected_sector = f"S{np.argmax(differ[::-1]) + 1}"
    current_strategy = ind1_tuple[np.argmax(differ)]
    mutant_strategy = ind2_tuple[np.argmax(differ)]
    # We only support two sectors here, so we know the other value must be the
    # fixed sector.
    fs = ind1_tuple[np.argmin(differ)]
    cs = current_strategy
    ms = mutant_strategy
    SA, SB = [], []
    z = Z[affected_sector]
    relevant_players = [p
                        for p in allowed_sectors.keys()
                        if affected_sector in allowed_sectors[p]]
    if len(relevant_players) == 1:
        for k in range(1, z):
            SA.append(payoffs[f"{cs}-{fs}-{fs}"]['P3'])
            SB.append(payoffs[f"{ms}-{fs}-{fs}"]['P3'])
    elif len(relevant_players) == 2:
        for k in range(1, z):
            successA = ((k / (z - 1)
                        * (payoffs[f"{fs}-{ms}-{cs}"]['P1']
                            + payoffs[f"{fs}-{cs}-{ms}"]['P2']) / 2
                         )
                        + ((z - 1 - k) / (z - 1)
                            * (payoffs[f"{fs}-{cs}-{cs}"]['P1']
                               + payoffs[f"{fs}-{cs}-{cs}"]['P2']) / 2
                           )
                        )
            SA.append(successA)
            successB = (((k - 1) / (z - 1)
                        * (payoffs[f"{fs}-{ms}-{ms}"]['P1']
                            + payoffs[f"{fs}-{ms}-{ms}"]['P2']) / 2
                         )
                        + ((z - k) / (z - 1)
                        * (payoffs[f"{fs}-{cs}-{ms}"]['P1']
                            + payoffs[f"{fs}-{ms}-{cs}"]['P2']) / 2
                           )
                        )
            SB.append(successB)
    return SA, SB


In [94]:
# | export
Z = {"S2": 10, "S1": 10}
β = 1
sector_strategies = {"S2": [3, 4],
                     "S1": [1, 2]}
allowed_sectors = {"P3": ["S2"],
                   "P2": ["S1"],
                   "P1": ["S1"]}
sector_weights = {}

models = {"dispatch-type": "multiple-populations",
          "β": β,
          "Z": Z,
          "allowed_sectors": allowed_sectors,
          "sector_strategies": sector_strategies,
          }

models = thread_macro(models,
                      create_profiles,
                      apply_profile_filters)

payoffs = {}
for profile in models['profiles_filtered']:
    payoffs[profile] = {}
    for player in allowed_sectors.keys():
        payoffs[profile][player] = np.array([np.random.beta(1, 1)
                                             for _ in range(2)])
models = {**models, "payoffs": payoffs}

result = thread_macro({**models, "payoffs": payoffs},
                      build_transition_matrix,
                      (get, "transition_matrix"))

# Generate expected results
S = ['3-1', '3-2', '4-1', '4-2']
matrix_inds = [(i, j)
               for i in range(len(S))
               for j in range(len(S))]
n_models = infer_n_models(models)
M = np.zeros((n_models, len(S), len(S)))
for i in range(M.shape[-1]):
    M[:, i, i] += 1
for i, j in matrix_inds:
    transition_indices = [S[i], S[j]]
    current_state, new_state = transition_indices
    if current_state == new_state:
        continue
    if not valid_transition(current_state, new_state):
        continue
    transition_indices = [current_state, new_state]
    expected_model = {"transition_indices": transition_indices,
                      "Z": Z,
                      "payoffs": payoffs,
                      "allowed_sectors": allowed_sectors,
                      "success_analytical_derivation": "2sector2strategy3player"}
    PA, PB = compute_success_analytical(expected_model)
    rho = fixation_rate_stable(PA, PB, β)
    M[:, i, j] += rho / 2
    M[:, i, i] -= rho / 2

# Test that expected and actual results are close
for row_ind in range(M.shape[2]):
    for col_ind in range(M.shape[1]):
        for model_ind in range(M.shape[0]):
            if not fastcore.test.is_close(M[model_ind, col_ind, row_ind],
                                          result[model_ind, col_ind, row_ind]):
                print("indices: ", model_ind, col_ind, row_ind)
            fastcore.test.test_close(M[model_ind, col_ind, row_ind],
                                     result[model_ind, col_ind, row_ind])

In [95]:
# | export
Z = {"S2": 10, "S1": 10}
β = 1
sector_strategies = {"S2": [4, 5],
                     "S1": [1, 2]}
allowed_sectors = {"P3": ["S2"],
                   "P2": ["S1"],
                   "P1": ["S1"]}
sector_weights = {}

models = {"dispatch-type": "multiple-populations",
          "β": β,
          "Z": Z,
          "allowed_sectors": allowed_sectors,
          "sector_strategies": sector_strategies,
          }

models = thread_macro(models,
                      create_profiles,
                      apply_profile_filters)

payoffs = {}
for profile in models['profiles_filtered']:
    payoffs[profile] = {}
    for player in allowed_sectors.keys():
        payoffs[profile][player] = np.array([np.random.beta(1, 1)
                                             for _ in range(2)])
models = {**models, "payoffs": payoffs}

result = thread_macro({**models, "payoffs": payoffs},
                      build_transition_matrix,
                      (get, "transition_matrix"))

# Generate expected results
S = ['4-1', '4-2', '5-1', '5-2']
matrix_inds = [(i, j)
               for i in range(len(S))
               for j in range(len(S))]
n_models = infer_n_models(models)
M = np.zeros((n_models, len(S), len(S)))
for i in range(M.shape[-1]):
    M[:, i, i] += 1
for i, j in matrix_inds:
    transition_indices = [S[i], S[j]]
    current_state, new_state = transition_indices
    if current_state == new_state:
        continue
    if not valid_transition(current_state, new_state):
        continue
    transition_indices = [current_state, new_state]
    expected_model = {"transition_indices": transition_indices,
                      "Z": Z,
                      "payoffs": payoffs,
                      "allowed_sectors": allowed_sectors,
                      "success_analytical_derivation": "2sector2strategy3player"}
    PA, PB = compute_success_analytical(expected_model)
    rho = fixation_rate_stable(PA, PB, β)
    M[:, i, j] += rho / 2
    M[:, i, i] -= rho / 2

# Test that expected and actual results are close
for row_ind in range(M.shape[2]):
    for col_ind in range(M.shape[1]):
        for model_ind in range(M.shape[0]):
            if not fastcore.test.is_close(M[model_ind, col_ind, row_ind],
                                          result[model_ind, col_ind, row_ind]):
                print("indices: ", model_ind, col_ind, row_ind)
            fastcore.test.test_close(M[model_ind, col_ind, row_ind],
                                     result[model_ind, col_ind, row_ind])

#### Test 4

This example comes from a paper by Encarnacao et al. 2016.

They have a 3 sector model and report fixation probabilities for a particular scenario. Can we replicate it?

In [96]:
# | export
def payoffs_encanacao_2016(models):
    names = ['b_r', 'b_s', 'c_s', 'c_t', 'σ']
    b_r, b_s, c_s, c_t, σ = [models[k] for k in names]
    payoffs = {}
    n_players = 3
    n_sectors = 3
    n_strategies_per_sector = [2, 2, 2]
    n_strategies_total = 6
    # All players are from the first sector, playing that sector's first strategy
    index_min = "0-0-0"
    # All players are from the third sector, playing that sector's second strategy
    index_max = "5-5-5"
    # Note: The seperator makes it easy to represent games where n_strategies_total >= 10.

    # It is also trivial to define a vector which maps these indexes to strategy profiles
    # As sector order is fixed we could neglect to mention suscripts for each sector
    strategy_names = ["D", "C", "D", "C", "D", "C"]

    zero = np.zeros(b_r.shape[0])
    # As in the main text
    payoffs["C-C-C"] = {"P3": b_r-2*c_s,
                        "P2": σ+b_s-c_t,
                        "P1": σ+b_s}
    payoffs["C-C-D"] = {"P3": -c_s,
                        "P2": b_s-c_t,
                        "P1": zero}
    payoffs["C-D-C"] = {"P3": b_r-c_s,
                        "P2": zero,
                        "P1": b_s}
    payoffs["C-D-D"] = {"P3": zero,
                        "P2": σ,
                        "P1": σ}
    payoffs["D-C-C"] = {"P3": zero,
                        "P2": σ-c_t,
                        "P1": σ}
    payoffs["D-C-D"] = {"P3": zero,
                        "P2": -c_t,
                        "P1": zero}
    payoffs["D-D-C"] = {"P3": zero,
                        "P2": zero,
                        "P1": zero}
    payoffs["D-D-D"] = {"P3": zero,
                        "P2": σ,
                        "P1": σ}

    # The following indexes capture all strategy profiles where each player is fixed to a unique sector
    # (and player order does not matter, so we need only consider one ordering of sectors).
    payoffs["4-2-0"] = payoffs["D-D-D"]
    payoffs["4-2-1"] = payoffs["D-D-C"]
    payoffs["4-3-0"] = payoffs["D-C-D"]
    payoffs["4-3-1"] = payoffs["D-C-C"]
    payoffs["5-2-0"] = payoffs["C-D-D"]
    payoffs["5-2-1"] = payoffs["C-D-C"]
    payoffs["5-3-0"] = payoffs["C-C-D"]
    payoffs["5-3-1"] = payoffs["C-C-C"]
    return {**models, "payoffs": payoffs}


In [97]:
# | export
Z = {"S3": 50, "S2": 50, "S1": 50}
β = 0.08
sector_strategies = {"S3": [4, 5],
                     "S2": [2, 3],
                     "S1": [0, 1], }
allowed_sectors = {"P3": ["S3"],
                   "P2": ["S2"],
                   "P1": ["S1"], }
sector_weights = {}

models = {"dispatch-type": "multiple-populations",
          "β": β,
          "Z": Z,
          "allowed_sectors": allowed_sectors,
          "sector_strategies": sector_strategies,
          #   "sector_weights": sector_weights,
          'b_r': np.array([0.8]),
          'b_s': np.array([0.4]),
          'c_s': np.array([0.15]),
          'c_t': np.array([0.15]),
          'σ': np.array([0.2]),
          }

models = thread_macro(models,
                      create_profiles,
                      apply_profile_filters,
                      payoffs_encanacao_2016,
                      build_transition_matrix,
                      find_ergodic_distribution,
                      )
models['transition_matrix']
models['ergodic'], np.sum(models['ergodic'][0, 4:])


  ergodic = np.array(V.transpose(0, 2, 1)[y], dtype=float)


(array([[0.07429517, 0.02588305, 0.03549838, 0.06857849, 0.05535651,
         0.11993023, 0.04737337, 0.57308479]]),
 0.7957449065894959)

In [98]:
models['transition_matrix'] * 50 * 2


array([[[98.58484409,  0.43868208,  0.30980716,  0.        ,
          0.66666667,  0.        ,  0.        ,  0.        ],
        [ 0.96080835, 96.4805528 ,  0.        ,  0.73408493,
          0.        ,  1.82455392,  0.        ,  0.        ],
        [ 1.22164065,  0.        , 97.32807231,  0.96080835,
          0.        ,  0.        ,  0.48947868,  0.        ],
        [ 0.        ,  0.60342679,  0.43868208, 97.44630095,
          0.        ,  0.        ,  0.        ,  1.51159018],
        [ 0.66666667,  0.        ,  0.        ,  0.        ,
         97.63844005,  0.96080835,  0.73408493,  0.        ],
        [ 0.        ,  0.14274942,  0.        ,  0.        ,
          0.43868208, 98.00649791,  0.        ,  1.41207058],
        [ 0.        ,  0.        ,  0.88124961,  0.        ,
          0.60342679,  0.        , 96.79725701,  1.71806658],
        [ 0.        ,  0.        ,  0.        ,  0.21292021,
          0.        ,  0.24196967,  0.1635232 , 99.38158692]]])

The ergodic distribution of states looks remarkably similar to the results reported in the paper.

Unfortunately, there are no exact results available to compare against. However, the direction of
change and relative sizes of the bars in the bar chat are very similar. The sum of the last 4 very
closely matches the bar for total public cooperators which sits level to 0.8 on the chart.

# References

https://datagy.io/python-defaultdict/

In [99]:
# | hide
import nbdev
nbdev.nbdev_export()
