# Bag Synchronizer
Reads the input bag in mcap format and generates a new bag with synchronized timestamps for the different messages

In [None]:
%load_ext autoreload
%autoreload 2
    
import sys
sys.path.append('..')
import ha_python_utils.constants as const
from  ha_python_utils.bag_reader import BagReader
from rclpy.serialization import serialize_message
import numpy as np
from pprint import pprint
import matplotlib.pyplot as plt
from pathlib import Path
import rosbag2_py
from event_camera_py import Decoder
import copy
import time
import collections
import cv2
import pdb
import std_msgs.msg

In [None]:
# Input synchronized bag
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.19.Pennov.Flight.and.Calib/cam.calib.1/synchronized/synchronized_0.mcap")
SYNCED_BAG = Path("/data/ha_ec_data/2024.08.19.Pennov.Flight.and.Calib/cam.calib.2/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.19.Pennov.Flight.and.Calib/flight.day.1/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.19.Pennov.Flight.and.Calib/flight.day.2/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.19.Pennov.Flight.and.Calib/imu.calib.1/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.19.Pennov.Flight.and.Calib/imu.calib.2/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/cam.calib.1/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/cam.calib.2/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/flight.day.1/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/flight.day.2/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/flight.day.3/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/flight.day.4/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/flight.night.1/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/flight.night.2/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/flight.night.3/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/imu.calib.1/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/imu.calib.2/synchronized/synchronized_0.mcap")
# SYNCED_BAG = Path("/data/ha_ec_data/2024.08.25.Pennov.Flight.and.Calib/imu.calib.3/synchronized/synchronized_0.mcap")

In [None]:
# Validate synchronized bag topic number
bag_reader = BagReader(SYNCED_BAG, const.ALL_OUTPUT_TOPICS)
bag_reader.print_stats(all_topics=True)

num_imu_msgs = bag_reader.get_num_msgs(const.OUTPUT_VNAV_IMU_TOPIC)
num_flir_msgs = bag_reader.get_num_msgs(const.OUTPUT_FLIR_IMAGE_TOPIC)
num_gps_msgs = bag_reader.get_num_msgs(const.OUTPUT_GPS_TOPIC)

assert num_imu_msgs > 0 and num_flir_msgs > 0 and num_gps_msgs > 0
assert np.abs(num_imu_msgs/400 - num_flir_msgs/50) < (1/50 + 1/400)
assert np.abs(num_flir_msgs/50 - num_gps_msgs/5) < (1/50 + 1/5)

In [None]:
# Iterate over the bag to get timestamps
   
bag_reader = BagReader(SYNCED_BAG, const.ALL_OUTPUT_TOPICS)

msg_info = {i: {"first_msg": None, 
                "first_bag_ts": None, 
                "last_msg": None, 
                "last_bag_ts": None,
                "cnt": 0,
                "bag_timestamps": np.zeros(bag_reader.get_num_msgs(i), dtype=np.int64)} for i in \
            [const.OUTPUT_GPS_TOPIC, const.OUTPUT_VNAV_IMU_TOPIC, const.OUTPUT_EC_EVENTS_TOPIC, const.OUTPUT_FLIR_IMAGE_TOPIC, const.OUTPUT_EC_TRIGGER_TOPIC, const.OUTPUT_RANGE_TOPIC]}

for topic, msg, timestamp in bag_reader.read_all():
    if topic not in list(msg_info.keys()):
        continue
    if msg_info[topic]["first_msg"] is None:
        msg_info[topic]["first_msg"] = msg
    msg_info[topic]["last_msg"] = msg
    msg_info[topic]["bag_timestamps"][msg_info[topic]["cnt"]] = timestamp
    msg_info[topic]["cnt"] += 1

for topic in msg_info:
    msg_info[topic]["first_bag_ts"] = msg_info[topic]["bag_timestamps"][0]
    msg_info[topic]["last_bag_ts"] = msg_info[topic]["bag_timestamps"][-1]

In [None]:
# Check that the events are synchronized with the decoder
decoder = Decoder()
bag_reader = BagReader(SYNCED_BAG, [const.OUTPUT_EC_EVENTS_TOPIC, const.OUTPUT_EC_TRIGGER_TOPIC])
first_trigger_ts = None
for topic, msg, timestamp in bag_reader.read_all():
    if topic == const.OUTPUT_EC_EVENTS_TOPIC:
        # print(timestamp)
        msg_ts = msg.header.stamp
        decoder.decode(msg)
        # events = decoder.get_cd_events()
        trigger = decoder.get_ext_trig_events()
        if len(trigger) > 0:
            if first_trigger_ts is None:
                first_trigger_ts = msg_ts
                first_trigger_ts_bag = timestamp
            else:
                break
            break
    elif topic == const.OUTPUT_EC_TRIGGER_TOPIC:
        trigger_msg_ts = msg.stamp
        trigger_msg_ts_bag = timestamp
        
