### Free falling ball catch visualization using open3d

In [1]:
import zmq, time, math, numpy as np, open3d as o3d, threading
from collections import deque

# ---------- CONFIG ----------
ZMQ_ADDR = "tcp://localhost:5566"   # predictor publishes here by default. See predictor.py.
TARGET_Z = 0.20                     # 20 cm intercept height
CAR_START = np.array([0.30, 0.30, 0.0])  # meters (x,y,z)
CAR_L = 0.25   # length (m)
CAR_W = 0.25   # width  (m)
CAR_H = 0.075  # height (m)
TURRET_H = 0.125   # cylinder height (m)
TURRET_D = 0.11    # cylinder diameter (m)
CAR_SPEED = 0.5    # m/s (constant velocity for planner simulation)
SIM_DT = 0.03      # seconds per simulation tick (~33Hz)

# Visualization colors
COLOR_CAR = [0.2, 0.6, 0.2]
COLOR_TURRET = [0.8, 0.7, 0.2]
COLOR_BALL = [1.0, 0.4, 0.0]
COLOR_TRAIL = [0.9, 0.2, 0.2]
COLOR_LANDING_OK = [0.0, 0.8, 0.0]
COLOR_LANDING_FAIL = [0.8, 0.0, 0.0]

# ---------- Helper functions ----------
def solve_time_to_z(z0, vz0, target_z=TARGET_Z, g=9.81):
    a = -0.5 * g
    b = vz0
    c = z0 - target_z
    disc = b*b - 4*a*c
    if disc < 0: return None
    r1 = (-b + math.sqrt(disc)) / (2*a)
    r2 = (-b - math.sqrt(disc)) / (2*a)
    ts = [t for t in (r1, r2) if t > 0]
    return min(ts) if ts else None

def is_safe_catch(car_center, ball_xy):
    # Dimensions
    turret_radius = TURRET_D / 2.0  # 0.055 m
    ball_radius = 0.025             # 0.025 m (Matches mesh radius)
    
    # Calculate distance between Car Center and Ball Center
    dx = ball_xy[0] - car_center[0]
    dy = ball_xy[1] - car_center[1]
    dist = math.sqrt(dx*dx + dy*dy)

    # STRICT CHECK: Ball must be fully inside the turret (not hitting the rim)
    # Distance + BallRadius <= TurretRadius
    safe_margin = turret_radius - ball_radius  # 0.055 - 0.025 = 0.03 m
    
    return dist <= safe_margin

# ---------- ZMQ Subscriber Thread ----------
class PredictorSubscriber(threading.Thread):
    def __init__(self, zmq_addr=ZMQ_ADDR):
        super().__init__(daemon=True)
        self.ctx = zmq.Context()
        self.sub = self.ctx.socket(zmq.SUB)
        self.sub.connect(zmq_addr)
        self.sub.setsockopt_string(zmq.SUBSCRIBE, "")
        self.latest = None
        self.lock = threading.Lock()
        self.running = True

    def run(self):
        while self.running:
            try:
                msg = self.sub.recv_json(flags=0)
            except Exception:
                time.sleep(0.005)
                continue
            with self.lock:
                self.latest = msg

    def get(self):
        with self.lock:
            return self.latest

    def stop(self):
        self.running = False
        try: self.sub.close()
        except: pass
        try: self.ctx.term()
        except: pass

