In [None]:
import ipywidgets as widgets
import math
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import random
from collections import Counter
from dataclasses import dataclass
from IPython.display import display
from time import time


@dataclass
class Vector:
    x: float
    y: float
    z: float

    def add(self, o):
        return Vector(self.x + o.x, self.y + o.y, self.z + o.z)

    def sub(self, o):
        return Vector(self.x - o.x, self.y - o.y, self.z - o.z)

    def mul(self, f):
        return Vector(self.x * f, self.y * f, self.z * f)

    def div(self, f):
        return Vector(self.x / f, self.y / f, self.z / f)

    def unm(self):
        return Vector(-self.x, -self.y, -self.z)

    def dot(self, o):
        return self.x * o.x + self.y * o.y + self.z * o.z

    def cross(self, o):
        return Vector(
            self.y * o.z - self.z * o.y,
            self.z * o.x - self.x * o.z,
            self.x * o.y - self.y * o.x,
        )

    def length(self):
        return math.sqrt(self.x * self.x + self.y * self.y + self.z * self.z)

    def round(self, tol: float = 1):
        return Vector(
            round(self.x / tol), round(self.y / tol), round(self.z / tol)
        ).mul(tol)

    def __str__(self):
        return f"{self.x} {self.y} {self.z}"

    def copy(self):
        return Vector(self.x, self.y, self.z)


@dataclass
class Cannon:
    pos: Vector
    v_ms: float
    length: int
    g: float
    c_d: float


@dataclass
class Radar:
    pos: Vector
    range: int
    interval: float
    drop_rate: float


# UTILS


def timed_function(fn):
    """
    Usage:
    ```
    @timed_function
    def foo(..):
        ...
    ```
    """

    def wrap_function(*args, **kwargs):
        t1 = time()
        result = fn(*args, **kwargs)
        t2 = time()
        print(f"Function {fn.__name__!r} executed in {(t2-t1):.4f}s")
        return result

    return wrap_function


def round_increment(number, increment):
    return round(number * (1 / increment)) / (1 / increment)


def tick2sec(tick: int) -> float:
    return round_increment(tick * 0.05, 0.05)


def sec2tick(sec: float) -> int:
    return round(sec * 20)


# UTILS, BUT FOR PLOTLY


def add_marker(fig, pos: Vector, name: str | None = None, size: int | None = None):
    fig.add_trace(
        go.Scatter3d(
            x=[pos.x],
            y=[pos.z],  # on purpose
            z=[pos.y],
            mode="markers+text",
            name=name,
            text=[name],
            marker=dict(size=size),
        )
    )


def add_hemisphere(fig, pos: Vector, radius: float, name: str | None = None):
    u = np.linspace(0, 2 * np.pi, 25)  # azimuth angle
    v = np.linspace(0, np.pi / 2, 13)  # polar angle (0-90 degrees)

    x = pos.x + radius * np.outer(np.cos(u), np.sin(v))
    y = pos.z + radius * np.outer(np.sin(u), np.sin(v))
    z = pos.y + radius * np.outer(np.ones(np.size(u)), np.cos(v))

    fig.add_trace(
        go.Surface(
            x=x,
            y=y,
            z=z,
            opacity=0.15,
            showscale=False,
            name=name,
            hoverinfo="skip",
            showlegend=True,
        )
    )


# SIMULATION STUFF


