# Hardware Log GIF Generator

Generates animated GIFs of planar quadrotor trajectory with forward-looking reachable tubes and time-varying wind field (TVGPR) for all hardware log files.

- **no_wind_estimation/**: HW flights without wind estimation (GIFs saved there)
- **yes_wind_estimation/**: HW flights with wind estimation (GIFs saved there)

In [None]:
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('Agg')  # Non-interactive backend for saving GIFs
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.patches import Rectangle
import glob
import os
import sys
import jax.numpy as jnp

# Add path to GP modules
sys.path.insert(0, '/home/egmc/ws_px4_rta_mm_gpr/src/px4_rta_mm_gpr/px4_rta_mm_gpr/jax_mm_rta')
from TVGPR import TVGPR

plt.rcParams.update({
    'text.usetex': False,
    'font.family': 'sans-serif',
    'font.size': 14
})

print('Imports complete.')

In [None]:
# ====== CONFIGURATION ======
HORIZON_STEPS = 12      # Forward-looking prediction steps per timestep
FRAME_SKIP    = 3       # Skip frames (higher = faster render, choppier)
GIF_FPS       = 10      # Frames per second in saved GIF
FONTSIZE      = 18
LINEWIDTH     = 2.5
QUADSIZE      = 0.25
OBS_WINDOW    = 50      # Sliding window size for TVGPR updates

# Wind arrow settings
WIND_Y_SCALE  = 15
WIND_Z_SCALE  = 20
N_ARROWS_Y    = 8
N_ARROWS_Z    = 10

# TVGPR hyperparameters
SIGMA_F  = 5.0
LENGTH   = 2.0
SIGMA_N  = 0.01
EPSILON  = 0.25

# Paths
BASE_DIR = 'log_files'
FOLDERS  = ['no_wind_estimation', 'yes_wind_estimation']

In [None]:
def quadplot(ypos, zpos, rot, scale):
    """Draw a 2D quadrotor shape at given position and orientation."""
    ts = -0.5 * np.arange(np.pi, 3/2 * np.pi, 0.2) + 0.3
    xtemp = np.cos(ts)
    ytemp = np.sin(ts) * np.cos(ts)
    xs = np.hstack((0.4*xtemp - 1, -1, -1, 1, 1, 0.4*xtemp + 1))
    ys = np.hstack((0.3*ytemp + 0.4, 0.4, 0, 0, 0.4, 0.3*ytemp + 0.4))
    xs, ys = scale * xs, scale * ys
    rotmat = np.array([[np.cos(rot), -np.sin(rot)],
                       [np.sin(rot),  np.cos(rot)]])
    newpos = rotmat @ np.vstack((xs, ys))
    return newpos[0, :] + ypos, newpos[1, :] + zpos


def get_RMSE(data, num_refs_per_step=12):
    """Calculate RMSE between actual position and reference trajectory."""
    y     = data['y'].to_numpy()
    z     = data['z'].to_numpy()
    y_ref = data['y_ref'].to_numpy()
    z_ref = data['z_ref'].to_numpy()
    n_true, n_ref = len(y), len(y_ref)
    squared_errors = []
    n = 0
    while n < n_true:
        if np.isnan(y[n]) or np.isnan(z[n]):
            break
        ref_idx = num_refs_per_step * n
        if ref_idx >= n_ref:
            break
        if np.isnan(y_ref[ref_idx]) or np.isnan(z_ref[ref_idx]):
            n += 1
            continue
        dy = y[n] - y_ref[ref_idx]
        dz = z[n] - z_ref[ref_idx]
        squared_errors.append(dy*dy + dz*dz)
        n += 1
    if len(squared_errors) == 0:
        return np.nan
    return np.sqrt(np.mean(squared_errors))


def generate_gif(log_path, gif_path):
    """
    Generate an animated GIF from a single log file.
    
    Wind arrows (TVGPR) are automatically enabled when the log contains
    non-zero wind data, and disabled otherwise.
    """
    # --- Load data ---
    data = pd.read_csv(log_path)
    if len(data) < HORIZON_STEPS * 2:
        print(f'  SKIP (too few rows: {len(data)})')
        return None

    num_horizon  = HORIZON_STEPS
    max_timestep = len(data) // num_horizon

    # --- Extract arrays ---
    time_all = data['time'].values
    y_all    = data['y'].values
    z_all    = -data['z'].values          # flip Z positive-up
    yaw_all  = data['yaw'].values
    wy_all   = data['wy'].values
    wz_all   = data['wz'].values

    y_ref_all = data['y_ref'].values
    z_ref_all = -data['z_ref'].values     # flip Z
    pyH_all   = data['save_tube_pyH'].values
    pyL_all   = data['save_tube_pyL'].values
    pzH_all   = -data['save_tube_pzL'].values  # flip & swap
    pzL_all   = -data['save_tube_pzH'].values  # flip & swap

    # --- Auto-detect wind ---
    has_wind = (np.nanmax(np.abs(wy_all)) > 1e-6) or (np.nanmax(np.abs(wz_all)) > 1e-6)
    show_wind_y = has_wind
    show_wind_z = has_wind

    # --- RMSE ---
    rmse = get_RMSE(data, num_refs_per_step=num_horizon)

    # --- TVGPR init (only if wind present) ---
    tvgp_wy = tvgp_wz = None
    if has_wind:
        n_init = min(20, max_timestep)
        idx = np.arange(n_init)
        vm_wy = ~(np.isnan(time_all[idx]) | np.isnan(z_all[idx]) | np.isnan(wy_all[idx]))
        vm_wz = ~(np.isnan(time_all[idx]) | np.isnan(y_all[idx]) | np.isnan(wz_all[idx]))
        obs_wy = np.column_stack((time_all[idx][vm_wy], z_all[idx][vm_wy], wy_all[idx][vm_wy]))
        obs_wz = np.column_stack((time_all[idx][vm_wz], y_all[idx][vm_wz], wz_all[idx][vm_wz]))
        tvgp_wy = TVGPR(jnp.array(obs_wy), sigma_f=SIGMA_F, l=LENGTH, sigma_n=SIGMA_N, epsilon=EPSILON)
        tvgp_wz = TVGPR(jnp.array(obs_wz), sigma_f=SIGMA_F, l=LENGTH, sigma_n=SIGMA_N, epsilon=EPSILON)

    # --- Axis limits ---
    y_valid = y_all[~np.isnan(y_all)]
    z_valid = z_all[~np.isnan(z_all)]
    ym = (np.max(y_valid) - np.min(y_valid)) * 0.15
    zm = (np.max(z_valid) - np.min(z_valid)) * 0.15
    y_min, y_max = np.min(y_valid) - ym, np.max(y_valid) + ym
    z_min, z_max = np.min(z_valid) - zm, np.max(z_valid) + zm

    # --- Wind arrow grid ---
    arrow_y_coords = np.linspace(y_min, y_max, N_ARROWS_Y)
    arrow_z_coords = np.linspace(z_min, z_max, N_ARROWS_Z)
    ag_y, ag_z = np.meshgrid(arrow_y_coords, arrow_z_coords)
    arrow_pts = np.column_stack((ag_y.ravel(), ag_z.ravel()))

    # --- Build figure ---
    fig, ax = plt.subplots(figsize=(14, 10))

    wind_arrows_y = wind_arrows_z = None
    if show_wind_y:
        zero_dirs = np.zeros((len(arrow_pts), 2))
        wind_arrows_y = ax.quiver(arrow_pts[:, 0], arrow_pts[:, 1],
                                  zero_dirs[:, 0], zero_dirs[:, 1],
                                  color='tab:gray', alpha=0.5, label='Wind Y',
                                  scale=WIND_Y_SCALE)
    if show_wind_z:
        zero_dirs = np.zeros((len(arrow_pts), 2))
        wind_arrows_z = ax.quiver(arrow_pts[:, 0], arrow_pts[:, 1],
                                  zero_dirs[:, 0], zero_dirs[:, 1],
                                  color='tab:orange', alpha=0.5, label='Wind Z',
                                  scale=WIND_Z_SCALE)

    actual_line, = ax.plot([], [], label='Actual Trajectory',
                           color='blue', linewidth=LINEWIDTH)
    reference_horizon, = ax.plot([], [],
                                 label=f'Reference Horizon ({num_horizon}-step)',
                                 linestyle='dashed', color='green',
                                 linewidth=LINEWIDTH, alpha=0.8,
                                 marker='o', markersize=6)
    tube_rects = []
    for i in range(num_horizon):
        rect = Rectangle((0, 0), 1, 1, linewidth=1.5,
                          edgecolor='red', facecolor='red', alpha=0.15)
        ax.add_patch(rect)
        tube_rects.append(rect)
        if i == 0:
            rect.set_label('Reachable Tube Horizon')

    quad_line, = ax.plot([], [], color='black', linewidth=LINEWIDTH)
    current_pos, = ax.plot([], [], 'ro', markersize=10, label='Current Position')

    log_name = os.path.basename(log_path)
    wind_tag = 'Wind Est. ON' if has_wind else 'Wind Est. OFF'
    ax.set_xlabel('Y Position (m)', fontsize=FONTSIZE)
    ax.set_ylabel('Z Position (m)', fontsize=FONTSIZE)
    ax.set_title(f'{log_name}  |  {wind_tag}  |  RMSE={rmse:.4f}', fontsize=FONTSIZE)
    ax.legend(loc='center left', bbox_to_anchor=(1.02, 0.5), fontsize=FONTSIZE - 4)
    ax.grid(True, alpha=0.3)
    ax.set_aspect('equal', adjustable='box')
    ax.set_xlim(y_min, y_max)
    ax.set_ylim(z_min, z_max)

    time_text = ax.text(0.02, 0.02, '', transform=ax.transAxes,
                        fontsize=FONTSIZE - 2, verticalalignment='bottom',
                        bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
    step_text = ax.text(0.98, 0.02, '', transform=ax.transAxes,
                        fontsize=FONTSIZE - 4, verticalalignment='bottom',
                        horizontalalignment='right',
                        bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.8))
    plt.tight_layout()

    # --- Mutable state for closure ---
    state = {'tvgp_wy': tvgp_wy, 'tvgp_wz': tvgp_wz}

    # --- Animation update ---
    def update(n):
        ret = [actual_line, reference_horizon, quad_line, current_pos,
               time_text, step_text]
        if wind_arrows_y is not None:
            ret.append(wind_arrows_y)
        if wind_arrows_z is not None:
            ret.append(wind_arrows_z)
        ret.extend(tube_rects)

        if n >= max_timestep or np.isnan(time_all[n]):
            return ret

        actual_line.set_data(y_all[:n+1], z_all[:n+1])
        t_curr = time_all[n]
        y_curr, z_curr, yaw_curr = y_all[n], z_all[n], yaw_all[n]

        # Update TVGPR sliding window
        if has_wind:
            s = max(0, n - OBS_WINDOW)
            idx = np.arange(s, n + 1)
            vm_wy = ~(np.isnan(time_all[idx]) | np.isnan(z_all[idx]) | np.isnan(wy_all[idx]))
            vm_wz = ~(np.isnan(time_all[idx]) | np.isnan(y_all[idx]) | np.isnan(wz_all[idx]))
            if np.sum(vm_wy) > 5:
                obs = np.column_stack((time_all[idx][vm_wy], z_all[idx][vm_wy], wy_all[idx][vm_wy]))
                state['tvgp_wy'] = TVGPR(jnp.array(obs), sigma_f=SIGMA_F, l=LENGTH,
                                         sigma_n=SIGMA_N, epsilon=EPSILON)
            if np.sum(vm_wz) > 5:
                obs = np.column_stack((time_all[idx][vm_wz], y_all[idx][vm_wz], wz_all[idx][vm_wz]))
                state['tvgp_wz'] = TVGPR(jnp.array(obs), sigma_f=SIGMA_F, l=LENGTH,
                                         sigma_n=SIGMA_N, epsilon=EPSILON)

            # Update wind arrows
            if wind_arrows_y is not None and state['tvgp_wy'] is not None:
                dirs = np.zeros_like(arrow_pts)
                for i, (yp, zp) in enumerate(arrow_pts):
                    dirs[i, 0] = float(state['tvgp_wy'].mean(jnp.array([t_curr, zp]))[0][0])
                wind_arrows_y.set_UVC(dirs[:, 0], dirs[:, 1])

            if wind_arrows_z is not None and state['tvgp_wz'] is not None:
                dirs = np.zeros_like(arrow_pts)
                for i, (yp, zp) in enumerate(arrow_pts):
                    dirs[i, 1] = float(state['tvgp_wz'].mean(jnp.array([t_curr, yp]))[0][0])
                wind_arrows_z.set_UVC(dirs[:, 0], dirs[:, 1])

        # Reference horizon & tubes
        rs, re = num_horizon * n, num_horizon * n + num_horizon
        if re <= len(data):
            reference_horizon.set_data(y_ref_all[rs:re], z_ref_all[rs:re])
            for i in range(num_horizon):
                pH, pL = pyH_all[rs+i], pyL_all[rs+i]
                zH, zL = pzH_all[rs+i], pzL_all[rs+i]
                if not (np.isnan(pH) or np.isnan(pL) or np.isnan(zH) or np.isnan(zL)):
                    tw, th = pH - pL, zH - zL
                    if 0 < tw < 50 and 0 < th < 50:
                        tube_rects[i].set_xy((pL, zL))
                        tube_rects[i].set_width(tw)
                        tube_rects[i].set_height(th)
                        tube_rects[i].set_visible(True)
                    else:
                        tube_rects[i].set_visible(False)
                else:
                    tube_rects[i].set_visible(False)
        else:
            reference_horizon.set_data([], [])
            for r in tube_rects:
                r.set_visible(False)

        # Quadrotor
        if not (np.isnan(y_curr) or np.isnan(z_curr)):
            qy, qz = quadplot(y_curr, z_curr, yaw_curr, QUADSIZE)
            quad_line.set_data(qy, qz)
            quad_line.set_visible(True)
            current_pos.set_data([y_curr], [z_curr])
            current_pos.set_visible(True)
        else:
            quad_line.set_visible(False)
            current_pos.set_visible(False)

        time_text.set_text(f't = {t_curr:.3f} s')
        return ret

    # --- Create & save animation ---
    frames = list(range(0, max_timestep, FRAME_SKIP))
    ani = animation.FuncAnimation(fig, update, frames=frames,
                                  interval=50, blit=True, repeat=False)
    ani.save(gif_path, writer='pillow', fps=GIF_FPS)
    plt.close(fig)

    print(f'  Saved: {gif_path}  ({len(frames)} frames, RMSE={rmse:.4f}, wind={has_wind})')
    return rmse

## No Wind Estimation

In [None]:
folder = os.path.join(BASE_DIR, 'no_wind_estimation')
log_files = sorted(glob.glob(os.path.join(folder, '*.log')))
print(f'Found {len(log_files)} log files in {folder}\n')

results_no_wind = {}
for lf in log_files:
    name = os.path.splitext(os.path.basename(lf))[0]
    gif_out = os.path.join(folder, f'{name}.gif')
    print(f'Processing {os.path.basename(lf)} ...')
    rmse = generate_gif(lf, gif_out)
    if rmse is not None:
        results_no_wind[name] = rmse

print('\n--- Summary (no wind estimation) ---')
for name, rmse in results_no_wind.items():
    print(f'  {name}: RMSE = {rmse:.6f}')

## Yes Wind Estimation

In [None]:
folder = os.path.join(BASE_DIR, 'yes_wind_estimation')
log_files = sorted(glob.glob(os.path.join(folder, '*.log')))
print(f'Found {len(log_files)} log files in {folder}\n')

results_yes_wind = {}
for lf in log_files:
    name = os.path.splitext(os.path.basename(lf))[0]
    gif_out = os.path.join(folder, f'{name}.gif')
    print(f'Processing {os.path.basename(lf)} ...')
    rmse = generate_gif(lf, gif_out)
    if rmse is not None:
        results_yes_wind[name] = rmse

print('\n--- Summary (yes wind estimation) ---')
for name, rmse in results_yes_wind.items():
    print(f'  {name}: RMSE = {rmse:.6f}')

## Combined Summary

In [None]:
print('=' * 55)
print(f'{"Log":25s} {"Wind Est.":12s} {"RMSE":>10s}')
print('-' * 55)
for name, rmse in results_no_wind.items():
    print(f'{name:25s} {"OFF":12s} {rmse:10.6f}')
for name, rmse in results_yes_wind.items():
    print(f'{name:25s} {"ON":12s} {rmse:10.6f}')
print('=' * 55)

if results_no_wind:
    avg_no  = np.mean(list(results_no_wind.values()))
    print(f'\nAvg RMSE (no wind est.):  {avg_no:.6f}')
if results_yes_wind:
    avg_yes = np.mean(list(results_yes_wind.values()))
    print(f'Avg RMSE (yes wind est.): {avg_yes:.6f}')