Edit the below cell's `input_log` variable to the path to your BIN (dataflash) log.
The BIN log parser currently assumes the first rangefinder is downward facing.

In [None]:
!pip install pymavlink
!pip install ipympl
!pip install open3d
!pip install plotly
!pip install pandas

In [44]:

from dataclasses import dataclass
from collections import namedtuple
from datetime import datetime
import math
import pandas as pd
from pathlib import Path
from pymavlink import mavutil
from pyproj import Geod
from scipy.spatial.transform import Rotation as R


input_log = Path("00000026.BIN")
stem = input_log.stem  # '00000024' from '00000024.BIN'
output_csv = Path(f"projected_{stem}.csv")

# Open .bin log
log = mavutil.mavlink_connection(str(input_log))

# Create a geodesic calculator
geod = Geod(ellps='WGS84')

# Store messages as lists of tuples
attitudes = []
ahrs_positions = []
rangefinder = []
terrain_data = []


Ahrs = namedtuple("Ahrs", ["timestamp", "lat", "lon", "alt", "roll", "pitch", "yaw"])
Rangefinder = namedtuple("Rangefinder", ["timestamp", "distance"])
Terrain = namedtuple("Terrain", ["timestamp", "terr_height"])

@dataclass
class GroundPoint:
    timestamp: float
    rng_lat: float
    rng_lon: float
    rng_elev: float
    ahrs_lat: float
    ahrs_lon: float
    ahrs_alt: float
    distance: float
    terr_height: float


while True:
    msg = log.recv_match(blocking=False)
    if msg is None:
        break

    t = datetime.fromtimestamp(msg._timestamp)
    mtype = msg.get_type()

    if msg.get_type() == "AHR2":
        # Lat and Lon in degrees, alt in m
        ahrs_positions.append(Ahrs(
            timestamp=t,
            lat=msg.Lat,
            lon=msg.Lng,
            alt=msg.Alt,
            roll=math.radians(msg.Roll),
            pitch=math.radians(msg.Pitch),
            yaw=math.radians(msg.Yaw),
        ))
    elif msg.get_type() == "RFND" and msg.Stat == 4: # Only use healthy readings, see AP_Rangefinder::Status::Good
        rangefinder.append(Rangefinder(
            timestamp=t,
            distance=msg.Dist
        ))
    elif msg.get_type() == "TERR":
        terrain_data.append(Terrain(
            timestamp=t,
            terr_height=msg.TerrH
        ))

ahrs_df = pd.DataFrame(ahrs_positions).set_index("timestamp").sort_index()
rfnd_df = pd.DataFrame(rangefinder).set_index("timestamp").sort_index()
# terr_df = pd.DataFrame(terrain_data).set_index("timestamp").sort_index()

In [45]:
# Compute estimated ground points
ground_points: list[GroundPoint] = []

assert ahrs_positions, "No AHR2 messages found"
assert rangefinder, "No valid RFND messages found"
# assert terrain_data, "No TERR messages found"

for t, row in rfnd_df.iterrows():
    dist = row.distance

    # Efficient nearest neighbor lookup with .asof() for datetime index
    try:
        ahrs_row = ahrs_df.loc[ahrs_df.index.asof(t)]
        # terr_row = terr_df.loc[terr_df.index.asof(t)]
    except KeyError:
        continue  # skip if no data available at or before timestamp

    # Unpack required fields
    lat, lon, alt, roll, pitch, yaw  = ahrs_row.lat, ahrs_row.lon, ahrs_row.alt, ahrs_row.roll, ahrs_row.pitch, ahrs_row.yaw
    # terr_height = terr_row.terr_height

    # Build the rotation using Euler angles (in radians)
    rotation = R.from_euler('zyx', [yaw, pitch, roll])

    # Rotate the down vector (in body frame)
    ned_vector = rotation.apply([0, 0, dist])

    # Calculate ground point in NED frame
    north_offset = ned_vector[0]
    east_offset = ned_vector[1]
    down_offset = ned_vector[2]

    ground_alt = alt - down_offset  # down_offset is positive, so subtract it to get estimated ground height

    # Project horizontal lat/lon offset
    azimuth = math.degrees(math.atan2(east_offset, north_offset))
    horizontal_distance = math.hypot(north_offset, east_offset)
    g_lon, g_lat, _ = geod.fwd(lon, lat, azimuth, horizontal_distance)

    ground_points.append(GroundPoint(
        timestamp=t,
        rng_lat=g_lat,
        rng_lon=g_lon,
        rng_elev=ground_alt,
        ahrs_lat=lat,
        ahrs_lon=lon,
        ahrs_alt=alt,
        distance=dist,
        terr_height=math.nan
    ))

if not ground_points:
    print("Error: No projected ground point data found.")
    exit(1)

In [46]:
# Convert to DataFrame

import pandas as pd

df = pd.DataFrame([gp.__dict__ for gp in ground_points])

In [47]:
# Get the latitude/longitude extent for input to OpenTopography

min_lat = df["rng_lat"].min()
max_lat = df["rng_lat"].max()
min_lon = df["rng_lon"].min()
max_lon = df["rng_lon"].max()

assert min_lat < max_lat
assert min_lon < max_lon

print(f"Latitude range: ({min_lat},{max_lat})")
print(f"Longitude range: ({min_lon},{max_lon})")


Latitude range: (40.05316186681585,40.057205709321124)
Longitude range: (-105.29139324579846,-105.28702229932495)


To compare to other elevation data sources, these must be manually downloaded from OpenTopography.
Once extracted into this directory, rename them and update paths in the below block.
Add as many datasets for comparison as you like. GDAL supports many different file formats.

In [48]:

