In [None]:
!pip install PySuperTuxKart

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import ResNet50
import pystk
import matplotlib.pyplot as plt
# import pylab as pl
from IPython import display
from tqdm import tqdm
import matplotlib.animation as animation

pystk.LogLevel = 0
RESCUE_TIMEOUT = 30
EPSILON = 0.1

In [None]:
%matplotlib inline

In [None]:
class ImageAni:
    def __init__(self, ax, shape):
        assert len(shape) == 3, "must be 3 channel image!"
        self.ax = ax
        self.im = self.ax.imshow(np.zeros(shape), cmap='gray')

    def update(self, y):
        self.ax.cla()
        self.im = self.ax.imshow(y, cmap='gray')

        return self.im

In [None]:
def controller(model, state):
  # action_probs, critic_value = model(state)
  steer, acceleration, brake, drift, nitro, fire, rescue, critic_value = model(state)
  # preds = action_probs.numpy().flatten()

  # brake = brake > 0.5
  # brake = np.squeeze(brake.numpy())
  # drift = np.squeeze(drift.numpy())
  # nitro = np.squeeze(nitro.numpy())
  # rescue = np.squeeze(rescue.numpy())
  # fire = np.squeeze(fire.numpy())

  # brake = np.array([[1-brake, brake]]).astype("float")
  # drift = np.array([[1-drift, drift]]).astype("float")
  # nitro = np.array([[1-nitro, nitro]]).astype("float")
  # rescue = np.array([[1-rescue, rescue]]).astype("float")
  # fire = np.array([[1-fire, fire]]).astype("float")

  # drift = drift > 0.5
  # nitro = nitro > 0.5
  # rescue = rescue > 0.5

  return steer, acceleration, brake, drift, nitro, fire, rescue, critic_value
  # if (np.random.choice(2, p=[1-EPSILON, EPSILON])):
  #     preds = np.random.normal(0.5, 0.25, (preds).shape)

  # preds[0] = 2 * preds[0] - 1
  # preds[2:] = preds[2:] > 0.5

  # return preds, action_probs, critic_value


In [None]:
class PyTuxActionCritic:
  _singleton = None

  def __init__(self, screen_width=128, screen_height=96):
      assert PyTuxActionCritic._singleton is None, "Cannot create more than one pytux object"
      PyTuxActionCritic._singleton = self
      self.config = pystk.GraphicsConfig.hd()
      self.config.screen_width = screen_width
      self.config.screen_height = screen_height
      pystk.init(self.config)
      self.k = None
      self.t = 0
      self.state = None
      self.track = None
      self.last_rescue = 0
      self.distance = 0

  @staticmethod
  def _point_on_track(distance, track, offset=0.0):
      """
      Get a point at `distance` down the `track`. Optionally applies an offset after the track segment if found.
      Returns a 3d coordinate
      """
      node_idx = np.searchsorted(track.path_distance[..., 1],
                                  distance % track.path_distance[-1, 1]) % len(track.path_nodes)
      d = track.path_distance[node_idx]
      x = track.path_nodes[node_idx]
      t = (distance + offset - d[0]) / (d[1] - d[0])
      return x[1] * t + x[0] * (1 - t)

  @staticmethod
  def _to_image(x, proj, view):
      p = proj @ view @ np.array(list(x) + [1])
      return np.clip(np.array([p[0] / p[-1], -p[1] / p[-1]]), -1, 1)

  def restart(self, track):
    self.state = pystk.WorldState()
    self.track = pystk.Track()

    self.last_rescue = 0
    self.t = 0
    self.distance = 0

    if self.k is not None and self.k.config.track == track:
      self.k.restart()
      self.k.step()
    else:
      if self.k is not None:
          self.k.stop()
          del self.k
      config = pystk.RaceConfig(num_kart=1, laps=1,track=track)
      config.players[0].controller = pystk.PlayerConfig.Controller.PLAYER_CONTROL

      self.k = pystk.Race(config)
      self.k.start()
      self.k.step()

    self.state = pystk.WorldState()
    self.track = pystk.Track()

  def getState(self):
    if (self.k is not None):
      yield np.array(self.k.render_data[0].image)

    yield np.zeros((self.config.screen_height, 
                     self.config.screen_width, 3))
    
  def step(self, action, verbose=False):
    """
    Play a level (track) for a single round.
    :param track: Name of the track
    :param controller: low-level controller, see controller.py
    :param max_frames: Maximum number of frames to play for
    :param verbose: Should we use matplotlib to show the agent drive?
    :return: image/state, time, reward
    """

    self.state.update()
    self.track.update()

    kart = self.state.players[0].kart

    im = self.k.render_data[0].image

    current_distance = kart.overall_distance / self.track.length

    if np.isclose(current_distance, 1.0, atol=2e-3):
        if verbose:
            print("Finished at t=%d" % self.t)
        return np.array(im), self.t, 1000 # reward for finish

    if (self.t == 1000):
      return np.array(im), self.t, 1000 * current_distance

    current_vel = np.linalg.norm(kart.velocity)

    if current_vel < 1.0 and self.t - self.last_rescue > RESCUE_TIMEOUT:
        self.last_rescue = self.t
        action.rescue = True

    self.k.step(action)
    self.t += 1
    return np.array(im), self.t, -1 # penalty for each additional step

  def close(self):
    """
    Call this function, once you're done with PyTux
    """
    if self.k is not None:
        self.k.stop()
        del self.k
    pystk.clean()

