# Reinforcement learning and the Attacker-Defender

Antoine Habis & Geert-Jan Huizing

## Imports

In [None]:
import os
import random
import time
from random import choice, randint
from tkinter import *

import gym
import matplotlib.pyplot as plt
import numpy as np
from gym import error, spaces, utils
from gym.utils import seeding
from pulp import LpInteger, LpMinimize, LpProblem, LpVariable
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
from stable_baselines import DQN, PPO2
from stable_baselines.bench import Monitor
from stable_baselines.common.policies import FeedForwardPolicy as FFP_common
from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines.deepq.policies import FeedForwardPolicy as FFP_DQ
from stable_baselines.results_plotter import load_results, ts2xy

## Helper functions

In [None]:
class MLP_DQN(FFP_DQ):
    def __init__(self, *args, **kwargs):
        super(MLP_DQN, self).__init__(*args, **kwargs,
                                      layers=[300, 300],
                                      layer_norm=False,
                                      feature_extraction="mlp")

In [None]:
def movingAverage(values, window):
    weights = np.repeat(1.0, window) / window
    return np.convolve(values, weights, 'valid')

def plot_results(log_folder, title='Learning Curve'):
    x, y = ts2xy(load_results(log_folder), 'timesteps')
    y = movingAverage(y, window=50)
    x = x[len(x) - len(y):] # Truncate x
    plt.plot(x, y)
    plt.xlabel('Number of Timesteps')
    plt.ylabel('Rewards')
    plt.title(title + " Smoothed")
    plt.show()

In [None]:
def plot_multi_results(log_folders, legends, title='Learning Curve'):
    for log_folder in log_folders:
        x, y = ts2xy(load_results(log_folder), 'timesteps')
        y = movingAverage(y, window=100)
        x = x[len(x) - len(y):] # Truncate x
        plt.plot(x, y)
    plt.legend(legends)
    plt.xlabel('Number of Timesteps')
    plt.ylabel('Rewards')
    plt.title(title + " Smoothed")
    plt.show()

## Training a defender

In [None]:
class Defender(gym.Env):


    def __init__(self, K, initial_potential, save_states=False):
        self.state = None
        self.save_states = save_states
        self.states_saved = []
        self.actions_saved = []
        self.game_state = None
        self.K = K
        self.initial_potential = initial_potential
        self.weights = np.power(2.0, [-(self.K - i) for i in range(self.K + 1)])
        self.done = 0
        self.reward = 0
        self.action_space = spaces.Discrete(2)
        self.observation_space= spaces.MultiDiscrete([10]* (2*K+2))
        

    def potential(self, A):
        return np.sum(A*self.weights)

    def split(self, A):
        B = self.game_state - A
        return A, B

    def erase(self, A):
        self.game_state -= A
        self.game_state = np.insert(self.game_state[:-1], 0, 0)


    def optimal_split(self, ratio = 0.5):
        if (sum(self.game_state) == 1):
            if (randint(1,100)<=50):
                return self.game_state, [0]*(self.K+1)
            else:
                return [0]*(self.K+1), self.game_state
            
        else:
            prob = LpProblem("Optimal split",LpMinimize)
            A = []
            for i in range(self.K + 1):
                A += LpVariable(str(i), 0, self.game_state[i], LpInteger)
            prob += sum([2**(-(self.K - i)) * c for c, i in zip(A, range(self.K + 1))]) - ratio * self.potential(self.game_state), "Objective function"
            prob += sum([2**(-(self.K - i)) * c for c, i in zip(A, range(self.K + 1))]) >= ratio * self.potential(self.game_state), "Constraint"
            #prob.writeLP("test.lp")
            prob.solve()
            Abis = [0]*(self.K+1)
            for v in prob.variables():
                Abis[int(v.name)] = round(v.varValue)
            B = [z - a for z, a in zip(self.game_state, Abis)]
            return Abis, B


    def _seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]


    def attacker_play(self):
        prob = 90 
        if(randint(1,100)<=prob):
            return self.optimal_split()
        else:
            ratios = [0.1, 0.2, 0.3, 0.4, 0.6, 0.7, 0.8, 0.9]
            return self.optimal_split(ratio=choice(ratios))


    def check(self):
        """Function to chek if the game is over or not.
        Returns:
            int -- If the game is not over returns 0, otherwise returns 1 if the defender won or -1 if the attacker won.
        """

        if (sum(self.game_state) == 0):
            return 1
        elif (self.game_state[-1] >=1 ):
            return -1
        else:
            return 0


    def step(self, target):
        if self.save_states:
            self.states_saved.append(self.state)
            self.actions_saved.append(target)
        
        A = self.state[: self.K + 1]
        B = self.state[self.K + 1 :]
        if (target == 0):
            self.erase(A)
        else:
            self.erase(B)
        win = self.check()
        if(win):
            self.done = 1
            self.reward = win

        if self.done != 1:
            A, B = self.attacker_play()
            self.state = np.concatenate([A,B])

        return self.state, self.reward, self.done, {}


    def reset(self):
        self.game_state = self.random_start()
        self.done = 0
        self.reward = 0
        A, B = self.attacker_play()
        self.state = np.concatenate([A,B])
        return self.state

    def random_start(self):
        self.game_state = [0] * (self.K + 1)
        potential = 0
        stop = False
        while (potential < self.initial_potential and not stop):
            possible = self.initial_potential - potential
            upper = self.K - 1 #upper is K-1 because K represents the top of the matrix which means end of the game
            while (2**(-(self.K-upper)) > possible):
                upper -=1
            if(upper < 0):
                stop = True
            else:
                self.game_state[randint(0,upper)]+=1
                potential = self.potential(self.game_state)
        return self.game_state


    def render(self):
        for j in range(self.K + 1):
            print(self.game_state[j], end = " ")
        print("")

