# Tube Arrangement Using a Mesh-Based Sorting Approach in Video Synopsis
DOI: 10.1109/LSP.2025.3631432

https://ieeexplore.ieee.org/document/11242038

## Display system status

In [None]:
!nvidia-smi
!free -h
!lscpu

## Install dependencies

In [None]:
!pip install --cache-dir=pip_cache tensorflow
!pip install --cache-dir=pip_cache ultralytics
!pip install --cache-dir=pip_cache supervision
!pip install --cache-dir=pip_cache python-opencv
!pip install --cache-dir=pip_cache scikit-image
!pip install --cache-dir=pip_cache trimesh
!pip install --cache-dir=pip_cache matplotlib
!pip install --cache-dir=pip_cache moviepy
!pip install --cache-dir=pip_cache numpy

## Import dependencies

In [None]:
from tqdm import tqdm
from collections import defaultdict
import random
import tensorflow as tf
import numpy as np
from gc import collect as garbage_collect
from skimage.measure import marching_cubes
import trimesh
from ultralytics import YOLO
from moviepy import ImageSequenceClip
import cv2
import matplotlib.pyplot as plt

## Load input video details

In [None]:
from os import getcwd
HOME = getcwd()
INPUT_VIDEO_PATH = f"{HOME}/TownCentreXVID-mini-nowatermark.mp4"
#INPUT_VIDEO_PATH = f"{HOME}/inputs/30s.mp4"
OUTPUT_VIDEO_PATH = f"{HOME}/output.mp4"

import supervision as sv
video_info = sv.VideoInfo.from_video_path(INPUT_VIDEO_PATH)
print(video_info)

## YOLO11 segmentation and tracking

### Calculate the width and height of output tensor for YOLO11 model

