In [1]:
from abc import ABC
import random
import numpy as np
import tensorflow as tf




In [2]:
MEMORY_LENGTH = 30

# Game structure design

In [3]:
class Strategy(ABC):
    def next_play(recent_plays: np.array) -> int:
        raise NotImplementedError
        
class AllC(Strategy):
    def next_play(recent_plays: np.array = np.array([])) -> int:
        return 1
    
class AllD(Strategy):
    def next_play(recent_plays: np.array = np.array([])) -> int:
        return -1
        
class RandomStrategy(Strategy):
    def next_play(recent_plays: np.array = np.array([])) -> int:
        return 1 if random.random()>=0.5 else -1
    
class TitForTatStrategy(Strategy):
    def __init__(self, player):
        if player==1:
            self.player = 0
            self.opponent = 1
        elif player==2:
            self.player = 1
            self.opponent = 0
        else:
            return NotImplementedError
        
    def next_play(self, recent_plays: np.array = np.array([])) -> int:
        if recent_plays[self.opponent,0] in [-1,1]:
            return int(recent_plays[self.opponent,0])
        else:
            return 1
        

In [4]:
class Axelrod:
    
    def __init__(self, memory: int = 10):
        
        self._memory_length = memory
        self._games = np.zeros(shape=(2,self._memory_length))
        self._score = np.array([0.0, 0.0])
        self._game_duration = 0
        
    def play(self, strategy_1: Strategy, strategy_2: Strategy):
        next_play_1 = strategy_1.next_play(self._games)
        next_play_2 = strategy_2.next_play(self._games)
        self._games = np.roll(self._games, 1, axis=1)
        self._games[0,0] = next_play_1
        self._games[1,0] = next_play_2
        self._score += self.get_last_rewards()
        self._game_duration += 1
        #print(self._games, next_play_1, next_play_2, self.get_last_rewards(), self._score/self._game_duration)
        
    def get_last_rewards(self) -> np.array:
        last_play = self._games[:,0]
        if (last_play==np.array([1,1])).all():
            return np.array([0.6,0.6])
        elif (last_play==np.array([-1,-1])).all():
            return np.array([0.2,0.2])
        elif (last_play==np.array([-1,1])).all():
            return np.array([1,0])
        elif (last_play==np.array([1,-1])).all():
            return np.array([0,1])
        else:
            raise ValueError("Unsupported play combination")
    
    def get_average_game_rewards(self) -> np.array:
        return self._score/self._game_duration
        

In [5]:
game = Axelrod(memory=MEMORY_LENGTH)
for i in range(5):
    game.play(RandomStrategy, AllC)

In [6]:
game = Axelrod(memory=MEMORY_LENGTH)
for i in range(5):
    game.play(RandomStrategy, AllD)

In [7]:
game = Axelrod(memory=MEMORY_LENGTH)
for i in range(5):
    game.play(AllC, AllD)

In [8]:
game = Axelrod(memory=MEMORY_LENGTH)
for i in range(5):
    game.play(RandomStrategy, RandomStrategy)

In [9]:
game.get_last_rewards()

array([0, 1])

In [10]:
game = Axelrod(memory=MEMORY_LENGTH)
for i in range(9):
    game.play(RandomStrategy, TitForTatStrategy(player=2))

# TF model design

https://arminnorouzi.github.io/posts/2023/05/blog-post-13/

In [11]:
kernel_regularizer = tf.keras.regularizers.L2(1e-4)

In [12]:
def positional_encoding(length, depth):
    """
    Generates a matrix of position encodings for an input sequence.

    Args:
      length: An integer representing the length of the input sequence.
      depth: An integer representing the dimensionality of the encoding.

    Returns:
      A `tf.Tensor` of shape `(length, depth)` representing the position encoding matrix.
    """
    depth = depth/2

    positions = np.arange(length)[:, np.newaxis]     # (seq, 1)
    depths = np.arange(depth)[np.newaxis, :]/depth   # (1, depth)

    angle_rates = 1 / (10000**depths)         # (1, depth)
    angle_rads = positions * angle_rates      # (pos, depth)

    pos_encoding = np.concatenate(
        [np.sin(angle_rads), np.cos(angle_rads)],
        axis=-1
    )

    return tf.cast(pos_encoding, dtype=tf.float32)