def calculate_pitch(
    distance: float,
    velocity_ms: float,
    target_height: float,
    cannon_length: int,
    gravity: float,
    drag_coefficient: float,
    low: bool,
):

    # Constants
    g = gravity  # CBC gravity
    c_d = drag_coefficient  # drag coefficient
    t0 = 0  # Minimum projectile flight time in ticks
    tn = 750  # Maximum projectile flight time in ticks
    start_step_size = 18.75

    # Inputs
    X_R = distance
    v_m = velocity_ms
    h = target_height
    L = cannon_length

    u = v_m / 20  # Convert to velocity per tick

    # Higher order parameters
    A = g * c_d / (u * (1 - c_d))

    def B(t):
        return t * (g * c_d / (1 - c_d)) * 1 / X_R

    C = L / (u * X_R) * (g * c_d / (1 - c_d)) + h / X_R

    # The idea is to start with very large steps and decrease step size
    # the closer we get to the actual value.
    num_halvings = 10  # This is fine, too many halvings will slow down
    acceptable_threshold = 0.001  # How close to X_R is good enough

    def a_R(t):
        # "watch out for the square root" -Endal
        B_t = B(t)
        in_root = -(A**2) + B_t**2 + C**2 + 2 * B_t * C + 1
        if in_root < 0:
            return None

        num_a_R = math.sqrt(in_root) - 1
        den_a_R = A + B_t + C
        return 2 * math.atan(num_a_R / den_a_R)

    # t = time projectile, either start from t0 and increment or tn and decrement
    t = t0 if low else tn
    increasing_t = low
    step_size = start_step_size

    a_R1 = None  # scope reasons, since we need to return it

    for _ in range(num_halvings):
        while True:
            # It's taking too long, give up
            if (low and t >= tn) or (not low and t <= t0):
                return None, None

            # Angle of projectile at t
            a_R1 = a_R(t)
            # a square root being negative means something
            # has gone wrong, so give up
            if a_R1 is None:
                return None, None

            # Distance of projectile at t
            p1_X_R1 = u * math.cos(a_R1) / math.log(c_d)
            p2_X_R1 = c_d**t - 1
            p3_X_R1 = L * math.cos(a_R1)
            X_R1 = p1_X_R1 * p2_X_R1 + p3_X_R1

            # Good enough, let's call it quits
            if abs(X_R1 - X_R) <= acceptable_threshold:
                break

            # We've passed the target (aka we're close), now oscillate around the actual
            # target value until it's 'good enough' or it's taking too long.
            if (increasing_t and X_R1 > X_R) or (not increasing_t and X_R1 < X_R):
                increasing_t = not increasing_t
                break

            t = t + (step_size if increasing_t == low else -step_size)

        # Increase the precision after breaking out, since we're closer to target
        step_size = step_size / 2

    return t, math.degrees(a_R1)


def calculate_yaw_pitch_t(cannon: Cannon, target_pos: Vector, low: bool = True):
    dpos = target_pos.sub(cannon.pos)
    horizontal_dist = math.sqrt(dpos.x**2 + dpos.z**2)
    t, pitch = calculate_pitch(
        distance=horizontal_dist,
        velocity_ms=cannon.v_ms,
        target_height=dpos.y,
        cannon_length=cannon.length,
        gravity=cannon.g,
        drag_coefficient=cannon.c_d,
        low=low,
    )

    if t is None:
        return None, None, None

    yaw = math.degrees(math.atan2(dpos.x, dpos.z))
    return yaw, pitch, t


def simulate_trajectory(
    cannon: Cannon, yaw: float, pitch: float, max_ticks: int
) -> list[tuple[int, Vector]]:
    yaw_rad = math.radians(yaw)
    pitch_rad = math.radians(pitch)

    # Cannon aiming direction
    dir = Vector(
        math.cos(pitch_rad) * math.sin(yaw_rad),
        math.sin(pitch_rad),
        math.cos(pitch_rad) * math.cos(yaw_rad),
    )
    # Muzzle (initial projectile) position
    pos = cannon.pos.add(dir.mul(cannon.length))
    # v/s -> v/t
    vel = dir.mul(cannon.v_ms / 20)

    trajectory: list[tuple[int, Vector]] = []
    for t in range(max_ticks + 1):
        trajectory.append((t, pos.copy()))
        pos = pos.add(vel)
        vel = vel.mul(cannon.c_d)
        vel.y -= cannon.g

    return trajectory


