-
Notifications
You must be signed in to change notification settings - Fork 0
pylibstardos
PyLibSTARDOS is the Python library used for creating STARDOS nodes, and also contains certain utilities available for use in all STARDOS nodes.
It exposes three base classes: StardosNode, ProducerNode, and PipelineNode. These classes provide common structure for all STARDOS nodes of their types, as described later in this document.
Many features provided by PyLibSTARDOS were built with multithreading in mind. Certain properties of these base classes are protected by Python Lock objects. In order to ensure thread-safety, these locks should be acquired. Always use the Python context manager when acquiring these locks in order to prevent deadlocks should some exception prevent the release call.
Instead of doing this:
self.some_lock.acquire()
do_a_thing()
self.some_lock.release()Do this:
with self.some_lock():
do_a_thing()Additionally, these locks should only be acquired for the minimum amount of time necessary. Calling some function which releases the Global Interpreter Lock is heavily discouraged.
The StardosNode class should only be used directly by capture group nodes. It provides very minimal functionality beyond the rclpy base Node class.
This feature makes use of synchronization primitives. Variables listed in this section should not be accessed without acquiring a lock as described in this section.
The StardosNode class adds heartbeat publishing functionality to the main thread of the program. It publishes the self.heartbeat_message NodeHeartbeat object once per second. For thread safety with pipeline nodes, it has a self.state_mutex lock that should be acquired whenever accessing or modifying the heartbeat. This is not strictly necessary in producer or base nodes, but is still good practice, especially as these nodes may have multithreading introduced at a later time.
The StardosNode class parses the node’s config from stdin, and stores the resulting dictionary in self.args. It also extracts the node-specific config object and stores it (again as a dictionary) in self.config. These should never be modified, and as such can be accessed without regard for thread safety.
The ProducerNode class should be used by nodes which control sensors and other such data producers. It provides handling for progressive shutdown commands as well as a framework for data collection and publshing.
Nodes of this type should override the following methods:
capture(self, data: Control) -> tuple[str[], dict]
The capture() method should return a tuple consisting of a list of data created by this capture, and a dictionary of metadata for the capture.
The capture() method will be invoked each time the capture group requests data. The ProducerNode has a stub method that logs a warning if called, it should be overridden. Return a tuple containing the datapoints created by this capture, and a dictionary containing metadata for the capture. PyLibSTARDOS will then publish it.
Note that capture is called from the worker thread.
The PipelineNode class should be used by nodes which take input data from another node and do something with it. It provides handling for progressive shutdown commands as well as a framework for data processing. It utilizes a separate worker thread to allow requests to be queued while previous requests are being processed.
Nodes of this type should override the following methods:
process(self, data: SensorData) -> str[] | tuple[str[], dict]
Pipeline nodes call their process() methods with each packet of data they receive. This method should return either a list of data to send to the next node, or a tuple containing said list and a new metadata dictionary to override the previous one. The index and collected_at fields of the SensorData object are preserved and forwarded to the next node.
This feature makes use of synchronization primitives. Variables listed in this section should not be accessed without acquiring a lock as described in this section.
The PipelineNode maintains a Python deque called work_queue for its processing queue. If the process() function takes longer than the interval between processing requests, the requests will pile up in the work_queue. Access to this queue is regulated by the work_queue_mutex lock. Additionally, each time the worker thread goes to process an item from this queue, it attempts to acquire the work_queue_sem semaphore. Each time it receives data from the previous node, the PipelineNode posts to this semaphore.
Note that the process() method is invoked in the worker thread.
Progressive shutdown is not fully implemented as of the time of writing. This conceptual feature would necessitate a redesign of existing nodes to include a new function, cleanup(self), the logic for which, for now, is implemented as a signal handler. A short discussion regarding this feature follows.
Producer nodes are the first nodes in a pipeline to shut down, and will shut down immediately upon receiving the shutdown command from the control node. The ProducerNode class has a method cleanup that should be overridden by subclasses to perform any final cleanup tasks such as shutting down a sensor. Failing to override this method will result in a warning being printed to console, but the node will still shut down properly.
Just before calling the cleanup method, the ProducerNode sends a shutdown message to the next node in the pipeline, beginning the shutdown process for the data pipeline.
Note that cleanup is called from the worker thread.
Pipeline nodes will receive a shutdown command after the previous node in its pipeline has completed processing all of its data. When a pipeline node receives a shutdown command, it stores it and continues to process data. Once it has exhausted its data processing queue, the node forwards the shutdown command onto the next node, invokes its own cleanup() method, then exits.
Note that the cleanup() method is invoked in the worker thread.