In [None]:
max_stride = 32
video_info.height = ((video_info.height + max_stride - 1) // max_stride) * max_stride
video_info.width = ((video_info.width + max_stride - 1) // max_stride) * max_stride

### Run YOLO11 model

In [None]:
model = YOLO("yolo11n-seg.pt")
model.fuse()
seg_track_results = model.track(INPUT_VIDEO_PATH, conf=0.5, iou=0.5, imgsz=(video_info.height, video_info.width), show=False, stream=True, verbose=False, classes=[0,24,26,28])

### Combine YOLO11 outputs to a single array

In [None]:
segmented_frames_arr = list()
object_ids = set()
object_tube_box = defaultdict(lambda: {"frame":{"start":np.inf, "end":0},"x":{"start":np.inf, "end":0},"y":{"start":np.inf, "end":0}})
frame_num = 0

timestamp_dict = defaultdict(lambda: list())
for res_frame in tqdm(seg_track_results, desc = "Object Segmentation and Tracking"):
    if res_frame.boxes.id == None:
        segmented_frames_arr.append(tf.zeros([video_info.height, video_info.width]))
        frame_num += 1
        continue
    frame = res_frame.masks.data[0]*res_frame.boxes.id[0]
    for i in range(1, len(res_frame.boxes.id)):
        oid = int(res_frame.boxes.id[i])
        if oid not in object_ids:
            object_ids.add(oid)
            object_tube_box[oid]['frame']['start'] = frame_num
        object_tube_box[oid]['frame']['end'] = frame_num
        object_tube_box[oid]['x']['start'] = min(object_tube_box[oid]['x']['start'], int(res_frame.boxes.xyxy[i][0]))
        object_tube_box[oid]['x']['end'] = max(object_tube_box[oid]['x']['end'], int(res_frame.boxes.xyxy[i][2]))
        object_tube_box[oid]['y']['start'] = min(object_tube_box[oid]['y']['start'], int(res_frame.boxes.xyxy[i][1]))
        object_tube_box[oid]['y']['end'] = max(object_tube_box[oid]['y']['end'], int(res_frame.boxes.xyxy[i][3]))
        frame = frame + res_frame.masks.data[i]*res_frame.boxes.id[i]
        timestamp_dict[oid].append((frame_num + 1, (res_frame.boxes.xyxy[i][0] + res_frame.boxes.xyxy[i][2]) // 2, int(res_frame.boxes.xyxy[i][1])))
    segmented_frames_arr.append(frame.cpu())
    frame_num += 1
del model, seg_track_results
segmented_frames_arr = tf.stack(segmented_frames_arr, axis = 0)
segmented_frames_arr = tf.cast(segmented_frames_arr, tf.int32)
segmented_frames_arr = segmented_frames_arr.cpu().numpy()
tf.keras.backend.clear_session()
garbage_collect()

## Interaction Detection

In [None]:
def compute_centroids(object_tensor):
    centroids = {}
    for f in range(object_tensor.shape[0]):
        centroids[f] = {}
        ids = np.unique(object_tensor[f])
        for obj_id in ids:
            if obj_id == 0:
                continue
            y, x = np.where(object_tensor[f] == obj_id)
            if len(x) > 0:
                cx, cy = np.mean(x), np.mean(y)
                centroids[f][obj_id] = (cx, cy)
    return centroids

def detect_interactions_from_tensor(object_tensor, fps, height, proximity_thresh=60):
    centroids = compute_centroids(object_tensor)
    interaction_frames = defaultdict(lambda: defaultdict(list))
    num_frames = object_tensor.shape[0]
    for f in range(num_frames):
        frame_data = centroids[f]
        ids = list(frame_data.keys())
        for i in range(len(ids)):
            for j in range(i + 1, len(ids)):
                id1, id2 = ids[i], ids[j]
                c1, c2 = frame_data[id1], frame_data[id2]
                dist = np.linalg.norm(np.array(c1) - np.array(c2))
                ht = height/20                                           #need to take the y-length of possible objects
                if dist < proximity_thresh:
                    interaction_frames[min(id1, id2)][max(id1, id2)].append(f)
    interactions = []
    min_duration = int(2 * fps)
    for id1 in interaction_frames:
        for id2 in interaction_frames[id1]:
            frames = interaction_frames[id1][id2]
            if not frames:
                continue
            grouped = []
            temp = [frames[0]]
            for idx in range(1, len(frames)):
                if frames[idx] == frames[idx - 1] + 1:
                    temp.append(frames[idx])
                else:
                    if len(temp) >= min_duration:
                        grouped.append(temp)
                    temp = [frames[idx]]
            if len(temp) >= min_duration:
                grouped.append(temp)
            for grp in grouped:
                start_time = grp[0]
                end_time = grp[-1]
                interactions.append({
                    "object_id": id1,
                    "interacting_with": id2,
                    "start_time": round(start_time, 2),
                    "end_time": round(end_time, 2)
                })
    return interactions
interactions0 = detect_interactions_from_tensor(segmented_frames_arr, video_info.fps, video_info.height)
interactions = [(i["object_id"], i["interacting_with"]) for i in interactions0]

## Define Object Tubes

### Grouping of objects according to detected interactions

In [None]:
def find_groups(pairs):
    from collections import defaultdict
    graph = defaultdict(set)
    for a, b in pairs:
        graph[a].add(b)
        graph[b].add(a)
    visited = set()
    groups = []
    def dfs(node, group):
        visited.add(node)
        group.append(node)
        for neighbor in graph[node]:
            if neighbor not in visited:
                dfs(neighbor, group)
    for number in graph:
        if number not in visited:
            group = []
            dfs(number, group)
            groups.append(tuple(group))
    return groups
output_groups = find_groups(interactions)

for g in tqdm(output_groups):
    grpid=min(g)
    for i in g:
        if i == grpid:
            continue
        segmented_frames_arr[segmented_frames_arr == i] = grpid
        object_tube_box[grpid]['frame']['start'] = min(object_tube_box[grpid]['frame']['start'], object_tube_box[i]['frame']['start'])
        object_tube_box[grpid]['frame']['end'] = max(object_tube_box[grpid]['frame']['end'], object_tube_box[i]['frame']['end'])
        object_tube_box[grpid]['x']['start'] = min(object_tube_box[grpid]['x']['start'], object_tube_box[i]['x']['start'])
        object_tube_box[grpid]['x']['end'] = max(object_tube_box[grpid]['x']['end'], object_tube_box[i]['x']['end'])
        object_tube_box[grpid]['y']['start'] = min(object_tube_box[grpid]['y']['start'], object_tube_box[i]['y']['start'])
        object_tube_box[grpid]['y']['end'] = max(object_tube_box[grpid]['y']['end'], object_tube_box[i]['y']['end'])
        if i in timestamp_dict:
            timestamp_dict[grpid] += timestamp_dict[i]
            del timestamp_dict[i]
        if i in object_tube_box: del object_tube_box[i]
        if i in object_ids: object_ids.remove(i)

### Define object tubes

In [None]:
object_tubes = defaultdict(lambda: dict())
object_ids_blacklist = list()
itera = tqdm(object_ids, desc = "Object Tube Initialization")
for oid in itera:  # oid: object id
    impure_voxel = segmented_frames_arr[
        object_tube_box[oid]['frame']['start']:object_tube_box[oid]['frame']['end']+1,
        object_tube_box[oid]['y']['start']:object_tube_box[oid]['y']['end']+1,
        object_tube_box[oid]['x']['start']:object_tube_box[oid]['x']['end']+1
    ]
    if min(impure_voxel.shape)<2:
        object_ids_blacklist.append(oid)
        continue
    object_tubes[oid]['voxel'] = (impure_voxel == oid).astype(np.int8)
    object_tubes[oid]['start_frame'] = object_tube_box[oid]['frame']['start']
    object_tubes[oid]['start_coord'] = (object_tube_box[oid]['y']['start'], object_tube_box[oid]['x']['start'])
    object_tubes[oid]['2d_dim'] = impure_voxel.shape[1:3]
    object_tubes[oid]['length'] = impure_voxel.shape[0]

    vox = object_tubes[oid]['voxel']#.copy()
    vox_filled = set([i for i in range(object_tubes[oid]['length']) if vox[i].sum()!=0])
    vox_gpu = tf.convert_to_tensor(vox, dtype=tf.float32)
    itera.set_postfix_str(f"{object_tubes[oid]['length']}")
    if object_tubes[oid]['length']<=video_info.fps*15:
        for i in range(object_tubes[oid]['length']):
            if i in vox_filled: continue
            mul_arr=[(1/abs(i-j) if i!=j else 1) for j in range(object_tubes[oid]['length'])] 
            mul_arr = tf.convert_to_tensor(mul_arr)
            weighted_vox = tf.multiply(vox_gpu, tf.reshape(mul_arr, (object_tubes[oid]['length'], 1, 1)))
            layer = weighted_vox.cpu().numpy()
            del weighted_vox, mul_arr
            layer = np.sum(layer, axis=0)
            vox[i]=(layer >= layer[layer != 0].mean()).astype(int)
    else:
        pass
        
    tf.keras.backend.clear_session()
    garbage_collect()
    padded_voxel = np.pad(vox, pad_width=1, mode='constant', constant_values=0)
    verts, faces = None, None
    try:
        verts, faces, _, _ = marching_cubes(padded_voxel, level=0.5)
    except:
        object_ids_blacklist.append(oid)
        continue
    mesh = trimesh.Trimesh(vertices=verts, faces=faces)
    mesh.vertices -= (1, 1, 1)
    fc = ((object_tubes[oid]['length']//video_info.fps)+1)*16+40
    object_tubes[oid]['mesh'] = mesh.simplify_quadric_decimation(face_count=fc) if len(mesh.faces)>fc else mesh
    object_tubes[oid]['mesh'].apply_translation((-1*min([i[0] for i in object_tubes[oid]['mesh'].vertices]), object_tubes[oid]['start_coord'][0], object_tubes[oid]['start_coord'][1]))
    object_tubes[oid]['mesh'].faces = object_tubes[oid]['mesh'].faces[:, ::-1]

for oid in object_ids_blacklist: object_ids.remove(oid)
for oid in object_ids_blacklist: 
    if oid in object_tubes:
        del object_tubes[oid]

## Simulated Annealing for Object Tube Sorting

In [None]:
class BinaryTree:
    def __init__(self):
        self.nodes = dict()
        self.id_count = 1
        self.leafs = set()
        self.nodes[0] = Node(self, 0, None, 0)
        self.leafs.add(0)
        self.max_frame = None
        self.max_frame_oid = None
        
    def addNode(self):
        pnode = min(self.leafs, key=lambda x: self.nodes[x].level)
        for _ in range(2):
            id = self.id_count
            self.id_count += 1        
            self.nodes[id] = Node(self, id, pnode, self.nodes[pnode].level+1)
            self.nodes[pnode].cnodes.append(id)
            self.leafs.add(id)
        self.leafs.remove(pnode)

class Node:
    def __init__(self, tree: BinaryTree, id: int, pnode: int, level: int):
        self.tree = tree
        self.id = id
        self.pnode = pnode
        self.level = level
        self.cnodes = list()
        self.mesh = None

    def initMesh(self):
        if self.id not in self.tree.leafs:
            for c in self.cnodes:
                self.tree.nodes[c].initMesh()
            else:
                self.mesh = trimesh.boolean.union([self.tree.nodes[i].mesh for i in self.cnodes], check_volume=False)

    def _reconstructMesh_(self):
        self.mesh = trimesh.boolean.union([self.tree.nodes[i].mesh for i in self.cnodes], check_volume=False)
        if self.pnode!=None:
            self.tree.nodes[self.pnode]._reconstructMesh_()

    def moveMesh(self, shift):
        self.mesh.apply_translation((shift, 0, 0))
        self.tree.nodes[self.pnode]._reconstructMesh_()

In [None]:
state = BinaryTree()
for _ in range(len(object_tubes)-1):
    state.addNode()


In [None]:
oid_in_state = dict()
for oid, nid in tqdm(zip(object_tubes, state.leafs), desc="Initializing workspace variable (Part 1/2)"):
    oid_in_state[oid]=nid
    state.nodes[nid].mesh = object_tubes[oid]['mesh'].copy()
    state.nodes[nid].mesh.apply_translation((object_tubes[oid]['start_frame'], 0, 0))

In [None]:
state.nodes[0].initMesh()

In [None]:
TOTAL_VOLUME = state.nodes[0].mesh.volume

In [None]:
import copy
import time

state.max_frame = float('-inf')
state.max_frame_oid = None
for oid in tqdm(object_tubes, desc = "Finding current duration"):
    for i in state.nodes[oid_in_state[oid]].mesh.vertices:
        if i[0] > state.max_frame:
            state.max_frame = i[0]
            state.max_frame_oid = oid

def fn_fitness(state):
    volume = state.nodes[0].mesh.volume
    return state.max_frame, volume, 0.85*(video_info.total_frames/state.max_frame)+0.15*(volume/TOTAL_VOLUME)

best_duration, best_union_volume, best_fitness = fn_fitness(state)
current_duration, current_union_volume, current_fitness = best_duration, best_union_volume, best_fitness
best_state = copy.deepcopy(state)
temperature = 5000
oid = random.choice(list(object_tubes.keys()))
print("Iter\tCR\tUoS\tTimestamp")
for i in range(2010):
    shift = random.randint(int(-1*min([i[0] for i in state.nodes[oid_in_state[oid]].mesh.vertices])), int(state.max_frame - max([i[0] for i in state.nodes[oid_in_state[oid]].mesh.vertices])))
    state.nodes[oid_in_state[oid]].moveMesh(shift)    
    end_frame = max([i[0] for i in state.nodes[oid_in_state[oid]].mesh.vertices])
    if end_frame > state.max_frame:
        state.max_frame = end_frame
        state.max_frame_oid = oid
    elif oid == state.max_frame_oid:
        state.max_frame = float('-inf')
        state.max_frame_oid = None
        for oid2 in object_tubes:
            for j in state.nodes[oid_in_state[oid2]].mesh.vertices:
                if j[0] > state.max_frame:
                    state.max_frame = j[0]
                    state.max_frame_oid = oid2
    duration, union_volume, fitness = fn_fitness(state)
    delta_fitness = fitness - current_fitness 
    if delta_fitness > 0:
        current_duration = duration
        current_union_volume = union_volume
        current_fitness = fitness
    else:
        acceptance_probability = np.exp(delta_fitness / temperature)
        if random.random() < acceptance_probability:
            current_duration = duration
            current_union_volume = union_volume
            current_fitness = fitness
        else:
            state.nodes[oid_in_state[oid]].moveMesh(-1*shift)
    if current_fitness > best_fitness:
        del best_state
        best_state = copy.deepcopy(state)
        best_fitness = current_fitness
        best_duration = current_duration
        best_union_volume = current_union_volume
    cooling_rate = 0.85
    temperature *= cooling_rate
    if i%10==0:
        print(f"{i:04}\t{(video_info.total_frames/best_duration):1.16f}\t{(best_union_volume/TOTAL_VOLUME):1.16f}\t{time.time()}")
    oid = random.choice([state.max_frame_oid, random.choice(list(object_tubes.keys()))])

In [None]:
start_frames = dict()
for oid in object_tubes:
    start_frames[oid] = min([i[0] for i in best_state.nodes[oid_in_state[oid]].mesh.vertices])

## Generate Synopsis Video

In [None]:
def load_and_pad_video(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError("Could not open video file.")
    frames = []
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        height, width, channels = frame.shape
        pad_height = (32 - height % 32) % 32
        pad_width = (32 - width % 32) % 32
        top_pad = pad_height // 2
        bottom_pad = pad_height - top_pad
        left_pad = pad_width // 2
        right_pad = pad_width - left_pad
        padded_frame = np.pad(
            frame, 
            ((top_pad, bottom_pad), (left_pad, right_pad), (0, 0)),
            mode='constant', 
            constant_values=0
        )
        frames.append(padded_frame)
        frame_count += 1
    video_array = np.array(frames)
    cap.release()
    return video_array

def extract_background(input_video_array, segmented_frames_arr):
    f, h, w = segmented_frames_arr.shape
    mask = tf.cast(segmented_frames_arr == 0, tf.float32)
    sum_pixels = np.zeros((h, w, 3))
    count_valid_frames = np.zeros((h, w))
    for frame_idx in range(f):
        valid_pixels = mask[frame_idx].numpy()
        sum_pixels += input_video_array[frame_idx] * valid_pixels[:, :, np.newaxis]    
        count_valid_frames += valid_pixels
    epsilon = 1e-8
    average_pixels = sum_pixels / (count_valid_frames[:, :, np.newaxis] + epsilon)
    average_pixels_normalized = np.clip(average_pixels, 0, 255).astype(np.uint8)
    return average_pixels_normalized

In [None]:
synopsis_video_array = np.zeros((int(best_duration)+3*video_info.fps, segmented_frames_arr.shape[1], segmented_frames_arr.shape[2], 3), dtype=np.uint8)
input_video_array = load_and_pad_video(INPUT_VIDEO_PATH)

background_frame = extract_background(input_video_array, segmented_frames_arr)
synopsis_video_array[:] = background_frame

In [None]:
for oid in tqdm(start_frames, desc = "Placing objects on synopsis video"):
    obj_tube_sub_array = object_tubes[oid]['voxel']    
    start_coord = object_tubes[oid]['start_coord']
    
    object_tube_mask = (obj_tube_sub_array != 0).astype(np.uint8)
    object_tube_mask = np.repeat(object_tube_mask[:, :, :, np.newaxis], 3, axis=3)
    
    object_tube_mask_neg = (obj_tube_sub_array == 0).astype(np.uint8)    
    object_tube_mask_neg = np.repeat(object_tube_mask_neg[:, :, :, np.newaxis], 3, axis=3)
    
    corner1 = (int(start_frames[oid]), start_coord[0], start_coord[1])
    corner2 = tuple(corner1[i] + object_tube_mask_neg.shape[i] for i in range(3))
    f1 = object_tubes[oid]['start_frame']
    f2 = f1 + object_tubes[oid]['length']
    synopsis_video_array[corner1[0]:corner2[0], corner1[1]:corner2[1], corner1[2]:corner2[2]] *= object_tube_mask_neg
    synopsis_video_array[corner1[0]:corner2[0], corner1[1]:corner2[1], corner1[2]:corner2[2]] += object_tube_mask * input_video_array[f1:f2, corner1[1]:corner2[1], corner1[2]:corner2[2]]


In [None]:
def add_text_to_frame(video_array, frame_number, text, desired_width, coordinates):
    frame = video_array[frame_number]
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 0.5
    thickness = 1
    (text_width, text_height), baseline = cv2.getTextSize(text, font, font_scale, thickness)
    x, y = coordinates
    y += text_height
    font_color = (255, 255, 255)
    cv2.putText(frame, text, (int(x), y), font, font_scale, font_color, thickness)
for oid in tqdm(start_frames, desc="Adding timestamp"):
    change = int(start_frames[oid]) - object_tubes[oid]['start_frame']
    for ts in timestamp_dict[oid]:
        add_text_to_frame(synopsis_video_array, ts[0]+change, f"{(ts[0] // video_info.fps) // 60:02}:{(ts[0] // video_info.fps) % 60:02}", 2, (ts[1], ts[2]))

In [None]:
clip = ImageSequenceClip(list(synopsis_video_array[..., ::-1]), fps=video_info.fps)
clip.write_videofile('output.mp4')

## Display system status

In [None]:
!nvidia-smi
!free -h
!lscpu