In [1]:
from cv_bridge import CvBridge
import cv2
import rosbag
import bagpy as bg
from tqdm import tqdm
import numpy as np

import math
from ultralytics import YOLO

import gc
import time
import bagpy

import matplotlib.pyplot as plt

Failed to load Python extension for LZ4 support. LZ4 compression will not be available.


# pip3 install --force-reinstall --extra-index-url https://rospypi.github.io/simple/ rospy rosbag


# écriture dans un nouveau fichier bag

In [2]:
# ROS_FILE = "../../2022-11-17_pietons/Pietons2_1_1.bag"

In [3]:
# bag = rosbag.Bag(ROS_FILE, "w")
# bag

In [4]:
# bag_data = bg.bagreader("../../2022-11-17_pietons/Pietons2.bag")
# bridge = CvBridge()
# images = bag_data.reader.read_messages("/camera/color/image_raw")

In [5]:
# while True:
#     try:
#         bag.write("/camera/color/image_raw", next(images).message)
#     except:
#         break

In [6]:
# bag.close()

In [7]:
# bag_data = bg.bagreader(ROS_FILE)
# images = bag_data.reader.read_messages("/camera/color/image_raw")

In [8]:
# next(images)

# écriture dans un fichier bag

In [2]:
def fill_list(box_list: list, frame_rate:int=1, box_difference:int=5):
    """
    Modifies a list of lists (`box_list`) by filling in gaps and ensuring continuity between frames
    based on spatial proximity of bounding boxes.

    This function iterates through each list of boxes (representing frames) and checks for continuity 
    of each box between consecutive frames. If a box in a previous frame does not have a close match 
    in the current frame but has one in subsequent frames (within a specified `frame_rate`), a mean 
    box is created to bridge the gap.

    It will also add a box at before the first occurance of a box, for a more smooth blurring

    Parameters
    ----------
    box_list : list of list of list of float
        A list where each element is a list representing a frame of bounding boxes, and each bounding 
        box is a list of floats representing its coordinates.
    frame_rate : int, optional
        The number of frames to look ahead for matching a box from the previous frame, default is 1.
    box_difference : int, optional
        The allowed difference between the coordinates of boxes for them to be considered close, 
        default is 5.

    Returns
    -------
    list of list of list of float
        The modified `box_list` with added boxes to ensure continuity between frames.

    Examples
    --------
    >>> boxes = [
            [
                [10, 10, 20, 20],
                [20, 20, 40, 40]
            ],
            [
                [100, 100, 150, 150]
            ],
            [
                [20, 20, 40, 40]
            ],
            [
                [15, 15, 25, 25]
            ]
        ]
    >>> fill_list(boxes, frame_rate=3)
    [
        [
            [10, 10, 20, 20], 
            [20, 20, 40, 40], 
            [100, 100, 150, 150]        # was added 
        ],
        [
            [100, 100, 150, 150], 
            [12.5, 12.5, 22.5, 22.5],   # was added
            [20.0, 20.0, 40.0, 40.0]    # was added
        ],
        [
            [20, 20, 40, 40], 
            [13.75, 13.75, 23.75, 23.75]    # was added
        ],
        [
            [15, 15, 25, 25]
        ]
    ]
    """
    for i in range(1, len(box_list)-1):
        # on regarde les boxes de la frame précédente
        for last_boxes in box_list[i-1]:
            correspondance_now = False
            for present_boxe in box_list[i]:
                # on trouve une frame ressemblante dans la frame actuelle
                if np.isclose(last_boxes, present_boxe, atol=box_difference).all():
                    correspondance_now = True
            # si on trouve, alors on s'arrête là (pas besoin de créer de boxe)
            if correspondance_now:
                continue
            
            # on regarde si les frames d'après ressemble à une box de la frame précédente
            correspondance_after = False
            for j in range(frame_rate):
                if len(box_list) > i+j+1:
                    for next_boxe in box_list[i+j+1]:
                        # on trouve une frame ressemblante
                        if np.isclose(last_boxes, next_boxe, atol=box_difference).all():
                            correspondance_after = True
                            break
                    if correspondance_after:
                        break
            
            # si on trouve une frame ressemblante, alors on créer une approximation entre la
            # boxe de la frame précédente et suivante
            if correspondance_after:
                box_list[i].append(np.mean([last_boxes, next_boxe], axis=0).tolist())

        # on rajoute des boxes aux frames précédentes
        for present_boxe in box_list[i]:
            correspondance = False
            for last_boxe in box_list[i-1]:
                if np.isclose(last_boxe, present_boxe, atol=box_difference).all():
                    correspondance = True
            if not correspondance:
                box_list[i-1].append(present_boxe) 

    return box_list

