In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Normal
import numpy as np
import time
import math
import gc

# --- Hyperparameters ---
LR = 0.00005
entropy_coef = 0.01
BATCH_SIZE = 128
GAMMA = 0.99
K_EPOCHS = 10
HIDDEN_DIM = 128
ACTION_STD = 0.5

# --- Physics Constants (Ported from JS) ---
GRAVITY = 100.0
THRUST_POWER = 300.0
MASS = 1.0
MOMENT_OF_INERTIA = 1000.0
DRAG_COEFFICIENT = 0.05
ANGULAR_DRAG = 2.0
RCS_THRUST = 100.0  # Force applied by RCS (updated for force-based application)
MIN_THROTTLE = 0.4  # Minimum throttle when engine is on (realistic rocket behavior)
FRICTION = 0.5
RESTITUTION = 0.0
STOP_THRESHOLD = 0.5

# --- Sim Constants ---
DT = 1 / 60.0
CANVAS_WIDTH = 800
CANVAS_HEIGHT = 600
PAD_X = 400
PAD_Y = CANVAS_HEIGHT - 50
PAD_WIDTH = 120
ROCKET_WIDTH = 24
ROCKET_HEIGHT = 100  # Updated to match JS physics.js

In [2]:

class ActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.actor = nn.Sequential(
            nn.Linear(state_dim, HIDDEN_DIM),
            nn.ReLU(),
            nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
            nn.ReLU(),
            nn.Linear(HIDDEN_DIM, action_dim)
        )
        self.critic = nn.Sequential(
            nn.Linear(state_dim, HIDDEN_DIM),
            nn.ReLU(),
            nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
            nn.ReLU(),
            nn.Linear(HIDDEN_DIM, 1)
        )
        self.log_std = nn.Parameter(torch.zeros(action_dim))

    def act(self, state):
        mean = self.actor(state)
        std = self.log_std.exp()
        dist = Normal(mean, std)
        raw_action = dist.sample()
        action = torch.tanh(raw_action)
        log_prob = dist.log_prob(raw_action).sum(-1)
        return action, raw_action, log_prob, self.critic(state).squeeze(-1)

    def evaluate(self, states, raw_actions):
        mean = self.actor(states)
        std = self.log_std.exp()
        dist = Normal(mean, std)
        log_probs = dist.log_prob(raw_actions).sum(-1)
        return log_probs, self.critic(states).squeeze(-1), dist.entropy().sum(-1)

