In [None]:
!pip install swig
!pip install gymnasium[box2d]
!pip install box2d



In [None]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import os
from PIL import Image, ImageDraw, ImageFont
import numpy as np


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [None]:
class ActorCritic(nn.Module):
  def __init__(self, obs_dim, act_dim):
    super().__init__()
    self.net = nn.Sequential(
      nn.Linear(obs_dim, 128),
      nn.ReLU(),
      nn.Linear(128, 128),
      nn.ReLU()
    )
    self.policy = nn.Linear(128, act_dim)
    self.value = nn.Linear(128, 1)

  def forward(self, x):
    x = self.net(x)
    return self.policy(x), self.value(x)

  def step(self, state):
    state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
    logits, value = self.forward(state)
    probs = torch.softmax(logits, dim=-1)
    dist = torch.distributions.Categorical(probs)
    action = dist.sample()
    return action.item(), dist.log_prob(action).item(), value.item()

In [None]:
class LunarLanderShaped(gym.Wrapper):
  def __init__(self):
    super().__init__(gym.make("LunarLander-v3", render_mode="rgb_array"))
    self.prev_dist = None

  def reset(self, **kwargs):
    obs, info = self.env.reset(**kwargs)
    self.prev_dist = np.sqrt(obs[0]**2 + obs[1]**2)
    return obs, info

  def step(self, action):
    obs, reward, term, trunc, info = self.env.step(action)

    x, y, vx, vy, angle, vangle, leg1, leg2 = obs

    # distance to landing pad
    dist = np.sqrt(x*x + y*y)

    # shaping: reward for moving closer
    reward += 2.0 * (self.prev_dist - dist)
    self.prev_dist = dist

    # shaping: penalty for hovering with almost zero movement
    if abs(vx) < 0.03 and abs(vy) < 0.03:
      reward -= 0.05

    # shaping: encourage touching down
    if term and not trunc:
      reward += 50.0   # successful landing bonus

    return obs, reward, term, trunc, info