# ---------- Open3D Visualizer / Simulator ----------
class PlannerSim:
    def __init__(self):
        # state
        self.ball_pos = None      # [x,y,z]
        self.ball_prev_z = None
        self.ball_trail = deque(maxlen=200)
        self.predicted_landing = None
        self.landed = False
        self.landing_point = None
        self.landing_inside = False

        # car state
        self.car_center = CAR_START.copy()
        self.car_vel = np.array([0.0, 0.0, 0.0])

        # tracking for optimized updates
        self.last_ball_pos = None
        self.last_car_center = None

        # ZMQ subscriber
        self.sub = PredictorSubscriber(ZMQ_ADDR)
        self.sub.start()

        # open3d setup
        self.vis = o3d.visualization.VisualizerWithKeyCallback()
        self.vis.create_window("Planner Simulation (Free-Fall)", width=1280, height=720)
        self._setup_scene()
        self.vis.register_animation_callback(self.update)

        # timing
        self.last_time = time.time()

    def _setup_scene(self):
        # floor grid and axes
        grid = o3d.geometry.LineSet()
        pts = []
        lines = []
        size = 1.5
        step = 0.25
        for i in np.arange(-size, size+1e-6, step):
            pts.append([i, -size, 0]); pts.append([i, size, 0]); lines.append([len(pts)-2, len(pts)-1])
            pts.append([-size, i, 0]); pts.append([size, i, 0]); lines.append([len(pts)-2, len(pts)-1])
        grid.points = o3d.utility.Vector3dVector(np.array(pts))
        grid.lines = o3d.utility.Vector2iVector(np.array(lines))
        grid.colors = o3d.utility.Vector3dVector([[0.5,0.5,0.5] for _ in lines])
        self.vis.add_geometry(grid)

        # ball (sphere)
        self.ball_mesh = o3d.geometry.TriangleMesh.create_sphere(radius=0.025)
        self.ball_mesh.paint_uniform_color(COLOR_BALL)
        self.ball_mesh.compute_vertex_normals()
        self.vis.add_geometry(self.ball_mesh)

        # trail point cloud
        self.trail_pcd = o3d.geometry.PointCloud()
        self.trail_pcd.points = o3d.utility.Vector3dVector(np.zeros((0, 3)))
        self.trail_pcd.paint_uniform_color(COLOR_TRAIL)
        self.vis.add_geometry(self.trail_pcd)

        # car box
        box = o3d.geometry.TriangleMesh.create_box(width=CAR_L, height=CAR_W, depth=CAR_H)
        # Open3D box origin at (0,0,0) with x in width dir; we want center at car_center and z base at 0.
        box.translate([-CAR_L/2.0, -CAR_W/2.0, 0.0])
        box.paint_uniform_color(COLOR_CAR)
        box.compute_vertex_normals()
        self.car_mesh = box
        self.vis.add_geometry(self.car_mesh)

        # turret cylinder
        self.turret_mesh = o3d.geometry.TriangleMesh.create_cylinder(radius=TURRET_D / 2.0, height=TURRET_H)
        self.turret_mesh.paint_uniform_color(COLOR_TURRET)
        self.turret_mesh.compute_vertex_normals()
        # initial position: bottom at z=0
        self.turret_mesh.translate([0, 0, CAR_H])
        self.vis.add_geometry(self.turret_mesh)

        # landing marker (sphere) - hidden initially
        self.land_marker = o3d.geometry.TriangleMesh.create_sphere(radius=0.02)
        self.land_marker.paint_uniform_color(COLOR_LANDING_FAIL)
        self.land_marker.compute_vertex_normals()
        # hide initially
        lm_cur = self.land_marker.get_center()
        hidden_pos = np.array([0.0, 0.0, -10.0])
        self.land_marker.translate(hidden_pos - lm_cur, relative=True)
        self.land_marker_visible = False
        self.vis.add_geometry(self.land_marker)

        # camera
        ctr = self.vis.get_view_control()
        ctr.set_lookat([0.45, 0.45, 0.1])
        ctr.set_front([-0.5, -0.8, -0.3])
        ctr.set_up([0,0,1])
        ctr.set_zoom(0.6)

    def update(self, vis):
        # dt
        now = time.time()
        dt = now - self.last_time
        if dt <= 0: dt = SIM_DT
        self.last_time = now

        msg = self.sub.get()
        ball_updated = False
        if msg is not None:
            # payload format per predictor/planner: "ball": {x,y,z,valid}, "tag4":..., "tag5":...
            b = msg.get("ball", {})
            bx = b.get("x"); by = b.get("y"); bz = b.get("z")
            valid = b.get("valid", False)
            if valid and (bx is not None) and (by is not None) and (bz is not None):
                new_pos = np.array([float(bx), float(by), float(bz)])
                # check if significantly different to avoid redundant updates
                if self.ball_pos is None or np.linalg.norm(new_pos - self.ball_pos) > 1e-6:
                    self.ball_pos = new_pos
                    self.ball_trail.append(self.ball_pos.copy())
                    ball_updated = True

                    # compute free-fall predicted landing XY: for free fall we aim directly under current xy
                    # Compute time to TARGET_Z with vz=0 (free-fall)
                    vz_assumed = 0.0
                    t_hit = solve_time_to_z(self.ball_pos[2], vz_assumed, TARGET_Z)
                    if t_hit is None: 
                        self.predicted_landing = None
                    else:
                        # free-fall landing xy = current xy (no horizontal motion predicted)
                        self.predicted_landing = np.array([self.ball_pos[0], self.ball_pos[1]])
            else:
                # no valid ball -> keep previous
                pass

        # Move car toward predicted landing XY at CAR_SPEED
        car_moved = False
        if self.predicted_landing is not None and not self.landed:
            target = np.array([self.predicted_landing[0], self.predicted_landing[1], 0.0])
            vec = target - self.car_center
            dist = np.linalg.norm(vec[:2])
            if dist > 1e-6:
                dir2 = vec[:2] / dist
                step = CAR_SPEED * dt
                old_car_center = self.car_center.copy()
                if step >= dist:
                    self.car_center[:2] = target[:2]
                else:
                    self.car_center[:2] += dir2 * step
                # check if moved
                if self.last_car_center is None or np.linalg.norm(self.car_center - self.last_car_center) > 1e-6:
                    car_moved = True

        # update ball and trail only if updated
        if ball_updated:
            # ball
            b_center = self.ball_mesh.get_center()
            self.ball_mesh.translate(self.ball_pos - b_center, relative=True)
            self.vis.update_geometry(self.ball_mesh)

            # trail update
            if len(self.ball_trail) > 0:
                pts = np.array(self.ball_trail)
                self.trail_pcd.points = o3d.utility.Vector3dVector(pts)
                self.trail_pcd.paint_uniform_color(COLOR_TRAIL)
                self.vis.update_geometry(self.trail_pcd)

        # update car and turret only if moved
        if car_moved:
            # car mesh update: we created box with base at z=0 and origin shifted to center; simply translate to car_center
            cur_center = self.car_mesh.get_center()
            desired_center = np.array([self.car_center[0], self.car_center[1], CAR_H/2.0])
            self.car_mesh.translate(desired_center - cur_center, relative=True)
            self.vis.update_geometry(self.car_mesh)

            # turret update (place at car top center)
            t_cur_center = self.turret_mesh.get_center()
            turret_des = np.array([self.car_center[0], self.car_center[1], CAR_H + TURRET_H/2.0])
            self.turret_mesh.translate(turret_des - t_cur_center, relative=True)
            self.vis.update_geometry(self.turret_mesh)

            self.last_car_center = self.car_center.copy()

        # landing detection: when ball crosses TARGET_Z from above to <=TARGET_Z
        # MULTI-CATCH LOGIC (safe version)
        if self.ball_pos is not None:
            prev_z = self.ball_prev_z if self.ball_prev_z is not None else self.ball_pos[2]

            # Reset landing when ball rises above catch height again
            if self.landed and self.ball_pos[2] > TARGET_Z + 0.01:
                self.landed = False
                self.landing_point = None
                self.landing_inside = False

                # hide landing marker
                lm_cur = self.land_marker.get_center()
                hidden_pos = np.array([0.0, 0.0, -10.0])
                self.land_marker.translate(hidden_pos - lm_cur, relative=True)
                self.vis.update_geometry(self.land_marker)

            # Detect NEW landing event
            if (not self.landed) and (prev_z > TARGET_Z) and (self.ball_pos[2] <= TARGET_Z):
                self.landed = True
                self.landing_point = np.array([self.ball_pos[0], self.ball_pos[1], TARGET_Z])
                self.landing_inside = is_safe_catch(self.car_center, self.landing_point[:2])
                print(f"[PlannerSim] Ball landed at {self.landing_point[:2]} inside_car={self.landing_inside}")

                # show marker
                self.land_marker.paint_uniform_color(
                    COLOR_LANDING_OK if self.landing_inside else COLOR_LANDING_FAIL
                )
                lm_cur = self.land_marker.get_center()
                lm_des = self.landing_point
                self.land_marker.translate(lm_des - lm_cur, relative=True)
                self.vis.update_geometry(self.land_marker)

            # save for next frame
            self.ball_prev_z = float(self.ball_pos[2])

        # pause small time implicitly done by Open3D callback; request redraw
        return False

    def run(self):
        try:
            self.vis.run()
        finally:
            self.sub.stop()
            self.vis.destroy_window()