pe = positional_encoding(length=MEMORY_LENGTH, depth=3)

In [13]:
class SelfAttentionLayer(tf.keras.layers.Layer):
    
    def __init__(self, **kwargs):
        super().__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(**kwargs, kernel_regularizer=kernel_regularizer)
        self.add = tf.keras.layers.Add()
        self.layer_norm = tf.keras.layers.LayerNormalization()
        
    def call(self, x):
        attention_output = self.mha(query=x, key=x, value=x)
        x = self.add([x, attention_output])
        x = self.layer_norm(x)
        return x

In [14]:
class AttentionTower(tf.keras.layers.Layer):
    
    def __init__(self, n_layers=5, **kwargs):
        super().__init__()
        self.n_layers = n_layers
        self.attention_layers = [SelfAttentionLayer(num_heads=8, key_dim=64) for _ in range(self.n_layers)]
    
    def __call__(self, x):
        for layer in self.attention_layers:
            x = layer(x)
            
        return x

In [15]:
at = AttentionTower(num_heads=8, key_dim=64)




In [16]:
class QTower(tf.keras.layers.Layer):
    
    def __init__(self, n_layers = 5, n_neurons=10, output_size=2, dropout_rate=0.3):
        super().__init__()
        self.n_layers = n_layers
        self.ff = tf.keras.Sequential()
        self.ff.add( tf.keras.layers.Flatten() )
        for _ in range(self.n_layers):
            self.ff.add( tf.keras.layers.Dense(n_neurons, activation=tf.nn.leaky_relu) )
        self.ff.add( tf.keras.layers.Dropout(dropout_rate) )
        self.ff.add( tf.keras.layers.Dense(output_size, activation=tf.keras.activations.sigmoid, kernel_regularizer=kernel_regularizer, name='policy') )
    
    def call(self, x):
        return self.ff(x)

In [17]:
class EmbeddingLayer(tf.keras.layers.Layer):
    
    def __init__(self):
        super().__init__()
        
    def call(self, x):
        x = tf.cast(tf.one_hot(tf.cast(x + 1, tf.int32), 3), tf.float32)
        return x
        

In [18]:
EmbeddingLayer()(game._games)