In [3]:
def blur_box(frame, box, min_conf=0.3):
    x1, y1, x2, y2 = box.xyxy[0]
    x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
    h, w = y2-y1, x2-x1

    ROI = frame[y1:y1+h, x1:x1+w]
    blur = cv2.GaussianBlur(ROI, (51,51), 0) 
    frame[y1:y1+h, x1:x1+w] = blur
    # frame[y1:y1+h, x1:x1+w] = np.zeros((h, w, 3))
    return frame

In [4]:
def blur_box1(frame: np.ndarray, 
             box: list, 
             black_box: bool=False):
    """
    Apply a blur or black box to a specified region of an image if the confidence level is above a threshold.

    Parameters
    ----------
    frame : numpy.ndarray
        The image on which the blur or black box is to be applied. It should be a 3D array representing an RGB image.
    box : object
        An object containing the bounding box coordinates and confidence score. It should have attributes `conf` and `xyxy`:
        - `box.conf` : list or numpy.ndarray
            The confidence score(s) of the bounding box, with values between 0 and 1.
        - `box.xyxy` : list or numpy.ndarray
            The coordinates of the bounding box in the format [x1, y1, x2, y2].
    black_box : bool, optional
        If True, the specified region is filled with a black box instead of being blurred. Default is False.
    min_conf : float, optional
        The minimum confidence threshold to apply the blur or black box. Default is 0.3. Must in [0, 1].

    Returns
    -------
    numpy.ndarray
        The modified image with the blur or black box applied to the specified region.

    Notes
    -----
    - The input `frame` must be a 3D NumPy array representing an image with shape (height, width, channels).
    - The bounding box coordinates and confidence score must be provided in the `box` object (already implemented in the Yolov8 results).
    - If `black_box` is set to True, the region within the bounding box will be replaced with black pixels (faster than blurring).
    - The Gaussian blur applied uses a kernel size of (51, 51) with a standard deviation of 0.
    """
    x1, y1, x2, y2 = box
    x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
    h, w = y2-y1, x2-x1

    if black_box:
        blur = np.zeros((h, w, 3))
    else:
        ROI = frame[y1:y1+h, x1:x1+w]
        blur = cv2.GaussianBlur(ROI, (51,51), 0) 
    frame[y1:y1+h, x1:x1+w] = blur
    return frame

In [5]:
model = YOLO("models/yolov8n-face.pt")

In [6]:
min_conf = 0.2
bridge = CvBridge()
frame_rate = 5

In [7]:
%%script false --no-raise-error

# version 1
with rosbag.Bag('output.bag', 'w') as outbag:
    for topic, msg, t in tqdm(rosbag.Bag("../../2022-11-17_pietons/Pietons2.bag").read_messages()):
        if topic == "/camera/color/image_raw":
            img = bridge.imgmsg_to_cv2(msg, desired_encoding="rgb8").copy()
            boxes = next(model(img, stream=True, verbose=False)).boxes
            for box in boxes:
                img = blur_box(img, box)
            image_message = bridge.cv2_to_imgmsg(img, encoding="passthrough")
            outbag.write(topic, image_message, msg.header.stamp)
        else:   
            outbag.write(topic, msg, msg.header.stamp if msg._has_header else t)

In [8]:
%%script false --no-raise-error

# version 2
with rosbag.Bag('output.bag', 'w') as outbag:
    last_imgs = []
    begin = True
    save_imgs = False
    for topic, msg, t in tqdm(rosbag.Bag("../../2022-11-17_pietons/Pietons2.bag").read_messages()):
        if topic == "/camera/color/image_raw":
            img = bridge.imgmsg_to_cv2(msg, desired_encoding="rgb8").copy()
            last_imgs.append(img)

            if begin and len(last_imgs) == math.ceil(frame_rate/2):
                begin = False
                save_imgs = True
                idx_img = 0
            elif len(last_imgs) == frame_rate:
                save_imgs = True
                idx_img = math.ceil(frame_rate/2)-1

            if save_imgs:
                save_imgs = False
                boxes = next(model(last_imgs[idx_img], stream=True, verbose=False)).boxes
                to_blur = len(boxes) > 0
                for img in last_imgs:
                    if to_blur:
                        boxes = next(model(img, stream=True, verbose=False)).boxes
                        
                        for box in boxes:
                            if math.ceil((box.conf[0]*100))/100 > min_conf:
                                img = blur_box(img, box)
                    image_message = bridge.cv2_to_imgmsg(img, encoding="passthrough")
                    outbag.write(topic, image_message, msg.header.stamp)
                last_imgs = []

        else:   
            outbag.write(topic, msg, msg.header.stamp if msg._has_header else t)