if __name__ == "__main__":
    print("Starting Planner + Open3D Simulation (Free-Fall). Subscribe:", ZMQ_ADDR)
    sim = PlannerSim()
    sim.run()

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.
Starting Planner + Open3D Simulation (Free-Fall). Subscribe: tcp://localhost:5566
[PlannerSim] Ball landed at [0.33552288 0.36539285] inside_car=False


### 3D Visualization of Projectile Predictions and catching.

In [7]:
#!/usr/bin/env python3
# planner_sim_open3d.py (updated to consume predictor predicted/current fields)

import zmq, time, math, numpy as np, open3d as o3d, threading
from collections import deque

# ---------- CONFIG ----------
ZMQ_ADDR = "tcp://localhost:5566"   # predictor publishes here by default. See predictor.py.
TARGET_Z = 0.20                     # 20 cm intercept height (matches predictor)
CAR_START = np.array([0.30, 0.30, 0.0])  # meters (x,y,z)
CAR_L = 0.25
CAR_W = 0.25
CAR_H = 0.075
TURRET_H = 0.125
TURRET_D = 0.11
CAR_SPEED = 2.0    # Increased from 0.5 for better catching
SIM_DT = 0.03

COLOR_CAR = [0.2, 0.6, 0.2]
COLOR_TURRET = [0.8, 0.7, 0.2]
COLOR_BALL = [1.0, 0.4, 0.0]
COLOR_PREDICTED = [0.0, 0.0, 1.0]  # Blue for predicted ball
COLOR_TRAIL = [0.9, 0.2, 0.2]
COLOR_LANDING_OK = [0.0, 0.8, 0.0]
COLOR_LANDING_FAIL = [0.8, 0.0, 0.0]

def solve_time_to_z(z0, vz0, target_z=TARGET_Z, g=9.81):
    a = -0.5 * g
    b = vz0
    c = z0 - target_z
    disc = b*b - 4*a*c
    if disc < 0: return None
    r1 = (-b + math.sqrt(disc)) / (2*a)
    r2 = (-b - math.sqrt(disc)) / (2*a)
    ts = [t for t in (r1, r2) if t > 0]
    return min(ts) if ts else None

def is_safe_catch(car_center, ball_xy):
    turret_radius = TURRET_D / 2.0
    ball_radius = 0.025
    dx = ball_xy[0] - car_center[0]
    dy = ball_xy[1] - car_center[1]
    dist = math.sqrt(dx*dx + dy*dy)
    safe_margin = turret_radius - ball_radius
    return dist <= safe_margin