In [None]:
def draw_instrument_panel(state, action, base_frame):

  x, y, vx, vy, angle, ang_vel, l1, l2 = state

  landed = (l1 == 1 and l2 == 1 and abs(vx) < 0.4 and abs(vy) < 0.4 and abs(angle) < 0.4)

  frame = Image.fromarray(base_frame)
  W, H = frame.size

  panel_h = 250
  out = Image.new("RGB", (W, H + panel_h), (18, 18, 18))
  out.paste(frame, (0, 0))
  draw = ImageDraw.Draw(out)

  # -------------------------------------------------
  # 1. avia horizont
  # -------------------------------------------------
  radius = 75
  cx = W // 2
  cy = H + 70

  horizon = Image.new("RGB", (2*radius, 2*radius), (0, 0, 0))
  hdraw = ImageDraw.Draw(horizon)

  hdraw.rectangle((0, 0, 2*radius, radius), fill=(40, 90, 180))       # небо
  hdraw.rectangle((0, radius, 2*radius, 2*radius), fill=(110, 80, 50)) # земля
  hdraw.line((0, radius, 2*radius, radius), fill=(255,255,255), width=4)

  tilt_deg = angle * 180 / np.pi
  rotated = horizon.rotate(tilt_deg, resample=Image.BICUBIC)

  mask = Image.new("L", (2*radius, 2*radius), 0)
  m = ImageDraw.Draw(mask)
  m.ellipse((0, 0, 2*radius, 2*radius), fill=255)

  out.paste(rotated, (cx - radius, cy - radius), mask)

  draw.ellipse((cx-radius-4, cy-radius-4, cx+radius+4, cy+radius+4),
               outline=(220,220,220), width=4)


  speed_gain = 3.0

  # -------------------------------------------------
  # 2. Vertical speed
  # -------------------------------------------------
  vs_x = 70
  vs_y1 = H + 40
  vs_y2 = H + 200

  draw.text((vs_x - 30, vs_y1 - 30), "VERT SPD", fill=(200,255,200))

  draw.line((vs_x, vs_y1, vs_x, vs_y2), fill=(150,150,150), width=3)

  for i in range(6):
    ytick = vs_y1 + i * (vs_y2 - vs_y1) / 5
    draw.line((vs_x - 12, ytick, vs_x + 12, ytick), fill=(150,150,150), width=2)

  vy_scaled = vy * speed_gain
  vy_clamped = np.clip(vy_scaled, -5, 5)

  arrow_y = vs_y1 + (vy_clamped + 5) * (vs_y2 - vs_y1) / 10
  draw.line((vs_x - 22, arrow_y, vs_x + 22, arrow_y), fill=(200,255,200), width=4)


  # -------------------------------------------------
  # 3. Horizontal speed
  # -------------------------------------------------
  hs_y = H + 225
  hs_x1 = 150
  hs_x2 = W - 150

  draw.text((hs_x1, hs_y - 30), "HORZ SPD", fill=(200,255,200))

  draw.line((hs_x1, hs_y, hs_x2, hs_y), fill=(150,150,150), width=3)

  for i in range(6):
    xtick = hs_x1 + i * (hs_x2 - hs_x1) / 5
    draw.line((xtick, hs_y - 12, xtick, hs_y + 12), fill=(150,150,150), width=2)

  vx_scaled = vx * speed_gain
  vx_clamped = np.clip(vx_scaled, -5, 5)

  arrow_x = hs_x1 + (vx_clamped + 5) * (hs_x2 - hs_x1) / 10
  draw.line((arrow_x, hs_y - 22, arrow_x, hs_y + 22), fill=(200,255,200), width=4)


  # -------------------------------------------------
  # 4. ALTITUDE - right scale
  # -------------------------------------------------
  alt_x = W - 70
  alt_y1 = H + 40
  alt_y2 = H + 200

  draw.text((alt_x - 20, alt_y1 - 30), "ALT", fill=(200,255,200))

  draw.line((alt_x, alt_y1, alt_x, alt_y2), fill=(150,150,150), width=3)

  for i in range(6):
    ytick = alt_y1 + i * (alt_y2 - alt_y1) / 5
    draw.line((alt_x - 12, ytick, alt_x + 12, ytick), fill=(150,150,150), width=2)

  y_clamped = np.clip(y, 0, 1.4)  # высота в Lunar Lander ~ 1.4
  arrow_y = alt_y2 - y_clamped * (alt_y2 - alt_y1) / 1.4

  draw.line((alt_x - 22, arrow_y, alt_x + 22, arrow_y), fill=(200,255,200), width=4)


  # -------------------------------------------------
  # 5. ENGINES BUTTONS
  # -------------------------------------------------
  def ec(on): return (255,80,80) if on else (80,80,80)

  by = H + 10
  bw = 110
  bh = 35

  bx = W//2 - 180

  draw.rectangle((bx, by, bx+bw, by+bh), fill=ec(action==1))
  draw.text((bx+30, by+10), "LEFT", fill=(255,255,255))

  draw.rectangle((bx+140, by, bx+140+bw, by+bh), fill=ec(action==2))
  draw.text((bx+165, by+10), "MAIN", fill=(255,255,255))

  draw.rectangle((bx+280, by, bx+280+bw, by+bh), fill=ec(action==3))
  draw.text((bx+310, by+10), "RIGHT", fill=(255,255,255))


  # -------------------------------------------------
  # 6. Lamp LANDED
  # -------------------------------------------------
  lx = W - 100
  ly = H + 210
  lw = 70
  lh = 20

  color = (0,180,0) if landed else (150,0,0)
  text = "LANDED" if landed else "-----"

  draw.rectangle((lx, ly, lx+lw, ly+lh), fill=color)
  draw.text((lx+18, ly+7), text, fill=(255,255,255))

  return np.array(out)