In [9]:
# %%script false --no-raise-error
min_conf = None

# version 3
with rosbag.Bag('output.bag', 'w') as outbag:
    last_imgs = []
    boxes_list = []
    begin = True
    save_imgs = False
    for topic, msg, t in tqdm(rosbag.Bag("../../2022-11-17_pietons/Pietons2.bag").read_messages()):
        if topic == "/camera/color/image_raw":
            img = bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8").copy()
            last_imgs.append(img)

            if begin and len(last_imgs) == math.ceil(frame_rate/2):
                begin = False
                save_imgs = True
                idx_img = 0
            elif len(last_imgs) == frame_rate:
                save_imgs = True
                idx_img = math.ceil(frame_rate/2)-1

            if save_imgs:
                save_imgs = False
                boxes = next(model(last_imgs[idx_img], stream=True, verbose=False)).boxes
                to_blur = len(boxes) > 0
                for img in last_imgs:
                    boxes_list.append([])
                    if to_blur:
                        boxes = next(model(img, stream=True, verbose=False)).boxes
                        if len(boxes.conf) > 0:
                            if isinstance(min_conf, type(None)):
                                min_conf = boxes.conf.mean() * 0.7
                            else:
                                min_conf = (min_conf + boxes.conf.mean())/2 * 0.7
                        for box in boxes:
                            if box.conf[0] > min_conf:
                                boxes_list[-1].append(box.xyxy[0].tolist())
                last_imgs = []
        else:   
            outbag.write(topic, msg, msg.header.stamp if msg._has_header else t)
    # on vérifie les dernières frames
    boxes = next(model(last_imgs[-1], stream=True, verbose=False)).boxes
    to_blur = len(boxes) > 0
    for img in last_imgs:
        boxes_list.append([])
        if to_blur:
            boxes = next(model(img, stream=True, verbose=False)).boxes
            
            for box in boxes:
                boxes_list[-1].append(box.xyxy[0].tolist())
    
    boxes_list = fill_list(boxes_list, frame_rate*2, math.ceil(math.sqrt(max(msg.width, msg.height)))*2)
    idx_boxes = 0
    for topic, msg, t in tqdm(rosbag.Bag("../../2022-11-17_pietons/Pietons2.bag").read_messages()):
        if topic == "/camera/color/image_raw":
            img = bridge.imgmsg_to_cv2(msg).copy()
            for box in boxes_list[idx_boxes]:
                img = blur_box1(img, box)
            image_message = bridge.cv2_to_imgmsg(img, encoding="rgb8")
            outbag.write(topic, image_message, msg.header.stamp)
            idx_boxes += 1


174963it [00:32, 5333.71it/s]
174963it [00:03, 54941.29it/s]


In [10]:
bag_data = bg.bagreader("output.bag")
images = bag_data.reader.read_messages("/camera/color/image_raw")
fps = bag_data.topic_table[bag_data.topic_table["Topics"] == "/camera/color/image_raw"]["Frequency"].item()

[INFO]  Data folder output already exists. Not creating.


In [11]:
topic = "/camera/color/image_raw"
# bag_data = bg.bagreader("../../2022-11-17_pietons/Pietons2.bag")
bag_data = bg.bagreader("output.bag")

bridge = CvBridge()
images = bag_data.reader.read_messages(topic)

fps = bag_data.topic_table[bag_data.topic_table["Topics"] == "/camera/color/image_raw"]["Frequency"].item()

frame = next(images).message

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video = cv2.VideoWriter(filename="supp.mp4", 
                    fourcc=fourcc, 
                    fps=fps, 
                    frameSize=(frame.width, frame.height))
print((frame.width, frame.height))
while True:
    try:
        video.write(bridge.imgmsg_to_cv2(frame, desired_encoding="bgr8"))
        frame = next(images).message
    except StopIteration:
        break

video.release()


[INFO]  Data folder output already exists. Not creating.
(640, 480)


In [19]:
import os
os.getcwd()

'/home/franzele/Desktop/univ_lille/m1s2/pji/pgm/test'