In [1]:
import pandas as pd
from groundingdino.util.inference import Model
from typing import List
import os
import supervision as sv
import cv2
import warnings
from tqdm import tqdm
from ultralytics import YOLO, SAM
import torch
import numpy as np
import matplotlib.pyplot as plt
from sam2.sam2_image_predictor import SAM2ImagePredictor
from sam2.build_sam import build_sam2, build_sam2_video_predictor
import rerun as rr
#from track_utils import sample_points_from_masks
#from video_utils import create_video_from_images
import json
import random
from uuid import uuid4
import pandas as pd
import logging

In [2]:
!pip install supervision==0.22.0



In [3]:
sv.__version__

'0.22.0'

In [4]:
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
for logger in loggers:
    if "transformers" in logger.name.lower():
        logger.setLevel(logging.ERROR)

In [5]:
#video_folder = [f"{i}" for i in range(1, 14)]
#video_paths = [os.path.join(f'/mnt/data/Datasets/Innsbruk/test/output_video_part_{f}.mp4') for f in video_folder]
#video_paths

In [None]:
#get PGD video
import torch
import cv2
import numpy as np
from tqdm import tqdm

# 定义 PGD 攻击函数
def pgd_attack(images, eps=0.3, alpha=0.01, iters=40):
    images = images.clone().detach().requires_grad_(True)
    delta = torch.zeros_like(images).uniform_(-eps, eps).to(images.device)
    delta.requires_grad = True
    
    for _ in range(iters):
        outputs = images + delta  # 仅对图像进行扰动
        loss = torch.mean(outputs)  # 使用简单的损失值，不涉及具体任务
        loss.backward()
        
        grad = delta.grad.detach()
        delta.data = delta + alpha * grad.sign()
        delta.data = torch.clamp(delta, -eps, eps)
        delta.grad.zero_()
    
    return (images + delta).detach()

# 视频处理流程
SOURCE_VIDEO_PATH = "/home/lnt/PycharmProjects/sam/data/videocut.mp4"
output_path = "/home/lnt/PycharmProjects/sam/data/attack.mp4"

cap = cv2.VideoCapture(SOURCE_VIDEO_PATH)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# 逐帧处理视频
for frame_number in tqdm(range(total_frames)):
    ret, frame = cap.read()
    if not ret:
        break

    # 将帧转换为RGB格式并调整尺寸以符合模型要求
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame_rgb = cv2.resize(frame_rgb, (640, 640))  # 调整为640x640尺寸

    # 将图像转换为Tensor格式
    frame_tensor = torch.from_numpy(frame_rgb).permute(2, 0, 1).unsqueeze(0).float() / 255.0

    # 应用 PGD 攻击
    frame_pgd_tensor = pgd_attack(frame_tensor)
    frame_pgd = frame_pgd_tensor.squeeze().permute(1, 2, 0).numpy() * 255.0
    frame_pgd = frame_pgd.astype(np.uint8)

    # 将结果写入输出视频
    out.write(cv2.cvtColor(frame_pgd, cv2.COLOR_RGB2BGR))

# 关闭视频文件
out.release()
cap.release()