In [3]:
class FalconEnv:
    def __init__(self):
        self.state_dim = 6
        self.action_dim = 3
        self.prev_shaping = None
        self.reset()

    def reset(self):
        # Result: [x, y, vx, vy, angle, angularVelocity, throttle, gimbal, fuel, groundContact, landingResult, done, stepCount]
        # Randomize "Anywhere" - matches JS state.js reset logic
        # Random Start Conditions: Center +/- 150px (300px total range)
        # Random initial Angle: 3 to 15 degrees (wider range)
        angle_mag = np.random.uniform(3.0, 15.0) * (np.pi / 180.0)  # 3-15 degrees in radians
        random_angle = angle_mag * (1 if np.random.random() < 0.5 else -1)

        self.state = {
            'x': PAD_X + (np.random.random() - 0.5) * 300.0,  # Center +/- 150 (matches JS)
            'y': CANVAS_HEIGHT * 0.1,  # Start high (matches JS)
            'vx': 0.0,  # No initial horizontal velocity (matches JS)
            'vy': 50.0,  # Initial drop speed (matches JS)
            'angle': random_angle,  # 3-15 degrees either direction (matches JS)
            'angularVelocity': 0.0,  # No initial rotation (matches JS)
            'throttle': 0.0,
            'engineGimbal': 0.0,
            'fuel': 100.0,
            'groundContact': False,
            'landingResult': None,
            'done': False,
            'stepCount': 0
        }
        self.prev_shaping = None
        return self._get_obs()

    def step(self, action):
        # Action: [throttle (0-1), gimbal (-1 to 1), rcs (-1 to 1)]
        throttle, gimbal, rcs = action

        # Clamp Inputs
        throttle = np.clip(throttle, 0.0, 1.0)
        gimbal = np.clip(gimbal, -1.0, 1.0)
        rcs = np.clip(rcs, -1.0, 1.0)

        s = self.state
        s['throttle'] = throttle
        s['engineGimbal'] = gimbal * 0.6  # Max 0.6 rad (updated to match JS autopilot.js)

        # --- Physics Update ---
        fx = 0.0
        fy = GRAVITY * MASS
        torque = 0.0

        # Thrust with minimum throttle (matches JS physics.js)
        if s['throttle'] > 0:
            # Calculate power with minimum throttle (realistic rocket behavior)
            # When throttle > 0, engine power is MIN_THROTTLE + throttle * (1 - MIN_THROTTLE)
            power = MIN_THROTTLE + s['throttle'] * (1 - MIN_THROTTLE)
            thrust_mag = power * THRUST_POWER * MASS
            thrust_angle = s['angle'] + s['engineGimbal']

            # Thrust vector components
            fx += math.sin(thrust_angle) * thrust_mag
            fy -= math.cos(thrust_angle) * thrust_mag

            # Torque from gimbal (increased by 1.5x for better responsiveness, matches JS)
            lever_arm = ROCKET_HEIGHT / 2.0
            torque += -lever_arm * thrust_mag * math.sin(s['engineGimbal']) * 1.5

        # RCS Force Application (at thruster position, matches JS physics.js)
        # RCS thrusters are at the top of the rocket and apply force perpendicular to rocket axis
        if abs(rcs) > 0.3:  # Deadband threshold
            # Calculate thruster position (top of rocket, 86% of height from center)
            THRUSTER_HEIGHT = ROCKET_HEIGHT * 0.86

            # Force direction: perpendicular to rocket axis
            # rcs < -0.3 (left) pushes rocket right (positive X)
            # rcs > 0.3 (right) pushes rocket left (negative X)
            force_dir = 1.0 if rcs < -0.3 else -1.0

            # Force components perpendicular to rocket axis
            force_x = force_dir * math.cos(s['angle']) * RCS_THRUST
            force_y = force_dir * math.sin(s['angle']) * RCS_THRUST

            # Apply force at center of mass (simplified)
            fx += force_x
            fy += force_y

            # Force at thruster position also creates torque
            lever_arm = THRUSTER_HEIGHT
            force_perpendicular = force_dir * RCS_THRUST
            # Torque sign: rcsLeft (forceDir=1) creates CCW rotation (negative torque)
            torque += -lever_arm * force_perpendicular

        # Drag
        fx -= s['vx'] * DRAG_COEFFICIENT
        fy -= s['vy'] * DRAG_COEFFICIENT
        torque -= s['angularVelocity'] * ANGULAR_DRAG

        # Integration
        s['vx'] += (fx / MASS) * DT
        s['vy'] += (fy / MASS) * DT
        s['angularVelocity'] += (torque / MOMENT_OF_INERTIA) * DT

        s['x'] += s['vx'] * DT
        s['y'] += s['vy'] * DT
        s['angle'] += s['angularVelocity'] * DT

        # Collision
        ground_y = PAD_Y - 10 # Approx pad top
        rocket_bottom = s['y'] + ROCKET_HEIGHT / 2.0

        if rocket_bottom >= ground_y:
            s['groundContact'] = True
            s['y'] = ground_y - ROCKET_HEIGHT / 2.0

            if s['vy'] > 0:
                s['vx'] *= FRICTION
                s['angularVelocity'] *= 0.8
                if s['vy'] < STOP_THRESHOLD:
                    s['vy'] = 0.0
                else:
                    s['vy'] = -s['vy'] * RESTITUTION

        # --- Outcome Detection ---
        # Out of bounds
        if s['x'] < 0 or s['x'] > CANVAS_WIDTH or s['y'] > CANVAS_HEIGHT + 100 or s['y'] < -500:
            s['landingResult'] = 'FAILURE'
            s['done'] = True

        # Replace the "Landing Logic" block in step()
        # -------- Replace the Landing Logic in step() --------
        if s['groundContact'] and not s['done']:
            # Relaxed constraints to let the model learn "almost" landings
            angle_deg = abs(s['angle'] * 180.0 / math.pi)
            on_pad = abs(s['x'] - PAD_X) < (PAD_WIDTH * 0.8) # Generous pad width

            # Widen the success window:
            # - Speed < 5.0 (was 1.0)
            # - Angle < 15.0 (was 5.0)
            is_stable = angle_deg < 15.0
            is_slow = abs(s['vy']) < 5.0 and abs(s['vx']) < 5.0

            if on_pad and is_stable and is_slow:
                s['landingResult'] = 'SUCCESS'
            else:
                s['landingResult'] = 'FAILURE'

            # End episode immediately on contact
            s['done'] = True

        # Timeout
        s['stepCount'] += 1
        if s['stepCount'] > 1000:
            s['landingResult'] = 'FAILURE'
            s['done'] = True

        reward = self._compute_reward()

        return self._get_obs(), reward, s['done'], {'result': s['landingResult']}

    def _get_obs(self):
        s = self.state
        # Wrap angle to [-pi, pi]
        wrapped_angle = (s['angle'] + math.pi) % (2 * math.pi) - math.pi
        return np.array([
            (s['x'] - PAD_X) / 200.0,
            (PAD_Y - s['y']) / 400.0,
            s['vx'] / 50.0,
            s['vy'] / 50.0,
            wrapped_angle / math.pi, # Normalized -1 to 1
            s['angularVelocity'] / 10.0
        ], dtype=np.float32)

    def _compute_reward(self):
        s = self.state
        TARGET_X = PAD_X
        TARGET_Y = PAD_Y - ROCKET_HEIGHT / 2.0

        dist_x = abs(s['x'] - TARGET_X) / CANVAS_WIDTH
        dist_y = abs(s['y'] - TARGET_Y) / CANVAS_HEIGHT

        vel_penalty = math.sqrt(s['vx']**2 + s['vy']**2) / 100.0
        angle_penalty = abs(s['angle'])

        shaping = -100 * math.sqrt(dist_x**2 + dist_y**2) \
                  - 100 * vel_penalty \
                  - 100 * angle_penalty

        if self.prev_shaping is not None:
            reward = shaping - self.prev_shaping
        else:
            reward = 0
        self.prev_shaping = shaping

        if s['landingResult'] == 'FAILURE': return reward - 100.0
        if s['landingResult'] == 'SUCCESS': return reward + 500.0 # Increased

        return reward - 0.05

