In [1]:
! echo $SHELL
! source /opt/ros/noetic/setup.zsh

/usr/bin/zsh
/opt/ros/noetic/etc/catkin/profile.d/50-rosmon.zsh:66: command not found: compdef


In [2]:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
from sensor_msgs.msg import Image
import cv2
import numpy as np
import os
import re
import rosbag
import rospy
import time
import threading
import yaml

In [3]:
file_type_to_extension = {
    "mp4": ".mp4",
    "rosbag": ".bag",
    "directory": "",
    "YAML": ".yaml"
}

In [4]:
def load_config(cfg_path: str) -> dict:
    with open(cfg_path) as f:
        cfg: dict = yaml.safe_load(f)
    return cfg

def print_dict(d: dict, indent: int = 0):
    for key, value in d.items():
        if isinstance(value, dict):
            print('  ' * indent + str(key) + ": ")
            print_dict(value, indent+1)
        else:
            print('  ' * indent + str(key) + ": " + str(value))

In [5]:
class ImgDirOutput:
    def __init__(self, name: str, output: dict):
        self.name = name
        self.path = output['path']
        self.directory = output['directory']

        assert output['type'] == 'directory', f'{self.name}\'s output type {output["type"]} is inappropriate for ImgDirOutput Class.'

    def callback(self, image, time_stamp: int) -> None:
        output_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        cv2.imwrite(os.path.join(self.directory, str(time_stamp) + '.png'), output_image)

    def close(self) -> None:
        pass