# Check that the trigger topic and the trigger in the event stream agree
assert msg_ts == trigger_msg_ts
# Check that the first message is a trigger message
assert first_trigger_ts == msg_ts
# check that the timestamp and the event stream agree for the event topic
assert first_trigger_ts.sec*1000000+first_trigger_ts.nanosec//1000 == trigger[0][1]
# check that the timestamp and the event stream agree for the trigger topic
assert trigger_msg_ts.sec*1000000+trigger_msg_ts.nanosec//1000 == trigger[0][1]
# Check that the bag timestamp is correct for the event topic
assert first_trigger_ts.sec*1000000000+first_trigger_ts.nanosec == first_trigger_ts_bag
# Check that the bag timestamp is correct for the trigger topic
assert trigger_msg_ts.sec*1000000000+trigger_msg_ts.nanosec == trigger_msg_ts_bag

In [None]:
def freq_calculate(bag_timestamps, target_freq):
    ''' Estimate frequency statistics given an array of timestamps '''
    period = np.diff((bag_timestamps))
    target_period = 1/target_freq*1000000000
    freq = 1/period*1000000000
    # Get the number of lost triggers by looking at the samples that have a period larger than 1 target period
    plt.figure()
    plt.plot(period)
    plt.title(f"Target freq: {target_freq}")
    lost_triggers = period >= 1.8*target_period
    return np.min(freq), np.mean(freq), np.std(freq), np.max(freq), np.count_nonzero(lost_triggers)

def get_ts_in_ns(header):
    ''' Convert a header msg into nanoseconds '''
    assert isinstance(header, std_msgs.msg.Header)
    return header.stamp.sec*1000000000 + header.stamp.nanosec