In [None]:
def record_eval_video(model, epoch, folder="training_stats"):
  import imageio, base64
  from IPython.display import HTML

  env = gym.make("LunarLander-v3", render_mode="rgb_array")
  state, _ = env.reset()

  frames = []
  total_reward = 0
  final_obs = None

  # ---- 1 episode rollout ----
  for _ in range(1000):
    st = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
    logits, _ = model(st)
    probs = torch.softmax(logits, dim=-1)
    action = torch.argmax(probs, dim=-1).item()

    next_state, reward, term, trunc, _ = env.step(action)

    base = env.render()
    panel = draw_instrument_panel(state, action, base)
    frames.append(panel)

    total_reward += reward
    state = next_state

    if term or trunc:
      final_obs = next_state
      break

  env.close()

  # ---- determine success ----
  def is_soft_landing(obs):
    x, y, vx, vy, angle, ang_vel, l1, l2 = obs
    return (
      l1 == 1 and l2 == 1 and
      abs(vx) < 0.6 and     # relaxed
      abs(vy) < 0.6 and
      abs(angle) < 0.6
    )

  success = is_soft_landing(final_obs) if final_obs is not None else False

  # ---- save video ----
  filename = f"{folder}/epoch_{epoch}_eval.mp4"
  imageio.mimsave(filename, frames, fps=60)

  # ---- save small metrics ----
  import json
  metrics = {
    "epoch": epoch,
    "reward": float(total_reward),
    "success": bool(success),
    "final_state": [float(x) for x in final_obs] if final_obs is not None else None
  }

  with open(f"{folder}/epoch_{epoch}_metrics.json", "w") as f:
    json.dump(metrics, f, indent=2)

  print(f"[SAVE] Video + metrics saved for epoch {epoch}")

  return filename


In [None]:
def train_ppo(epochs=200, steps_per_epoch=3000, gamma=0.99, lam=0.95, clip_ratio=0.2, lr=3e-4, train_iters=80):
  env = LunarLanderShaped()
  obs_dim = env.observation_space.shape[0]
  act_dim = env.action_space.n

  model = ActorCritic(obs_dim, act_dim).to(device)
  optimizer = optim.Adam(model.parameters(), lr=lr)

  for epoch in range(epochs):
    obs_buf, act_buf, adv_buf, ret_buf, logp_buf = [], [], [], [], []
    val_buf, rew_buf, done_buf = [], [], []

    state, _ = env.reset()

    # ---- rollout ----
    for _ in range(steps_per_epoch):
        action, logp, value = model.step(state)
        next_state, reward, term, trunc, _ = env.step(action)
        done = term or trunc

        obs_buf.append(state)
        act_buf.append(action)
        logp_buf.append(logp)
        val_buf.append(value)
        rew_buf.append(reward)
        done_buf.append(done)

        state = next_state
        if done:
            state, _ = env.reset()



    # ---- GAE advantage ----
    vals = np.array(val_buf + [0])
    adv = np.zeros_like(rew_buf)
    last_gae = 0

    for t in reversed(range(len(rew_buf))):
      delta = rew_buf[t] + gamma * vals[t+1] * (1 - done_buf[t]) - vals[t]
      adv[t] = last_gae = delta + gamma * lam * (1 - done_buf[t]) * last_gae

    ret = adv + vals[:-1]

    # ---- to tensors ----
    obs = torch.tensor(obs_buf, dtype=torch.float32).to(device)
    act = torch.tensor(act_buf, dtype=torch.long).to(device)
    logp_old = torch.tensor(logp_buf, dtype=torch.float32).to(device)
    adv = torch.tensor(adv, dtype=torch.float32).to(device)
    ret = torch.tensor(ret, dtype=torch.float32).to(device)

    adv = (adv - adv.mean()) / (adv.std() + 1e-8)

    # ---- update PPO ----
    for _ in range(train_iters):
      logits, values = model(obs)
      dist = torch.distributions.Categorical(logits=logits)
      logp = dist.log_prob(act)

      ratio = torch.exp(logp - logp_old)
      surr1 = ratio * adv
      surr2 = torch.clamp(ratio, 1 - clip_ratio, 1 + clip_ratio) * adv

      loss_pi = -torch.min(surr1, surr2).mean()
      loss_v = ((values.squeeze() - ret) ** 2).mean()
      loss = loss_pi + 0.5 * loss_v

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

    print(f"Epoch {epoch + 1}: Reward={sum(rew_buf) / (sum(done_buf)+1):.1f}")

    if (epoch + 1) % 20 == 0:
      record_eval_video(model, epoch + 1)

  return model