In [None]:
# Configuration parameters for the whole setup
seed = 42
gamma = 0.99  # Discount factor for past rewards
max_steps_per_episode = 1000
max_episodes = 50
#env = gym.make("CartPole-v0")  # Create the environment
#env.seed(seed)
eps = np.finfo(np.float32).eps.item()  # Smallest number such that 1.0 + eps != 1.0

In [None]:
num_actions = 7 # 'acceleration', 'brake', 'drift', 'fire', 'nitro', 'rescue', 'steer'
num_hidden = 48

# inputs = keras.Input(shape=(96*128*3))
inputs = keras.Input(shape=(96,128,3))
# common = layers.Dense(num_hidden, activation="relu")(inputs)
common = layers.Conv2D(num_hidden, (3,3))(inputs)
common = layers.LeakyReLU()(common)
common = layers.Conv2D(num_hidden, (3,3))(common)
common = layers.LeakyReLU()(common)
common = layers.Conv2D(num_hidden / 2, (3,3))(common)
common = layers.LeakyReLU()(common)
common = layers.Flatten()(common)
# extractor = ResNet50(weights="imagenet", include_top=False, input_shape=(96,128,3))
# common = extractor(inputs)
# common = layers.GlobalAveragePooling2D()(common)
# action = layers.Dense(num_actions, activation="sigmoid")(common)

acceleration = layers.Dense(3, activation="softmax")(common) # 0 to 1
steer = layers.Dense(3, activation="softmax")(common) # -1 to 1
drift = layers.Dense(2, activation="softmax")(common) # bool
brake = layers.Dense(2, activation="softmax")(common) # bool
nitro = layers.Dense(2, activation="softmax")(common) # bool
rescue = layers.Dense(2, activation="softmax")(common) # bool
fire = layers.Dense(2, activation="softmax")(common) # bool

critic = layers.Dense(1)(common)

# model = keras.Model(inputs=inputs, outputs=[action, critic])
model = keras.Model(inputs=inputs, outputs=[steer, acceleration, brake, drift, nitro, fire, rescue, critic])

In [None]:
print(model.summary())

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 96, 128, 3)  0           []                               
                                ]                                                                 
                                                                                                  
 conv2d (Conv2D)                (None, 94, 126, 48)  1344        ['input_1[0][0]']                
                                                                                                  
 leaky_re_lu (LeakyReLU)        (None, 94, 126, 48)  0           ['conv2d[0][0]']                 
                                                                                                  
 conv2d_1 (Conv2D)              (None, 92, 124, 48)  20784       ['leaky_re_lu[0][0]']        

In [None]:
# lr_schedule = keras.optimizers.schedules.ExponentialDecay(
#     initial_learning_rate=1e-2,
#     decay_steps=10000,
#     decay_rate=0.90)

# optimizer = keras.optimizers.Adam(learning_rate=lr_schedule)
optimizer = keras.optimizers.Adam(learning_rate=0.01)
huber_loss = keras.losses.Huber()
# huber_loss = keras.losses.MeanSquaredError()
action_probs_history = []
critic_value_history = []
rewards_history = []
running_reward = 0
episode_count = 0
accel = 0
ster = 0

space = [-0.1, 0, 1]

tracks = ['hacienda']

from matplotlib import rc
rc('animation', html='jshtml')

In [None]:
pytux = PyTuxActionCritic() # create pytux environment

#TODO: add animation
# fig, ax = plt.subplots()

# imageAni = ImageAni(ax, shape=(96, 128,3))
# ani = animation.FuncAnimation(fig, imageAni.update, pytux.getState)

