In [1]:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import subprocess
import numpy as np

In [None]:
class live_model_run(Node):
    def __init__(self):
        super().__init__('live_model_run')
        # subscribe to the ros camera topic
        self.subscription = self.create_subscription(
            Image,
            '/zed/zed_node/left/image_rect_color',  # Path to the live webcam topic
            self.image_callback,
            10
        )
        self.bridge = CvBridge()
        # run monoloco as a subprocess with the webcam option
        self.model_process = subprocess.Popen(
            ["python3", "-m", "monoloco.run", "predict", "--webcam"],
            stdin = subprocess.PIPE , stdout = subprocess.PIPE , stderr = subprocess.PIPE , text = True
        )
        # video writer to save output video
        output_video_path = "/Users/youssefelsaady/Desktop/Practical work/Output_webcam_run/live_run.mp4"  # Change to your desired path
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")  
        self.video_writer = cv2.VideoWriter(output_video_path , fourcc,  20.0 , (1280 , 720))  
        self.get_logger().info("Live Model Processor Node Initialized")
    
    def image_callback(self, msg):
        # converting ros_images message to opencv
        cv_image = self.bridge.imgmsg_to_cv2(msg , desired_encoding = "bgr8")
        # changing opencv image to bytes and write it to model to process
        _, buffer = cv2.imencode(".jpg", cv_image)
        self.model_process.stdin.write(buffer.tobytes())
        self.model_process.stdin.flush()
        self.get_logger().info("Sent frame to model")
        
        # reading model output 
        output_data = self.model_process.stdout.read()  
        if output_data:
            # converting output to opencv 
            np_arr = np.frombuffer(output_data , np.uint8)
            output_image = cv2.imdecode(np_arr , cv2.IMREAD_COLOR)
            if output_image is not None:
                # showing and saving processed video in live
                cv2.imshow("Processed Output", output_image)
                cv2.waitKey(1)
                self.video_writer.write(output_image)
                self.get_logger().info("Frame written to video file")

    def destroy_node(self):
        # cleaning up subprocess before shutting down
        self.model_process.terminate()
        self.video_writer.release()
        cv2.destroyAllWindows()
        super().destroy_node()

In [None]:
def main(args=None):
    rclpy.init(args = args)
    node = LiveModelProcessor()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

In [None]:
if __name__ == '__main__':
    main()