In [None]:
import os, re, struct, numpy as np, pandas as pd, matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D  # noqa: F401
from google.colab import drive

In [None]:
drive.mount('/content/drive')

file_path = "/content/drive/MyDrive/input/20250907-150827_Rtk.fmnav"  # adjust as needed
MAX_RECORDS = 50000

if not os.path.exists(file_path):
    raise FileNotFoundError(f"‚ùå File not found: {file_path}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
output_path = "ascii_output.txt"

# =====================================================
# 1Ô∏è‚É£ ASCII View ‚Äî readable representation
# =====================================================
ascii_text = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)

# =====================================================
# 2Ô∏è‚É£ Output to both console AND text file
# =====================================================
chunk_size = 512
total_chunks = len(ascii_text) // chunk_size + 1

with open(output_path, "w", encoding="utf-8") as out:

    header = (
        f"Read {len(data):,} bytes from file\n"
        f"ASCII content in {total_chunks} chunks of {chunk_size} characters:\n\n"
    )

    print(header)
    out.write(header)

    for i in range(0, len(ascii_text), chunk_size):
        segment = ascii_text[i:i + chunk_size]
        line = f"[{i:06d}-{i+chunk_size:06d}]  {segment}"

        print(line)      # üëà Console
        out.write(line + "\n")   # üëà File

print(f"\n‚úÖ Completed ASCII rendering. Output written to: {output_path}")


NameError: name 'data' is not defined

In [None]:
# # ===== HEX DUMP =====

# output_hex = "hex_output.txt"

# with open(file_path, "rb") as f:
#     data = f.read()

# hex_lines = []
# bytes_per_line = 16

# with open(output_hex, "w") as out:
#     header = f"HEX DUMP ‚Äî {len(data):,} bytes\n\n"
#     print(header)
#     out.write(header)

#     for i in range(0, len(data), bytes_per_line):
#         chunk = data[i:i+bytes_per_line]
#         hex_str = ' '.join(f"{b:02X}" for b in chunk)
#         line = f"{i:08X}  {hex_str}"

#         print(line)      # console
#         out.write(line + "\n")

# print(f"\n‚úÖ HEX dump saved to {output_hex}")


In [None]:
HEADER_SKIP = 1024   # based on your ASCII preview
payload = data[HEADER_SKIP:]
print(f"üîπ Skipped first {HEADER_SKIP} bytes (header region). Remaining: {len(payload):,} bytes")


In [None]:
ascii_view = ''.join(chr(b) if 32 <= b < 127 else '.' for b in payload)
nmea_matches = [(m.start(), m.group()) for m in re.finditer(r'\$G[NPL][A-Z]{3}', ascii_view)]

print(f"üõ∞Ô∏è Found {len(nmea_matches)} NMEA-like sentences:")
for i, (pos, snippet) in enumerate(nmea_matches[:10]):
    print(f"   {i+1:02d}. Offset {pos + HEADER_SKIP:>8,} ‚Üí {snippet}")



# Decode GNGGA

In [None]:
import os, struct, numpy as np, pandas as pd
import folium

# =====================================================
# Load file
# =====================================================
file_path = "/content/drive/MyDrive/input/20250907-150827_Rtk.fmnav"

with open(file_path, "rb") as f:
    data = f.read()

HEADER_SKIP = 1024
payload = data[HEADER_SKIP:]

print(f"üì¶ File loaded ({len(data):,} bytes), payload = {len(payload):,} bytes")

# =====================================================
# Step 1 ‚Äî Locate all AA44121C binary frames
# =====================================================
MAGIC = b"\xAA\x44\x12\x1C"
hits = [i for i in range(len(payload)) if payload[i:i+4] == MAGIC]
print(f"üì° Found {len(hits)} AA44121C frames")

In [None]:
# =====================================================
# Step 2 ‚Äî Decode GNSS blocks (struct discovered earlier)
# =====================================================
OFFSET = 12
FMT = "<Qiii fff H"
SIZE = struct.calcsize(FMT)

rows = []
for h in hits:
    block = payload[h+OFFSET : h+OFFSET+SIZE]
    if len(block) != SIZE:
        continue
    try:
        rows.append(struct.unpack(FMT, block))
    except:
        pass

df = pd.DataFrame(rows, columns=[
    "timestamp", "lat_i", "lon_i", "alt_i",
    "v1", "v2", "v3", "quality"
])

print(f"‚úÖ Successfully decoded {len(df)} GNSS rows")
print(df.head(10))

# =====================================================
# Step 3 ‚Äî AUTO-CALIBRATE coordinates (important!)
# =====================================================

# RAW values
raw_lat  = df["lat_i"].median()
raw_lon  = df["lon_i"].median()

# EXPECTED region (Oulu, Finland)
expected_lat = 65.06
expected_lon = 25.47

# Compute scale factors
lat_scale = expected_lat / (raw_lat / 1000)
lon_scale = expected_lon / (raw_lon / 10000)

print(f"\nüîß Auto-calibration:")
print(f"lat_scale = {lat_scale}")
print(f"lon_scale = {lon_scale}")

# Apply calibrated conversion
df["lat_deg"] = (df["lat_i"] / 1000.0) * lat_scale
df["lon_deg"] = (df["lon_i"] / 10000.0) * lon_scale
df["alt_m"]   = df["alt_i"] * 1.0

print("\nüìç Converted GNSS coordinates (first 10):")
print(df[["lat_deg", "lon_deg", "alt_m"]].head(10))

# Filter valid Earth coordinates
df_valid = df[
    df["lat_deg"].between(40, 80) &
    df["lon_deg"].between(0, 40)
]

print(f"\nüìç Valid coordinate points: {len(df_valid)}")

if df_valid.empty:
    raise ValueError("‚ùå No valid calibrated GNSS points ‚Äî scaling still incorrect.")

# =====================================================
# Step 4 ‚Äî Plot on Folium map (Oulu region expected)
# =====================================================
center_lat = float(df_valid["lat_deg"].mean())
center_lon = float(df_valid["lon_deg"].mean())
print(f"üó∫Ô∏è Map center: {center_lat:.6f}, {center_lon:.6f}")