def get_observed_trajectory(
    trajectory: list[tuple[int, Vector]], radar: Radar
) -> list[tuple[float, Vector]]:
    """
    Args:
        trajectory (list[tuple[int, Vector]]): timestamp in **ticks**, position
        radar (Radar):
        drop_rate (float, optional): Portion of measurements that get skipped range: [0, 1]. Defaults to 0.

    Returns:
        list[tuple[float, Vector]]: timestamp in **seconds**, position
    """
    t0 = time()  # Pretend this is when the radar sees projectile.
    # Game works in 0.05 intervals.
    scan_interval_ticks = max(1, sec2tick(radar.interval))
    items = []
    for t, pos in trajectory:
        if t % scan_interval_ticks != 0:
            continue
        if random.random() < radar.drop_rate:
            continue
        # distance
        if pos.sub(radar.pos).length() <= radar.range:
            items.append((t0 + tick2sec(t), pos))
    return items


EPSILON_ANGLE = 1e-12
EPSILON_TIME = 1e-9
CDG_DIGITS = 5
ACCEPTABLE_RMSE = 1e-3


# TODO: maybe sec2tick here is a bad idea, because information might be lost.
def compute_obs_velocities(
    datapoints: list[tuple[float, Vector]],
) -> list[tuple[int, Vector]]:
    """
    Note that this will return list of len(datapoints) - 1, because we can't know
    the velocity of the first datapoint.

    Args:
        datapoints (list[tuple[float, Vector]]): **seconds**, position

    Returns:
        list[tuple[float, Vector]]: **ticks**, velocity
    """
    velocities = []
    for i in range(len(datapoints) - 1):
        t0, p0 = datapoints[i]
        t1, p1 = datapoints[i + 1]
        dt = sec2tick(t1 - t0)  # You never know what in-game lag will do.
        if dt <= 0:
            continue
        vel = p1.sub(p0).div(dt)
        velocities.append((dt, vel))
    return velocities