In [4]:
def compute_gae(rewards, values, dones, next_value, gamma=0.99, lam=0.95):
    advantages = []
    gae = 0
    values = values + [next_value]

    for t in reversed(range(len(rewards))):
        delta = rewards[t] + gamma * values[t+1] * (1 - dones[t]) - values[t]
        gae = delta + gamma * lam * (1 - dones[t]) * gae
        advantages.insert(0, gae)

    returns = [adv + val for adv, val in zip(advantages, values[:-1])]
    return torch.tensor(advantages), torch.tensor(returns)

In [5]:
def train():
    env = FalconEnv()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    policy = ActorCritic(env.state_dim, env.action_dim).to(device)
    optimizer = optim.Adam(policy.parameters(), lr=0.0001)

    clip_eps = 0.2
    entropy_coef = 0.05
    best_success = 0

    state = env.reset()
    success_history = []

    for episode in range(1, 1001):
        states, raw_actions, logprobs, rewards, dones, values = [], [], [], [], [], []
        ep_reward = 0
        success_count = 0

        for _ in range(2048):
            st = torch.tensor(state, dtype=torch.float32).to(device)
            with torch.no_grad():
                action, raw_a, lp, val = policy.act(st)

            a_env = action.cpu().numpy()
            next_state, reward, done, info = env.step([(a_env[0]+1)/2, a_env[1], a_env[2]])

            states.append(st)
            raw_actions.append(raw_a)
            logprobs.append(lp)
            dones.append(done)
            values.append(val.item())
            rewards.append(reward)
            ep_reward += reward

            state = next_state

            if done:
                if info.get('result') == 'SUCCESS':
                    success_count += 1
                state = env.reset()

        with torch.no_grad():
            next_val = policy.critic(torch.tensor(state, dtype=torch.float32).to(device)).item()
        adv, ret = compute_gae(rewards, values, dones, next_val, GAMMA)
        adv = (adv - adv.mean()) / (adv.std() + 1e-8)

        b_s = torch.stack(states)
        b_a = torch.stack(raw_actions)
        b_lp = torch.stack(logprobs)
        b_ret = ret.to(device)
        b_adv = adv.to(device)

        for _ in range(K_EPOCHS):
            inds = np.arange(2048)
            np.random.shuffle(inds)
            for i in range(0, 2048, BATCH_SIZE):
                sb = inds[i:i+BATCH_SIZE]
                lp_n, val_n, ent = policy.evaluate(b_s[sb], b_a[sb])
                ratio = torch.exp(lp_n - b_lp[sb])

                surr1 = ratio * b_adv[sb]
                surr2 = torch.clamp(ratio, 1 - clip_eps, 1 + clip_eps) * b_adv[sb]

                loss = -torch.min(surr1, surr2).mean() + 0.5 * (b_ret[sb] - val_n).pow(2).mean() - entropy_coef * ent.mean()

                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(policy.parameters(), max_norm=0.5)
                optimizer.step()

        if success_count > best_success:
            best_success = success_count
            torch.save(policy.state_dict(), "falcon_ppo_best.pth")
            print(f"New Best Model Saved! Successes: {best_success}")

        success_history.append(success_count)

        if episode % 5 == 0:
            avg_success = (sum(success_history[-10:]) / 10)

            if avg_success > 1.5:
                for param_group in optimizer.param_groups:
                    if param_group['lr'] > 0.00001:
                        param_group['lr'] = 0.00001
                        print("LR Reduced to 1e-5")

            dist_x = abs(next_state[0] * 200.0)
            last_vx = next_state[2] * 50.0
            last_vy = next_state[3] * 50.0
            last_throttle = (a_env[0] + 1) / 2

            print(f"Ep: {episode} | Rew: {ep_reward:.1f} | SR: {avg_success:.1f} | Vy: {last_vy:.1f} | Thr: {last_throttle:.2f} | DistX: {dist_x:.1f}")

    torch.save(policy.state_dict(), "falcon_ppo_final.pth")