In [6]:
class RosbagOutput:
    def __init__(self, name: str, output: dict):
        self.name = name
        self.path = output['path']
        self.directory = output['directory']
        self.topic = output['topic']
        self.frame_id = output.get('frame_id', None)

        assert output['type'] == 'rosbag', f'{self.name}\'s output type {output["type"]} is inappropriate for RosbagOutput Class.'

        self.init_rosbag()

    def init_rosbag(self) -> None:
        self.bag = rosbag.Bag(self.path, 'w')
        
        self.image_msg = Image()
        if self.frame_id is not None:
            self.image_msg.header.frame_id = self.frame_id
        self.image_msg.encoding = 'rgb8'
        self.image_msg.header.seq = -1

    def callback(self, image, time_stamp: int) -> None:
        self.image_msg.header.stamp = rospy.Time(secs=int(time_stamp // 1e9), nsecs=int(time_stamp % 1e9))
        self.image_msg.header.seq += 1
        self.image_msg.height = image.shape[0]
        self.image_msg.width = image.shape[1]
        self.image_msg.step = self.image_msg.width * 3
        self.image_msg.data = image.tobytes()

        self.bag.write(self.topic, self.image_msg, t=self.image_msg.header.stamp)

    def close(self) -> None:
        self.bag.close()

In [7]:
class MP4Input:
    def __init__(self, name: str, input: dict, outputs: dict):
        """initialize the MP4Input class

        Args:
            name (str): name of this pipeline in the yaml config, used in named sink of gstreamer pipeline
            input (dict): input config
            output (list): list of output class instances, this input class only uses their callback functions
        """
        Gst.init(None)
        self.name = name
        self.path = input['path']
        if not input['type'] == 'mp4': # verify type is correct
            raise ValueError(f'Input type {input["type"]} is inappropriate for MP4Input Class.')

        self.width = input.get('width', None)
        self.height = input.get('height', None)
        self.start = input.get('start', None)
        self.end = input.get('end', None)
        self.output_callbacks = [output.callback for output in outputs.values()]
        self.close_handlers = [output.close for output in outputs.values()]

        self.setup()

    def setup(self):
        self.parse_start_time()

        self.bytes_per_pixel = 3 # since it's in RGB format
        self.current_relative_time = 0

        # setup gstreamer pipeline
        self.gstreamer_pipeline_str = "filesrc location=" + self.path + " ! decodebin ! nvvideoconvert ! video/x-raw,format=RGB"
        if self.width:
            self.gstreamer_pipeline_str += ",width=" + str(self.width)
        if self.height:
            self.gstreamer_pipeline_str += ",height=" + str(self.height)
        self.gstreamer_pipeline_str += " ! progressreport update-freq=1 name=" + self.name + "_progress ! fakesink name=" + self.name
        print(self.name + " MP4Input's gstreamer pipeline string: " + self.gstreamer_pipeline_str)
        self.gstreamer_pipeline = Gst.parse_launch(self.gstreamer_pipeline_str)
        self.gstreamer_pipeline.get_by_name(self.name).get_static_pad('sink').add_probe(Gst.PadProbeType.BUFFER, self.callback)

    def parse_start_time(self):
        """parse the absolute start time in nanoseconds from the file name
        """
        match = re.search(r'(\d{10}_\d{9})(?=\.\w+$)', self.path)
        assert match is not None, f'{self.path} does not contain the proper timestamp sub-string.'
        time_str = match[1].split('_')
        self.absolute_start_time = int(time_str[0]) * int(1e9) + int(time_str[1])

    def callback(self, pad, info):
        image, self.current_relative_time = self.get_frame(pad, info)

        # check if the current frame is within the start and end time
        if (self.start and self.current_relative_time * 1e-9 < self.start) or (self.end and self.current_relative_time * 1e-9 > self.end):
            return Gst.PadProbeReturn.OK

        current_absolute_time = self.absolute_start_time + self.current_relative_time

        # calling callbacks for each output pipeline
        for callback in self.output_callbacks:
            callback(image, current_absolute_time)

        return Gst.PadProbeReturn.OK

    def get_frame(self, pad, info):
        buffer = info.get_buffer()
        caps = pad.get_current_caps()

        caps_structure = caps.get_structure(0)
        height, width = caps_structure.get_value('height'), caps_structure.get_value('width')

        is_mapped, map_info = buffer.map(Gst.MapFlags.READ)
        if is_mapped:
            try:
                image = np.ndarray((height, width, self.bytes_per_pixel), dtype='uint8', buffer=map_info.data).copy() # extend array lifetime beyond subsequent unmap
            finally:
                buffer.unmap(map_info)
        
        return image, buffer.pts # using pts instead of dts since it's set in the mp4 file recorded using https://bitbucket.org/castacks/image_sharing/src/c5ab07622155793ff756020d1a8117a7d5036325/src/tools/to_gstreamer.cpp#lines-156
    
    def loop(self):
        self.gstreamer_pipeline.set_state(Gst.State.PLAYING)

        try:
            while True:
                msg = self.gstreamer_pipeline.get_bus().timed_pop_filtered(Gst.SECOND * 5, Gst.MessageType.EOS | Gst.MessageType.ERROR) #! 5 seconds timeout

                if msg:
                    if msg.type == Gst.MessageType.EOS:
                        break
                    elif msg.type == Gst.MessageType.ERROR:
                        raise Exception('GStreamer stream Error occurred in pipeline: ' + self.name)

                # # Query position and duration to display progress
                # success, position = self.gstreamer_pipeline.query_position(Gst.Format.TIME)
                # success, duration = self.gstreamer_pipeline.query_duration(Gst.Format.TIME)

                # if success and duration != Gst.CLOCK_TIME_NONE and duration != 0:
                #     progress = (position / duration) * 100
                #     print(f"{self.name}\'s progress: {progress:.2f}%")

                #! delay for 5 seconds since the progress query is blocking
                time.sleep(5)
        finally:
            self.gstreamer_pipeline.set_state(Gst.State.NULL)

            # call close handlers for each output pipeline
            for close_handler in self.close_handlers:
                close_handler()
            print(f'Pipeline {self.name} finished.')


In [8]:
class Pipeline(threading.Thread):
    def __init__(self, name: str, pipeline: dict):
        super().__init__()
        self.name = name
        self.outputs = self.init_outputs(pipeline['output'])
        self.input = self.init_input(pipeline['input'])

    def init_outputs(self, outputs_dict: dict):
        outputs={}
        for name, output in outputs_dict.items(): # name is arbitrary and doesn't matter
            output_type = output['type']
            assert output_type in file_type_to_extension.keys(), f'pipeline {self.name} output\'s file type {output_type} is not supported'
            if output_type == 'directory':
                outputs[name] = ImgDirOutput(name, output)
            elif output_type == 'rosbag':
                outputs[name] = RosbagOutput(name, output)
        return outputs

    def init_input(self, input_dict: dict):
        assert input_dict['type'] in file_type_to_extension.keys(), f'pipeline {self.name} input\'s file type {input_dict["type"]} is not supported'
        if input_dict['type'] == 'mp4':
            return MP4Input(self.name, input_dict, self.outputs)

    def run(self):
        print(f"{self.name} pipeline started.")
        self.input.loop()

In [9]:
def verify_file(path: str, type: str) -> bool:
    # verify the file/directory exists
    assert os.path.exists(path), f'file {path} does not exist'

    # verify the file type is supported
    assert type in file_type_to_extension.keys(), f'file type {type} is not supported'

    # verify the file type matches the extension or the path is a directory
    if type == "directory":
        assert os.path.isdir(path), f'{path} is not a directory'
    else:
        assert os.path.splitext(path)[-1] == file_type_to_extension[type], f'file {path} is not a {type} file'

    return True


def trace_config(cfg: dict):
    pipelines = []
    # verify that the config file contains "pipelines"
    assert "pipelines" in cfg and cfg["pipelines"], "pipeline is not specified in config file"

    print('Loaded config:')
    print("pipelines: ", end="")
    print(cfg["pipelines"], end="\n\n")

    for pipeline_name in cfg["pipelines"]:
        # verify that the pipeline is specified in the config file
        assert pipeline_name in cfg, f'pipeline {pipeline_name} is not specified in config file'

        pipeline = cfg[pipeline_name]

        # make directories for output and verify type
        for index, output in pipeline["output"].items():
            # verify the file type is supported
            assert output["type"] in file_type_to_extension.keys(), f'pipeline {pipeline_name}\'s output {index}\'s file type {output["type"]} is not supported'

            # make parent directories if they don't exist
            output["directory"] = os.path.dirname(output["path"]) if (output["type"] == "directory") else os.path.dirname(output["path"])
            os.makedirs(output["directory"], exist_ok=True)

        # verify input
        assert verify_file(path = pipeline["input"]["path"], type = pipeline["input"]["type"]), f'pipeline {pipeline_name}\'s input file {pipeline["input"]["path"]} is not valid'

        print(pipeline_name + ":")
        print_dict(pipeline, indent=1)

        pipelines.append(Pipeline(pipeline_name, pipeline))

    return pipelines

In [10]:
cfg_path="./config/wildfire/subcanopy/mp4_to_imgdir_rosbag.yaml"

cfg = load_config(cfg_path)

In [11]:
pipelines = trace_config(cfg)

for pipeline in pipelines:
    pipeline.start()
for pipeline in pipelines:
    pipeline.join()

print("All pipelines finished successfully.")

Loaded config:
pipelines: ['camera_0', 'camera_1', 'camera_2', 'camera_3']

camera_0:
  input: 
    path: /root/data/wildfire/subcanopy/2023-04-27_FIRE-SGL-228/1/camera_0_1677761985_629121776.mp4
    type: mp4
    width: 612
    height: 514
  output: 
    2: 
      path: /root/data/wildfire/subcanopy/2023-04-27_FIRE-SGL-228/1/camera_0.bag
      type: rosbag
      topic: /camera_0
      frame_id: camera_0/optical_frame
      directory: /root/data/wildfire/subcanopy/2023-04-27_FIRE-SGL-228/1
camera_0 MP4Input's gstreamer pipeline string: filesrc location=/root/data/wildfire/subcanopy/2023-04-27_FIRE-SGL-228/1/camera_0_1677761985_629121776.mp4 ! decodebin ! nvvideoconvert ! video/x-raw,format=RGB,width=612,height=514 ! progressreport update-freq=1 name=camera_0_progress ! fakesink name=camera_0
camera_1:
  input: 
    path: /root/data/wildfire/subcanopy/2023-04-27_FIRE-SGL-228/1/camera_1_1677761985_962133776.mp4
    type: mp4
    width: 612
    height: 514
  output: 
    2: 
      path: /