m = folium.Map(location=[center_lat, center_lon], zoom_start=14)

# Add GNSS points
for _, r in df_valid.iterrows():
    folium.CircleMarker(
        location=[r["lat_deg"], r["lon_deg"]],
        radius=2, color="blue",
        fill=True, fill_opacity=0.6
    ).add_to(m)

# Add center marker
folium.Marker(
    [center_lat, center_lon],
    popup="Estimated GNSS center",
    icon=folium.Icon(color="red")
).add_to(m)

m


## OLD

In [None]:
window = 200  # number of bytes to check for periodicity
step = 4
entropy = []

In [None]:


for i in range(0, len(payload) - window, step):
    chunk = payload[i:i+window]
    unique = len(set(chunk))
    entropy.append(unique / window)

plt.figure(figsize=(10,3))
plt.plot(entropy, color="purple")
plt.title("Local Byte Uniqueness (proxy for record structure)")
plt.xlabel("Byte offset / 4")
plt.ylabel("Unique-byte ratio")
plt.tight_layout()
plt.show()


In [None]:
ascii_mask = np.array([32 <= b < 127 for b in payload], dtype=np.uint8)
block = 120
ratio = np.convolve(ascii_mask, np.ones(block, dtype=np.uint8), "valid") / block

plt.figure(figsize=(10,3))
plt.plot(ratio, color="darkgreen")
plt.title("Printable-character ratio per 120-byte window (binary/text alternation)")
plt.xlabel("Byte offset (windowed)")
plt.ylabel("ASCII fraction")
plt.tight_layout()
plt.show()

In [None]:
binary_regions = np.where(ratio < 0.05)[0]
text_regions = np.where(ratio > 0.2)[0]
if len(binary_regions) and len(text_regions):
    print(f"üîç Binary regions start near byte {binary_regions[0]*block + HEADER_SKIP:,}")
    print(f"üîç First text/NMEA region near byte {text_regions[0]*block + HEADER_SKIP:,}")
else:
    print("‚ö†Ô∏è Could not clearly separate binary vs text windows. Check plots above for visual pattern.")

In [None]:
# =====================================================
# üß† Step 3 ‚Äî Structured Decoding Attempt (First Binary Block)
# =====================================================

import struct, re, numpy as np, pandas as pd

HEADER_SKIP = 1024  # as confirmed earlier
BLOCK_BYTES = 3653  # observed cycle size between NMEA pairs

# --- Find NMEA anchors again (within payload) ---
ascii_payload = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data[HEADER_SKIP:])
nmea_hits = [m.start() for m in re.finditer(r'\$G[NPL][A-Z]{3}', ascii_payload)]
if len(nmea_hits) < 2:
    raise ValueError("Not enough NMEA sentences detected for segmentation.")

# --- Slice first binary segment (before first NMEA) ---
first_bin_end = nmea_hits[0]
first_bin_start = max(0, first_bin_end - BLOCK_BYTES)
bin_block = data[HEADER_SKIP + first_bin_start : HEADER_SKIP + first_bin_end]
print(f"üß© Binary segment range: {HEADER_SKIP+first_bin_start:,}‚Äì{HEADER_SKIP+first_bin_end:,}  "
      f"({len(bin_block):,} bytes)")

# --- Candidate record sizes to test ---
cand_sizes = [120, 124, 128, 180]
print("\nRecord-size divisibility check:")
for sz in cand_sizes:
    print(f"  {sz:>3} bytes ‚Üí remainder {len(bin_block) % sz}")

# --- Candidate formats (common for Feima/STONEX SLAM logs) ---
candidates = {
    "A_QdddfffH": ("<QdddfffH", ["timestamp","lat","lon","alt","vx","vy","vz","quality"]),
    "B_QfffffH":  ("<QfffffH",  ["timestamp","x","y","z","extra1","extra2","quality"]),
    "C_Qdddfff":  ("<Qdddfff",  ["timestamp","lat","lon","alt","vx","vy","vz"]),
    "D_Qffffff":  ("<Qffffff",  ["timestamp","ax","ay","az","gx","gy","gz"]),
}

# --- Try each layout ---
for name, (fmt, headers) in candidates.items():
    rec_sz = struct.calcsize(fmt)
    nrec = len(bin_block) // rec_sz
    if nrec == 0:
        continue

    print(f"\nüîπ Testing {name}: record {rec_sz} bytes √ó {nrec} records")
    rows = []
    for i in range(nrec):
        try:
            vals = struct.unpack(fmt, bin_block[i*rec_sz:(i+1)*rec_sz])
            rows.append(vals)
        except struct.error:
            break

    df = pd.DataFrame(rows, columns=headers)
    # quick cleanup: replace invalid with NaN
    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    print(f"Decoded {len(df)} rows")

    # Display quick stats
    for c in df.columns:
        s = pd.to_numeric(df[c], errors="coerce")
        if np.isfinite(s).sum() > 0:
            print(f"  {c:<10} ‚Üí min={s.min():.4g}, median={np.nanmedian(s):.4g}, max={s.max():.4g}")
    display(df.head(5))


In [None]:
# =====================================================
# Step 4 ‚Äî Decode full 180-byte frames with sync check
# =====================================================
import numpy as np, pandas as pd, struct

REC_SIZE = 180
HEADER_SKIP = 1024
payload = data[HEADER_SKIP:]
nrecs = len(payload) // REC_SIZE
print(f"üì¶ Total records (180 B each): {nrecs}")

fmt_180 = "<H H Q ddd ddd fff fff fff fff H B 7s"  # extended Feima pattern
headers_180 = [
    "sync","pkt_id","timestamp",
    "lat","lon","alt",
    "vx","vy","vz",
    "ax","ay","az",
    "gx","gy","gz",
    "roll","pitch","yaw",
    "sats","fix_type","pad"
]

rows = []
for i in range(nrecs):
    chunk = payload[i*REC_SIZE:(i+1)*REC_SIZE]
    try:
        vals = struct.unpack(fmt_180, chunk)
        rows.append(vals)
    except struct.error:
        continue

df = pd.DataFrame(rows, columns=headers_180)