In [None]:
import shutil, os
import imageio, base64
from IPython.display import HTML

folder = "training_stats"

if os.path.exists(folder):
    shutil.rmtree(folder)

os.makedirs(folder, exist_ok=True)

In [None]:
model = train_ppo(epochs=200)

Epoch 1: Reward=-126.0
Epoch 2: Reward=-152.9
Epoch 3: Reward=-151.0
Epoch 4: Reward=-120.3
Epoch 5: Reward=-134.9
Epoch 6: Reward=-70.4
Epoch 7: Reward=-79.3
Epoch 8: Reward=-104.2
Epoch 9: Reward=-97.9
Epoch 10: Reward=-66.9
Epoch 11: Reward=-73.1
Epoch 12: Reward=-49.4
Epoch 13: Reward=-42.9
Epoch 14: Reward=-39.2
Epoch 15: Reward=-53.1
Epoch 16: Reward=-18.6
Epoch 17: Reward=-21.1
Epoch 18: Reward=-41.5
Epoch 19: Reward=-25.6
Epoch 20: Reward=18.6




[SAVE] Video + metrics saved for epoch 20
Epoch 21: Reward=27.3
Epoch 22: Reward=-16.5
Epoch 23: Reward=-8.9
Epoch 24: Reward=-7.3
Epoch 25: Reward=11.7
Epoch 26: Reward=-20.0
Epoch 27: Reward=-5.1
Epoch 28: Reward=2.5
Epoch 29: Reward=22.1
Epoch 30: Reward=10.1
Epoch 31: Reward=8.1
Epoch 32: Reward=86.1
Epoch 33: Reward=62.8
Epoch 34: Reward=61.1
Epoch 35: Reward=38.3
Epoch 36: Reward=52.4
Epoch 37: Reward=55.7
Epoch 38: Reward=41.0
Epoch 39: Reward=62.5
Epoch 40: Reward=79.7




[SAVE] Video + metrics saved for epoch 40
Epoch 41: Reward=40.9
Epoch 42: Reward=38.4
Epoch 43: Reward=17.7
Epoch 44: Reward=42.1
Epoch 45: Reward=30.8
Epoch 46: Reward=11.1
Epoch 47: Reward=83.2
Epoch 48: Reward=8.0
Epoch 49: Reward=42.8
Epoch 50: Reward=5.1
Epoch 51: Reward=26.3
Epoch 52: Reward=88.5
Epoch 53: Reward=57.4
Epoch 54: Reward=39.6
Epoch 55: Reward=44.1
Epoch 56: Reward=95.0
Epoch 57: Reward=75.8
Epoch 58: Reward=109.3
Epoch 59: Reward=71.7
Epoch 60: Reward=86.6