In [None]:
class Defense:
    def __init__(self, method, K=5, P=0.95, log_dir="/tmp/gym/", save_states=False):
        self.method = method

        self.K = K
        self.state_size = 2 * (self.K + 1)
        self.action_size = 2
        self.reward = []
        self.save_states = save_states

        self.log_dir = log_dir
        os.makedirs(self.log_dir, exist_ok=True)

        env = Defender(K, P, save_states=self.save_states)
        env = Monitor(env, self.log_dir, allow_early_resets=True)
        self.envs = DummyVecEnv([lambda: env])

        self.model = DQN(MLP_DQN, self.envs, verbose=0)

        self.best_mean_reward, self.n_steps = -np.inf, 0

    def callback(self, _locals, _globals):
        """
        Callback called at each step (for DQN an others) or after n steps (see ACER or PPO2)
        :param _locals: (dict)
        :param _globals: (dict)
        """

        # Print stats every 1000 calls
        if (self.n_steps + 1) % 1000 == 0:
            # Evaluate policy performance
            x, y = ts2xy(load_results(self.log_dir), 'timesteps')
            if len(x) > 0:
                mean_reward = np.mean(y[-100:])
                print(x[-1], 'timesteps')
                print("Best mean reward: {:.2f} - Last mean reward per episode: {:.2f}".format(
                    self.best_mean_reward, mean_reward))

                # New best model, you could save the agent here
                if mean_reward > self.best_mean_reward:
                    self.best_mean_reward = mean_reward

        self.n_steps += 1
        return True

    def learn(self, timesteps=10000):
        self.model.learn(total_timesteps=timesteps, callback=self.callback)
        print("======\nLEARNING DONE\n======")

    def save(self, filename):
        self.model.save(filename)
        print("Model saved !\n Filename:", filename)

    def load(self, filename):
        self.model = DQN.load(filename, policy=MLP_DQN)

    def run(self, nb_episodes=1000):
        self.nb_episodes = nb_episodes

        for index_episode in range(nb_episodes):
            state = self.envs.reset()
            state = np.reshape(np.array(state), [1, self.state_size])
            done = False
            steps = 0
            while not done:
                action, _states = self.model.predict(state)
                next_state, reward, done, _ = self.envs.step(action)
                next_state = np.reshape(
                    np.array(next_state), [1, self.state_size])
                state = next_state
                steps += 1
            if index_episode % 100 == 0:
                print("Episode {}#; \t Nb of steps: {}; \t Reward: {}.".format(
                    index_episode, steps + 1, reward))
            if index_episode > 0:
                self.reward += [((self.reward[-1] * len(self.reward)
                                  ) + reward) / (len(self.reward) + 1)]
            else:
                self.reward += [reward]

