-
-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
localint.py
201 lines (162 loc) · 7.33 KB
/
localint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import numpy as np
import numbers
from scipy import sparse
from ..util import check_random_state, rng_integers
from .random import random_pure_actions
from .normal_form_game import Player
class LocalInteraction:
"""
Class representing the local interaction model.
Parameters
----------
payoff_matrix : array_like(float, ndim=2)
The payoff matrix of the symmetric two-player game played in each
interaction.
adj_matrix : array_like(float, ndim=2)
The adjacency matrix of the network. Non constant weights and asymmetry
in interactions are allowed.
Attributes
----------
players : list(Player)
The list consisting of all players with the given payoff matrix.
adj_matrix : scipy.sparse.csr.csr_matrix(float, ndim=2)
See Parameters.
N : scalar(int)
The number of players.
num_actions : scalar(int)
The number of actions available to each player.
"""
def __init__(self, payoff_matrix, adj_matrix):
self.adj_matrix = sparse.csr_matrix(adj_matrix)
M, N = self.adj_matrix.shape
if N != M:
raise ValueError('adjacency matrix must be square')
self.N = N
A = np.asarray(payoff_matrix)
if A.ndim != 2 or A.shape[0] != A.shape[1]:
raise ValueError('payoff matrix must be square')
self.num_actions = A.shape[0]
self.players = [Player(A) for i in range(self.N)]
self.tie_breaking = 'smallest'
def _play(self, actions, player_ind, tie_breaking, tol, random_state):
actions_matrix = sparse.csr_matrix(
(np.ones(self.N, dtype=int), actions, np.arange(self.N+1)),
shape=(self.N, self.num_actions))
opponent_act_dict = (self.adj_matrix[player_ind] @
actions_matrix).toarray()
for k, i in enumerate(player_ind):
actions[i] = self.players[i].best_response(
opponent_act_dict[k, :], tie_breaking=tie_breaking,
tol=tol, random_state=random_state
)
return actions
def play(self, revision='simultaneous', actions=None,
player_ind_seq=None, num_reps=1, **options):
"""
Return a new action profile which is updated by playing the game
`num_reps` times.
Parameters
----------
revision : str, optional(default='simultaneous')
The way to revise the action profile. If `simulataneous`, all
players' actions will be updated simultaneously. If `asynchronous`,
only designated players' actions will be updated. str in
{'simultaneous', 'asynchronous'}.
actions : tuple(int) or list(int), optional(default=None)
The action profile in the first period. If None, selected randomly.
player_ind_seq : array_like(int), optional(default=None)
The index of players who take actions. If None, all players take
actions.
num_reps : scalar(int), optional(default=1)
The number of iterations.
**options : Keyword arguments passed to the best response method and
other methods.
Returns
-------
tuple(int)
The action profile after iterations.
"""
tie_breaking = options.get('tie_breaking', self.tie_breaking)
tol = options.get('tol', None)
random_state = check_random_state(options.get('random_state', None))
if actions is None:
nums_actions = tuple([self.num_actions] * self.N)
actions = random_pure_actions(nums_actions, random_state)
if revision == 'simultaneous':
player_ind_seq = [None] * num_reps
elif revision == 'asynchronous':
if player_ind_seq is None:
random_state = check_random_state(random_state)
player_ind_seq = rng_integers(random_state, self.N,
size=num_reps)
elif isinstance(player_ind_seq, numbers.Integral):
player_ind_seq = [player_ind_seq]
else:
raise ValueError(
"revision must be `simultaneous` or `asynchronous`"
)
actions = list(actions)
for t, player_ind in enumerate(player_ind_seq):
if player_ind is None:
player_ind = list(range(self.N))
elif isinstance(player_ind, numbers.Integral):
player_ind = [player_ind]
actions = self._play(actions, player_ind, tie_breaking,
tol, random_state)
return tuple(actions)
def time_series(self, ts_length, revision='simultaneous',
actions=None, player_ind_seq=None, **options):
"""
Return an array representing time series of each player's actions.
Parameters
----------
ts_length : scalar(int)
The number of iterations.
revision : {'simultaneous', 'asynchronous'}(default='simultaneous')
The way to revise the action profile. If `simulataneous`, all
players' actions will be updated simultaneously. If `asynchronous`,
only designated players' actions will be updated. str in
{'simultaneous', 'asynchronous'}.
actions : tuple(int), optional(default=None)
The action profile in the first period. If None, selected randomly.
player_ind_seq : array_like, optional(default=None)
The sequence of `player_ind`(see `play` Parameters) when the
revision is 'asynchronous'. If None, selected randomly.
**options : Keyword arguments passed to the best response method and
other methods.
Returns
-------
Array_like(int)
The array representing time series of each player's actions.
"""
tie_breaking = options.get('tie_breaking', self.tie_breaking)
tol = options.get('tol', None)
random_state = check_random_state(options.get('random_state', None))
if actions is None:
nums_actions = tuple([self.num_actions] * self.N)
actions = random_pure_actions(nums_actions, random_state)
if revision == 'simultaneous':
player_ind_seq = [None] * ts_length
elif revision == 'asynchronous':
if player_ind_seq is None:
random_state = check_random_state(random_state)
player_ind_seq = rng_integers(random_state, self.N,
size=ts_length)
else:
raise ValueError(
"revision must be `simultaneous` or `asynchronous`"
)
out = np.empty((ts_length, self.N), dtype=int)
for i in range(self.N):
out[0, i] = actions[i]
for t in range(ts_length-1):
random_state = check_random_state(random_state)
actions = self.play(revision=revision,
actions=actions,
player_ind_seq=player_ind_seq[t],
num_reps=1,
tie_breaking=tie_breaking,
tol=tol, random_state=random_state)
for i in range(self.N):
out[t+1, i] = actions[i]
return out