In [6]:
gc.collect()
torch.cuda.empty_cache()
# train()

In [11]:
import torch
import torch.nn as nn
# from train_falcon_rl import ActorCritic, FalconEnv

def export_to_onnx():
    # 1. Config - IMPORTANT: Use the best model
    MODEL_PATH = "/Users/asutoshdalei/Desktop/Work/Game2/rl/falcon_ppo_best.pth"  # Use best model, not just falcon_ppo.pth
    ONNX_PATH = "/Users/asutoshdalei/Desktop/Work/Game2/rl/falcon_ppo_best.onnx"  # Match the model name

    # 2. Setup Environment & Model
    env = FalconEnv()
    state_dim = env.state_dim
    action_dim = env.action_dim
    device = torch.device("cpu")

    model = ActorCritic(state_dim, action_dim)
    state_dict = torch.load(MODEL_PATH, map_location=device)
    model.load_state_dict(state_dict)
    model.eval()
    model.actor.float()

    # 3. CRITICAL FIX: Create a wrapper that includes tanh
    # During training: action = tanh(sample(Normal(mean, std)))
    # During inference: we want tanh(mean) to get deterministic action in [-1, 1]
    # Without this, the model outputs raw means (like -9.08) which tanh to -1, causing throttle=0
    class ActorWithTanh(nn.Module):
        def __init__(self, actor):
            super().__init__()
            self.actor = actor
        
        def forward(self, x):
            mean = self.actor(x)
            return torch.tanh(mean)  # Apply tanh to get actions in [-1, 1]
    
    actor_with_tanh = ActorWithTanh(model.actor)
    actor_with_tanh.eval()

    # 4. Create Dummy Input
    dummy_input = torch.randn(1, state_dim, dtype=torch.float32, device=device)

    print(f"Exporting model with input shape: {dummy_input.shape}")
    print("⚠️  CRITICAL: Model will output tanh(mean) in [-1, 1] range")
    print("   This fixes the issue where raw means (like -9.08) cause throttle=0")

    # 5. Export with tanh included
    torch.onnx.export(
        actor_with_tanh,          # Export wrapper with tanh (FIXED)
        dummy_input,
        ONNX_PATH,
        export_params=True,
        opset_version=11,
        do_constant_folding=True,
        input_names=['input.1'],   # Match JS input name
        output_names=['output.1'], # Match JS output name
        dynamic_axes={
            'input.1': {0: 'batch_size'},
            'output.1': {0: 'batch_size'}
        }
    )

    print(f"✅ Success! Model exported to: {ONNX_PATH}")
    print("   Model outputs are now in [-1, 1] range (tanh already applied)")
    print("   JavaScript will detect values are in range and use them directly")

export_to_onnx()

Exporting model with input shape: torch.Size([1, 6])
⚠️  CRITICAL: Model will output tanh(mean) in [-1, 1] range
   This fixes the issue where raw means (like -9.08) cause throttle=0


  torch.onnx.export(


✅ Success! Model exported to: /Users/asutoshdalei/Desktop/Work/Game2/rl/falcon_ppo_best.onnx
   Model outputs are now in [-1, 1] range (tanh already applied)
   JavaScript will detect values are in range and use them directly


In [10]:
!pip3 install onnx onnxscript onnxruntime --quiet

In [8]:
pwd

'/Users/asutoshdalei/Desktop/Work/Game2/rl'