class PredictorSubscriber(threading.Thread):
    def __init__(self, zmq_addr=ZMQ_ADDR):
        super().__init__(daemon=True)
        self.ctx = zmq.Context()
        self.sub = self.ctx.socket(zmq.SUB)
        self.sub.connect(zmq_addr)
        self.sub.setsockopt_string(zmq.SUBSCRIBE, "")
        self.latest = None
        self.lock = threading.Lock()
        self.running = True

    def run(self):
        while self.running:
            try:
                msg = self.sub.recv_json(flags=0)
            except Exception:
                time.sleep(0.005)
                continue
            with self.lock:
                self.latest = msg

    def get(self):
        with self.lock:
            return self.latest

    def stop(self):
        self.running = False
        try: self.sub.close()
        except: pass
        try: self.ctx.term()
        except: pass

class PlannerSim:
    def __init__(self):
        self.ball_pos = None
        self.ball_prev_z = None
        self.ball_trail = deque(maxlen=200)
        self.predicted_landing = None
        self.predicted_pos = None  # For 3D predicted position visualization
        self.landed = False
        self.landing_point = None
        self.landing_inside = False

        self.car_center = CAR_START.copy()
        self.car_vel = np.array([0.0, 0.0, 0.0])

        self.last_ball_pos = None
        self.last_car_center = None

        self.sub = PredictorSubscriber(ZMQ_ADDR)
        self.sub.start()

        self.vis = o3d.visualization.VisualizerWithKeyCallback()
        self.vis.create_window("Planner Simulation (Projectile with RLS)", width=1280, height=720)
        self._setup_scene()
        self.vis.register_animation_callback(self.update)
        self.last_time = time.time()

    def _setup_scene(self):
        grid = o3d.geometry.LineSet()
        pts = []
        lines = []
        size = 1.5
        step = 0.25
        for i in np.arange(-size, size+1e-6, step):
            pts.append([i, -size, 0]); pts.append([i, size, 0]); lines.append([len(pts)-2, len(pts)-1])
            pts.append([-size, i, 0]); pts.append([size, i, 0]); lines.append([len(pts)-2, len(pts)-1])
        grid.points = o3d.utility.Vector3dVector(np.array(pts))
        grid.lines = o3d.utility.Vector2iVector(np.array(lines))
        grid.colors = o3d.utility.Vector3dVector([[0.5,0.5,0.5] for _ in lines])
        self.vis.add_geometry(grid)

        self.ball_mesh = o3d.geometry.TriangleMesh.create_sphere(radius=0.025)
        self.ball_mesh.paint_uniform_color(COLOR_BALL)
        self.ball_mesh.compute_vertex_normals()
        self.vis.add_geometry(self.ball_mesh)

        self.trail_pcd = o3d.geometry.PointCloud()
        self.trail_pcd.points = o3d.utility.Vector3dVector(np.zeros((0, 3)))
        self.trail_pcd.paint_uniform_color(COLOR_TRAIL)
        self.vis.add_geometry(self.trail_pcd)

        box = o3d.geometry.TriangleMesh.create_box(width=CAR_L, height=CAR_W, depth=CAR_H)
        box.translate([-CAR_L/2.0, -CAR_W/2.0, 0.0])
        box.paint_uniform_color(COLOR_CAR)
        box.compute_vertex_normals()
        self.car_mesh = box
        self.vis.add_geometry(self.car_mesh)

        self.turret_mesh = o3d.geometry.TriangleMesh.create_cylinder(radius=TURRET_D / 2.0, height=TURRET_H)
        self.turret_mesh.paint_uniform_color(COLOR_TURRET)
        self.turret_mesh.compute_vertex_normals()
        self.turret_mesh.translate([0, 0, CAR_H])
        self.vis.add_geometry(self.turret_mesh)

        # Predicted ball (blue sphere)
        self.predicted_ball_mesh = o3d.geometry.TriangleMesh.create_sphere(radius=0.025)
        self.predicted_ball_mesh.paint_uniform_color(COLOR_PREDICTED)
        self.predicted_ball_mesh.compute_vertex_normals()
        # Hide initially
        p_cur = self.predicted_ball_mesh.get_center()
        hidden_pos = np.array([0.0, 0.0, -10.0])
        self.predicted_ball_mesh.translate(hidden_pos - p_cur, relative=True)
        self.vis.add_geometry(self.predicted_ball_mesh)

        self.land_marker = o3d.geometry.TriangleMesh.create_sphere(radius=0.02)
        self.land_marker.paint_uniform_color(COLOR_LANDING_FAIL)
        self.land_marker.compute_vertex_normals()
        lm_cur = self.land_marker.get_center()
        hidden_pos = np.array([0.0, 0.0, -10.0])
        self.land_marker.translate(hidden_pos - lm_cur, relative=True)
        self.land_marker_visible = False
        self.vis.add_geometry(self.land_marker)

        ctr = self.vis.get_view_control()
        ctr.set_lookat([0.45, 0.45, 0.1])
        ctr.set_front([-0.5, -0.8, -0.3])
        ctr.set_up([0,0,1])
        ctr.set_zoom(0.6)

    def update(self, vis):
        now = time.time()
        dt = now - self.last_time
        if dt <= 0: dt = SIM_DT
        self.last_time = now

        msg = self.sub.get()
        ball_updated = False
        if msg is not None:
            # New predictor payload structure:
            # "ball": { "current": {...}, "predicted": {...} }
            b = msg.get("ball", {})
            cur = b.get("current", {})
            pred = b.get("predicted", {})

            # Prefer using current 3D measurement for moving the ball in viz/trail.
            if cur and cur.get("valid", False):
                bx = cur.get("x"); by = cur.get("y"); bz = cur.get("z")
                if bx is not None and by is not None and bz is not None:
                    new_pos = np.array([float(bx), float(by), float(bz)])
                    if self.ball_pos is None or np.linalg.norm(new_pos - self.ball_pos) > 1e-6:
                        self.ball_pos = new_pos
                        self.ball_trail.append(self.ball_pos.copy())
                        ball_updated = True

            # For planning, prefer the predictor's RLS-based predicted intercept if available
            self.predicted_pos = None
            if pred and pred.get("valid", False):
                px = pred.get("x"); py = pred.get("y"); pz = pred.get("z")
                if px is not None and py is not None and pz is not None:
                    self.predicted_pos = np.array([float(px), float(py), float(pz)])
                    self.predicted_landing = self.predicted_pos[:2]
            else:
                # Fallback: if we have current measurement, compute free-fall landing XY (assume vz=0)
                if self.ball_pos is not None:
                    vz_assumed = 0.0
                    t_hit = solve_time_to_z(self.ball_pos[2], vz_assumed, TARGET_Z)
                    if t_hit is None:
                        self.predicted_landing = None
                    else:
                        self.predicted_landing = np.array([self.ball_pos[0], self.ball_pos[1]])
                        # For viz fallback, show at target z
                        self.predicted_pos = np.array([self.ball_pos[0], self.ball_pos[1], TARGET_Z])
                else:
                    self.predicted_landing = None

        # Move car toward predicted landing XY at fixed speed
        car_moved = False
        if self.predicted_landing is not None and not self.landed:
            target = np.array([self.predicted_landing[0], self.predicted_landing[1], 0.0])
            vec = target - self.car_center
            dist = np.linalg.norm(vec[:2])
            if dist > 1e-6:
                dir2 = vec[:2] / dist
                step = CAR_SPEED * dt
                old_car_center = self.car_center.copy()
                if step >= dist:
                    self.car_center[:2] = target[:2]
                else:
                    self.car_center[:2] += dir2 * step
                if self.last_car_center is None or np.linalg.norm(self.car_center - self.last_car_center) > 1e-6:
                    car_moved = True

        # update viz only when updated
        if ball_updated:
            b_center = self.ball_mesh.get_center()
            self.ball_mesh.translate(self.ball_pos - b_center, relative=True)
            self.vis.update_geometry(self.ball_mesh)
            if len(self.ball_trail) > 0:
                pts = np.array(self.ball_trail)
                self.trail_pcd.points = o3d.utility.Vector3dVector(pts)
                self.trail_pcd.paint_uniform_color(COLOR_TRAIL)
                self.vis.update_geometry(self.trail_pcd)

        if car_moved:
            cur_center = self.car_mesh.get_center()
            desired_center = np.array([self.car_center[0], self.car_center[1], CAR_H/2.0])
            self.car_mesh.translate(desired_center - cur_center, relative=True)
            self.vis.update_geometry(self.car_mesh)

            t_cur_center = self.turret_mesh.get_center()
            turret_des = np.array([self.car_center[0], self.car_center[1], CAR_H + TURRET_H/2.0])
            self.turret_mesh.translate(turret_des - t_cur_center, relative=True)
            self.vis.update_geometry(self.turret_mesh)

            self.last_car_center = self.car_center.copy()

        # Update predicted ball visualization
        if self.predicted_pos is not None:
            p_center = self.predicted_ball_mesh.get_center()
            self.predicted_ball_mesh.translate(self.predicted_pos - p_center, relative=True)
            self.vis.update_geometry(self.predicted_ball_mesh)
        else:
            # Hide
            p_cur = self.predicted_ball_mesh.get_center()
            hidden_pos = np.array([0.0, 0.0, -10.0])
            self.predicted_ball_mesh.translate(hidden_pos - p_cur, relative=True)
            self.vis.update_geometry(self.predicted_ball_mesh)

        # landing detection
        if self.ball_pos is not None:
            prev_z = self.ball_prev_z if self.ball_prev_z is not None else self.ball_pos[2]

            if self.landed and self.ball_pos[2] > TARGET_Z + 0.01:
                self.landed = False
                self.landing_point = None
                self.landing_inside = False
                lm_cur = self.land_marker.get_center()
                hidden_pos = np.array([0.0, 0.0, -10.0])
                self.land_marker.translate(hidden_pos - lm_cur, relative=True)
                self.vis.update_geometry(self.land_marker)

            if (not self.landed) and (prev_z > TARGET_Z) and (self.ball_pos[2] <= TARGET_Z):
                self.landed = True
                self.landing_point = np.array([self.ball_pos[0], self.ball_pos[1], TARGET_Z])
                self.landing_inside = is_safe_catch(self.car_center, self.landing_point[:2])
                print(f"[PlannerSim] Ball landed at {self.landing_point[:2]} inside_car={self.landing_inside}")
                self.land_marker.paint_uniform_color(COLOR_LANDING_OK if self.landing_inside else COLOR_LANDING_FAIL)
                lm_cur = self.land_marker.get_center()
                lm_des = self.landing_point
                self.land_marker.translate(lm_des - lm_cur, relative=True)
                self.vis.update_geometry(self.land_marker)

            self.ball_prev_z = float(self.ball_pos[2])

        return False

    def run(self):
        try:
            self.vis.run()
        finally:
            self.sub.stop()
            self.vis.destroy_window()