[SAVE] Video + metrics saved for epoch 60
Epoch 61: Reward=60.0
Epoch 62: Reward=105.0
Epoch 63: Reward=80.2
Epoch 64: Reward=124.4
Epoch 65: Reward=62.2
Epoch 66: Reward=80.3
Epoch 67: Reward=80.0
Epoch 68: Reward=77.4
Epoch 69: Reward=98.3
Epoch 70: Reward=76.6
Epoch 71: Reward=101.2
Epoch 72: Reward=239.9
Epoch 73: Reward=179.0
Epoch 74: Reward=136.9
Epoch 75: Reward=107.0
Epoch 76: Reward=192.0
Epoch 77: Reward=247.4
Epoch 78: Reward=-16.1
Epoch 79: Reward=164.5
Epoch 80: Reward=212.4




[SAVE] Video + metrics saved for epoch 80
Epoch 81: Reward=208.9
Epoch 82: Reward=161.1
Epoch 83: Reward=136.6
Epoch 84: Reward=94.9
Epoch 85: Reward=193.1
Epoch 86: Reward=239.3
Epoch 87: Reward=236.7
Epoch 88: Reward=198.6
Epoch 89: Reward=145.1
Epoch 90: Reward=202.4
Epoch 91: Reward=189.3
Epoch 92: Reward=182.6
Epoch 93: Reward=276.8
Epoch 94: Reward=235.0
Epoch 95: Reward=241.3
Epoch 96: Reward=178.3
Epoch 97: Reward=137.9
Epoch 98: Reward=208.1
Epoch 99: Reward=302.2
Epoch 100: Reward=236.1




[SAVE] Video + metrics saved for epoch 100
Epoch 101: Reward=237.7
Epoch 102: Reward=239.7
Epoch 103: Reward=261.8
Epoch 104: Reward=188.5
Epoch 105: Reward=199.3
Epoch 106: Reward=262.6
Epoch 107: Reward=246.8
Epoch 108: Reward=274.2
Epoch 109: Reward=192.3
Epoch 110: Reward=201.6
Epoch 111: Reward=277.1
Epoch 112: Reward=196.0
Epoch 113: Reward=276.8
Epoch 114: Reward=274.2
Epoch 115: Reward=250.2
Epoch 116: Reward=299.7
Epoch 117: Reward=258.0
Epoch 118: Reward=237.7
Epoch 119: Reward=281.1
Epoch 120: Reward=279.9




[SAVE] Video + metrics saved for epoch 120
Epoch 121: Reward=270.2
Epoch 122: Reward=253.5
Epoch 123: Reward=234.1
Epoch 124: Reward=249.7
Epoch 125: Reward=198.6
Epoch 126: Reward=197.8
Epoch 127: Reward=259.9
Epoch 128: Reward=196.3
Epoch 129: Reward=308.7
Epoch 130: Reward=267.9
Epoch 131: Reward=274.9
Epoch 132: Reward=283.1
Epoch 133: Reward=181.0
Epoch 134: Reward=283.6
Epoch 135: Reward=310.4
Epoch 136: Reward=133.5
Epoch 137: Reward=252.3
Epoch 138: Reward=286.6
Epoch 139: Reward=268.8
Epoch 140: Reward=296.2




[SAVE] Video + metrics saved for epoch 140
Epoch 141: Reward=267.4
Epoch 142: Reward=305.9
Epoch 143: Reward=243.8
Epoch 144: Reward=249.6
Epoch 145: Reward=202.0
Epoch 146: Reward=226.0
Epoch 147: Reward=292.0
Epoch 148: Reward=272.3
Epoch 149: Reward=271.1
Epoch 150: Reward=266.8
Epoch 151: Reward=290.6
Epoch 152: Reward=260.4
Epoch 153: Reward=259.0
Epoch 154: Reward=239.0
Epoch 155: Reward=290.7
Epoch 156: Reward=265.7
Epoch 157: Reward=283.5
Epoch 158: Reward=268.1
Epoch 159: Reward=214.2
Epoch 160: Reward=199.0