<tf.Tensor: shape=(2, 30, 3), dtype=float32, numpy=
array([[[0., 0., 1.],
        [0., 0., 1.],
        [1., 0., 0.],
        [0., 0., 1.],
        [0., 0., 1.],
        [0., 0., 1.],
        [0., 0., 1.],
        [1., 0., 0.],
        [1., 0., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.]],

       [[0., 0., 1.],
        [1., 0., 0.],
        [0., 0., 1.],
        [0., 0., 1.],
        [0., 0., 1.],
        [0., 0., 1.],
        [1., 0., 0.],
        [1., 0., 0.],
        [0., 0., 1.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],


In [19]:
class AxelrodModel(tf.keras.Model):
    
    def __init__(self):
        super().__init__()
        
        self.embed = EmbeddingLayer()
        self.at = AttentionTower(num_heads=8, key_dim=64)
        self.qt = QTower()
    
    def call(self, x):
        
        x = self.embed(x)
        x = self.at(x)
        q = self.qt(x)
        
        return q


In [20]:
model = AxelrodModel()

In [21]:
model(game._games[np.newaxis,...])




<tf.Tensor: shape=(1, 2), dtype=float32, numpy=array([[0.5589354 , 0.46187583]], dtype=float32)>

In [22]:
class AIStrategy(Strategy):
    def __init__(self, model = AxelrodModel()):
        self.model = model
    def next_play(self, recent_plays: np.array) -> int:
        x = self.model(recent_plays[np.newaxis,...])
        return 2*np.argmax(x)-1

In [23]:
game._games

array([[ 1.,  1., -1.,  1.,  1.,  1.,  1., -1., -1.,  0.,  0.,  0.,  0.,
         0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
         0.,  0.,  0.,  0.],
       [ 1., -1.,  1.,  1.,  1.,  1., -1., -1.,  1.,  0.,  0.,  0.,  0.,
         0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
         0.,  0.,  0.,  0.]])

In [24]:
AIStrategy().next_play(game._games)

1

In [25]:
model = AxelrodModel()

In [26]:
model(game._games[np.newaxis,...])

<tf.Tensor: shape=(1, 2), dtype=float32, numpy=array([[0.47946987, 0.48785147]], dtype=float32)>

In [27]:
game = Axelrod(memory=MEMORY_LENGTH)
ai = AIStrategy(model)
for i in range(5):
    game.play(ai, RandomStrategy)
    ai.next_play(game._games)

# Training with GradientTape

In [29]:
opponent_strategies = [RandomStrategy, AllC, AllD, TitForTatStrategy(player=2)]

In [31]:
n_games_per_strategy = 10
game_length = 50

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
gamma = 0.5

ai = AIStrategy()

for _ in range(n_games_per_strategy):
    dataset = []
    game = Axelrod(memory=MEMORY_LENGTH)
    for strategy in opponent_strategies:
        for _ in range(game_length):
            current_state = game._games
            game.play(ai, strategy)
            dataset.append( (current_state, game.get_last_rewards()[0], tf.math.reduce_max(ai.model(game._games[np.newaxis,...])) ) )

    states, rewards, max_q_primes = list(zip(*dataset))
    states = tf.concat([b[np.newaxis, :] for b in states], axis=0)
    rewards = tf.cast(tf.stack(rewards), dtype=tf.float32)
    max_q_primes = tf.stack(max_q_primes)

    with tf.GradientTape() as tape:
        q_0s = ai.model(states)
        actions = tf.argmax(q_0s, axis=1)
        action_selector = tf.cast(tf.stack([1-actions,actions], axis=1), dtype=tf.float32)
        loss = q_0s + tf.transpose((rewards+gamma*max_q_primes)[np.newaxis,...]) * action_selector

    grad = tape.gradient(loss, ai.model.trainable_weights)
    optimizer.apply_gradients(zip(grad, ai.model.trainable_weights))


In [32]:
game = Axelrod(memory=MEMORY_LENGTH)
for _ in range(50):
    game.play(ai, RandomStrategy)
    #print(_, game._games, game.get_last_rewards())
game.get_average_game_rewards()

array([0.52, 0.34])

In [33]:
game = Axelrod(memory=MEMORY_LENGTH)
for _ in range(50):
    game.play(ai, AllD)
    #print(_, game._games, game.get_last_rewards())
game.get_average_game_rewards()

array([0.2, 0.2])

In [34]:
game = Axelrod(memory=MEMORY_LENGTH)
for _ in range(50):
    game.play(ai, AllC)
    #print(_, game._games, game.get_last_rewards())
game.get_average_game_rewards()

array([0.848, 0.228])

In [35]:
game = Axelrod(memory=MEMORY_LENGTH)
for _ in range(50):
    game.play(ai, TitForTatStrategy(player=2))
    #print(_, game._games, game.get_last_rewards())
game.get_average_game_rewards()

array([0.412, 0.392])

In [36]:
game = Axelrod(memory=MEMORY_LENGTH)
for _ in range(50):
    game.play(ai, TitForTatStrategy(player=2))
    print(_, game._games, game.get_last_rewards())
game.get_average_game_rewards()

0 [[-1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [ 1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]] [1 0]
1 [[-1. -1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [-1.  1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]] [0.2 0.2]
2 [[-1. -1. -1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [-1. -1.  1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]] [0.2 0.2]
3 [[-1. -1. -1. -1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [-1. -1. -1.  1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0. 

32 [[ 1.  1.  1.  1.  1.  1.  1.  1.  1. -1.  1.  1.  1.  1.  1. -1. -1. -1.
  -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]
 [ 1.  1.  1.  1.  1.  1.  1.  1. -1.  1.  1.  1.  1.  1. -1. -1. -1. -1.
  -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]] [0.6 0.6]
33 [[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1. -1.  1.  1.  1.  1.  1. -1. -1.
  -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]
 [ 1.  1.  1.  1.  1.  1.  1.  1.  1. -1.  1.  1.  1.  1.  1. -1. -1. -1.
  -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]] [0.6 0.6]
34 [[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1. -1.  1.  1.  1.  1.  1. -1.
  -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]
 [ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1. -1.  1.  1.  1.  1.  1. -1. -1.
  -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]] [0.6 0.6]
35 [[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1. -1.  1.  1.  1.  1.  1.
  -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]
 [ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1. -1.  1.  1.  1.  1.  1. -1.
  -1. 

array([0.412, 0.392])

# Build training data

In [28]:
def map_0_to_minus_1(input_tensor: tf.Tensor):

    condition = tf.equal(input_tensor, 0)

    modified_tensor = tf.where(condition, tf.constant(-1, dtype=tf.int64), input_tensor)
    return modified_tensor


In [729]:
opponent_strategies = [RandomStrategy, AllC, AllD, TitForTatStrategy(player=2)]

In [730]:
n_games_per_strategy = 10
game_length = 30

ai = AIStrategy()

dataset = []
for _ in range(n_games_per_strategy):
    game = Axelrod(memory=MEMORY_LENGTH)
    for strategy in opponent_strategies:
        for _ in range(game_length):
            current_state = game._games
            game.play(ai, strategy)
            dataset.append( (current_state, ai.model(current_state[np.newaxis,...]), game.get_last_rewards()[0], tf.math.reduce_max(ai.model(game._games[np.newaxis,...])) ) )
        

In [562]:
#dataset

In [766]:
states, q_0s, rewards, max_q_primes = list(zip(*dataset))

In [767]:
states = tf.concat([b[np.newaxis, :] for b in states], axis=0)

In [768]:
q_0s = tf.concat( q_0s, axis=0)

In [769]:
rewards = tf.cast(tf.stack(rewards), dtype=tf.float32)

In [770]:
max_q_primes = tf.stack(max_q_primes)

In [771]:
alpha0 = tf.convert_to_tensor([0.05,-0.05])

In [772]:
alpha = tf.repeat( alpha0[np.newaxis,:], len(dataset), axis=0)

In [773]:
alpha = tf.transpose( tf.cast(map_0_to_minus_1( tf.argmax(q_0s, axis=1)[np.newaxis,...] ), dtype=tf.float32) )*alpha

In [775]:
new_q = (1-alpha)*q_0s + alpha*tf.transpose( (rewards+gamma*max_q_primes)[tf.newaxis,...])

In [776]:
def scheduler(epoch, lr):
    if epoch < 10:
        return lr
    else:
        return max(lr * tf.math.exp(-0.01 * (epoch - 10)), 1e-4)

In [777]:
ai.model.compile(
    # Optimizer
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.1),
    # Loss function to minimize
    loss=[tf.keras.losses.MeanSquaredError()],
    # List of metrics to monitor
    # metrics=[tf.keras.metrics.MeanSquaredError()],
    run_eagerly=False,
)

In [778]:
ai.model.fit(
    x=states,
    y=new_q,
    epochs=100,
    batch_size=50,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(
            monitor="val_loss", patience=10, restore_best_weights=True
        ),
        tf.keras.callbacks.LearningRateScheduler(scheduler),
    ],
    validation_split=0.1,
    use_multiprocessing=True,
)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100


<keras.src.callbacks.History at 0x1ef340e1850>

In [779]:
game = Axelrod(memory=MEMORY_LENGTH)
for _ in range(50):
    game.play(ai, AllD)
    print(_, game._games, game.get_last_rewards())
    

0 [[ 1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [-1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]] [0 1]
1 [[ 1.  1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [-1. -1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]] [0 1]
2 [[ 1.  1.  1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [-1. -1. -1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]] [0 1]
3 [[ 1.  1.  1.  1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [-1. -1. -1. -1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
   0.  0.  0.  0.  0. 

33 [[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.
   1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.]
 [-1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.
  -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]] [0 1]
34 [[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.
   1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.]
 [-1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.
  -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]] [0 1]
35 [[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.
   1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.]
 [-1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.
  -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.]] [0 1]
36 [[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.
   1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.]
 [-1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.
  -1. -1. -1. -1. 

In [780]:
ai.model(game._games[np.newaxis,...])

<tf.Tensor: shape=(1, 2), dtype=float32, numpy=array([[0.44734684, 0.4533372 ]], dtype=float32)>