-
-
Notifications
You must be signed in to change notification settings - Fork 57
/
complete_tree_exploration_for_MP_bandits.py
executable file
路1257 lines (1085 loc) 路 59.7 KB
/
complete_tree_exploration_for_MP_bandits.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#! /usr/bin/env python3
# -*- coding: utf-8; mode: python -*-
r""" Experimental code to perform complete tree exploration for Multi-Player bandits.
Algorithms:
- Support Selfish 0-greedy, UCB, and klUCB in 3 different variants.
- Support also RhoRand, RandTopM and MCTopM, even though they are *not* memory-less, by using another state representation (inlining the memory of each player, eg the ranks for RhoRand).
Features:
- For the means of each arm, :math:`\mu_1, \dots, \mu_K`, this script can use exact formal computations with sympy, or fractions with Fraction, or float number.
- The graph can contain all nodes from root to leafs, or only leafs (with summed probabilities), and possibly only the absorbing nodes are showed.
- Support export of the tree to a GraphViz dot graph, and can save it to SVG/PNG and LaTeX (with Tikz) and PDF etc.
- By default, the root is highlighted in green and the absorbing nodes are in red.
.. warning:: I still have to fix these issues:
- TODO : right now, it is not so efficient, could it be improved? I don't think I can do anything in a smarter way, in pure Python.
Requirements:
- 'sympy' module to use formal means :math:`\mu_1, \dots, \mu_K` instead of numbers,
- 'numpy' module for computations on indexes (e.g., ``np.where``),
- 'graphviz' module to generate the graph and save it,
- 'dot2tex' module to generate nice LaTeX (with Tikz) graph and save it to PDF.
.. note::
To use the 'dot2tex' module, only Python2 is supported.
However, I maintain an unpublished port of 'dot2tex' for Python3, see
[here](https://github.com/Naereen/dot2tex), that you can download, and install
manually (sudo python3 setup.py install) to have 'dot2tex' for Python3 also.
About:
- *Date:* 16/09/2017.
- *Author:* Lilian Besson, (C) 2017
- *Licence:* MIT Licence (http://lbesson.mit-license.org).
"""
from __future__ import print_function, division # Python 2 compatibility if needed
__author__ = "Lilian Besson"
__version__ = "0.7"
from collections import Counter, deque
from fractions import Fraction
from itertools import product
from os import getenv, chdir, getcwd
from os.path import join as os_path_join
from os.path import dirname, basename
from re import sub as re_sub
from textwrap import wrap
import subprocess
try:
import numpy as np
except ImportError as e:
print("Warning: the 'numpy' module was not found...\nInstall it with 'sudo pip2/pip3 install numpy' or your system packet manager (eg. 'sudo apt install python3-numpy')") # XXX
raise e
try:
import sympy
except ImportError:
print("Warning: the 'sympy' module was not found...\nSymbolic computations cannot be performed without sympy.\nInstall it with 'sudo pip2/pip3 install sympy' or your system packet manager (eg. 'sudo apt install python3-sympy')") # XXX
try:
from graphviz import Digraph
except ImportError:
print("Warning: the 'graphviz' module was not found...\nTrees cannot be saved or displayed without graphviz.\nInstall it with 'sudo pip2/pip3 install graphviz'") # XXX
try:
# if version_info.major < 3:
from dot2tex import dot2tex
except ImportError:
print("Warning: the 'dot2tex' module was not found...\nTrees cannot be saved to LaTeX and PDF formats.\nInstall it with 'sudo pip2 install dot2tex' (require Python 2)\nOr install it from https://github.com/Naereen/dot2tex for Python 3.") # XXX
oo = float('+inf') #: Shortcut for float('+inf').
from sys import version_info
if version_info.major < 3: # Python 2 compatibility if needed
input = raw_input
import codecs
def open(filename, mode): # https://docs.python.org/3/library/codecs.html#standard-encodings
return codecs.open(filename, mode=mode, encoding='utf_8')
PLOT_DIR = os_path_join("plots", "trees") #: Directory for the plots
from Arms.usenumba import jit
def tupleit1(anarray):
"""Convert a non-hashable 1D numpy array to a hashable tuple."""
return tuple(list(anarray))
def tupleit2(anarray):
"""Convert a non-hashable 2D numpy array to a hashable tuple-of-tuples."""
return tuple([tuple(r) for r in list(anarray)])
def prod(iterator):
"""Product of the values in this iterator."""
p = 1
for v in iterator:
p *= v
return p
WIDTH = 200 #: Default value for the ``width`` parameter for :func:`wraptext` and :func:`wraplatex`.
def wraptext(text, width=WIDTH):
""" Wrap the text, using ``textwrap`` module, and ``width``."""
return '\n'.join(wrap(text, width=width))
def mybool(s):
return False if s == 'False' else bool(s)
ONLYLEAFS = True #: By default, aim at the most concise graph representation by only showing the leafs.
ONLYLEAFS = mybool(getenv('ONLYLEAFS', ONLYLEAFS))
ONLYABSORBING = False #: By default, don't aim at the most concise graph representation by only showing the absorbing leafs.
ONLYABSORBING = mybool(getenv('ONLYABSORBING', ONLYABSORBING))
CONCISE = True #: By default, only show :math:`\tilde{S}` and :math:`N` in the graph representations, not all the 4 vectors.
CONCISE = mybool(getenv('CONCISE', CONCISE))
FULLHASH = not CONCISE #: Use only Stilde, N for hashing the states.
FULLHASH = mybool(getenv('FULLHASH', FULLHASH))
# FORMAT = "pdf" #: Format used to save the graphs.
FORMAT = "svg" #: Format used to save the graphs.
FORMAT = getenv("FORMAT", FORMAT)
# --- Implement the bandit algorithms in a purely functional and memory-less flavor
@jit
def FixedArm(j, state):
"""Fake player j that always targets at arm j."""
return [j]
@jit
def UniformExploration(j, state):
"""Fake player j that always targets all arms."""
return list(np.arange(state.K))
@jit
def ConstantRank(j, state, decision, collision):
"""Constant rank no matter what."""
return [state.memories[j]]
@jit
def choices_from_indexes(indexes):
"""For deterministic index policies, if more than one index is maximum, return the list of positions attaining this maximum (ties), or only one position."""
return np.where(indexes == np.max(indexes))[0]
# --- Selfish 0-greedy variants
@jit
def Selfish_0Greedy_U(j, state):
"""Selfish policy + 0-Greedy index + U feedback."""
indexes = state.S[j] / state.N[j]
indexes[state.N[j] < 1] = +oo
return choices_from_indexes(indexes)
@jit
def Selfish_0Greedy_Utilde(j, state):
"""Selfish policy + 0-Greedy index + Utilde feedback."""
indexes = state.Stilde[j] / state.N[j]
indexes[state.N[j] < 1] = +oo
return choices_from_indexes(indexes)
@jit
def Selfish_0Greedy_Ubar(j, state):
"""Selfish policy + 0-Greedy index + Ubar feedback."""
indexes = (state.Ntilde[j] / state.N[j]) * (state.S[j] / state.N[j])
indexes[state.N[j] < 1] = +oo
return choices_from_indexes(indexes)
default_policy = Selfish_0Greedy_Ubar
# --- Selfish UCB variants
alpha = 0.5
@jit
def Selfish_UCB_U(j, state):
"""Selfish policy + UCB_0.5 index + U feedback."""
indexes = (state.S[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
return choices_from_indexes(indexes)
@jit
def Selfish_UCB(j, state):
"""Selfish policy + UCB_0.5 index + Utilde feedback."""
indexes = (state.Stilde[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
return choices_from_indexes(indexes)
Selfish_UCB_Utilde = Selfish_UCB
@jit
def Selfish_UCB_Ubar(j, state):
"""Selfish policy + UCB_0.5 index + Ubar feedback."""
indexes = (state.Ntilde[j] / state.N[j]) * (state.S[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
return choices_from_indexes(indexes)
# default_policy = Selfish_UCB_Ubar
# --- Selfish kl UCB variants
from Policies import klucbBern
tolerance = 1e-6
klucb = np.vectorize(klucbBern)
c = 1
@jit
def Selfish_KLUCB_U(j, state):
"""Selfish policy + Bernoulli KL-UCB index + U feedback."""
indexes = klucb(state.S[j] / state.N[j], c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
return choices_from_indexes(indexes)
@jit
def Selfish_KLUCB(j, state):
"""Selfish policy + Bernoulli KL-UCB index + Utilde feedback."""
indexes = klucb(state.Stilde[j] / state.N[j], c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
return choices_from_indexes(indexes)
Selfish_KLUCB_Utilde = Selfish_KLUCB
@jit
def Selfish_KLUCB_Ubar(j, state):
"""Selfish policy + Bernoulli KL-UCB index + Ubar feedback."""
indexes = klucb((state.Ntilde[j] / state.N[j]) * (state.S[j] / state.N[j]), c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
return choices_from_indexes(indexes)
# default_policy = Selfish_KLUCB_Ubar
# --- RhoRand UCB variants
@jit
def choices_from_indexes_with_rank(indexes, rank=1):
"""For deterministic index policies, if more than one index is maximum, return the list of positions attaining the rank-th largest index (with more than one if ties, or only one position)."""
return np.where(indexes == np.sort(indexes)[-rank])[0]
alpha = 0.5
@jit
def RhoRand_UCB_U(j, state):
"""RhoRand policy + UCB_0.5 index + U feedback."""
rank = state.memories[j]
indexes = (state.S[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
return choices_from_indexes_with_rank(indexes, rank=rank)
@jit
def RhoRand_UCB_Utilde(j, state):
"""RhoRand policy + UCB_0.5 index + Utilde feedback."""
rank = state.memories[j]
indexes = (state.Stilde[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
return choices_from_indexes_with_rank(indexes, rank=rank)
@jit
def RhoRand_UCB_Ubar(j, state):
"""RhoRand policy + UCB_0.5 index + Ubar feedback."""
rank = state.memories[j]
indexes = (state.Ntilde[j] / state.N[j]) * (state.S[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
return choices_from_indexes_with_rank(indexes, rank=rank)
@jit
def RhoRand_KLUCB_U(j, state):
"""RhoRand policy + Bernoulli KL-UCB index + U feedback."""
rank = state.memories[j]
indexes = klucb(state.S[j] / state.N[j], c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
return choices_from_indexes_with_rank(indexes, rank=rank)
@jit
def RhoRand_KLUCB_Utilde(j, state):
"""RhoRand policy + Bernoulli KL-UCB index + Utilde feedback."""
rank = state.memories[j]
indexes = klucb(state.Stilde[j] / state.N[j], c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
return choices_from_indexes_with_rank(indexes, rank=rank)
@jit
def RhoRand_KLUCB_Ubar(j, state):
"""RhoRand policy + Bernoulli KL-UCB index + Ubar feedback."""
rank = state.memories[j]
indexes = klucb((state.Ntilde[j] / state.N[j]) * (state.S[j] / state.N[j]), c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
return choices_from_indexes_with_rank(indexes, rank=rank)
# So we need tow functions: one takes the decision, one updates the rank after all the decisions are taken
@jit
def RandomNewRank(j, state, decision, collision):
"""RhoRand chooses a new uniform rank in {1,..,M} in case of collision, or keep the same."""
if collision: # new random rank
return list(np.arange(1, 1 + state.M))
else: # keep the same rank
return [state.memories[j]]
default_policy, default_update_memory = RhoRand_UCB_U, RandomNewRank
# default_policy, default_update_memory = RhoRand_KLUCB_U, RandomNewRank
# --- RandTopM, MCTopM variants
@jit
def RandTopM_UCB_U(j, state, collision=False):
"""RandTopM policy + UCB_0.5 index + U feedback."""
chosen_arm = state.memories[j]
indexes = (state.S[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def RandTopM_UCB_Utilde(j, state, collision=False):
"""RandTopM policy + UCB_0.5 index + Utilde feedback."""
chosen_arm = state.memories[j]
indexes = (state.Stilde[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def RandTopM_UCB_Ubar(j, state, collision=False):
"""RandTopM policy + UCB_0.5 index + Ubar feedback."""
chosen_arm = state.memories[j]
indexes = (state.Ntilde[j] / state.N[j]) * (state.S[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def RandTopM_KLUCB_U(j, state, collision=False):
"""RandTopM policy + Bernoulli KL-UCB index + U feedback."""
chosen_arm = state.memories[j]
indexes = klucb(state.S[j] / state.N[j], c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def RandTopM_KLUCB_Utilde(j, state, collision=False):
"""RandTopM policy + Bernoulli KL-UCB index + Utilde feedback."""
chosen_arm = state.memories[j]
indexes = klucb(state.Stilde[j] / state.N[j], c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def RandTopM_KLUCB_Ubar(j, state, collision=False):
"""RandTopM policy + Bernoulli KL-UCB index + Ubar feedback."""
chosen_arm = state.memories[j]
indexes = klucb((state.Ntilde[j] / state.N[j]) * (state.S[j] / state.N[j]), c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def RandTopM_RandomNewChosenArm(j, state, decision, collision):
"""RandTopM chooses a new arm after a collision or if the chosen arm lies outside of its estimatedBestArms set, uniformly from the set of estimated M best arms, or keep the same."""
player = state.players[j]
return player(j, state, collision=collision) if player.__defaults__ else player(j, state)
# default_policy, default_update_memory = RandTopM_UCB_U, RandTopM_RandomNewChosenArm
# --- MCTopM variants
@jit
def write_to_tuple(this_tuple, index, value):
"""Tuple cannot be written, this hack fixes that."""
this_list = list(this_tuple)
this_list[index] = value
return tuple(this_list)
@jit
def MCTopM_UCB_U(j, state, collision=False):
"""MCTopM policy + UCB_0.5 index + U feedback."""
if not isinstance(state.memories[j], tuple): # if no sitted information yet
state.memories = write_to_tuple(state.memories, j, (-1, False))
assert isinstance(state.memories[j], tuple)
chosen_arm, sitted = state.memories[j]
indexes = (state.S[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def MCTopM_UCB_Utilde(j, state, collision=False):
"""MCTopM policy + UCB_0.5 index + Utilde feedback."""
if not isinstance(state.memories[j], tuple): # if no sitted information yet
state.memories = write_to_tuple(state.memories, j, (-1, False))
assert isinstance(state.memories[j], tuple)
chosen_arm, sitted = state.memories[j]
indexes = (state.Stilde[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def MCTopM_UCB_Ubar(j, state, collision=False):
"""MCTopM policy + UCB_0.5 index + Ubar feedback."""
if not isinstance(state.memories[j], tuple): # if no sitted information yet
state.memories = write_to_tuple(state.memories, j, (-1, False))
assert isinstance(state.memories[j], tuple)
chosen_arm, sitted = state.memories[j]
indexes = (state.Ntilde[j] / state.N[j]) * (state.S[j] / state.N[j]) + np.sqrt(alpha * np.log(state.t) / state.N[j])
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def MCTopM_KLUCB_U(j, state, collision=False):
"""MCTopM policy + Bernoulli KL-UCB index + U feedback."""
if not isinstance(state.memories[j], tuple): # if no sitted information yet
state.memories = write_to_tuple(state.memories, j, (-1, False))
assert isinstance(state.memories[j], tuple)
chosen_arm, sitted = state.memories[j]
indexes = klucb(state.S[j] / state.N[j], c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def MCTopM_KLUCB_Utilde(j, state, collision=False):
"""MCTopM policy + Bernoulli KL-UCB index + Utilde feedback."""
if not isinstance(state.memories[j], tuple): # if no sitted information yet
state.memories = write_to_tuple(state.memories, j, (-1, False))
assert isinstance(state.memories[j], tuple)
chosen_arm, sitted = state.memories[j]
indexes = klucb(state.Stilde[j] / state.N[j], c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def MCTopM_KLUCB_Ubar(j, state, collision=False):
"""MCTopM policy + Bernoulli KL-UCB index + Ubar feedback."""
if not isinstance(state.memories[j], tuple): # if no sitted information yet
state.memories = write_to_tuple(state.memories, j, (-1, False))
assert isinstance(state.memories[j], tuple)
chosen_arm, sitted = state.memories[j]
indexes = klucb((state.Ntilde[j] / state.N[j]) * (state.S[j] / state.N[j]), c * np.log(state.t) / state.N[j], tolerance)
indexes[state.N[j] < 1] = +oo
estimatedBestArms = np.argsort(indexes)[-state.M:]
if collision or chosen_arm not in estimatedBestArms:
return estimatedBestArms
else:
return [chosen_arm]
@jit
def MCTopM_RandomNewChosenArm(j, state, decision, collision):
"""RandTopMC chooses a new arm after if the chosen arm lies outside of its estimatedBestArms set, uniformly from the set of estimated M best arms, or keep the same."""
player = state.players[j]
chosen_arm, sitted = state.memories[j]
if not sitted:
if collision: # new arm from estimatedBestArms
chosen_arms = player(j, state, collision=collision) if player.__defaults__ else player(j, state)
return list(zip(chosen_arms, [False] * len(chosen_arms)))
else: # sitted, for now
return [(decision, True)]
else:
# sitted but the chair changed ==> not sitted
return [(chosen_arm, chosen_arm == decision)]
# default_policy, default_update_memory = MCTopM_UCB_U, MCTopM_RandomNewChosenArm
# --- Generate vector of formal means mu_1,...,mu_K
def symbol_means(K):
"""Better to work directly with symbols and instantiate the results *after*."""
return sympy.var(['mu_{}'.format(i) for i in range(1, K + 1)])
def random_uniform_means(K):
"""If needed, generate an array of K (numerical) uniform means in [0, 1]."""
return np.random.rand(K)
def uniform_means(nbArms=3, delta=0.1, lower=0., amplitude=1.):
"""Return a list of means of arms, well spaced:
- in [lower, lower + amplitude],
- sorted in increasing order,
- starting from lower + amplitude * delta, up to lower + amplitude * (1 - delta),
- and there is nbArms arms.
>>> np.array(uniform_means(2, 0.1))
array([ 0.1, 0.9])
>>> np.array(uniform_means(3, 0.1))
array([ 0.1, 0.5, 0.9])
>>> np.array(uniform_means(9, 1 / (1. + 9)))
array([ 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])
"""
assert nbArms >= 1, "Error: 'nbArms' = {} has to be >= 1.".format(nbArms) # DEBUG
assert amplitude > 0, "Error: 'amplitude' = {:.3g} has to be > 0.".format(amplitude) # DEBUG
assert 0. < delta < 1., "Error: 'delta' = {:.3g} has to be in (0, 1).".format(delta) # DEBUG
mus = lower + amplitude * np.linspace(delta, 1 - delta, nbArms)
return sorted(list(mus))
# --- Transform probabilities to float, expr, str
def proba2float(proba, values=None, K=None, names=None):
"""Replace mu_k by a numerical value and evaluation the formula."""
if hasattr(proba, "evalf"):
if values is None and K is not None:
values = uniform_means(nbArms=K)
if names is None:
K = len(values)
names = symbol_means(K)
return proba.evalf(subs=dict(zip(names, values)))
elif isinstance(proba, Fraction):
return float(proba)
else: # a bit of str rewriting
return proba
def simplify(proba):
"""Try to simplify the expression of the probability."""
if hasattr(proba, "simplify"):
return proba.simplify().factor()
else:
return proba
def proba2str(proba, latex=False, html_in_var_names=False):
"""Pretty print a proba, either a number, a Fraction, or a sympy expression."""
if isinstance(proba, float):
str_proba = "{:.3g}".format(proba)
elif isinstance(proba, Fraction):
str_proba = str(proba)
else: # a bit of str rewriting
str_proba = str(simplify(proba))
if latex:
str_proba = re_sub(r'\*\*([0-9]+)', r'^{\1}', str_proba)
elif html_in_var_names:
str_proba = re_sub(r'\*\*([0-9]+)', r'<SUP>\1</SUP>', str_proba)
else:
str_proba = str_proba.replace('**', '^')
str_proba = str_proba.replace('*', '')
str_proba = re_sub(r'\(mu_([0-9]+) - 1\)\(mu_([0-9]+) - 1\)', r'(1-mu_\1)(1-mu_\2)', str_proba)
str_proba = re_sub(r'-mu_([0-9]+) \+ 1', r'1-mu_\1', str_proba)
str_proba = re_sub(r'-(.*)\(mu_([0-9]+) - 1\)', r'\1(1-mu_\2)', str_proba)
str_proba = re_sub(r'-\(mu_([0-9]+) - 1\)', r'(1-mu_\1)', str_proba)
if latex: # replace mu_12 by mu_{12}
str_proba = re_sub(r'_([0-9]+)', r'_{\1}', str_proba)
str_proba = re_sub(r'mu_', r'\mu_', str_proba)
str_proba = '$' + str_proba + '$'
str_proba = str_proba.replace('\\', '\\\\')
elif html_in_var_names: # replace mu_12 by mu<sub>12</sub>
str_proba = re_sub(r'_([0-9]+)', r'<SUB>\1</SUB>', str_proba)
elif version_info.major < 3: # to avoid dealing with unicode for Python2...
str_proba = re_sub(r'mu_', r'm', str_proba)
else:
str_proba = re_sub(r'mu_', r'碌', str_proba)
return str_proba
# --- Transform .tex to .pdf
def tex2pdf(filename):
"""Naive call to command line pdflatex, twice."""
dir1 = getcwd()
dir2, base = dirname(filename), basename(filename)
print("Now compiling it to PDF with 'pdflatex {} && pdflatex {}' ...".format(base, base))
log, gz, aux = base.replace('.tex', '.log'), base.replace('.tex', '.synctex.gz'), base.replace('.tex', '.aux')
chdir(dir2) # go in the plots/trees/ subdir
if subprocess.call(["pdflatex", "-halt-on-error", base], stdout=open("/dev/null", 'w')) >= 0:
subprocess.call(["pdflatex", "-halt-on-error", base], stdout=open("/dev/null", 'w'))
subprocess.call(["mv", "-f", log, gz, aux, "/tmp/"])
else:
subprocess.call(["pdflatex", "-halt-on-error", base])
chdir(dir1) # go back
# --- Data representation'
class State(object):
"""Not space-efficient representation of a state in the system we model.
- S, Stilde, N, Ntilde: are arrays of size (M, K),
- depth, t, M, K: integers, to avoid recomputing them,
- mus: the problem parameters (only for Bernoulli arms),
- players: is a list of algorithms,
- probas: list of transition probabilities,
- children: list of all possible next states (transitions).
"""
def __init__(self, S, Stilde, N, Ntilde, mus, players, depth=0):
"""Create a new state. Arrays S, Stilde, N, Ntilde are *copied* to avoid modify previous values!"""
self.S = np.copy(S) #: sensing feedback
self.Stilde = np.copy(Stilde) #: number of sensing trials
self.N = np.copy(N) #: number of succesful transmissions
self.Ntilde = np.copy(Ntilde) #: number of trials without collisions
self.mus = mus # XXX OK memory efficient: only a pointer to the (never modified) list
self.players = players # XXX OK memory efficient: only a pointer to the (never modified) list
# New arguments
self.depth = depth #: current depth of the exploration tree
self.t = np.sum(N[0]) #: current time step. Simply = sum(N[0]) = sum(N[i]) for all player i, but easier to compute it once and store it
assert np.shape(S) == np.shape(Stilde) == np.shape(N) == np.shape(Ntilde), "Error: difference in shapes of S, Stilde, N, Ntilde."
self.M = min(np.shape(S)) #: number of players
assert len(players) == self.M, "Error: 'players' list is not of size M ..." # DEBUG
self.K = max(np.shape(S)) #: number of arms (channels)
assert len(mus) == self.K, "Error: 'mus' list is not of size K ..." # DEBUG
self.children = [] #: list of next state, representing all the possible transitions
self.probas = [] #: probabilities of transitions
# --- Utility
def __str__(self, concise=CONCISE):
if concise:
return " State : M = {}, K = {} and t = {}, depth = {}.\n{} =: Stilde\n{} =: N\n".format(self.M, self.K, self.t, self.depth, self.Stilde, self.N)
else:
return " State : M = {}, K = {} and t = {}, depth = {}.\n{} =: S\n{} =: Stilde\n{} =: N\n{} =: Ntilde\n".format(self.M, self.K, self.t, self.depth, self.S, self.Stilde, self.N, self.Ntilde)
def to_node(self, concise=CONCISE):
"""Print the state as a small string to be attached to a GraphViz node."""
if concise:
return "[[" + "], [".join(",".join("{:.3g}/{}".format(st, n) for st, n in zip(st2, n2)) for st2, n2 in zip(self.Stilde, self.N)) + "]]"
else:
return "[[" + "], [".join(",".join("{:.3g}:{:.3g}/{}:{}".format(s, st, n, nt) for s, st, n, nt in zip(s2, st2, n2, nt2)) for s2, st2, n2, nt2 in zip(self.S, self.Stilde, self.N, self.Ntilde)) + "]]"
def to_dot(self,
title="", name="", comment="",
latex=False, html_in_var_names=False, ext=FORMAT,
onlyleafs=ONLYLEAFS, onlyabsorbing=ONLYABSORBING, concise=CONCISE):
r"""Convert the state to a .dot graph, using GraphViz. See http://graphviz.readthedocs.io/ for more details.
- onlyleafs: only print the root and the leafs, to see a concise representation of the tree.
- onlyabsorbing: only print the absorbing leafs, to see a really concise representation of the tree.
- concise: weather to use the short representation of states (using :math:`\tilde{S}` and :math:`N`) or the long one (using the 4 variables).
- html_in_var_names: experimental use of ``<SUB>..</SUB>`` and ``<SUP>..</SUP>`` in the label for the tree.
- latex: experimental use of ``_{..}`` and ``^{..}`` in the label for the tree, to use with dot2tex.
"""
dot = Digraph(name=name, comment=comment, format=ext)
print("\nCreating a dot graph from the tree...")
dot.attr(overlap="false")
if title: dot.attr(label=wraptext(title))
node_number = 0
if onlyleafs:
root_name, root = "0", self
dot.node(root_name, root.to_node(concise=concise), color="green")
complete_probas, leafs = root.get_unique_leafs()
if len(leafs) > 256:
raise ValueError("Useless to save a tree with more than 256 leafs, the resulting image will be too large to be viewed.") # DEBUG
for proba, leaf in zip(complete_probas, leafs):
# add a UNIQUE identifier for each node: easy, just do a breath-first search, and use numbers from 0 to big-integer-that-is-computed on the fly
node_number += 1
leaf_name = str(node_number)
if leaf.is_absorbing():
dot.node(leaf_name, leaf.to_node(concise=concise), color="red")
dot.edge(root_name, leaf_name, label=proba2str(proba, latex=latex, html_in_var_names=html_in_var_names), color="red" if root.is_absorbing() else "black")
elif not onlyabsorbing:
dot.node(leaf_name, leaf.to_node(concise=concise))
dot.edge(root_name, leaf_name, label=proba2str(proba, latex=latex, html_in_var_names=html_in_var_names), color="red" if root.is_absorbing() else "black")
else:
to_explore = deque([("0", self)]) # BFS using a deque, DFS using a list/recursive call
nb_node = 1
# convert each state to a node and a list of edge
while len(to_explore) > 0:
nb_node += 1
root_name, root = to_explore.popleft()
if root_name == "0":
dot.node(root_name, root.to_node(concise=concise), color="green")
elif root.is_absorbing():
dot.node(root_name, root.to_node(concise=concise), color="red")
elif onlyabsorbing:
if root.has_absorbing_child_whole_subtree():
dot.node(root_name, root.to_node(concise=concise))
else:
dot.node(root_name, root.to_node(concise=concise))
for proba, child in zip(root.probas, root.children):
# add a UNIQUE identifier for each node: easy, just do a breath-first search, and use numbers from 0 to big-integer-that-is-computed on the fly
node_number += 1
child_name = str(node_number)
# here, if onlyabsorbing, I should only print the *paths* leading to absorbing leafs!
if onlyabsorbing:
if child.has_absorbing_child_whole_subtree():
dot.edge(root_name, child_name, label=proba2str(proba, latex=latex, html_in_var_names=html_in_var_names), color="red" if root.is_absorbing() else "black")
else:
dot.edge(root_name, child_name, label=proba2str(proba, latex=latex, html_in_var_names=html_in_var_names), color="red" if root.is_absorbing() else "black")
to_explore.append((child_name, child))
if nb_node > 1024:
raise ValueError("Useless to save a tree with more than 1024 nodes, the resulting image will be too large to be viewed.") # DEBUG
return dot
def saveto(self, filename, view=True,
title="", name="", comment="",
latex=False, html_in_var_names=False, ext=FORMAT,
onlyleafs=ONLYLEAFS, onlyabsorbing=ONLYABSORBING, concise=CONCISE):
# Hack to fix the LaTeX output
title = title.replace('_', ' ')
name = name.replace('_', ' ')
comment = comment.replace('_', ' ')
dot = self.to_dot(title=title, name=name, comment=comment,
html_in_var_names=html_in_var_names, latex=latex, ext=ext,
onlyleafs=onlyleafs, onlyabsorbing=onlyabsorbing, concise=concise)
if latex:
source = dot.source
if version_info.major < 3: source = unicode(source, 'utf_8')
# print("source =\n", source) # DEBUG
filename = filename.replace('.gv', '.gv.tex')
print("Saving the dot graph to '{}'...".format(filename))
with open(filename, 'w') as f:
f.write(dot2tex(source, format='tikz', crop=True, figonly=False, texmode='raw'))
tex2pdf(filename)
filename = filename.replace('.gv.tex', '__onlyfig.gv.tex')
print("Saving the dot graph to '{}'...".format(filename))
with open(filename, 'w') as f:
f.write(dot2tex(source, format='tikz', crop=True, figonly=True, texmode='raw'))
else:
print("Saving the dot graph to '{}.{}'...".format(filename, ext))
dot.render(filename, view=view)
# done for saving the graph
def copy(self):
"""Get a new copy of that state with same S, Stilde, N, Ntilde but no probas and no children (and depth=0)."""
return State(S=self.S, Stilde=self.Stilde, N=self.N, Ntilde=self.Ntilde, mus=self.mus, players=self.players, depth=self.depth)
def __hash__(self, full=FULLHASH):
"""Hash the matrix Stilde and N of the state."""
if full:
return hash(tupleit2(self.S)) + hash(tupleit2(self.N)) + hash(tupleit2(self.Stilde)) + hash(tupleit2(self.Ntilde) + (self.t, self.depth, ))
else:
return hash(tupleit2(self.Stilde)) + hash(tupleit2(self.N))
def is_absorbing(self):
"""Try to detect if this state is absorbing, ie only one transition is possible, and again infinitely for the only child.
.. warning:: Still very experimental!
"""
# FIXME still not sure about the characterization of absorbing states
# if at least two players have the same S, Stilde, N, Ntilde lines
if np.min(self.N) < 1:
return False
for j1 in range(self.M):
for j2 in range(j1 + 1, self.M):
A = [self.S, self.Stilde, self.N, self.Ntilde]
are_all_equal = [ tupleit1(a[j1]) == tupleit1(a[j2]) for a in A ]
if all(are_all_equal):
# bad_line = add([tupleit1(a[j1]) for a in A])
# bad_line = tupleit1(self.S[j1])
bad_line = tupleit1(self.Stilde[j1])
# and if that line has only different values
if len(set(bad_line)) == len(bad_line):
return True
return False
def has_absorbing_child_whole_subtree(self):
"""Try to detect if this state has an absorbing child in the whole subtree. """
if self.is_absorbing():
return True
else:
return any(child.has_absorbing_child_whole_subtree() for child in self.children)
# --- High level view of a depth-1 exploration
def explore_from_node_to_depth(self, depth=1):
"""Compute recursively the one_depth children of the root and its children."""
print("\nFor depth = {}, exploring from this node :\n{}".format(depth, self)) # DEBUG
if depth == 0:
return
self.compute_one_depth()
self.depth = depth
if depth > 1:
for child in self.children:
child.explore_from_node_to_depth(depth=depth-1)
def compute_one_depth(self):
"""Use all_deltas to store all the possible transitions and their probabilities. Increase depth by 1 at the end."""
self.depth += 1
uniq_children = dict()
uniq_probas = dict()
for delta, proba in self.all_deltas():
if proba == 0: continue
# copy the current state, apply decision of algorithms and random branching
child = delta(self.copy())
h = hash(child) # I guess I could use states directly as key, but this would cost more in terms of memory
if h in uniq_children:
uniq_probas[h] += proba
else:
assert child.depth == (self.depth - 1)
uniq_children[h] = child
uniq_probas[h] = proba
print(" at depth {} we saw {} different unique children states...".format(self.t, len(uniq_children)))
self.probas = [simplify(p) for p in uniq_probas.values()]
self.children = list(uniq_children.values())
# Done for computing all the children and probability of transitions
def all_absorbing_states(self, depth=1):
"""Generator that yields all the absorbing nodes of the tree, one by one.
- It might not find any,
- It does so without merging common nodes, in order to find the first absorbing node as quick as possible.
"""
if depth == 0:
return
for proba, bad_child in self.absorbing_states_one_depth():
# print("all_absorbing_states: yielding proba, child = {}, \n{}".format(proba, bad_child)) # DEBUG
yield proba, bad_child
self.compute_one_depth()
self.depth = depth
if depth > 1:
for child in self.children:
for proba, bad_child in child.all_absorbing_states(depth=depth-1):
# print("all_absorbing_states: yielding proba, child = {}, \n{}".format(proba, bad_child)) # DEBUG
yield proba, bad_child
def absorbing_states_one_depth(self):
"""Use all_deltas to yield all the absorbing one-depth child and their probabilities."""
self.depth += 1
for delta, proba in self.all_deltas():
if proba == 0: continue
# copy the current state, apply decision of algorithms and random branching
child = delta(self.copy())
if child.is_absorbing():
# print("absorbing_states_one_depth: yielding proba, child = {}, \n{}".format(proba, child)) # DEBUG
yield proba, child
def find_N_absorbing_states(self, N=1, maxdepth=8):
"""Find at least N absorbing states, by considering a large depth."""
complete_probas, bad_children = [], []
for proba, bad_child in self.all_absorbing_states(depth=maxdepth):
assert bad_child.is_absorbing(), "Error: a node was returned by all_absorbing_states() method but was not absorbing!"
complete_probas.append(proba)
bad_children.append(bad_child)
if len(bad_children) >= N:
return complete_probas, bad_children
raise ValueError("Impossible to find N = {} absorbing states from this root (max depth = {})...".format(N, maxdepth))
# --- The hard part is this all_deltas *generator*
def all_deltas(self):
"""Generator that yields functions transforming state to another state.
- It is memory efficient as it is a generator.
- Do not convert that to a list or it might use all your system memory: each returned value is a function with code and variables inside!
"""
all_decisions = [ player(j, self) for j, player in enumerate(self.players) ]
number_of_decisions = prod(len(decisions) for decisions in all_decisions)
for decisions in product(*all_decisions):
counter = Counter(decisions)
collisions = [counter.get(k, 0) >= 2 for k in range(self.K)]
for coin_flips in product([0, 1], repeat=self.K):
proba_of_this_coin_flip = prod(mu if b else (1 - mu) for b, mu in zip(coin_flips, self.mus))
# Create a function to apply this transition
def delta(s):
s.t += 1
s.depth -= 1
for j, Ij in enumerate(decisions):
s.S[j, Ij] += coin_flips[Ij] # sensing feedback
s.N[j, Ij] += 1 # number of sensing trials
if not collisions[Ij]: # no collision, receive this feedback for rewards
s.Stilde[j, Ij] += coin_flips[Ij] # number of succesful transmissions
s.Ntilde[j, Ij] += 1 # number of trials without collisions
return s
# Compute the probability of this transition
proba = proba_of_this_coin_flip / number_of_decisions
if proba == 0: continue
yield (delta, proba)
# --- Main functions, all explorations are depth first search (not the best, it's just easier...)
def pretty_print_result_recursively(self):
"""Print all the transitions, depth by depth (recursively)."""
if self.is_absorbing():
print("\n\n")
print("X "*87)
print("The state:\n{}\nseems to be absorbing...".format(self))
print("X "*87)
# return
if self.depth > 0:
print("\n\nFrom this state :\n{}".format(self))
for (proba, child) in zip(self.probas, self.children):
print("\n- Probability of transition = {} to this other state:\n{}".format(proba, child))
child.pretty_print_result_recursively()
print("\n==> Done for the {} children of this state...\n".format(len(self.children)))
def get_all_leafs(self):
"""Recurse and get all the leafs. Many different state can be present in the list of leafs, with possibly different probabilities (each correspond to a trajectory)."""
if self.depth <= 1:
return self.probas, self.children
else:
complete_probas, leafs = [], []
# assert len(self.probas) > 0
for (proba, child) in zip(self.probas, self.children):
# assert child.depth == (self.depth - 1)
c, l = child.get_all_leafs()
# assert all([s.depth == 0 for s in l])
c = [proba * p for p in c] # one more step, multiply but a proba
complete_probas.extend(c)
leafs.extend(l)
return complete_probas, leafs
def get_unique_leafs(self):
"""Compute all the leafs (deepest children) and merge the common one to compute their full probabilities."""
uniq_complete_probas = dict()
uniq_leafs = dict()
complete_probas, leafs = self.get_all_leafs()
for proba, leaf in zip(complete_probas, leafs):
h = hash(leaf)
if h in uniq_leafs:
uniq_complete_probas[h] += proba
else:
uniq_complete_probas[h] = proba
uniq_leafs[h] = leaf
return [simplify(p) for p in uniq_complete_probas.values()], list(uniq_leafs.values())
def proba_reaching_absorbing_state(self):
"""Compute the probability of reaching a leaf that is an absorbing state."""
probas, leafs = self.get_unique_leafs()
bad_proba = 0
nb_absorbing = 0
for proba, leaf in zip(probas, leafs):
if leaf.is_absorbing():
bad_proba += proba
nb_absorbing += 1
print("\n\nFor depth {}, {} leafs were found to be absorbing, and the probability of reaching any absorbing leaf is {}...".format(self.depth, nb_absorbing, bad_proba)) # DEBUG
sample_values = uniform_means(self.K)
print("\n==> Numerically, for means = {}, this probability is = {:.3g} ...".format(np.array(sample_values), proba2float(bad_proba, values=sample_values))) # DEBUG
return nb_absorbing, bad_proba
class StateWithMemory(State):
"""State with a memory for each player, to represent and play with RhoRand etc."""
def __init__(self, S, Stilde, N, Ntilde, mus, players, update_memories, memories=None, depth=0):
super(StateWithMemory, self).__init__(S, Stilde, N, Ntilde, mus, players, depth=depth)
self.update_memories = update_memories
if memories is None:
memories = tuple(1 for _ in range(self.M))
self.memories = memories #: Personal memory for all players, can be a rank in {1,..,M} for rhoRand, or anything else.
def __str__(self, concise=False):
if concise:
return " StateWithMemory : M = {}, K = {} and t = {}, depth = {}.\n{} =: Stilde\n{} =: N\n{} =: players memory\n".format(self.M, self.K, self.t, self.depth, self.Stilde, self.N, self.memories)
else:
return " StateWithMemory : M = {}, K = {} and t = {}, depth = {}.\n{} =: S\n{} =: Stilde\n{} =: N\n{} =: Ntilde\n{} =: players memory\n".format(self.M, self.K, self.t, self.depth, self.S, self.Stilde, self.N, self.Ntilde, self.memories)
def to_node(self, concise=CONCISE):
"""Print the state as a small string to be attached to a GraphViz node."""
if concise:
# return "[[" + "], [".join(",".join("{:.3g}/{}".format(st, n) for st, n in zip(st2, n2)) for st2, n2 in zip(self.S, self.N)) + "]]" + " r={}".format(list(self.memories)) # if U is used instead of Utilde
return "[[" + "], [".join(",".join("{:.3g}/{}".format(st, n) for st, n in zip(st2, n2)) for st2, n2 in zip(self.Stilde, self.N)) + "]]" + " r={}".format(list(self.memories))
else:
return "[[" + "], [".join(",".join("{:.3g}:{:.3g}/{}:{}".format(s, st, n, nt) for s, st, n, nt in zip(s2, st2, n2, nt2)) for s2, st2, n2, nt2 in zip(self.S, self.Stilde, self.N, self.Ntilde)) + "]]" + " ranks = {}".format(self.memories)
def copy(self):
"""Get a new copy of that state with same S, Stilde, N, Ntilde but no probas and no children (and depth=0)."""
return StateWithMemory(S=self.S, Stilde=self.Stilde, N=self.N, Ntilde=self.Ntilde, mus=self.mus, players=self.players, update_memories=self.update_memories, depth=self.depth, memories=self.memories)
def __hash__(self, full=FULLHASH):
"""Hash the matrix Stilde and N of the state and memories of the players (ie. ranks for RhoRand)."""
if full:
return hash(tupleit2(self.S)) + hash(tupleit2(self.N)) + hash(tupleit2(self.Stilde)) + hash(tupleit2(self.Ntilde) + (self.t, self.depth, )) + hash(tupleit1(self.memories))
else:
# return hash(tupleit2(self.S) + tupleit2(self.N) + tupleit1(self.memories)) # if U is used instead of Utilde
return hash(tupleit2(self.Stilde)) + hash(tupleit2(self.N)) + hash(tupleit1(self.memories))
def is_absorbing(self):