# Load elevation from GeoTIFF using GDAL

from osgeo import gdal

dataset_paths = ["/vsizip/vsicurl/https://terrain.ardupilot.org/SRTM1/ap_srtm1.zip/ap_srtm1.vrt"]
for dataset_path in dataset_paths:
    dataset = gdal.Open(str(dataset_path))
    assert dataset is not None, f"Dataset '{dataset_path}' failed to load"
    band = dataset.GetRasterBand(1)
    transform = dataset.GetGeoTransform()

    # Convert lat/lon to row/col
    def latlon_to_rowcol(lat, lon):
        inv_transform = gdal.InvGeoTransform(transform)
        px, py = gdal.ApplyGeoTransform(inv_transform, lon, lat)
        return int(py), int(px)

    elevations = []
    for _, row in df.iterrows():
        r, c = latlon_to_rowcol(row.rng_lat, row.rng_lon)
        grid = band.ReadAsArray(c, r, 1, 1)
        if grid is not None:
            elevation = grid[0][0]
        else:
            # Out of range of the dataset, common with LOG_REPLAY before the EKF initializes
            elevation = math.nan
        elevations.append(elevation)

    df["ap_srtm1_elevation"] = elevations

In [None]:
# plot using plotly, good for smaller logs (<30 minutes)

import plotly.graph_objs as go

fig = go.Figure()
for col in df.columns:
    stems = [f"ap_srtm1_elevation" for p in dataset_paths]
    cols = ("ahrs_alt", "rng_elev", "terr_height", "ap_srtm1_elevation")
    if col in cols:
        print(f"Adding {col}")
        y = pd.to_numeric(df[col], errors='coerce')

        fig.add_trace(go.Scattergl(x=df["timestamp"], y=y, mode='lines+markers', name=col))

fig.update_layout(title=f"Terrain Comparison Over Time for {stem}", xaxis_title="Time", yaxis_title="Altitude (m)", height=600)
fig.show()

In [None]:
import plotly.graph_objects as go
import numpy as np
from scipy import stats
import open3d as o3d
from pyproj import Transformer


# column_names = df.iloc[0]
# df.columns = column_names
# df = df.iloc[1:].reset_index(drop=True)
# print(df)

### basic thresholding and filtering
ndf = df[df["rng_elev"] >= 0]
ndf[["rng_lon", "rng_lat", "rng_elev"]].map(np.isreal)

# sample = ndf.sample(100)
# print(ndf[["rng_lon", "rng_lat", "rng_elev"]].isnull().sum())

### Save to CSV
fig = go.Figure(data=[go.Scatter3d(
    x=ndf["rng_lon"],
    y=ndf["rng_lat"],
    z=ndf["rng_elev"],
    mode='markers',
    marker=dict(size=4, color=ndf["rng_elev"], colorscale='Viridis', opacity=0.8)
)])

fig.update_layout(scene=dict(
    xaxis_title='Longitude',
    yaxis_title='Latitude',
    zaxis_title='Elevation'
))

### plotted without any filtering, so it will be noisy
fig.show()

### convert to numpy -> o3d pcl
coords = ndf[['rng_lat', 'rng_lon', 'rng_elev']].to_numpy()

## conversion to ECEF coordinates for o3d
transformer = Transformer.from_crs("epsg:4326", "epsg:4978", always_xy=True)
x, y, z = transformer.transform(coords[:,1], coords[:,0], coords[:,2])
points = np.vstack((x, y, z)).T

pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points)


### downsampling, outlier removal
vdpcd = pcd.voxel_down_sample(voxel_size=2)

cl, ind = vdpcd.remove_statistical_outlier(nb_neighbors=20,std_ratio=2.0)

iclStd = vdpcd.select_by_index(ind)
oclStd = vdpcd.select_by_index(ind, invert=True)

cl, ind = iclStd.remove_radius_outlier(nb_points=5, radius=5)
icl = iclStd.select_by_index(ind)
ocl = iclStd.select_by_index(ind, invert=True)

ocl.paint_uniform_color([1, 0, 0])
icl.paint_uniform_color([0.8, 0.8, 0.8])
iclStd.paint_uniform_color([0.6, 0.6, 0.6])
oclStd.paint_uniform_color([1, 0, 0])
o3d.visualization.draw_geometries([icl, ocl,oclStd])

### uncomment if want to see voxel grid for later navigation tasks if required

# voxel_grid = o3d.geometry.VoxelGrid.create_from_point_cloud(pcd, voxel_size=2)
# o3d.visualization.draw_geometries([voxel_grid])



In [None]:
# to np array and normalize and center the pcl
points = np.asarray(pcd.points)

centroid = np.mean(points, axis=0)
points_centered = points - centroid

max_distance = np.max(np.linalg.norm(points_centered, axis=1))
min_distance = np.max(np.linalg.norm(points_centered, axis=1))

points_normalized = points_centered / max_distance


fig = go.Figure(data=[go.Scatter3d(
    x=points_normalized[:, 0],
    y=points_normalized[:, 1],
    z=points_normalized[:, 2],
    mode='markers',
    marker=dict(size=1)
)])
fig.update_layout(scene=dict(
    xaxis_title='X',
    yaxis_title='Y',
    zaxis_title='Z'
))

print(points_normalized[:, 0])
fig.show()

In [52]:
# compare with SRTM elevation
df["delta"] = df["rng_elev"] - df["ap_srtm1_elevation"]

In [None]:
# Save plot

import plotly.io as pio
pio.write_html(fig, f'terrain_pyplot_{stem}.html')

In [53]:
# save to CSV to visualize on QGIS or other tools
df.to_csv('output.csv', index=False)