if __name__ == "__main__":
    print("Starting Planner + Open3D Simulation (uses predictor predicted/current). Subscribe:", ZMQ_ADDR)
    sim = PlannerSim()
    sim.run()

Starting Planner + Open3D Simulation (uses predictor predicted/current). Subscribe: tcp://localhost:5566
[PlannerSim] Ball landed at [0.26495013 0.38598408] inside_car=False
[PlannerSim] Ball landed at [0.31059998 0.47982275] inside_car=True
[PlannerSim] Ball landed at [-0.03817984  0.47272546] inside_car=False
[PlannerSim] Ball landed at [0.09733582 0.4056517 ] inside_car=False
[PlannerSim] Ball landed at [0.17874651 0.65306459] inside_car=False
[PlannerSim] Ball landed at [0.27884107 0.77139905] inside_car=False
[PlannerSim] Ball landed at [0.18669083 1.05199182] inside_car=False
[PlannerSim] Ball landed at [0.02082606 0.46756996] inside_car=False
[PlannerSim] Ball landed at [0.19125625 0.29591302] inside_car=True
[PlannerSim] Ball landed at [0.57355037 0.3820069 ] inside_car=True
[PlannerSim] Ball landed at [0.57670367 0.35420621] inside_car=True
[PlannerSim] Ball landed at [0.53066609 0.40232121] inside_car=True
[PlannerSim] Ball landed at [0.53066609 0.40232121] inside_car=True
[P

In [5]:
import socket
import time

# --- CONFIGURATION ---
ESP_IP = "10.42.0.125" 
PORT = 80
# ---------------------

def send_command(command):
    try:
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.settimeout(2)
        client.connect((ESP_IP, PORT))
        
        client.sendall(command.encode())
        print(f"Sent: {command}")
        
        # VITAL: Wait 0.1s to let ESP32 read the data before killing connection
        time.sleep(0.1) 
        
        client.close()
    except Exception as e:
        print(f"Error: {e}")

print(f"Connecting to ESP32 at {ESP_IP}...")
while True:
    user_input = input("Command (start/stop/q): ").strip().lower()
    if user_input == 'q': break
    if user_input in ['start', 'stop','1','2','3','4']:
        send_command(user_input)

Connecting to ESP32 at 10.42.0.125...
Sent: 1
Sent: 2
Sent: 3
Sent: 4
Sent: 1
Sent: 2
Sent: 3
Sent: 4
Sent: 2
Sent: 1
Sent: 1
Sent: 3
Sent: 3
Sent: 3
Sent: 1
Sent: 1
Sent: 1
Sent: 2
Sent: 4
Sent: 1
Sent: 3
Sent: 3
Sent: 3
Sent: 3


In [1]:
import socket
import time
import math
import numpy as np
import random

# ---------- CONFIG ----------
ESP_IP = "10.42.0.125"  # <--- CHECK THIS
ESP_PORT = 80

# STARTING POSITIONS (As per your data)
# Tag 4 (Front): (30, 41) cm
# Tag 5 (Rear):  (30, 20.4) cm
# Robot Center is approx midpoint
START_POS = np.array([0.30, 0.307]) # Meters
START_YAW = np.pi / 2.0  # 90 degrees (Facing +Y)

# SIMULATION CONFIG
SIM_DT = 0.1          # 100ms per loop step
V_SCALE = 0.002       # How fast the "simulated" robot moves per PWM unit
K_P = 350.0           # Proportional Gain
MAX_SPEED = 200
MIN_SPEED = 65

class WifiRobotController:
    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.sock = None
        self.connect()

    def connect(self):
        try:
            print(f"Connecting to ESP32 at {self.ip}...")
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.sock.settimeout(0.1)
            self.sock.connect((self.ip, self.port))
            print("Connected!")
        except Exception as e:
            self.sock = None

    def send_velocity(self, v_strafe, v_forward, omega):
        if self.sock is None:
            self.connect()
            return
        
        # Command Format: V:strafe,forward,omega
        cmd = f"V:{int(v_strafe)},{int(v_forward)},{int(omega)}\n"
        try:
            self.sock.sendall(cmd.encode())
        except Exception:
            print("Socket lost. Reconnecting...")
            self.sock.close()
            self.sock = None

    def stop(self):
        self.send_velocity(0, 0, 0)
        if self.sock: self.sock.close()

# ---------- MATH HELPERS ----------

def world_to_robot(wx, wy, robot_yaw):
    """ Rotates World Error into Robot Frame """
    cos_a = math.cos(-robot_yaw)
    sin_a = math.sin(-robot_yaw)
    r_fwd = wx * cos_a - wy * sin_a  # Robot X (Forward)
    r_str = wx * sin_a + wy * cos_a  # Robot Y (Strafe)
    return r_fwd, r_str

def clamp(val, min_val, max_val):
    return max(min(max_val, val), min_val)

# ---------- MAIN TEST LOOP ----------

def main():
    robot_comms = WifiRobotController(ESP_IP, ESP_PORT)
    
    # 1. Initialize Simulated Robot State
    current_pos = START_POS.copy()
    current_yaw = START_YAW 

    print("------------------------------------------------")
    print("starting SIMULATION TEST.")
    print(f"Robot Start: ({current_pos[0]:.2f}, {current_pos[1]:.2f}) m")
    print(f"Robot Yaw:   {math.degrees(current_yaw):.1f} deg (Facing +Y)")
    print("------------------------------------------------")

    try:
        while True:
            # 2. Pick a Random Target nearby (within 20cm)
            target_x = 0.30 + random.uniform(-0.2, 0.2)
            target_y = 0.30 + random.uniform(-0.2, 0.2)
            target = np.array([target_x, target_y])
            
            print(f"\n>>> NEW TARGET: ({target[0]:.2f}, {target[1]:.2f})")

            # 3. Move towards target for 20 steps (approx 2 seconds)
            for _ in range(30):
                # A. Calculate Error
                err_x = target[0] - current_pos[0]
                err_y = target[1] - current_pos[1]
                
                # B. Transform to Robot Frame
                dist_fwd, dist_strafe = world_to_robot(err_x, err_y, current_yaw)

                # Check if arrived
                if abs(dist_fwd) < 0.02 and abs(dist_strafe) < 0.02:
                    print("Target Reached!")
                    break

                # C. Compute PWM (P-Control)
                cmd_fwd = clamp(dist_fwd * K_P, -MAX_SPEED, MAX_SPEED)
                cmd_str = clamp(dist_strafe * K_P, -MAX_SPEED, MAX_SPEED)
                
                # Apply Min Speed
                if 0 < abs(cmd_fwd) < MIN_SPEED: cmd_fwd = math.copysign(MIN_SPEED, cmd_fwd)
                if 0 < abs(cmd_str) < MIN_SPEED: cmd_str = math.copysign(MIN_SPEED, cmd_str)

                # D. Send to REAL Robot
                robot_comms.send_velocity(cmd_str, cmd_fwd, 0)

                # E. Update SIMULATED Position (Linear Progression)
                # This makes the "virtual" robot move so the error decreases
                sim_vel_fwd = cmd_fwd * V_SCALE 
                sim_vel_str = cmd_str * V_SCALE
                
                # Rotate virtual velocity back to world to update position
                w_vel_x = sim_vel_fwd * math.cos(current_yaw) - sim_vel_str * math.sin(current_yaw)
                w_vel_y = sim_vel_fwd * math.sin(current_yaw) + sim_vel_str * math.cos(current_yaw)
                
                current_pos[0] += w_vel_x
                current_pos[1] += w_vel_y

                print(f"Err: F{dist_fwd:.2f}/S{dist_strafe:.2f} | PWM: F{int(cmd_fwd)}/S{int(cmd_str)}")
                
                time.sleep(SIM_DT)

            # Stop briefly between targets
            robot_comms.send_velocity(0,0,0)
            time.sleep(1.0)

    except KeyboardInterrupt:
        print("\nStopping Test...")
        robot_comms.stop()

if __name__ == "__main__":
    main()

Connecting to ESP32 at 10.42.0.125...
------------------------------------------------
starting SIMULATION TEST.
Robot Start: (0.30, 0.31) m
Robot Yaw:   90.0 deg (Facing +Y)
------------------------------------------------

>>> NEW TARGET: (0.12, 0.15)
Connecting to ESP32 at 10.42.0.125...
Connected!
Err: F-0.16/S0.18 | PWM: F-65/S65
Err: F-0.03/S0.05 | PWM: F-65/S65
Err: F0.10/S-0.08 | PWM: F65/S-65
Err: F-0.03/S0.05 | PWM: F-65/S65
Err: F0.10/S-0.08 | PWM: F65/S-65
Err: F-0.03/S0.05 | PWM: F-65/S65
Err: F0.10/S-0.08 | PWM: F65/S-65
Err: F-0.03/S0.05 | PWM: F-65/S65
Err: F0.10/S-0.08 | PWM: F65/S-65

Stopping Test...


In [37]:
import zmq
import socket
import time
import math
import numpy as np

# ---------- CONFIG ----------
ZMQ_ADDR = "tcp://localhost:5566"
ESP_IP = "10.42.0.125"  
ESP_PORT = 80

# --- ROBOT GEOMETRY ---
TAG_OFFSET = 0.103 

# --- CONTROL GAINS ---
K_P = 350.0       
MAX_SPEED = 200   
MIN_SPEED = 70    # Increased slightly to prevent stalling

class WifiRobotController:
    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.sock = None
        self.connect()

    def connect(self):
        try:
            print(f"Connecting to Robot at {self.ip}...")
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.sock.settimeout(0.1)
            self.sock.connect((self.ip, self.port))
            print("Connected!")
        except Exception:
            self.sock = None

    def send_velocity(self, vx, vy, omega):
        if self.sock is None:
            self.connect()
            return
        # V:strafe,forward,omega
        cmd = f"V:{int(vx)},{int(vy)},{int(omega)}\n"
        try:
            self.sock.sendall(cmd.encode())
        except Exception:
            self.sock.close()
            self.sock = None

    def stop(self):
        self.send_velocity(0, 0, 0)
        if self.sock: self.sock.close()

# ---------- MATH HELPERS ----------

def get_robot_pose(tag4, tag5):
    # (Same as before - Validated)
    p4_valid = tag4.get("valid")
    p5_valid = tag5.get("valid")
    p4 = np.array(tag4["pos"]) if p4_valid else None
    p5 = np.array(tag5["pos"]) if p5_valid else None
    
    if p4_valid and p5_valid:
        center = (p4 + p5) / 2.0
        dx = p4[0] - p5[0]
        dy = p4[1] - p5[1]
        yaw = math.atan2(dy, dx)
        return center[0], center[1], yaw
    elif p4_valid:
        yaw = float(tag4["yaw"])
        cx = p4[0] - TAG_OFFSET * math.cos(yaw)
        cy = p4[1] - TAG_OFFSET * math.sin(yaw)
        return cx, cy, yaw
    elif p5_valid:
        yaw = float(tag5["yaw"])
        cx = p5[0] + TAG_OFFSET * math.cos(yaw)
        cy = p5[1] + TAG_OFFSET * math.sin(yaw)
        return cx, cy, yaw
    return None, None, None

def world_to_robot_frame(wx, wy, robot_yaw):
    # Standard Rotation Matrix (Verified)
    # wx, wy are World Error vectors
    cos_a = math.cos(-robot_yaw)
    sin_a = math.sin(-robot_yaw)
    
    # Forward is X-axis in Robot Frame
    r_fwd = wx * cos_a - wy * sin_a 
    # Left is Y-axis in Robot Frame (Standard Right-Hand Rule)
    r_left = wx * sin_a + wy * cos_a
    
    return r_fwd, r_left

def clamp(n, minn, maxn):
    return max(min(maxn, n), minn)

# ---------- MAIN LOOP ----------

def main():
    ctx = zmq.Context()
    sub = ctx.socket(zmq.SUB)
    sub.connect(ZMQ_ADDR)
    sub.setsockopt_string(zmq.SUBSCRIBE, "")

    robot = WifiRobotController(ESP_IP, ESP_PORT)
    print("Planner initialized. Waiting for State Estimate...")

    try:
        while True:
            try:
                msg = sub.recv_json(flags=0)
            except zmq.Again:
                continue

            ball = msg.get("ball", {})
            tag4 = msg.get("tag4", {})
            tag5 = msg.get("tag5", {})

            # 2. Estimate Robot State
            rx, ry, r_yaw = get_robot_pose(tag4, tag5)

            # 3. Determine Target
            target_x, target_y = None, None
            if ball.get("valid"):
                target_x = ball["x"]
                target_y = ball["y"]

            vx_cmd, vy_cmd = 0, 0

            if rx is not None and target_x is not None:
                # A. Calculate World Error
                err_x_world = target_x - rx
                err_y_world = target_y - ry
                
                # B. Transform to Robot Frame (Forward, Left)
                fwd_dist, left_dist = world_to_robot_frame(err_x_world, err_y_world, r_yaw)

                # --- FIX 1: STOPPING LOGIC ---
                # Check absolute distance. If close enough, ZERO the error manually.
                if abs(fwd_dist) < 0.05: fwd_dist = 0.0  # 5cm tolerance
                if abs(left_dist) < 0.05: left_dist = 0.0

                # C. Calculate Velocities (P-Control)
                raw_fwd = fwd_dist * K_P
                raw_strafe = left_dist * K_P

                # --- FIX 2: INVERTED STRAFE ---
                # You said it went "Forward Left" instead of "Forward Right".
                # That means positive 'left_dist' produced Left movement, but we needed Right.
                # So we INVERT the strafe command.
                raw_strafe = -raw_strafe 

                # D. Clamp & Min Speed
                vx_cmd = clamp(raw_fwd, -MAX_SPEED, MAX_SPEED)
                vy_cmd = clamp(raw_strafe, -MAX_SPEED, MAX_SPEED)
                
                if 0 < abs(vx_cmd) < MIN_SPEED: vx_cmd = math.copysign(MIN_SPEED, vx_cmd)
                if 0 < abs(vy_cmd) < MIN_SPEED: vy_cmd = math.copysign(MIN_SPEED, vy_cmd)
                
                print(f"Err: Fwd {fwd_dist:.2f}, Side {left_dist:.2f} | CMD: F{int(vx_cmd)} S{int(vy_cmd)}")

            elif rx is None:
                print("WAITING FOR TAGS...")
            elif target_x is None:
                print("WAITING FOR BALL...")

            # Send Velocity: (Strafe, Forward, Omega)
            robot.send_velocity(vy_cmd, vx_cmd, 0) 
            
            # Small delay to prevent flooding ESP32
            # time.sleep(0.01)

    except KeyboardInterrupt:
        print("Stopping...")
        robot.stop()

if __name__ == "__main__":
    main()

Connecting to Robot at 10.42.0.125...
Planner initialized. Waiting for State Estimate...
WAITING FOR BALL...
Connecting to Robot at 10.42.0.125...
Connected!
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WAITING FOR BALL...
WA