[SAVE] Video + metrics saved for epoch 160
Epoch 161: Reward=275.5
Epoch 162: Reward=269.2
Epoch 163: Reward=294.3
Epoch 164: Reward=233.9
Epoch 165: Reward=235.3
Epoch 166: Reward=203.0
Epoch 167: Reward=252.7
Epoch 168: Reward=255.6
Epoch 169: Reward=226.2
Epoch 170: Reward=252.8
Epoch 171: Reward=224.6
Epoch 172: Reward=244.5
Epoch 173: Reward=251.9
Epoch 174: Reward=271.1
Epoch 175: Reward=231.8
Epoch 176: Reward=257.3
Epoch 177: Reward=225.4
Epoch 178: Reward=316.0
Epoch 179: Reward=288.1
Epoch 180: Reward=286.8




[SAVE] Video + metrics saved for epoch 180
Epoch 181: Reward=285.9
Epoch 182: Reward=250.6
Epoch 183: Reward=246.5
Epoch 184: Reward=283.9
Epoch 185: Reward=294.5
Epoch 186: Reward=274.8
Epoch 187: Reward=301.4
Epoch 188: Reward=275.5
Epoch 189: Reward=282.6
Epoch 190: Reward=268.5
Epoch 191: Reward=252.6
Epoch 192: Reward=282.4
Epoch 193: Reward=312.1
Epoch 194: Reward=268.9
Epoch 195: Reward=255.2
Epoch 196: Reward=275.1
Epoch 197: Reward=313.2
Epoch 198: Reward=282.7
Epoch 199: Reward=277.0
Epoch 200: Reward=314.4




[SAVE] Video + metrics saved for epoch 200


In [None]:
torch.save(model.state_dict(), "ppo_lunarlander.pt")

In [None]:
import imageio
from IPython.display import HTML
from base64 import b64encode


In [None]:
def play_colab(model, episodes=1):
  env = gym.make("LunarLander-v3", render_mode="rgb_array")
  frames = []
  final_reward = 0
  final_obs = None

  for ep in range(episodes):
    state, _ = env.reset()
    done = False
    total_reward = 0

    while not done:
      st = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)

      logits, _ = model(st)
      probs = torch.softmax(logits, dim=-1)
      action = torch.argmax(probs, dim=-1).item()

      state, reward, term, trunc, _ = env.step(action)
      total_reward += reward
      done = term or trunc

      frames.append(env.render())

      if done:
        final_obs = state
        final_reward = total_reward

  env.close()

  # ----------- Success Evaluation -----------
  def is_soft_landing(obs):
    x, y, vx, vy, angle, ang_vel, l1, l2 = obs
    return (
      l1 == 1 and l2 == 1 and
      abs(vx) < 0.4 and
      abs(vy) < 0.4 and
      abs(angle) < 0.4
    )

  print(f"Total reward: {final_reward:.1f}")

  if final_reward > 200:
    print("✔ Succcessful landing (reward-based check)")
  elif final_obs is not None and is_soft_landing(final_obs):
    print("✔ Soft landing")
  elif final_reward < -50:
    print("✖ Crash | Lander destruction")
  else:
    print("✖ Unsuccessful landing | Timeout")

  # ----------- Video export -----------
  import imageio, base64
  from IPython.display import HTML

  imageio.mimsave("lander.mp4", frames, fps=60)

  mp4 = open("lander.mp4",'rb').read()
  data_url = "data:video/mp4;base64," + base64.b64encode(mp4).decode()
  return HTML(f"<video width=480 controls><source src='{data_url}' type='video/mp4'></video>")


In [None]:
obs_dim = 8         # у LunarLander-v3 всегда 8
act_dim = 4         # 4 действия

model = ActorCritic(obs_dim, act_dim).to(device)
model.load_state_dict(torch.load("ppo_lunarlander.pt", map_location=device))
model.eval()

play_colab(model)




Total reward: 298.5
✔ Успешная посадка (reward-based check)


In [None]:
import shutil
from google.colab import files

# zip folder
shutil.make_archive("training_stats", "zip", "training_stats")

# download
files.download("training_stats.zip")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>