In [6]:
class GLAMModel:
    def __init__(self, grounding_dino_config_path, grounding_dino_checkpoint_path, sam_model_cfg, sam_checkpoint_path, prompt=None):
        if prompt is None:
            # prompt = ['pathways', 'trails', 'walkways', 'sidewalks', 'tracks', 'footpaths', 'routes', 'pedestrian paths', 'walking paths', 'lanes']
            prompt = ['pavement', 'fence', 'cyclepath', 'trees', 'grasses', 'sidewalk', 'buildings', 'skies', 'streetlights']
            '''
            prompt = ["street lamp", "street light", "lamppost", "road light",
                      "trash can", "dustbin", "garbage bin", "waste container",
                      "path", "pathway", "trail", "sidewalk", "footpath", # , "path edge", "trail border"
                      "sculpture", "statue", "art installation", "monument", # , "public art"
                      "pavement", "road", "lane", # ,"restroom", "toilet", "bathroom", "lavatory", "public restroom", "public toilet"
                      "signpost", "tree", "bush", "fence", # "bench", , "street furniture", "bicycle rack"
                      "swing", "slide", # ,"playground",  "merry-go-round", "sandbox"
                      "grass", "lawn", "green area", "garden", "park", "meadow",
                      "pond", "lake", "fountain", "stream", "river" # , "water feature"
                      ]
            '''
        self.prompt = prompt
        self.grounding_dino_model = Model(model_config_path=grounding_dino_config_path, model_checkpoint_path=grounding_dino_checkpoint_path)
        self.sam = build_sam2(sam_model_cfg, sam_checkpoint_path, device="cuda")
        self.sam_predictor = SAM2ImagePredictor(self.sam)
        self.yolo = YOLO('yolov8x-seg.pt')
        with open('class_descriptions.json', 'r', encoding='utf-8') as file:
            self.class_names = json.load(file)

        self.class_names += [{'id': 80+i, 'color': self.generate_random_color(), 'name': p} for i, p in enumerate(self.prompt)]
        self.class_dict = {item['id']: item['name'] for item in self.class_names}
        # self.dino_classes = 'pathways . trails . walkways'
        self.dino_classes = self.enhance_class_name(self.prompt)
        # self.dino_classes = str.join(' . ', self.prompt) + ' .'
        self.dino_box_threshold = 0.35
        self.dino_text_threshold = 0.25
        self.class_descriptions = [rr.AnnotationInfo(id=cat["id"], color=cat["color"], label=cat["name"]) for cat in self.class_names]
        self.yolo_classes = [0, 1, 2, 3, 5, 7, 9, 11, 30]  # [0, 1, 2, 3, 5, 7, 9, 10, 11, 13, 14, 15, 16, 56, 60, 67]
        self.persist = []
        self.video_outs = dict()
        
    @staticmethod
    def enhance_class_name(class_names: List[str]) -> List[str]:
        return [f"{class_name}" for class_name in class_names]

    @staticmethod
    def generate_random_color():
        r = random.randint(0, 255)
        g = random.randint(0, 255)
        b = random.randint(0, 255)
        return r, g, b
    
    def add_dino_class(self, _phrase):
        _class_id = max(self.class_dict.keys()) + 1
        self.class_names.append({'id': _class_id, 'color': self.generate_random_color(), 'name': _phrase})
        self.class_dict = {item['id']: item['name'] for item in self.class_names}
        self.class_descriptions = [rr.AnnotationInfo(id=cat["id"], color=cat["color"], label=cat["name"]) for cat in self.class_names]
        return _class_id
    
    def dino_id_to_class_name(self, dino_id):
        return self.class_dict[dino_id]
    
    def phrases2classes(self, phrases: List[str]) -> (np.ndarray, bool):
        class_ids = []
        ret = False
        for phrase in phrases:
            if phrase in self.class_dict.values():
                for k, v in self.class_dict.items():
                    if v == phrase:
                        class_ids.append(k)
            else:
                _class_id = self.add_dino_class(phrase)
                class_ids.append(_class_id)
                ret = True
        return np.array(class_ids), ret

In [7]:
GROUNDING_DINO_CONFIG_PATH = os.path.join('../', "GroundingDINO/groundingdino/config/GroundingDINO_SwinT_OGC.py")
GROUNDING_DINO_CHECKPOINT_PATH = os.path.join('../', "weights", "groundingdino_swint_ogc.pth")
SAM_CHECKPOINT_PATH = os.path.join("/home/lnt/PycharmProjects/sam/weights/sam2_hiera_large.pt")
SAM_MODEL_CFG = "sam2_hiera_l.yaml"

In [14]:
import cv2
import torch
import numpy as np
from tqdm import tqdm
import pandas as pd

# 计算 IoU 和 Dice 指数
def calculate_iou(pred, target):
    intersection = np.logical_and(pred, target).sum()
    union = np.logical_or(pred, target).sum()
    return intersection / union if union != 0 else 1.0

def calculate_dice(pred, target):
    intersection = np.logical_and(pred, target).sum()
    return 2 * intersection / (pred.sum() + target.sum()) if (pred.sum() + target.sum()) != 0 else 1.0

# 初始化GLAM模型
glam_model = GLAMModel(
    grounding_dino_config_path=GROUNDING_DINO_CONFIG_PATH, 
    grounding_dino_checkpoint_path=GROUNDING_DINO_CHECKPOINT_PATH, 
    sam_model_cfg=SAM_MODEL_CFG, 
    sam_checkpoint_path=SAM_CHECKPOINT_PATH
)