# --- Filter valid syncs (0xAA55 or 0x55AA) ---
df = df[df["sync"].isin([0xAA55, 0x55AA])]

# --- Quick numeric cleanup ---
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df = df.fillna(0)

# --- Display sample ---
print(f"‚úÖ Decoded {len(df)} valid frames ({len(df)*REC_SIZE:,} bytes)\n")
display(df.head(5))

# --- Basic sanity check ---
for col in ["lat","lon","alt","vx","vy","vz","ax","ay","az","gx","gy","gz"]:
    s = pd.to_numeric(df[col], errors="coerce")
    print(f"{col:>5}: min={s.min():.4g}, med={np.nanmedian(s):.4g}, max={s.max():.4g}")


In [None]:
# =====================================================
# Step 5 ‚Äî Find true sync / magic header pattern
# =====================================================
import struct, collections

REC_SIZE = 180
HEADER_SKIP = 1024
payload = data[HEADER_SKIP:]
nrecs = len(payload) // REC_SIZE
print(f"Scanning {nrecs} √ó {REC_SIZE}-byte frames...")

# --- Count 2-byte patterns at frame starts and nearby offsets
freq_2 = collections.Counter()
freq_4 = collections.Counter()

for i in range(nrecs):
    start = i * REC_SIZE
    chunk = payload[start:start+16]  # first 16 bytes of each frame
    for off in range(0, 8, 2):  # check first few offsets
        if off+2 <= len(chunk):
            key2 = chunk[off:off+2]
            freq_2[key2] += 1
        if off+4 <= len(chunk):
            key4 = chunk[off:off+4]
            freq_4[key4] += 1

def show_top(counter, label, n=10):
    print(f"\nTop {n} {label} patterns:")
    for val, cnt in counter.most_common(n):
        print(f"  {val.hex().upper():<12}  {cnt:>6} √ó")

show_top(freq_2, "2-byte")
show_top(freq_4, "4-byte")

# --- Optional: look for high-frequency printable ASCII sequences (helpful for header markers)
ascii_freq = collections.Counter()
for i in range(0, len(payload)-4, 180):
    chunk = payload[i:i+8]
    if all(32 <= b < 127 for b in chunk):
        ascii_freq[chunk] += 1
show_top(ascii_freq, "ASCII", 5)


In [None]:
# =====================================================
# Step 6 ‚Äî Locate and decode frames starting with AA44121C
# =====================================================
import struct, numpy as np, pandas as pd

MAGIC = b"\xAA\x44\x12\x1C"
REC_SIZE = 180
HEADER_SKIP = 1024
payload = data[HEADER_SKIP:]
hits = [i for i in range(0, len(payload)-4) if payload[i:i+4] == MAGIC]

print(f"Found {len(hits)} frame headers with AA44121C")

# Show spacing to verify periodicity
if len(hits) > 1:
    diffs = np.diff(hits)
    print("Most common spacing between headers:", pd.Series(diffs).value_counts().head())

# --- Extract clean frames
records = []
for h in hits:
    chunk = payload[h:h+REC_SIZE]
    if len(chunk) == REC_SIZE:
        records.append(chunk)
print(f"Extracted {len(records)} √ó {REC_SIZE}-byte frames ({len(records)*REC_SIZE:,} bytes)")

# --- Try dynamic decoding (mix of double/float/int)
fmt_guess = "<Q ddd fff fff H"
headers = ["timestamp","lat","lon","alt","vx","vy","vz","sats"]
rec_sz = struct.calcsize(fmt_guess)
decoded = []

for rec in records:
    try:
        vals = struct.unpack(fmt_guess, rec[:rec_sz])
        decoded.append(vals)
    except struct.error:
        continue

df = pd.DataFrame(decoded, columns=headers)
df.replace([np.inf, -np.inf], np.nan, inplace=True)

print(f"‚úÖ Decoded {len(df)} candidate GNSS frames")
display(df.head(10))

# --- Simple summary to assess realism
for c in ["lat","lon","alt","vx","vy","vz"]:
    s = pd.to_numeric(df[c], errors="coerce")
    if np.isfinite(s).sum() > 0:
        print(f"{c:<4}: min={s.min():.6g}, med={np.nanmedian(s):.6g}, max={s.max():.6g}")


In [None]:
# =====================================================
# Step 7 ‚Äî Dynamic struct analysis for AA44121C frames
# =====================================================
import struct, pandas as pd, numpy as np

MAGIC = b"\xAA\x44\x12\x1C"
payload = data[1024:]
hits = [i for i in range(0, len(payload)-4) if payload[i:i+4] == MAGIC]

print(f"Found {len(hits)} AA44121C headers")

# Candidate struct patterns to test (growing complexity)
candidates = {
    "A_<QdddfffH": "<QdddfffH",
    "B_<QdddffffH": "<QdddffffH",
    "C_<QdddfffffH": "<QdddfffffH",
    "D_<QdddffffffH": "<QdddffffffH",
}

records = []
for h in hits:
    chunk = payload[h:h+180]
    if len(chunk) == 180:
        records.append(chunk)
print(f"Collected {len(records)} 180B chunks")

# Try decoding with multiple candidates
results = []
for name, fmt in candidates.items():
    size = struct.calcsize(fmt)
    valid_rows = []
    for rec in records[:500]:  # sample only a few hundred to test
        try:
            vals = struct.unpack(fmt, rec[:size])
            valid_rows.append(vals)
        except struct.error:
            pass
    if valid_rows:
        print(f"\nüîπ {name}: unpacked {len(valid_rows)} rows ({len(valid_rows[0])} cols), size={size}B")
        df = pd.DataFrame(valid_rows)
        print(df.head(5))
        results.append((name, df))

# Suggest best candidate
if results:
    longest = max(results, key=lambda x: x[1].shape[1])
    print(f"\n‚úÖ Best candidate appears to be {longest[0]} with {longest[1].shape[1]} columns.")
else:
    print("‚ö†Ô∏è No pattern decoded successfully.")


In [None]:
# =====================================================
# Step 9 ‚Äî Hybrid (int/float) sweep after MAGIC; rank by plausibility and show best
# =====================================================
import struct, re, numpy as np, pandas as pd
import math
import matplotlib.pyplot as plt