In [None]:
for track in tracks:
  while episode_count < max_episodes:  # Run until solved
      episode_reward = 0
      pytux.restart(track)
      state, steps, reward = pytux.step(pystk.Action())
      with tf.GradientTape() as tape:
          for timestep in tqdm(range(1, max_steps_per_episode+1)):
              # env.render(); Adding this line would show the attempts
              # of the agent in a pop up window.

              # state = tf.convert_to_tensor(state.flatten())
              state = tf.convert_to_tensor(state)
              state = tf.expand_dims(state, 0)

              # Predict action probabilities and estimated future rewards
              # from environment state

              # Sample action from action probability distribution
              steer, acceleration, brake, drift, nitro, fire, rescue, critic_value = controller(model, state)
              # action, action_probs, critic_value = controller(model, state)

              a = np.random.choice(3, p=np.squeeze(acceleration))
              s = np.random.choice(3, p=np.squeeze(steer))
              b = np.random.choice(2, p=np.squeeze(brake))
              d = np.random.choice(2, p=np.squeeze(drift))
              n = np.random.choice(2, p=np.squeeze(nitro))
              f = np.random.choice(2, p=np.squeeze(fire))
              r = np.random.choice(2, p=np.squeeze(rescue))

              accel = np.clip(accel+space[a], 0, 1)
              ster = np.clip(ster+space[s], -1, 1)
                            
              critic_value_history.append(critic_value[0, 0])
              # action_probs_history.append(tf.math.log(action_probs))

              
              probs = tf.constant(np.array([steer[0,s], acceleration[0,a], brake[0,b], drift[0,d], nitro[0,n], fire[0,f], rescue[0,r]]))
              action_probs_history.append(tf.math.log(probs))

              # Apply the sampled action in our environment
              state, steps, reward = pytux.step(pystk.Action(*(action.tolist())))
              state, steps, reward = pytux.step(pystk.Action(ster, accel, b, d, n, f, r))
              rewards_history.append(reward)
              episode_reward += reward

              if reward == 1000:
                  break

          # TODO: find better update rule and loss metric

          # Update running reward to check condition for solving
          running_reward = 0.05 * episode_reward + (1 - 0.05) * running_reward

          # Calculate expected value from rewards
          # - At each timestep what was the total reward received after that timestep
          # - Rewards in the past are discounted by multiplying them with gamma
          # - These are the labels for our critic
          returns = []
          discounted_sum = 0
          for r in rewards_history[::-1]:
              discounted_sum = r + gamma * discounted_sum
              returns.insert(0, discounted_sum)

          # Normalize
          returns = np.array(returns)
          returns = (returns - np.mean(returns)) / (np.std(returns) + eps)
          returns = returns.tolist()

          # Calculating loss values to update our network
          history = zip(action_probs_history, critic_value_history, returns)
          actor_losses = []
          critic_losses = []
          for log_prob, value, ret in history:
              # At this point in history, the critic estimated that we would get a
              # total reward = `value` in the future. We took an action with log probability
              # of `log_prob` and ended up recieving a total reward = `ret`.
              # The actor must be updated so that it predicts an action that leads to
              # high rewards (compared to critic's estimate) with high probability.
              diff = ret - value
              actor_losses.append(-log_prob * diff)  # actor loss

              # The critic must be updated so that it predicts a better estimate of
              # the future rewards.
              critic_losses.append(
                  huber_loss(tf.expand_dims(value, 0), tf.expand_dims(ret, 0))
              )

          # Backpropagation
          loss_value = sum(actor_losses) + sum(critic_losses)
          grads = tape.gradient(loss_value, model.trainable_variables)
          optimizer.apply_gradients(zip(grads, model.trainable_variables))

          # Clear the loss and reward history
          action_probs_history.clear()
          critic_value_history.clear()
          rewards_history.clear()

      # Log details
      episode_count += 1
      # template = "running reward: {:.2f} at episode {}"
      # print(template.format(running_reward, episode_count))
      template = "episode reward: {:.2f} at episode {}"
      print(template.format(episode_reward, episode_count))
      # if episode_count % 10 == 0:
      #     template = "running reward: {:.2f} at episode {}"
      #     print(template.format(running_reward, episode_count))

      if running_reward > 500:  # Condition to consider the task solved
          print("Solved at episode {}!".format(episode_count))
          break

  0%|          | 0/1000 [00:01<?, ?it/s]


TypeError: ignored