# 视频路径
video1_path = "/home/lnt/PycharmProjects/sam/data/videocut.mp4"
video2_path = "/home/lnt/PycharmProjects/sam/data/attack.mp4"

# 打开两个视频
cap1 = cv2.VideoCapture(video1_path)
cap2 = cv2.VideoCapture(video2_path)

# 获取视频属性
fps = int(cap1.get(cv2.CAP_PROP_FPS))
width = int(cap1.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap1.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')

# 输出视频路径
output_video_path = "//home/lnt/PycharmProjects/sam/data/comparison.mp4"
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width * 2, height))

# 初始化结果存储
results = {
    "Frame": [],
    "IoU": [],
    "Dice": []
}

# 获取视频总帧数
total_frames = min(int(cap1.get(cv2.CAP_PROP_FRAME_COUNT)), int(cap2.get(cv2.CAP_PROP_FRAME_COUNT)))

# 逐帧处理两个视频
for frame_number in tqdm(range(total_frames)):
    ret1, frame1 = cap1.read()
    ret2, frame2 = cap2.read()
    
    if not ret1 or not ret2:
        break
    
    # 调整帧尺寸以匹配
    if frame1.shape != frame2.shape:
        frame2 = cv2.resize(frame2, (frame1.shape[1], frame1.shape[0]))
        
    # 对两个帧分别进行分割
    def segment_frame(frame):
        yolo_results = glam_model.yolo.track(frame, verbose=False, conf=0.6, persist=True, retina_masks=True, classes=glam_model.yolo_classes)[0]
        dino_results, phrases = glam_model.grounding_dino_model.predict_with_caption(
            image=frame,
            caption=str.join(' . ', glam_model.prompt),
            box_threshold=glam_model.dino_box_threshold,
            text_threshold=glam_model.dino_text_threshold
        )
        dino_results.class_id, ret = glam_model.phrases2classes(phrases)
        
        glam_model.sam_predictor.set_image(frame)
        
        # 初始化 annotated_frame
        annotated_frame = frame.copy()
        
        for _box, _cls_id, _confidence, _phrase in zip(dino_results.xyxy, dino_results.class_id, dino_results.confidence, phrases):
            _masks, _scores, _logits = glam_model.sam_predictor.predict(
                box=_box,
                multimask_output=True
            )
            _index = np.argmax(_scores)
            _mask = _masks[_index]
            _mask = _mask.astype('bool')
            mask_result = sv.Detections(np.array([_box]), np.array([_mask]), np.array([_confidence]), np.array([_cls_id]))

            mask_annotator = sv.MaskAnnotator()
            box_annotator = sv.BoxAnnotator()
            label_annotator = sv.LabelAnnotator(text_position=sv.Position.CENTER)
            annotated_frame = mask_annotator.annotate(scene=frame, detections=mask_result)
            annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=mask_result)
            annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=mask_result, labels=[_phrase])
        return annotated_frame

    segmented_frame1 = segment_frame(frame1)
    segmented_frame2 = segment_frame(frame2)
    
    # 计算 IoU 和 Dice 系数
    iou = calculate_iou(segmented_frame1, segmented_frame2)
    dice = calculate_dice(segmented_frame1, segmented_frame2)
    
    # 保存计算结果
    results["Frame"].append(frame_number)
    results["IoU"].append(iou)
    results["Dice"].append(dice)
    
    # 拼接两个分割后的帧用于展示
    comparison_frame = np.hstack((segmented_frame1, segmented_frame2))
    
    # 写入对比视频
    out.write(comparison_frame)

# 释放视频资源
cap1.release()
cap2.release()
out.release()

# 将结果存储为DataFrame并保存为CSV
df_results = pd.DataFrame(results)
df_results.to_csv("comparison_results.csv", index=False)

# 打印结果概览
print(df_results.describe())


final text_encoder_type: bert-base-uncased


100%|███████████████████████████████████████| 1817/1817 [21:53<00:00,  1.38it/s]

             Frame          IoU         Dice
count  1817.000000  1817.000000  1817.000000
mean    908.000000     0.997218     0.009373
std     524.667037     0.003040     0.001334
min       0.000000     0.982170     0.006852
25%     454.000000     0.996004     0.008518
50%     908.000000     0.998290     0.009064
75%    1362.000000     0.999357     0.009925
max    1816.000000     0.999992     0.017289