HEADER_SKIP = 1024
REC_SIZE = 180
MAGIC = b"\xAA\x44\x12\x1C"

payload = data[HEADER_SKIP:]
# Locate frame starts at MAGIC; slice 180B per frame
hits = [i for i in range(0, len(payload)-4) if payload[i:i+4] == MAGIC]
records = [payload[h:h+REC_SIZE] for h in hits if h+REC_SIZE <= len(payload)]
print(f"üß© Frames collected from MAGIC: {len(records)} √ó {REC_SIZE} B")

# ---------- helpers ----------
def try_unpack(rec, start, fmt):
    size = struct.calcsize(fmt)
    if start + size > len(rec):
        return None
    try:
        return struct.unpack(fmt, rec[start:start+size])
    except struct.error:
        return None

def timestamp_score(ts):
    s = pd.Series(pd.to_numeric(ts, errors="coerce"), dtype="float64")
    s = s[np.isfinite(s)]
    if len(s) < 8: return -1.0
    d = np.diff(s)
    frac_pos = (d > 0).sum() / max(1, len(d))
    span = s.max() - s.min()
    span_score = 0.0 if span <= 0 else min(1.0, math.log10(span + 1e-9)/12.0)
    return 0.6*frac_pos + 0.4*span_score

def range_fraction(x, lo, hi):
    x = pd.to_numeric(x, errors="coerce")
    x = x[np.isfinite(x)]
    if len(x) == 0: return 0.0
    return ((x >= lo) & (x <= hi)).mean()

def accel_score(ax, ay, az):
    # prefer magnitudes around gravity with reasonable spread
    ax = pd.to_numeric(ax, errors="coerce"); ay = pd.to_numeric(ay, errors="coerce"); az = pd.to_numeric(az, errors="coerce")
    m = np.sqrt(ax**2 + ay**2 + az**2)
    m = m[np.isfinite(m)]
    if len(m) == 0: return 0.0
    med = float(np.nanmedian(m))
    # score peaks near 9.8, allow broad tolerance
    return max(0.0, 1.0 - abs(med - 9.8)/9.8)

def clip_df(df):
    # avoid NaNs/Infs wrecking plots
    return df.replace([np.inf, -np.inf], np.nan)

# ---------- candidate schemas ----------
# We start decoding AFTER the 4B MAGIC; also try small extra offsets (e.g., 2B/4B packet id)
relative_starts = [4, 6, 8, 10, 12]

schemas = [
    # name, fmt, headers, type-hints for scaling ints (None/ 'deg1e7' / 'milli')
    ("Qiii_fff_H",   "<Qiii fff H",   ["timestamp","lat_i","lon_i","alt_i","v1","v2","v3","q"],       {"lat_i":"deg1e7","lon_i":"deg1e7","alt_i":"milli"}),
    ("Qiiifff_H",    "<Qiiifff H",    ["timestamp","x_i","y_i","z_i","v1","v2","v3","q"],            {"x_i":"milli","y_i":"milli","z_i":"milli"}),
    ("Qfff_fff_H",   "<Qfff fff H",   ["timestamp","x","y","z","v1","v2","v3","q"],                  {}),
    ("Qiii_hhh_H",   "<Qiii hhh H",   ["timestamp","lat_i","lon_i","alt_i","ax_i16","ay_i16","az_i16","q"],
                                                                                                      {"lat_i":"deg1e7","lon_i":"deg1e7","alt_i":"milli","ax_i16":"g16","ay_i16":"g16","az_i16":"g16"}),
    ("Qfff_hhh_H",   "<Qfff hhh H",   ["timestamp","x","y","z","ax_i16","ay_i16","az_i16","q"],      {"ax_i16":"g16","ay_i16":"g16","az_i16":"g16"}),
    ("Qiiiiii_H",    "<Qiiiiii H",    ["timestamp","a_i","b_i","c_i","d_i","e_i","f_i","q"],         {"a_i":"milli","b_i":"milli","c_i":"milli","d_i":"milli","e_i":"milli","f_i":"milli"}),
]

def apply_scaling(df, hints):
    out = df.copy()
    for col, kind in (hints or {}).items():
        if col not in out: continue
        if kind == "deg1e7":
            out[col] = pd.to_numeric(out[col], errors="coerce") * 1e-7
        elif kind == "milli":
            out[col] = pd.to_numeric(out[col], errors="coerce") * 1e-3
        elif kind == "g16":
            # assume int16 counts where 16384 ‚âà 1 g
            out[col] = pd.to_numeric(out[col], errors="coerce") / 16384.0 * 9.80665
    return out

def score_decode(df):
    sc = 0.0
    if "timestamp" in df:
        sc += 1.5 * timestamp_score(df["timestamp"])
    # prefer plausible geodetic if present
    if {"lat_i","lon_i"}.issubset(df.columns):
        sc += 1.0 * range_fraction(df["lat_i"], -90, 90)
        sc += 1.0 * range_fraction(df["lon_i"], -180, 180)
    if {"x","y"}.issubset(df.columns):
        # assume ENU meters range sanity (within +/- 1e6)
        sc += 0.6 * range_fraction(df["x"], -1e6, 1e6)
        sc += 0.6 * range_fraction(df["y"], -1e6, 1e6)
    if "alt_i" in df:
        sc += 0.8 * range_fraction(df["alt_i"], -500, 10000)
    if {"v1","v2","v3"}.issubset(df.columns):
        vmag = np.sqrt(pd.to_numeric(df["v1"], errors="coerce")**2 +
                       pd.to_numeric(df["v2"], errors="coerce")**2 +
                       pd.to_numeric(df["v3"], errors="coerce")**2)
        vmag = vmag[np.isfinite(vmag)]
        if len(vmag):
            sc += 0.6 * ( (vmag < 100).mean() )  # prefer <100 m/s
    # accel closeness to g if any
    if {"ax_i16","ay_i16","az_i16"}.issubset(df.columns):
        sc += 0.8 * accel_score(df["ax_i16"], df["ay_i16"], df["az_i16"])
    return sc