def estimate_cd(velocities: list[int, Vector]) -> float | None:
    if len(velocities) < 2:
        return None

    cd_candidates = []
    for i in range(len(velocities) - 1):
        dt, v0 = velocities[i]
        _, v1 = velocities[i + 1]

        if abs(v0.x) > EPSILON_ANGLE and v1.x * v0.x > EPSILON_ANGLE:
            cd_val = (v1.x / v0.x) ** (1 / dt)
            cd_candidates.append(round(cd_val, CDG_DIGITS))
        if abs(v0.z) > EPSILON_ANGLE and v1.z * v0.z > EPSILON_ANGLE:
            cd_val = (v1.z / v0.z) ** (1 / dt)
            cd_candidates.append(round(cd_val, CDG_DIGITS))

    if len(cd_candidates) == 0:
        return None

    # Either a median (all differing data points) or mode
    mode, freq = Counter(cd_candidates).most_common(1)[0]
    if freq > 1:
        return mode
    else:
        # Median
        return sorted(cd_candidates)[len(cd_candidates) // 2]


def estimate_g(velocities: list[tuple[int, Vector]], cd: float) -> float | None:
    # cd required, because vy = cd * vy - g
    if len(velocities) < 2:
        return None

    g_candidates = []
    for i in range(len(velocities) - 1):
        dt, v0 = velocities[i]
        _, v1 = velocities[i + 1]

        denom = 1 - cd**dt
        if abs(denom) < EPSILON_ANGLE:
            continue

        g_val = (cd**dt * v0.y - v1.y) * (1 - cd) / denom

        # Discard if it doesn't make sense.
        if g_val < 0:
            continue

        g_candidates.append(round(g_val, CDG_DIGITS))

    if len(g_candidates) == 0:
        return None

    # Either a median (all differing data points) or mode
    mode, freq = Counter(g_candidates).most_common(1)[0]
    if freq > 1:
        return mode
    else:
        # Median
        return sorted(g_candidates)[len(g_candidates) // 2]


def velocity_to_angles(vel: Vector) -> tuple[float, float]:
    horiz = math.hypot(vel.x, vel.z) + EPSILON_ANGLE
    pitch = math.degrees(math.atan2(vel.y, horiz))
    yaw = math.degrees(math.atan2(vel.x, vel.z))
    return yaw, pitch


def backpropagate(
    p: Vector, v: Vector, s: int, cd: float, g: float
) -> tuple[Vector, Vector]:
    p_back, v_back = Vector(p.x, p.y, p.z), Vector(v.x, v.y, v.z)
    for _ in range(s):
        prev_v = Vector(v_back.x, v_back.y + g, v_back.z).div(cd)
        prev_p = p_back.sub(prev_v)
        p_back, v_back = prev_p, prev_v

    return p_back, v_back


def avg_to_instantaneous_velocity(
    avg_vel: Vector, dt: int, cd: float, g: float
) -> Vector:
    """
    Convert an observed *average per-tick* velocity computed as:
        avg_vel = (p(t+dt) - p(t)) / dt
    into the instantaneous velocity v(t) at the start of that interval,
    under the discrete model:
        v_{k+1} = cd * v_k   (x,z)
        v_{k+1}.y = cd * v_k.y - g
    dt is integer number of ticks between observations.
    """
    if dt <= 0:
        return avg_vel.copy()

    # handle x,z (closed form)
    if abs(1 - cd) < EPSILON_ANGLE:
        # cd -> 1 (no drag) limit: displacement_x = v0.x * dt
        v_x = avg_vel.x
        v_z = avg_vel.z
    else:
        S = (1 - cd**dt) / (1 - cd)  # sum_{k=0..dt-1} cd^k
        v_x = avg_vel.x * dt / S
        v_z = avg_vel.z * dt / S

    # y component (closed form invert)
    if abs(1 - cd) < EPSILON_ANGLE:
        # cd -> 1 limit: v decreases linearly by g each tick
        # displacement_y = dt * v0.y - g * dt*(dt-1)/2
        # avg_y = v0.y - g*(dt-1)/2  => v0.y = avg_y + g*(dt-1)/2
        v_y = avg_vel.y + g * (dt - 1) / 2.0
    else:
        S = (1 - cd**dt) / (1 - cd)
        # A = dt/(1-cd) - (1 - cd^dt)/(1-cd)^2
        A = dt / (1 - cd) - (1 - cd**dt) / ((1 - cd) ** 2)
        # displacement_y = v0.y * S - g * A
        # avg_y = displacement_y / dt
        # => v0.y = (avg_y * dt + g * A) / S
        v_y = (avg_vel.y * dt + g * A) / S

    return Vector(v_x, v_y, v_z)


def compute_rmse(
    sim_traj: list[tuple[float, Vector]],
    obs_pts: list[tuple[float, Vector]],
    s: int,
    offsets: list[int],
) -> float:
    """
    Compute RMSE between simulated and observed trajectory segments.

    sim_traj: list of (tick, Vector) or plain list indexed by tick (you use simulate_trajectory which appends per tick).
    obs_pts: list of observed (time_seconds, Vector)
    s: number of ticks we backpropagated (muzzle is s ticks before obs_pts[0])
    offsets: integer tick offsets for each obs point relative to obs_pts[0], e.g.
             offsets[0] == 0, offsets[1] == ticks_between(obs1, obs0), ...
    """
    if len(offsets) == 0:
        return float("inf")

    last_offset = offsets[-1]
    if len(sim_traj) < s + last_offset + 1:
        return float("inf")

    err = 0.0
    n = len(obs_pts)
    for i in range(n):
        sim_idx = s + offsets[i]
        diff = sim_traj[sim_idx][1].sub(obs_pts[i][1])
        err += diff.dot(diff)
    return math.sqrt(err / n)


# TODO: clean everything up
@timed_function
def estimate_muzzle(
    partial_trajectory: list[tuple[float, Vector]],
    speed_candidates: list[int] = list(range(40, 341, 20)),
    c_d: float = None,
    g: float = None,
    max_s: int = 750,
):
    # 2 datapoints are enough only if drag and gravity are already known.
    # 3 is usually enough, but weird things may still happen with c_d and g estimation.
    if len(partial_trajectory) < 2:
        return None, None  # not enough info

    obs_v = compute_obs_velocities(partial_trajectory)

    c_d = estimate_cd(obs_v) if c_d is None else c_d
    if c_d is None:
        return None, None  # Not enough info
    g = estimate_g(obs_v, c_d) if g is None else g
    if g is None:
        return None, None  # Not enough info

    first_dt, first_avg_vel = obs_v[0]
    v_curr = avg_to_instantaneous_velocity(first_avg_vel, first_dt, c_d, g)

    # compute tick offsets for each observation relative to the first observed time
    t0 = partial_trajectory[0][0]
    offsets = [sec2tick(t - t0) for t, _ in partial_trajectory]

    best = {"rmse": float("inf"), "muzzle": None, "params": None}
    for s in range(0, max_s):
        muzzle_pos, v0 = backpropagate(partial_trajectory[0][1], v_curr, s, c_d, g)
        speed_ms = v0.length() * 20  # From v/tick to v/sec
        # snap to nearest allowed speed candidate
        best_speed = speed_candidates[0]
        min_diff = abs(best_speed - speed_ms)
        for spd in speed_candidates:
            diff = abs(spd - speed_ms)
            if diff < min_diff:
                min_diff = diff
                best_speed = spd

        cannon_candidate = Cannon(muzzle_pos, best_speed, 0, g, c_d)

        sim_ticks = s + offsets[-1]
        yaw, pitch = velocity_to_angles(v0)
        sim_traj = simulate_trajectory(cannon_candidate, yaw, pitch, sim_ticks)

        rmse = compute_rmse(sim_traj, partial_trajectory, s, offsets)

        if rmse < best["rmse"]:
            best["rmse"] = rmse
            best["muzzle"] = muzzle_pos
            best["params"] = {
                "s": s,
                "cd": c_d,
                "g": g,
                "speed_ms": best_speed,
                "yaw": yaw,
                "pitch": pitch,
                "v0_per_tick": v0,
            }

        # good enough threshold
        if best["rmse"] < ACCEPTABLE_RMSE:
            return best["muzzle"], best

    return best["muzzle"], best

In [None]:
PLAY_AREA = 1500
CANNON = Cannon(Vector(50, 10, 80), 320, 24, 0.05, 0.99)
RADAR = Radar(Vector(0, 0, 0), 500, 0.25, 0.1)

label_target = widgets.Label(value="Target")
label_cannon = widgets.Label(value="Cannon")
label_radar = widgets.Label(value="Radar")
label_estimator = widgets.Label(value="Estimator")
slider_target_x = widgets.IntSlider(
    value=800, min=-PLAY_AREA, max=PLAY_AREA, continuous_update=False, description="X:"
)
slider_target_y = widgets.IntSlider(
    value=10, min=-PLAY_AREA, max=PLAY_AREA, continuous_update=False, description="Y:"
)
slider_target_z = widgets.IntSlider(
    value=340, min=-PLAY_AREA, max=PLAY_AREA, continuous_update=False, description=" Z:"
)
checkbox_trajectory = widgets.Checkbox(
    value=False, description="Low trajectory", indent=False
)
button_estimate_muzzle = widgets.Button(
    description="Estimate muzzle", button_style="info"
)
button_export = widgets.Button(description="Export", button_style="success")
plot_output = widgets.Output()


def update_plot(change=None, est_muzz=False, export=False):
    target_pos = Vector(
        slider_target_x.value, slider_target_y.value, slider_target_z.value
    )
    RADAR.pos = target_pos

    with plot_output:
        plot_output.clear_output(wait=True)
        fig = go.Figure()
        ax_style = dict(
            showbackground=False,
            zeroline=True,
            showgrid=True,
            zerolinewidth=4,
            zerolinecolor="grey",
            gridcolor="LightGrey",
        )
        fig.update_layout(
            width=800,
            height=600,
            margin=dict(l=0, r=0, t=0, b=0),
            scene=dict(
                xaxis=ax_style,
                yaxis=ax_style,
                zaxis=ax_style,
                aspectmode="data",
                xaxis_title="X",
                yaxis_title="Z",
                zaxis_title="Y",
                camera=dict(
                    up=dict(x=0, y=0, z=1),
                    center=dict(x=-0.05, y=-0.05, z=-0.1),
                    eye=dict(x=-0.9, y=-1.45, z=1.3),
                ),
            ),
            legend=dict(x=0, y=0, bgcolor="rgba(0,0,0,0)"),
        )

        yaw, pitch, t = calculate_yaw_pitch_t(
            CANNON, target_pos, low=checkbox_trajectory.value
        )
        if yaw is not None and pitch is not None and t is not None:
            trajectory = simulate_trajectory(CANNON, yaw, pitch, round(t))
            observed_trajectory = get_observed_trajectory(trajectory, RADAR)
            df = pd.DataFrame([t[1] for t in trajectory], columns=["x", "y", "z"])
            df2 = pd.DataFrame(
                [o[1] for o in observed_trajectory], columns=["x", "y", "z"]
            )
            fig.add_trace(
                go.Scatter3d(
                    x=df["x"],
                    y=df["z"],
                    z=df["y"],
                    mode="lines",
                    name="Trajectory",
                    line=dict(width=8),
                )
            )
            fig.add_trace(
                go.Scatter3d(
                    x=df2["x"],
                    y=df2["z"],
                    z=df2["y"],
                    mode="markers",
                    name="Recorded Trajectory",
                    marker=dict(size=4),
                )
            )

            impact_pos = trajectory[-1][1]
            print(f"Impact position: {impact_pos.round(1)}")
            print(f"Target position: {target_pos}")
            print(f"Error position: {impact_pos.sub(target_pos).length():.5f}")

            print(f"# obs: {len(observed_trajectory)}")
            if est_muzz and len(observed_trajectory) > 0:
                est_muzzle_pos, info = estimate_muzzle(observed_trajectory)
                add_marker(fig, est_muzzle_pos, "Muzzle (estimated)")

                muzzle_pos = trajectory[0][1]
                print(f"Act: cd={CANNON.c_d} | g={CANNON.g}")
                print(f"Est: cd={info["params"]["cd"]} | g={info["params"]["g"]}")
                print(f"Muzzle position: {muzzle_pos.round(1)}")
                print(f"Estimated muzzle position: {est_muzzle_pos.round(1)}")
                print(f"Error guess: {muzzle_pos.sub(est_muzzle_pos).length():.5f}")

            add_marker(fig, CANNON.pos, "Cannon", 12)
            add_marker(fig, target_pos, "Target", 12)
            add_hemisphere(fig, RADAR.pos, RADAR.range, "Radar range")

        fig.show()
        if export:
            fig.write_html("./docs/plot.html", full_html=False, include_plotlyjs="cdn")


slider_target_x.observe(update_plot, names="value")
slider_target_y.observe(update_plot, names="value")
slider_target_z.observe(update_plot, names="value")
checkbox_trajectory.observe(lambda change: update_plot(est_muzz=True), names="value")
button_estimate_muzzle.on_click(lambda change: update_plot(est_muzz=True))
button_export.on_click(lambda change: update_plot(est_muzz=True, export=True))

display(
    widgets.HBox(
        [
            widgets.VBox(
                [
                    label_target,
                    slider_target_x,
                    slider_target_y,
                    slider_target_z,
                    checkbox_trajectory,
                    button_estimate_muzzle,
                    button_export,
                    label_radar,
                    label_cannon,
                    label_estimator,
                ],
            ),
            plot_output,
        ]
    )
)
update_plot(est_muzz=True)

# TODO: use groups of widgets instead

# TODO: 
- clean everything up and move stuff from update_plot into a separate function.
- add more interactivity:
    - target: pos
    - Radar: pos (custom/ == targetpos), range, drop, scanrate
    - Cannon: pos, vms, length, g, cd
    - estimator: speed candidates, cd, g (pick between auto and custom)
- more robust estimator: when g is known but not cd, use g for cd estimation
- convert streamlit app to exe