In [None]:
log_dir_def = "/tmp/gym/10-05/"
defense = Defense('DQN', K=10, P=0.95, log_dir=log_dir_def)
defense.learn(4000)
plot_results(log_dir_def)
plt.show()

## Training an attacker

In [None]:
class Attacker(gym.Env):

    def __init__(self, K, initial_potential):
        self.state = None
        self.K = K
        self.initial_potential = initial_potential
        self.weights = 2.0**np.arange(-self.K, 1)
        self.done = 0
        self.reward = 0
        self.action_space = spaces.Discrete(self.K+1)
        self.observation_space = spaces.MultiDiscrete(10*np.ones(K+1))

    def potential(self, A):
        return np.sum(A*self.weights)

    def split(self, A):
        B = self.state - A
        return A, B

    def erase(self, A):
        self.state -= A
        self.state = np.insert(self.state[:-1], 0, 0)

    def _seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def defense_play(self, A, B):
        if self.potential(A) >= self.potential(B):
            self.erase(A)
        else:
            self.erase(B)

    def check(self):
        if sum(self.state) == 0:
            return -1  # defender won
        elif self.state[-1] >= 1:
            return 1  # attacker won
        else:
            return 0  # game not over

    def step(self, target):
        A = np.zeros(self.K + 1)
        A[:target] = self.state[:target]

        B = np.zeros(self.K + 1)
        B[target+1:] = self.state[target+1:]

        for _ in range(int(self.state[target])):
            if self.potential(A) > self.potential(B):
                B[target] += 1
            else:
                A[target] += 1

        self.defense_play(A, B)

        self.reward = self.check()
        self.done = np.abs(self.reward)  # 1 if someone won

        return self.state, self.reward, self.done, {}

    def reset(self):
        self.state = self.random_start()
        self.done = 0
        self.reward = 0
        return self.state

    def random_start(self):
        self.state = np.zeros(self.K+1)
        potential = 0
        upper = self.K-1
        while potential < self.initial_potential and upper >= 0:
            possible = self.initial_potential - potential
            upper = self.K-1
            while 2**(upper-self.K) > possible:
                upper -= 1
            if upper >= 0:
                self.state[randint(0, upper)] += 1
                potential = self.potential(self.state)
        return self.state

    def render(self):
        for j in range(self.K + 1):
            print(self.state[j], end=" ")
        print("")

In [None]:
class Attack:
    def __init__(self, method, K=5, P=0.95, log_dir="/tmp/gym_attack/"):
        self.method = method

        self.K = K
        self.state_size = 2 * (self.K + 1)
        self.action_size = 2
        self.reward = []

        self.log_dir = log_dir
        os.makedirs(self.log_dir, exist_ok=True)

        env = Attacker(K, P)
        env = Monitor(env, self.log_dir, allow_early_resets=True)
        self.envs = DummyVecEnv([lambda: env])

        self.model = DQN(MLP_DQN, self.envs, verbose=0)

        self.best_mean_reward, self.n_steps = -np.inf, 0

    def callback(self, _locals, _globals):
        # Print stats every 1000 calls
        if (self.n_steps + 1) % 1000 == 0:
            # Evaluate policy performance
            x, y = ts2xy(load_results(self.log_dir), 'timesteps')
            if len(x) > 0:
                mean_reward = np.mean(y[-100:])
                print(x[-1], 'timesteps')
                print("Best mean reward: {:.2f} - Last mean reward per episode: {:.2f}".format(
                    self.best_mean_reward, mean_reward))

                # New best model, you could save the agent here
                if mean_reward > self.best_mean_reward:
                    self.best_mean_reward = mean_reward

        self.n_steps += 1
        return True

    def learn(self, timesteps=10000):
        self.model.learn(total_timesteps=timesteps, callback=self.callback)

    def save(self, filename):
        self.model.save(filename)

    def load(self, filename):
        self.model = DQN.load(filename, policy=MLP_DQN)

    def run(self, nb_episodes=1000):
        self.nb_episodes = nb_episodes

        for index_episode in range(nb_episodes):
            state = self.envs.reset()
            done = False
            steps = 0
            while not done:
                action, _states = self.model.predict(state)
                next_state, reward, done, _ = self.envs.step(action)
                next_state = np.array(next_state)
                state = next_state
                steps += 1
            if index_episode % 100 == 0:
                print("Episode {}#; \t Nb of steps: {}; \t Reward: {}.".format(
                    index_episode, steps + 1, reward))
            if index_episode > 0:
                self.reward += [((self.reward[-1] * len(self.reward)
                                  ) + reward) / (len(self.reward) + 1)]
            else:
                self.reward += [reward]