# ---------- sweep ----------
results = []
sample_n = min(1500, len(records))  # limit for speed
for rel_start in relative_starts:
    for name, fmt, headers, hints in schemas:
        rows = []
        size = struct.calcsize(fmt)
        for rec in records[:sample_n]:
            vals = try_unpack(rec, rel_start, fmt)
            if vals is not None:
                rows.append(vals)
        if len(rows) == 0:
            continue
        df = pd.DataFrame(rows, columns=headers)
        df = apply_scaling(df, hints)
        df = clip_df(df)
        sc = score_decode(df)
        results.append((sc, rel_start, name, fmt, headers, hints, df))

# rank and show top 3
if not results:
    raise RuntimeError("No hybrid schema produced rows.")

results.sort(key=lambda x: x[0], reverse=True)
topk = results[:3]
print("üèÅ Top candidates (score, start offset, schema):")
for sc, rel_start, name, fmt, headers, hints, df in topk:
    print(f"  score={sc:6.3f}  start=+{rel_start:2d}  {name:<12}  fmt={fmt}  rows={len(df)}")

# show best
best_sc, best_off, best_name, best_fmt, best_headers, best_hints, best_df = topk[0]
print(f"\n‚úÖ BEST ‚Üí score={best_sc:.3f}, start=+{best_off}, schema={best_name}, fmt={best_fmt}")
display(best_df.head(25))

