In [1]:
import gym
import numpy as np
from gym import spaces
import random, math
import scipy
from scipy.optimize import minimize_scalar, fsolve
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import networkx as nx
from matplotlib.animation import FuncAnimation, PillowWriter
import sys
matplotlib.use('agg')
class EXP3:
	def __init__(self, num_actions, num_iterations=None):
		self.num_actions = num_actions
		self.belief = np.ones(num_actions, dtype=np.longdouble)
		self.belief /= sum(self.belief)
		self.gamma = 0
		self.action_prob = np.ones(num_actions, dtype=np.longdouble) / num_actions
		self.t = 0

	def __str__(self):
		return f"EXP3\ngamma={self.gamma}\naction_prob={self.action_prob}\n"

	def feedback(self, action, reward, state=None):
		self.t += 1

		estimatedReward = reward / self.action_prob[action]
		self.belief[action] *= math.exp(estimatedReward * self.gamma / self.num_actions)
		self.belief /= sum(self.belief)

		self.gamma = min(1, math.sqrt(self.num_actions * 2 * math.log(self.num_actions) / (self.t)))
		self.action_prob = (1.0 - self.gamma) * self.belief + self.gamma / self.num_actions

class EXP3DH:
	def __init__(self, num_actions, num_iterations=None, beta=None, b=0.2):
		self.num_actions = num_actions
		### not essential, but use higher precision just in case
		self.loss = np.zeros(num_actions, dtype=np.float128)
		self.eps = 0
		self.action_prob = np.ones(num_actions, dtype=np.float128) / num_actions
		self.t = 0
		self.beta = 2 * num_actions if not beta else beta #or 1 for second price auction
		self.b = b

	def __str__(self):
		return f"EXP3-DH\nbeta={self.beta}\nb={self.b}\naction_prob={self.action_prob}\n"

	def compute_action(self):
		return self.action_prob

	def feedback(self, action, reward, state=None):
		self.t += 1

		estimatedReward = reward / self.action_prob[action]
		discount = ( (self.t-1)/(self.t) ) ** (self.beta)
		self.eps = self.t ** (-self.b)
		self.loss *= discount
		self.loss[action] += estimatedReward

		### here the normalization through minus np.max(self.loss)
		### is critical for maintain numerical stability
		### while perserve the originial value
		exp_loss = np.exp( self.loss - np.max(self.loss) )

		self.action_prob =  (1-self.eps) * exp_loss / np.sum(exp_loss) + self.eps / self.num_actions


In [2]:
def general_render(array, name):

  fig = plt.figure()
  ax = plt.axes(xlim=(0, len(array)), ylim=(0, np.max(array)))

  line, = ax.plot([], [], lw = 1)

  def init():
    line.set_data([], [])
    return line,

  def animate(i):
    x = range(0, i)
    y = array[:i]
    line.set_data(x, y)
    return line,

  if len(array) > 10000:
    anim = FuncAnimation(fig, animate, init_func=init, frames=range(0, len(array), len(array)//100), interval=1, blit=True)
  else:
    anim = FuncAnimation(fig, animate, init_func=init, frames=range(0, len(array)), interval=1000, blit=True)

  anim.save(f'{name}.gif', writer=PillowWriter(fps=5))

  plt.close()

def generate_game(n):
  array = []
  for i in range(n):
    array.append([])
    for j in range(n):
        array[i].append((random.random()*1/3, random.random()*1/3))

  #Randomize important action points and return SSE
  c1 = random.randint(0, n-1)
  c2 = random.randint(0, n-1)
  while c2 == c1:
    c2 = random.randint(0, n-1)
  r1 = random.randint(0, n-1)
  r2 = random.randint(0, n-1)
  while r2 == r1:
    r2 = random.randint(0, n-1)
  array[r1][c1] = (1, 0)
  array[r1][c2] = (1/2, 1/2)
  array[r2][c1] = (0, 3/4)
  array[r2][c2] = (3/4, 1/4)
  return array, r1, c2

In [None]:
def Stackelberg_n_changes(game_sizes):
  iters = []
  for n in game_sizes:
    simple_SSE, r1, c2 = generate_game(n)
    agent_1 = EXP3DH(n)
    agent_2 = [EXP3DH(n) for i in range(n)]
    SSE = [r1, c2]
    cur_iters = 0
    while True:
      #Leader action choice
      action_1 = np.random.choice(list(range(n)), p=agent_1.action_prob.astype('float64'))

      #Learning follower action choice
      action_2 = np.random.choice(list(range(n)), p=agent_2[action_1].action_prob.astype('float64'))

      #Reward model based on action pair taken
      reward = simple_SSE[action_1][action_2]
      agent_1.feedback(action_1, reward[0])
      agent_2[action_1].feedback(action_2, reward[1])

      if agent_1.action_prob[SSE[0]] > .8 and agent_2[SSE[0]].action_prob[SSE[1]] > .8:
        iters.append(cur_iters)
        break

      cur_iters += 1

  return iters

game_sizes = [2, 5, 10, 20, 100, 200, 500]


iters = np.asarray(Stackelberg_n_changes(game_sizes))

for i in range(4):
  iters = np.add(iters, np.asarray(Stackelberg_n_changes(game_sizes)))

iters = iters.astype(int) * 1/5

fig, ax = plt.subplots()
ax.plot(game_sizes, iters)
ax.set_xlabel('Size of game')
ax.set_ylabel('.8 SSE prob iterations')
ax.set_title("Runtime for EXP3DH algorithm to reach SSE in several game sizes")
plt.savefig(f"EXP3DH_SSE_runtimes_3.png")

In [None]:
def learn_Stackelberg(n):
  #Original 2x2 game that subsequent games are based on
  #simple_SSE = [[(1, 0), (1/2, 1/2)], [(0, 3/4), (3/4, 1/4)]]

  simple_SSE, r1, c2 = generate_game(n)

  #Code for printing game if needed
  #for i in simple_SSE:
  #  print(i)

  #Generate agents by making leader by one EXP3 agent and follower be n EXP3 agents, learning for each leader action
  agent_1 = EXP3(n)
  agent_2 = [EXP3(n) for i in range(n)]
  SSE = [r1, c2]
  SSE_probs = []
  agent_1_SSE_probs = []
  agent_2_SSE_probs = []
  for i in range(200000):
    #Leader action choice
    action_1 = np.random.choice(list(range(n)), p=agent_1.action_prob.astype('float64'))

    #Learning follower action choice
    action_2 = np.random.choice(list(range(n)), p=agent_2[action_1].action_prob.astype('float64'))

    #Reward model based on action pair taken
    reward = simple_SSE[action_1][action_2]

    if action_1 == SSE[0] and action_2 == SSE[1] and i > 0:
      SSE_probs.append(len(SSE_probs)/i)

    agent_1.feedback(action_1, reward[0])
    agent_2[action_1].feedback(action_2, reward[1])

    agent_1_SSE_probs.append(agent_1.action_prob[SSE[0]])
    agent_2_SSE_probs.append(agent_2[SSE[0]].action_prob[SSE[1]])

  general_render(agent_1_SSE_probs, 'agent_1_SSE_prob')
  general_render(agent_2_SSE_probs, 'agent_2_SSE_prob')
  if(len(SSE_probs) > 100):
    general_render(SSE_probs, 'SSE_prob')

  print(agent_1.action_prob[SSE[0]])
  print(agent_2[SSE[0]].action_prob[SSE[1]])

learn_Stackelberg(10)