In [None]:
log_dir_att = "/tmp/gym_attack/10-101/"
attack = Attack('DQN', K=10, P=1.01, log_dir=log_dir_att)
attack.learn(4000)
plot_results(log_dir_att)

## Supervised

In [None]:
log_dir_def = "/tmp/gym/def/"
n = 4000
clf = MLPClassifier(hidden_layer_sizes=(300, 300))
proportion_correct = []
for K in [5, 8, 10]:
    defender = Defender(K, .95)
    
    defense = Defense('DQN', K=K, P=0.95, log_dir=log_dir_def, save_states=True)
    defense.learn(n//3)
    X_test = np.array(defense.envs.get_attr('states_saved')[0])
    
    y_test = np.zeros(n//3)
    for i in range(n//3):
        A, B = X_test[i,:K+1], X_test[i,K+1:]
        if defender.potential(A) < defender.potential(B):
            y_test[i] = 1
    
    defense = Defense('DQN', K=K, P=0.95, log_dir=log_dir_def, save_states=True)
    defense.learn(n)
    X_train = np.array(defense.envs.get_attr('states_saved')[0])
    y_rl_train = defense.model.predict(X_train)[0]
    y_rl_test = defense.model.predict(X_test)[0]
    
    y_train = np.zeros(n)
    for i in range(n):
        A, B = X_train[i,:K+1], X_train[i,K+1:]
        if defender.potential(A) < defender.potential(B):
            y_train[i] = 1
    
    clf.fit(X_train, y_train)
    y_sup_test = clf.predict(X_test)
    
    proportion_correct.append((1 - np.sum(np.abs(y_test - y_rl_test))/len(y_test), 1 - np.sum(np.abs(y_test - y_sup_test))/len(y_test)))
    print("proportion correct", proportion_correct[-1])

In [None]:
plt.plot([5, 8, 10], proportion_correct, marker='o')
plt.legend(['RL', 'Supervised'])
plt.title('Proportion of moves correct on test set of RL and Supervised Learning for varying K')
plt.xlabel('K')
plt.ylabel('Proportion correct')
plt.show()

## Creating a graphical interface

In [None]:
K = 20  # number of stages
N = 8  # number of points

nb_iter = 5  # number of attack and defense

field_width = 4
size_window = 300

In [None]:
def state_graphic(state, window, Field):
    field_width = np.max(state)
    K = len(state)  # number of stages
    size_window = 300
    l, c = np.empty(0),np.empty(0)
    for i in range(K):
        l  = np.concatenate((l,np.ones(int(state[i]))*(i+1)))
    for unique in (np.unique(l)):
        c = np.concatenate((c, np.arange(list(l).count(unique))))
    l = K-l
    N = len(c)
    v = min(scale_x, scale_y)
    c1 = (c+1/2)*scale_x - (1/4)*v
    c2 = c1 + (1/2)*v
    l1 = (l+1/2)*scale_y - (1/4)*v
    l2 = l1 + (1/2)*v
    # dictionnary of the ovals
    dic = {}
    # display the points in the grid
    for i in range(N):
        rond = Field.create_oval(c1[i], l1[i], c2[i], l2[i], fill='black')
        dic[str(i)] = rond
        Field.update()
    Coord = Label(window)
    Coord.pack(pady='10px')
    return dic, N

**Random Play**

In [None]:
# coord of the different points
c = np.random.randint(low=0, high=field_width, size=N)
l = np.random.randint(low=1, high=K, size=N)

# construction of the window and the canvas
window = Tk()
window.resizable(width=False, height=False)
window.title("Erdos-Selfridge-Spencer-Game")
Field = Canvas(window, height=size_window, width=size_window)
Field.pack()

scale_x = size_window//field_width
scale_y = size_window//K

carreau = [[Field.create_rectangle(i*scale_x, j*scale_y, (scale_x+1)*60, (scale_y+1)*60, fill="#FFFFFF")
            for i in range(field_width)] for j in range(K)]

v = min(scale_x, scale_y)
c1 = (c+1/2) * scale_x - (1/4)*v
c2 = c1 + (1/2) * v
l1 = (l + 1/2) * scale_y - (1/4)*v
l2 = l1 + (1/2)*v
# dictionnary of the ovals
dic = {}

# display the points in the grid
for i in range(N):
    rond = Field.create_oval(c1[i], l1[i], c2[i], l2[i], fill='black')
    dic[str(i)] = rond
Coord = Label(window)
Coord.pack(pady='10px')


points = np.arange(N)
for i in range(nb_iter):
    if len(points) > 1:
        random.shuffle(points) ### here is the decision of A (how to do the partition)
        cut = np.random.randint(1, len(points))
        A = points[:cut]
        B = points[cut:]
        # Here we have to code the decision of the defender which partition to choose
        # j'ai pris une decision random pour tester
        u = np.random.randint(2)
        if u == 1:
            D, C = A, B  # C represent the points to keep and D the ones to delete
        else:
            D, C = B, A
    else:
        D = points
        C = []
    # Here is the code of the animation
    for j in points:
        if j in D:
            Field.delete(window, dic[str(j)])
            Field.update()
        else:
            for m in range(30):
                Field.move(dic[str(j)], 0, -scale_y/30)
                time.sleep(0.01)
                Field.update()
                
    # stopping conditions of the game:
    time.sleep(1)
    points = C
    if (np.prod(l[C] - (i+1)) == 0):  # stop if a point reached K
        T = Text(window, height=2, width=30)
        T.pack()
        T.insert(END, "The attacker won")
        mainloop()
        break

    elif (len(C) == 0):  # stop if ther is no more points
        T = Text(window, height=2, width=30)
        T.pack()
        T.insert(END, "The defender won")
        mainloop()
        break

**Using a trained attacker**

In [None]:
potential = 1.01

attacker = Attacker(10, potential)
#attack = trained model
state = attacker.random_start()

print("Initial state")
print(state)

field_width = int(np.max(state))
size_window = 300
l, c = np.empty(0), np.empty(0)
K = len(state)


############## Creation of the background of the window (THE GRID)#############
window = Tk()
window.resizable(width=False, height=False)
window.title("Erdos-Selfridge-Spencer-Game")
Field = Canvas(window, height=size_window, width=size_window)
Field.pack()

scale_x = size_window//field_width
scale_y = size_window//K

carreau = [[Field.create_rectangle(i*scale_x, j*scale_y, (scale_x+1)*60, (scale_y+1)*60, fill="#FFFFFF")
            for i in range(field_width)] for j in range(K)]
######################## Creation of the initial dots ######################################
dic, N = state_graphic(state, window, Field)
T = Text(window, height=2, width=30)
T.configure(font=('Ebrima', 20, "bold"))
T.tag_configure("center", justify='center')
T.pack()
T.insert(END, "Let the game begin")
window.update()
time.sleep(3)
T.delete('1.0', END)
window.update()
##################################### The game begins: #####################################
done = False
steps = 0

while not done:
    action, _states = attack.model.predict(state)
    print("Action :", action)

######################################TURN OF THE ATTACKER########################################
    next_state, reward, done, _ = attacker.step(action)
    T.insert(END, "The attacker chooses partitions")
    window.update()
    time.sleep(2)
    T.delete('1.0', END)
    window.update()

#####################################TURN OF THE DEFENDER#########################################
    T.insert(END, "Turn of the defender")
    window.update()
    time.sleep(2)
    T.delete('1.0', END)
    window.update()

    for j in range(N):
        Field.delete(window, dic[str(j)])
        Field.update()
    next_state = np.array(next_state)
    dic, N = state_graphic(next_state, window, Field)
    Field.update()
    window.update()
    state = next_state
    steps += 1
if reward == 1:
    T.insert(END, "The attacker won")
    window.update()
    print("Attacker won in", steps, "step(s)")
else:
    T.insert(END, "The defender won")
    window.update()
    print("Attacker lost in", steps, "step(s)")
mainloop()