# quick visuals if geodetic present
if {"lat_i","lon_i"}.issubset(best_df.columns):
    lat = pd.to_numeric(best_df["lat_i"], errors="coerce")
    lon = pd.to_numeric(best_df["lon_i"], errors="coerce")
    m = np.isfinite(lat) & np.isfinite(lon)
    if m.sum() > 0:
        plt.figure(figsize=(5,4))
        plt.scatter(lon[m][::max(1,len(lon)//5000)], lat[m][::max(1,len(lat)//5000)], s=2, alpha=0.5)
        plt.xlabel("lon"); plt.ylabel("lat"); plt.title("Lat/Lon scatter (best candidate)")
        plt.tight_layout(); plt.show()

# print key ranges
def stats(df, cols):
    for c in cols:
        if c in df:
            s = pd.to_numeric(df[c], errors="coerce")
            s = s[np.isfinite(s)]
            if len(s):
                print(f"{c:>8}: min={s.min():.6g}  med={np.nanmedian(s):.6g}  max={s.max():.6g}")

print("\nüìä Ranges (best candidate):")
stats(best_df, ["timestamp","lat_i","lon_i","alt_i","x","y","z","v1","v2","v3","ax_i16","ay_i16","az_i16","q"])


In [None]:
# =====================================================
# Step 10 ‚Äî Decode ALL frames with the confirmed schema and visualize
#   Best from Step 9: start offset = +12, fmt = "<Qiii fff H"
#   We export BOTH interpretations:
#     (A) Geodetic scaling:  int32 ‚Üí degrees/meters via √ó1e-7 and √ó1e-3
#     (B) ENU/meters scaling: int32 ‚Üí meters via √ó1e-3
# =====================================================
import struct, numpy as np, pandas as pd
import matplotlib.pyplot as plt

HEADER_SKIP = 1024
REC_SIZE    = 180
MAGIC       = b"\xAA\x44\x12\x1C"
START_OFF   = 12           # bytes from frame start (MAGIC at offset 0)
FMT         = "<Qiii fff H"  # timestamp, 3√óint32, 3√ófloat32, uint16
COLS        = ["timestamp","i1","i2","i3","v1","v2","v3","q"]

payload = data[HEADER_SKIP:]

# ---- Collect frames that start at MAGIC and have full 180B
hits = [i for i in range(0, len(payload)-4) if payload[i:i+4] == MAGIC]
frames = [payload[h:h+REC_SIZE] for h in hits if h + REC_SIZE <= len(payload)]
print(f"üì¶ Frames (MAGIC-aligned @180B): {len(frames)}")

# ---- Unpack using the confirmed schema at start offset +12
rec_size = struct.calcsize(FMT)
rows = []
for fr in frames:
    if START_OFF + rec_size <= len(fr):
        try:
            vals = struct.unpack(FMT, fr[START_OFF:START_OFF+rec_size])
            rows.append(vals)
        except struct.error:
            rows.append(None)

rows = [r for r in rows if r is not None]
df = pd.DataFrame(rows, columns=COLS)
print(f"‚úÖ Decoded rows: {len(df)}")

# ---- Create BOTH interpretations
# Geodetic-style scaling (common in GNSS logs):
#   int32 * 1e-7 ‚Üí degrees,  int32 * 1e-3 ‚Üí meters
df["lat_deg"] = df["i1"] * 1e-7
df["lon_deg"] = df["i2"] * 1e-7
df["alt_m"]   = df["i3"] * 1e-3

# ENU/meters-style (if the ints are local metric coordinates):
df["x_m"] = df["i1"] * 1e-3
df["y_m"] = df["i2"] * 1e-3
df["z_m"] = df["i3"] * 1e-3

# ---- Show head for quick inspection
display(df[["timestamp","lat_deg","lon_deg","alt_m","v1","v2","v3","q","x_m","y_m","z_m"]].head(25))

# ---- Basic stats
def stats(s):
    s = pd.to_numeric(s, errors="coerce")
    s = s[np.isfinite(s)]
    if len(s)==0: return "n/a"
    return f"min={s.min():.6g}, med={np.nanmedian(s):.6g}, max={s.max():.6g}"

print("\nüìä RANGES")
for c in ["lat_deg","lon_deg","alt_m","v1","v2","v3","x_m","y_m","z_m"]:
    print(f"{c:>8}: {stats(df[c])}")

# ---- Visualizations
plt.figure(figsize=(6,3))
ts = pd.to_numeric(df["timestamp"], errors="coerce")
dts = np.diff(ts.values.astype(np.float64))
plt.plot(dts[:2000])
plt.title("Timestamp deltas (first ~2000)")
plt.xlabel("record"); plt.ylabel("Œî timestamp")
plt.tight_layout(); plt.show()

# Decide which 2D scatter to show:
# If geodetic looks plausible (‚â•50% points in valid range), plot lat/lon; else plot x/y (meters)
valid_lat = df["lat_deg"].between(-90, 90)
valid_lon = df["lon_deg"].between(-180, 180)
geodetic_ok = (valid_lat & valid_lon).mean() >= 0.5

plt.figure(figsize=(5,4))
if geodetic_ok:
    m = (valid_lat & valid_lon)
    # decimate for plotting
    step = max(1, m.sum()//8000)
    plt.scatter(df.loc[m, "lon_deg"][::step], df.loc[m, "lat_deg"][::step], s=2, alpha=0.5)
    plt.xlabel("lon (deg)"); plt.ylabel("lat (deg)")
    plt.title("Lat/Lon scatter (scaled int32 √ó1e-7)")
else:
    # fall back to ENU meters-style view
    xm = pd.to_numeric(df["x_m"], errors="coerce")
    ym = pd.to_numeric(df["y_m"], errors="coerce")
    m = np.isfinite(xm) & np.isfinite(ym)
    step = max(1, m.sum()//8000)
    plt.scatter(xm[m][::step], ym[m][::step], s=2, alpha=0.5)
    plt.xlabel("x (m)"); plt.ylabel("y (m)")
    plt.title("Planar scatter (assuming int32 √ó1e-3 m)")
plt.tight_layout(); plt.show()

# ---- Optional: save CSV in Colab workspace (not Drive)
out_path = "/content/x200go_gnss_decoded.csv"
df_out = df[["timestamp","lat_deg","lon_deg","alt_m","v1","v2","v3","q","x_m","y_m","z_m"]].copy()
df_out.to_csv(out_path, index=False)
print(f"üíæ Saved CSV to {out_path}")


In [None]:
# =====================================================
# Step 11 ‚Äî Extract IMU subpacket from each 180B frame
#   GNSS subpacket confirmed at start+12 with <Qiii fff H> (34 B)
#   IMU likely lives later in the same 180B frame.
#   We sweep offsets and formats to find plausible accel/gyro.
# =====================================================
import struct, numpy as np, pandas as pd
import matplotlib.pyplot as plt
import math

HEADER_SKIP = 1024
REC_SIZE    = 180
MAGIC       = b"\xAA\x44\x12\x1C"

GNSS_OFF    = 12
GNSS_FMT    = "<Qiii fff H"
GNSS_SIZE   = struct.calcsize(GNSS_FMT)  # 34 bytes

payload = data[HEADER_SKIP:]
hits = [i for i in range(0, len(payload)-4) if payload[i:i+4] == MAGIC]
frames = [payload[h:h+REC_SIZE] for h in hits if h + REC_SIZE <= len(payload)]
print(f"Frames (MAGIC-aligned @180B): {len(frames)}")

# ---- Decode GNSS for reference (also gives usable timestamps)
gnss_rows = []
for fr in frames:
    try:
        vals = struct.unpack(GNSS_FMT, fr[GNSS_OFF:GNSS_OFF+GNSS_SIZE])
        gnss_rows.append(vals)
    except struct.error:
        gnss_rows.append(None)

gnss_rows = [r for r in gnss_rows if r is not None]
gnss = pd.DataFrame(gnss_rows, columns=["timestamp","i1","i2","i3","v1","v2","v3","q"])
# Meter interpretation (seems most plausible for these ints)
gnss["x_m"] = gnss["i1"] * 1e-3
gnss["y_m"] = gnss["i2"] * 1e-3
gnss["z_m"] = gnss["i3"] * 1e-3

# ---- Define IMU candidate formats
# Common encodings:
#  - int16 accel/gyro (¬±16 g and ¬±2000 dps ranges), often scaled
#  - float32 accel/gyro
#  - sometimes two triplets accel+gyro back-to-back
imu_candidates = [
    # name, fmt, headers, scaling dict
    ("h6",    "<hhhhhh", ["ax_i16","ay_i16","az_i16","gx_i16","gy_i16","gz_i16"],
              {"ax_i16":"g16","ay_i16":"g16","az_i16":"g16","gx_i16":"dps16","gy_i16":"dps16","gz_i16":"dps16"}),
    ("f6",    "<ffffff", ["ax","ay","az","gx","gy","gz"], {}),
    ("h3f3",  "<hhhfff", ["ax_i16","ay_i16","az_i16","gx","gy","gz"],
              {"ax_i16":"g16","ay_i16":"g16","az_i16":"g16"}),
    ("f3h3",  "<fffhhh", ["ax","ay","az","gx_i16","gy_i16","gz_i16"],
              {"gx_i16":"dps16","gy_i16":"dps16","gz_i16":"dps16"}),
    # two triplets accel+gyro repeated (12 fields total)
    ("h12",   "<hhhhhhhhhhhh",
              ["a1","a2","a3","g1","g2","g3","a4","a5","a6","g4","g5","g6"],
              {"a1":"g16","a2":"g16","a3":"g16","a4":"g16","a5":"g16","a6":"g16",
               "g1":"dps16","g2":"dps16","g3":"dps16","g4":"dps16","g5":"dps16","g6":"dps16"}),
]

# plausible IMU region starts: just after GNSS subpacket, with small paddings
IMU_STARTS = [GNSS_OFF + GNSS_SIZE + d for d in (0, 2, 4, 6, 8, 10, 12, 16, 20, 24)]

def apply_scaling(df, scale_map):
    out = df.copy()
    for col, kind in (scale_map or {}).items():
        if col not in out: continue
        s = pd.to_numeric(out[col], errors="coerce")
        if kind == "g16":
            # assume 16384 counts = 1 g
            out[col] = s / 16384.0 * 9.80665
        elif kind == "dps16":
            # assume 16.4 cts/¬∞/s (MPU-6000 style); adjust if needed
            out[col] = s / 16.4
    return out

def accel_score(ax, ay, az):
    ax = pd.to_numeric(ax, errors="coerce")
    ay = pd.to_numeric(ay, errors="coerce")
    az = pd.to_numeric(az, errors="coerce")
    m = np.sqrt(ax**2 + ay**2 + az**2)
    m = m[np.isfinite(m)]
    if len(m) == 0: return 0.0
    med = float(np.nanmedian(m))
    # score peaks near 1 g
    return max(0.0, 1.0 - abs(med - 9.80665)/9.80665)

def gyro_score(gx, gy, gz):
    gx = pd.to_numeric(gx, errors="coerce"); gy = pd.to_numeric(gy, errors="coerce"); gz = pd.to_numeric(gz, errors="coerce")
    m = np.sqrt(gx**2 + gy**2 + gz**2); m = m[np.isfinite(m)]
    if len(m) == 0: return 0.0
    # prefer med < 500 dps, not all zeros
    med = float(np.nanmedian(m))
    zfrac = (m == 0).mean()
    return max(0.0, 1.0 - med/500.0) * (1.0 - zfrac)

def try_imu_at(start, fmt, headers, scale_map):
    size = struct.calcsize(fmt)
    rows = []
    for fr in frames:
        if start + size <= len(fr):
            try:
                rows.append(struct.unpack(fmt, fr[start:start+size]))
            except struct.error:
                rows.append(None)
        else:
            rows.append(None)
    rows = [r for r in rows if r is not None]
    if not rows: return None
    df = pd.DataFrame(rows, columns=headers)
    df = apply_scaling(df, scale_map)
    return df

# Sweep candidates and rank
results = []
for st in IMU_STARTS:
    for name, fmt, headers, scale_map in imu_candidates:
        df_imu = try_imu_at(st, fmt, headers, scale_map)
        if df_imu is None or len(df_imu) < 50:
            continue
        # compute scores
        a_sc = 0.0; g_sc = 0.0
        if set(["ax","ay","az"]).issubset(df_imu.columns):
            a_sc = accel_score(df_imu["ax"], df_imu["ay"], df_imu["az"])
        elif set(["ax_i16","ay_i16","az_i16"]).issubset(df_imu.columns):
            a_sc = accel_score(df_imu["ax_i16"], df_imu["ay_i16"], df_imu["az_i16"])
        if set(["gx","gy","gz"]).issubset(df_imu.columns):
            g_sc = gyro_score(df_imu["gx"], df_imu["gy"], df_imu["gz"])
        elif set(["gx_i16","gy_i16","gz_i16"]).issubset(df_imu.columns):
            g_sc = gyro_score(df_imu["gx_i16"], df_imu["gy_i16"], df_imu["gz_i16"])
        score = a_sc + g_sc
        results.append((score, st, name, fmt, headers, df_imu))

# Show top hits
results.sort(key=lambda x: x[0], reverse=True)
print("\nTop IMU candidates (score = accel‚âà1g + gyro<500dps):")
for score, st, name, fmt, headers, df_imu in results[:5]:
    print(f"  score={score:5.3f}  start=+{st:3d}  {name:<6}  fmt={fmt}  rows={len(df_imu)}")

# Display best
if results:
    best_score, best_start, best_name, best_fmt, best_headers, best_df = results[0]
    print(f"\n‚úÖ BEST IMU ‚Üí score={best_score:.3f}  start=+{best_start}  {best_name}  fmt={best_fmt}")
    display(best_df.head(25))

    # Quick plots if we have accel/gyro (scaled)
    cols_a = [c for c in ["ax","ay","az","ax_i16","ay_i16","az_i16"] if c in best_df.columns]
    cols_g = [c for c in ["gx","gy","gz","gx_i16","gy_i16","gz_i16"] if c in best_df.columns]

    if len(cols_a) >= 3:
        plt.figure(figsize=(6,3))
        for c in cols_a[:3]:
            plt.plot(pd.to_numeric(best_df[c], errors="coerce").values[:3000], alpha=0.8, label=c)
        plt.title("Accel (first 3000 samples)"); plt.legend(); plt.tight_layout(); plt.show()

    if len(cols_g) >= 3:
        plt.figure(figsize=(6,3))
        for c in cols_g[:3]:
            plt.plot(pd.to_numeric(best_df[c], errors="coerce").values[:3000], alpha=0.8, label=c)
        plt.title("Gyro (first 3000 samples)"); plt.legend(); plt.tight_layout(); plt.show()
else:
    print("‚ö†Ô∏è No plausible IMU layout found yet. We can extend the search window/variants.")


In [None]:
import re

# Decode all ASCII characters (skip binary masking)
ascii_text = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)

# Find all GNRMC sentences
matches = list(re.finditer(r'\$GNRMC,[^$]{20,100}', ascii_text))
print(f"Found {len(matches)} GNRMC sentences")

# Pick the first one for inspection
if matches:
    s = matches[0].group(0)
    print("\nüîπ Raw GNRMC sentence:\n", s)

    # Split into fields
    parts = s.split(',')
    print("\nüîπ Split fields:")
    for i, p in enumerate(parts):
        print(f"{i:02d}: {p}")


In [None]:
valid = []
for m in re.finditer(r'\$GNRMC,[^$]{20,100}', ascii_text):
    s = m.group(0)
    if ",A," in s:
        valid.append(s)

print(f"Found {len(valid)} valid (A) GNRMC sentences")

if valid:
    print("\nüîπ First valid fix:\n", valid[0])
    print("\nüîπ Split fields:")
    parts = valid[0].split(',')
    for i, p in enumerate(parts):
        print(f"{i:02d}: {p}")


In [None]:
file_path = "/content/drive/MyDrive/input/20250907-150827_Rtk.fmnav"

In [None]:

# =====================================================
# Step ‚Äî Auto-detect schema, scale and visualize RTK data
# =====================================================
import folium
import pandas as pd
import numpy as np

df_best = pd.read_csv("/content/x200go_gnss_decoded.csv")

# Ensure df_best is loaded from your decoding step
try:
    df_best
except NameError:
    raise ValueError("‚ùå 'df_best' not found. Run the binary decoding cell first.")

# -----------------------------------------------------
# 1Ô∏è‚É£ Auto-detect schema
# -----------------------------------------------------
col_count = df_best.shape[1]

# Define flexible name sets depending on decoded column count
schema_map = {
    8:  ["timestamp", "lat_i", "lon_i", "alt_i", "v1", "v2", "v3", "q"],
    9:  ["timestamp", "lat_i", "lon_i", "alt_i", "v1", "v2", "v3", "extra", "q"],
    10: ["timestamp", "lat_i", "lon_i", "alt_i", "v1", "v2", "v3", "extra1", "extra2", "q"],
    11: ["timestamp", "lat_i", "lon_i", "alt_i", "v1", "v2", "v3", "ax", "ay", "az", "q"]
}

if col_count not in schema_map:
    raise ValueError(f"Unexpected number of columns ({col_count}). Please check the decoding step.")

df_best.columns = schema_map[col_count]

print(f"‚úÖ Assigned column names for {col_count} columns: {df_best.columns.tolist()}")

# -----------------------------------------------------
# 2Ô∏è‚É£ Convert integer fields to real-world units
# -----------------------------------------------------
df_best["lat_deg"] = df_best["lat_i"] * 1e-7
df_best["lon_deg"] = df_best["lon_i"] * 1e-7
df_best["alt_m"]   = df_best["alt_i"] * 0.001  # if in millimeters

# -----------------------------------------------------
# 3Ô∏è‚É£ Clean and filter
# -----------------------------------------------------
df_best = df_best.replace([np.inf, -np.inf], np.nan).dropna(subset=["lat_deg", "lon_deg"])
df_best = df_best[df_best["lat_deg"].between(-90, 90)]
df_best = df_best[df_best["lon_deg"].between(-180, 180)]
df_best = df_best[df_best["lat_deg"].diff().abs() + df_best["lon_deg"].diff().abs() > 0]

print(f"‚úÖ {len(df_best)} valid records after filtering.")

# -----------------------------------------------------
# 4Ô∏è‚É£ Summary
# -----------------------------------------------------
if not df_best.empty:
    print(f"lat_deg: min={df_best['lat_deg'].min():.6f}, med={df_best['lat_deg'].median():.6f}, max={df_best['lat_deg'].max():.6f}")
    print(f"lon_deg: min={df_best['lon_deg'].min():.6f}, med={df_best['lon_deg'].median():.6f}, max={df_best['lon_deg'].max():.6f}")
    print(f"alt_m:   min={df_best['alt_m'].min():.3f}, med={df_best['alt_m'].median():.3f}, max={df_best['alt_m'].max():.3f}")

# -----------------------------------------------------
# 5Ô∏è‚É£ Visualize trajectory
# -----------------------------------------------------
if len(df_best) > 0:
    avg_lat = df_best["lat_deg"].mean()
    avg_lon = df_best["lon_deg"].mean()

    m = folium.Map(location=[avg_lat, avg_lon], zoom_start=15, tiles="OpenStreetMap")

    folium.PolyLine(
        df_best[["lat_deg", "lon_deg"]].values.tolist(),
        color="blue", weight=2, opacity=0.8
    ).add_to(m)

    folium.Marker(
        [df_best.iloc[0]["lat_deg"], df_best.iloc[0]["lon_deg"]],
        popup="Start", icon=folium.Icon(color="green")
    ).add_to(m)
    folium.Marker(
        [df_best.iloc[-1]["lat_deg"], df_best.iloc[-1]["lon_deg"]],
        popup="End", icon=folium.Icon(color="red")
    ).add_to(m)

    print("\nüó∫Ô∏è Interactive trajectory map:")
    display(m)
else:
    print("‚ö†Ô∏è No valid coordinates to visualize.")




In [None]:
# =====================================================
# Search header for possible base coordinates
# =====================================================
with open("/content/x200go_gnss_decoded.csv", "rb") as f:
    head = f.read(2048)

import re
ascii_text = ''.join(chr(b) if 32 <= b < 127 else '.' for b in head)

# Look for degree-like numeric patterns (e.g., 24.xxxxxx or 67.xxxxxx)
matches = re.findall(r"\d{2}\.\d{4,6}", ascii_text)
print("üîç Possible coordinate-like patterns in header:")
for m in matches:
    print("  ", m)


In [None]:
# =====================================================
# Extract and plot all coordinate-like values on Folium map
# =====================================================
import re
import folium
import numpy as np

file_path = "/content/x200go_gnss_decoded.csv"

# 1Ô∏è‚É£ Read binary and convert to ASCII-like string
with open(file_path, "rb") as f:
    data = f.read()

ascii_data = ''.join(chr(b) if 32 <= b < 127 else '.' for b in data)

# 2Ô∏è‚É£ Extract all numeric-like substrings that look like coordinates
matches = re.findall(r"\d{2}\.\d{3,6}", ascii_data)
values = [float(v) for v in matches]

# 3Ô∏è‚É£ Separate plausible latitude/longitude ranges
lat_vals = [v for v in values if 0 < v < 60]     # latitudes
lon_vals = [v for v in values if 60 <= v < 180]  # longitudes

print(f"üìÑ Extracted {len(values)} numeric-like values")
print(f"   ‚Üí {len(lat_vals)} possible latitudes")
print(f"   ‚Üí {len(lon_vals)} possible longitudes")

# 4Ô∏è‚É£ Pair roughly by index
pairs = list(zip(lat_vals[:len(lon_vals)], lon_vals[:len(lat_vals)]))

if not pairs:
    raise ValueError("‚ö†Ô∏è No coordinate-like pairs found!")

# 5Ô∏è‚É£ Compute map center
avg_lat = np.mean([p[0] for p in pairs])
avg_lon = np.mean([p[1] for p in pairs])

print(f"üó∫Ô∏è Centering map around: lat={avg_lat:.6f}, lon={avg_lon:.6f}")
print(f"   Showing {len(pairs)} coordinate candidates")

# 6Ô∏è‚É£ Create Folium map
m = folium.Map(location=[avg_lat, avg_lon], zoom_start=6, tiles="OpenStreetMap")

# Add all points
for lat, lon in pairs:
    folium.CircleMarker(
        location=[lat, lon],
        radius=3,
        color="blue",
        fill=True,
        fill_opacity=0.6
    ).add_to(m)

# Highlight approximate cluster center
folium.Marker(
    [avg_lat, avg_lon],
    popup=f"Cluster Center\n({avg_lat:.6f}, {avg_lon:.6f})",
    icon=folium.Icon(color="red", icon="info-sign")
).add_to(m)

display(m)