# Check the first timestamp
try:
    # Check that events and regular camera have the same timestamps
    assert msg_info[const.OUTPUT_EC_EVENTS_TOPIC]["first_bag_ts"] == msg_info[const.OUTPUT_FLIR_IMAGE_TOPIC]["first_bag_ts"], "first_bag_ts for FLIR and EC are not the same"
    assert msg_info[const.OUTPUT_EC_TRIGGER_TOPIC]["first_bag_ts"] == msg_info[const.OUTPUT_EC_EVENTS_TOPIC]["first_bag_ts"], "first_bag_ts for EC trigger and topic not the same"

    first_message_diff_gps = msg_info[const.OUTPUT_GPS_TOPIC]["first_bag_ts"] - msg_info[const.OUTPUT_EC_EVENTS_TOPIC]["first_bag_ts"]
    assert first_message_diff_gps > 0 and first_message_diff_gps < const.GPS_T_NS, f"first_msg_diff_gps is greater than 0.2 seconds: {first_message_diff_gps/1e9}"
    
    first_message_diff_imu = msg_info[const.OUTPUT_VNAV_IMU_TOPIC]["first_bag_ts"] - msg_info[const.OUTPUT_EC_EVENTS_TOPIC]["first_bag_ts"]
    assert first_message_diff_imu > 0 and first_message_diff_imu < const.VNAV_T_NS, f"first_message_diff_imu is greater than 2.5 ms: {first_message_diff_imu/1e6}"

    first_message_diff_range = msg_info[const.OUTPUT_RANGE_TOPIC]["first_bag_ts"] - msg_info[const.OUTPUT_EC_EVENTS_TOPIC]["first_bag_ts"]
    # Be a bit more lenient for the range measurements as the frequency may fluctuate
    assert first_message_diff_range > 0 and first_message_diff_range < (const.RANGE_T_NS + 0.002*1e9), f"first_message_diff_range is greater than 16 ms: {first_message_diff_range/1e6}"
    
    # Check that the timestamp in the message agrees with the ROS timestamp
    assert get_ts_in_ns(msg_info[const.OUTPUT_FLIR_IMAGE_TOPIC]["first_msg"].header) == msg_info[const.OUTPUT_FLIR_IMAGE_TOPIC]["first_bag_ts"]
    assert get_ts_in_ns(msg_info[const.OUTPUT_VNAV_IMU_TOPIC]["first_msg"].header) == msg_info[const.OUTPUT_VNAV_IMU_TOPIC]["first_bag_ts"]
    assert get_ts_in_ns(msg_info[const.OUTPUT_GPS_TOPIC]["first_msg"].header) == msg_info[const.OUTPUT_GPS_TOPIC]["first_bag_ts"]
    assert get_ts_in_ns(msg_info[const.OUTPUT_RANGE_TOPIC]["first_msg"].header) == msg_info[const.OUTPUT_RANGE_TOPIC]["first_bag_ts"]
    assert get_ts_in_ns(msg_info[const.OUTPUT_EC_TRIGGER_TOPIC]["first_msg"]) == msg_info[const.OUTPUT_EC_TRIGGER_TOPIC]["first_bag_ts"]
    
    # Check that the timestamp in the message agrees with the ROS timestamp
    assert get_ts_in_ns(msg_info[const.OUTPUT_FLIR_IMAGE_TOPIC]["last_msg"].header) == msg_info[const.OUTPUT_FLIR_IMAGE_TOPIC]["last_bag_ts"]
    assert get_ts_in_ns(msg_info[const.OUTPUT_VNAV_IMU_TOPIC]["last_msg"].header) == msg_info[const.OUTPUT_VNAV_IMU_TOPIC]["last_bag_ts"]
    assert get_ts_in_ns(msg_info[const.OUTPUT_GPS_TOPIC]["last_msg"].header) == msg_info[const.OUTPUT_GPS_TOPIC]["last_bag_ts"]
    assert get_ts_in_ns(msg_info[const.OUTPUT_RANGE_TOPIC]["last_msg"].header) == msg_info[const.OUTPUT_RANGE_TOPIC]["last_bag_ts"]
    assert get_ts_in_ns(msg_info[const.OUTPUT_EC_TRIGGER_TOPIC]["last_msg"]) == msg_info[const.OUTPUT_EC_TRIGGER_TOPIC]["last_bag_ts"]
    
    # Check that the last message is within 0.5 s of the bag duration (all msgs are there)
    for i in msg_info:
        time_diff = np.abs(msg_info[i]["last_bag_ts"] -msg_info[i]["first_bag_ts"] - bag_reader.get_duration())/1e9
        assert time_diff < 1, f"{i} difference between first and last message is greater than 1 second: {time_diff}" 

    #  Check rates for all sensors and number of lost triggers
    topic_rates = {const.OUTPUT_FLIR_IMAGE_TOPIC: const.FLIR_FREQ,
                   const.OUTPUT_VNAV_IMU_TOPIC: const.VNAV_FREQ,
                   const.OUTPUT_GPS_TOPIC: const.GPS_FREQ,
                   const.OUTPUT_RANGE_TOPIC: const.RANGE_FREQ,
                   const.OUTPUT_EC_TRIGGER_TOPIC: const.EC_TRIGGER_FREQ,}
    any_errors = False
    for topic in topic_rates:
        target_freq = topic_rates[topic]
        fmin, fmean, fstd, fmax, lost_triggers = freq_calculate(msg_info[topic]["bag_timestamps"], target_freq)
        # print(fmin, fmean, fstd, fmax, lost_triggers)
        if lost_triggers:
            print(f"{topic} - {lost_triggers} lost triggers detected")


        if topic == const.OUTPUT_RANGE_TOPIC:
            if not np.isclose(target_freq, fmean, rtol=1e-2, atol=1e-4):
                print(f"Error checking avg frequency for {topic}. Target frequency: {target_freq}. Average frequency: {fmean}")
                any_errors = True
            if not lost_triggers < 5:
                print(f"More than 5 triggers lost for {topic}")
                any_errors = True
        elif topic == const.OUTPUT_EC_TRIGGER_TOPIC:
            if not np.isclose(target_freq, fmean, rtol=1e-3, atol=1e-6):
                print(f"Error checking avg frequency for {topic}. Target frequency: {target_freq}. Average frequency: {fmean}")
                any_errors = True
            if not lost_triggers < 5:
                print(f"More than 5 triggers lost for {topic}")
                any_errors = True
        else:
            if not np.isclose(fmean, target_freq):
                print(f"Error checking avg frequency for {topic}. Target frequency: {target_freq}. Average frequency: {fmean}")
                any_errors = True
            if not lost_triggers == 0:
                print(f"More than 0 triggers lost for {topic}")
                any_errors = True
    assert any_errors == False, "Errors were detected"
    print("ALL CHECKS WERE SUCCESSFUL. YAY!")

except AssertionError as msg:
    print("Error encountered when verifying bag. Consider deleting the bag.")
    print(msg)
    # delete_synced_bag()