In [27]:
import os
import pandas as pd
import numpy as np

# 设置当前工作目录
os.chdir(r'E:\ML\Pedestrian_trackingMOT') 

In [28]:
import cv2
import pandas as pd
import os
def construct_dataset(dataset_name, max_images_per_id=200):

    file_path='MOT15/train/'+dataset_name
    gt_file_path = os.path.join(file_path, 'gt/gt.txt')
    
    df = pd.read_csv(gt_file_path, header=None)

    # 帧号、ID、边界框的左上角坐标(x, y)、边界框的宽度和高度、置信度、3D位置信息(x, y, z)
    df.columns = ["frame", "id", "bb_left", "bb_top", "bb_width", "bb_height", "confidence", "x3d", "y3d", "z3d"]

    df_filtered = df[df["confidence"] != 0]

    df = df_filtered[["frame", "id", "bb_left", "bb_top", "bb_width", "bb_height"]]


    video_frames_dir=os.path.join(file_path, 'img1')

    output_dir = os.path.join('data', 'train')
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Iterate over unique IDs
    for person_id in df['id'].unique():
        person_df = df[df['id'] == person_id]
        images_saved = 0

        for _, row in person_df.iterrows():
            frame_number = int(row['frame'])  # Ensure frame number is an integer
            bb_left, bb_top, bb_width, bb_height = int(row['bb_left']), int(row['bb_top']), int(row['bb_width']), int(row['bb_height'])

            # Construct image path
            frame_path = os.path.join(video_frames_dir, f'{frame_number:06d}.jpg')
            
            if os.path.exists(frame_path):
                # Read and crop image
                image = cv2.imread(frame_path)

                if bb_left + bb_width > image.shape[1] or bb_top + bb_height > image.shape[0]:
                    print(f"Cropping parameters are out of bounds for the image size. Skipping crop.")
                    continue
                
                crop = image[bb_top:bb_top+bb_height, bb_left:bb_left+bb_width]
                
                if crop.size == 0:
                    print("Cropped image is empty. Skipping resize.")
                    continue
                crop = cv2.resize(crop, (240, 480))  # Resize to a consistent size

                # Save cropped image
                output_path = os.path.join(output_dir, f'{dataset_name}_{person_id}_frame_{frame_number}.jpg')
                cv2.imwrite(output_path, crop)
                images_saved += 1

                # Check if max images per ID is reached
                if images_saved >= max_images_per_id:
                    break



In [97]:
# construct_dataset("ADL-Rundle-6")
# construct_dataset("ADL-Rundle-8")
# construct_dataset("ETH-Bahnhof")



Number of images in the folder: 15085


In [29]:
import random
from PIL import Image
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms


# 自定义数据集
class CustomDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.image_files = [f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.data_dir, self.image_files[idx])
        image = Image.open(img_path).convert('RGB')

        # 生成随机遮挡
        masked_image = self.random_mask(image)

        if self.transform:
            masked_image = self.transform(masked_image)
            image = self.transform(image)

        return image, masked_image

    def random_mask(self, image):
        """
        随机遮挡图像的 25% 到 40% 区域
        """
        # 将PIL图像转换为numpy数组
        np_image = np.array(image)

        # 获取图像尺寸
        height, width, _ = np_image.shape

        mask_percentage = random.uniform(0.25, 0.40)
        mask_area = height * width * mask_percentage

        # 计算遮挡区域的尺寸 
        mask_height = int(np.sqrt(mask_area * height / width))
        mask_width = int(mask_height * width / height)


        # 随机选择遮挡的起始点
        top = random.randint(0, height - mask_height)
        left = random.randint(0, width - mask_width)

        # 创建遮挡
        np_image[top:top + mask_height, left:left + mask_width, :] = 0  # 将遮挡区域设置为黑色

        # 将numpy数组转换回PIL图像
        masked_image = Image.fromarray(np_image)

        return masked_image



In [30]:

import torchvision
import torch.nn as nn

#定义模型

pretrained_net_vgg=torchvision.models.vgg19(pretrained=True, progress=True)

class MyVGG(nn.Module):
    def __init__(self,feature_layers):
        '''
        feature_layers:选取的特征层的索引
        '''
        super(MyVGG,self).__init__()
        self.feature_layers=feature_layers
        self.net=pretrained_net_vgg.features[:max(feature_layers)+1]
    
    def forward(self,x):
        '''
        用于提取图像的不同尺度特征
        '''
        features=[]
        for i in range(len(self.net)):
            x=self.net[i](x)
            if i in self.feature_layers:
                features.append(x)
        return features



# 加载预训练的 ResNet-18
pretrained_net_resnet =torchvision.models.resnet18(pretrained=True, progress=True)

class MyResNet(nn.Module):
    def __init__(self):
        super(MyResNet, self).__init__()

        # 使用 ResNet-18 的预训练层
        self.layer1 = nn.Sequential(*list(pretrained_net_resnet.children())[:5])  # 到 layer1
        self.layer2 = pretrained_net_resnet.layer2
        self.layer3 = pretrained_net_resnet.layer3

    def forward(self, x):
        features = []

        # 通过 layer1
        x = self.layer1(x)

        # 通过 layer2 的第一个残差块的两个卷积层
        x = self.layer2[0](x)
        features.append(x)  # 第一个残差块的输出

        x=self.layer2[1](x)
        features.append(x)

        # 通过 layer3 的第一个残差块
        x = self.layer3[0](x)
        features.append(x)

        # 通过 layer3 的第二个残差块的两个卷积层
        x = self.layer3[1](x)
        features.append(x)  # 第二个残差块的输出

        return features




# 损失函数
def feature_loss(features_original, features_masked):
    criterion = nn.MSELoss()
    total_loss = 0.0

    # 假设features_original和features_masked是特征列表，且长度相同
    for feature_original, feature_masked in zip(features_original, features_masked):
        total_loss += criterion(feature_original, feature_masked)

    # 计算平均损失
    average_loss = total_loss / len(features_original)
    return average_loss



In [31]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
feature_extractor = MyResNet().to(device)  # 左侧固定的网络
def train_model(model,dataloader, epochs=10):
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    total_batches = len(dataloader)
    
    for epoch in range(epochs):
        running_loss = 0.0
        for i, (images, masked_images) in enumerate(dataloader):
            # 将图像和遮挡图像移至相同的设备
            images = images.to(device)
            masked_images = masked_images.to(device)

            # 原始模型特征
            with torch.no_grad():  # 确保不更新原始模型
                original_features = [feature.detach() for feature in feature_extractor(images)]

            # 遮挡模型特征
            masked_features = model(masked_images)

            # 计算损失
            loss = feature_loss(original_features, masked_features)

            # 反向传播和优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            print(f'Epoch {epoch+1}/{epochs}, Batch {i+1}/{total_batches}, Batch Loss: {loss.item():.4f}')
        epoch_loss = running_loss / len(dataloader)
        print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}')


In [None]:
# Transformations
rgb_mean = np.array([0.485, 0.456, 0.406])
rgb_std = np.array([0.229, 0.224, 0.225])
transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=rgb_mean, std=rgb_std)
])

# Dataset and DataLoader
dataset = CustomDataset(data_dir='data/train', transform=transform)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Model

model = MyResNet().to(device)   # 右侧的网络，将被训练

model.load_state_dict(torch.load('fine-tune-MyResNet.pth'))

# Train
train_model(model, dataloader, epochs=5)

# Save model
torch.save(model.state_dict(), 'fine-tune-MyResNet_10.pth')


In [40]:
model=torch.load('full-fine-tune-MyResNet.pth',map_location=torch.device('cpu'))
rgb_mean = np.array([0.485, 0.456, 0.406])
rgb_std = np.array([0.229, 0.224, 0.225])
def preprocess(PIL_img, image_shape):
    '''
    先更改输入图像的尺寸,然后再将PIL图片转成卷积神经网络接受的输入格式,再在RGB三个通道分别做标准化
    因为Resize类要求输入是PIL图片格式,所以我的输入图像默认是PIL图片格式
    '''
    process = torchvision.transforms.Compose([
        torchvision.transforms.Resize(image_shape),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize(mean=rgb_mean, std=rgb_std)])

    return process(PIL_img).unsqueeze(dim = 0) # (batch_size, 3, H, W)
img=Image.open('data/train/ADL-Rundle-6_1_frame_1.jpg')
img=preprocess(img,(224,224))
img=img.to(device)
model.eval()
f=model(img)

In [None]:
import os
from ultralytics import YOLO
data_source = r'MOT15/train/KITTI-13/img1'
model = YOLO(r'ultralytics\yolov8n.pt')
results = model(data_source,stream=True,classes=[0],conf=0.65)
file_paths=[]
for r in results:
    if np.array(r.boxes.xyxy).shape[0]==0:
        file_paths.append(r.path)

for file_path in file_paths:
    if os.path.exists(file_path):
        os.remove(file_path)
        print(f"File {file_path} has been deleted.")
    else:
        print(f"File {file_path} does not exist.")





In [37]:
def rename_images(folder_path, extension='jpg'):
    files = [file for file in os.listdir(folder_path) if file.lower().endswith(extension.lower())]
    files.sort()  # 可选，根据需要排序文件

    for i, file in enumerate(files, start=1):
        new_file_name = f"{i:06d}.{extension}"
        old_file_path = os.path.join(folder_path, file)
        new_file_path = os.path.join(folder_path, new_file_name)

        os.rename(old_file_path, new_file_path)
        # print(f"Renamed {file} to {new_file_name}")

folder_path = 'MOT15/train/KITTI-13/img1'  # 替换为图像文件夹的路径
rename_images(folder_path)

In [54]:
import cv2
import os
import numpy as np
import torch
import torch.nn.functional as F
import colorsys
from scipy.optimize import linear_sum_assignment

def read_imgs(img_dir):
    '''
    img_dir:图片文件夹路径
    读取图片文件夹中的图片,返回图片的路径列表
    '''
    imgs=[]
    for file_name in sorted(os.listdir(img_dir)):
        if file_name.endswith('.jpg') or file_name.endswith('.png'):
            image_path = os.path.join(img_dir, file_name)
            image = cv2.imread(image_path)
            imgs.append(image)
    return imgs

rgb_mean = np.array([0.485, 0.456, 0.406])
rgb_std = np.array([0.229, 0.224, 0.225])
def preprocess(PIL_img, image_shape):
    '''
    先更改输入图像的尺寸,然后再将PIL图片转成卷积神经网络接受的输入格式,再在RGB三个通道分别做标准化
    因为Resize类要求输入是PIL图片格式,所以我的输入图像默认是PIL图片格式
    '''
    process = torchvision.transforms.Compose([
        torchvision.transforms.Resize(image_shape),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize(mean=rgb_mean, std=rgb_std)])

    return process(PIL_img).unsqueeze(dim = 0) # (batch_size, 3, H, W)

class Tracker(object):
    def __init__(self,source):
        self.boxes_xyxy = [] #box_xxyy(ndarray)
        self.crop_imgs=[]   #crop_imgs(list)
        self.GT_sequence=[]#{id:box_xxyy}
        self.frames_features=[]#{id:features}
        self.orig_imgs=read_imgs(source)
        self.vgg=MyVGG(feature_layers=[0, 5, 10, 19, 25,28])
        self.fine_tune_vgg = MyVGG(feature_layers=[0, 5, 10, 19, 25, 28])
        self.fine_tune_vgg.load_state_dict(torch.load('fine-tune-MyVGG.pth'))
        self.resnet=MyResNet()
        self.fine_tune_resnet= torch.load('full-fine-tune-MyResNet.pth',map_location=torch.device('cpu'))


    @property
    def get_xyxy(self):
        return self.boxes_xyxy
    
    def put_bbox(self,boxes_xyxy):
        '''
        boxes_xyxy:当前帧的所有行人边界框
        '''
        self.boxes_xyxy.append(np.array(boxes_xyxy))

    def crop_img(self,idx):
        '''
        idx:要裁剪图像的索引与这张图象对应的边界框的索引
        裁剪图片,获得行人的图片,存储在crop_imgs中
        '''
        boxes_xyxy=self.boxes_xyxy[idx]
        img=self.orig_imgs[idx]
        cropped_image_list=[]
        for _, bbox in enumerate(boxes_xyxy):
            x1, y1, x2, y2 = map(int, [bbox[0], bbox[1], bbox[2], bbox[3]])
            cropped_image = img[y1:y2, x1:x2]
            cropped_image_list.append(cropped_image)

        self.crop_imgs.append(cropped_image_list)
        return cropped_image_list

    @property
    def get_crop_imgs(self):
        return self.crop_imgs


    
    def compute_jaccard(self,box1,box2):
        """计算两个边界框的IoU"""
        x1 = max(box1[0], box2[0])
        y1 = max(box1[1], box2[1])
        x2 = min(box1[2], box2[2])
        y2 = min(box1[3], box2[3])


        intersection_area = max(0, x2 - x1) * max(0, y2 - y1)
        box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
        box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
        union_area = box1_area + box2_area - intersection_area

        return intersection_area / union_area


    def trackByIoU(self,iou=0.5,interval=1):
        '''
        iou:IoU阈值
        interval:间隔帧数
        计算当前帧某一行人与前interval帧所有行人的IoU的极大值,且该值大于阈值则认为是同一个行人
        如果没有符合条件的，就认为是新的行人,分配新的ID,否则分配与之IoU最大的行人的ID
        '''
        boxes_xyxy=self.boxes_xyxy[-1]
        last_interval_frame_results=[]
        if self.GT_sequence == []:
            self.GT_sequence.append(dict(enumerate(boxes_xyxy)))
        else:
            last_interval_frame_results=self.GT_sequence[-interval:]

            last_ids = []
            for result in reversed(last_interval_frame_results):
                for last_id in result.keys():
                    if last_id not in last_ids:
                        last_ids.append(last_id)
            max_id=max(last_ids)

            # 匈牙利算法
            cost_matrix = []
            for current_id, current_bbox in enumerate(boxes_xyxy):
                row = []
                appeared_ids=[]
                for result in reversed(last_interval_frame_results):
                    for last_id, last_bbox in result.items():
                        if last_id not in appeared_ids:
                            appeared_ids.append(last_id)
                            IoU = self.compute_jaccard(current_bbox, last_bbox)                            
                            cost=1-IoU
                            row.append(cost)
                cost_matrix.append(row)


                
            row_ind, col_ind = linear_sum_assignment(cost_matrix)
            
            best_match = {}  # 最终匹配结果
            for current_id, col in zip(row_ind, col_ind):
                if cost_matrix[current_id][col] < 1 - iou:
                    matched_id = last_ids[col]
                    best_match[matched_id] = current_id

            for current_id, _ in enumerate(boxes_xyxy):
                if current_id not in best_match.values():
                    max_id += 1
                    best_match[max_id] = current_id
            
            best_match = {last_id: boxes_xyxy[current_id] for last_id, current_id in best_match.items()}

            self.GT_sequence.append(best_match)
        

    def compute_cosine_similarity(self,features1,features2):
        '''
        features1:行人1的特征,列表
        features2:行人2的特征,列表
        返回两个行人特征的余弦相似度
        '''
        cosine_similarities = []
        for feature1_map, feature2_map in zip(features1, features2):
            # 展平特征图
            vec1 = feature1_map.view(-1)
            vec2 = feature2_map.view(-1)

            # 计算余弦相似度
            cosine_sim = F.cosine_similarity(vec1.unsqueeze(0), vec2.unsqueeze(0))
            cosine_similarities.append(cosine_sim)

        avg_cosine_similarity = torch.mean(torch.stack(cosine_similarities))
        return avg_cosine_similarity.item()  


    def trackBYnet(self,fine_tune=False,threshold=0.25,interval=1,net_type='vgg'):
        '''
        fine_tune:是否对VGG19进行微调
        threshold:余弦相似度阈值
        interval:间隔帧数
        '''
        idx=len(self.boxes_xyxy)-1
        cropped_images_list=self.crop_img(idx)
        current_frame_features={}#id:features

        if fine_tune:
            if net_type=='vgg':
                Net=self.fine_tune_vgg
            else:
                Net=self.fine_tune_resnet
        else:
            if net_type=='vgg':
                Net=self.vgg
            else:
                Net=self.resnet
            
        Net.eval()
        #获得当前帧的所有行人特征
        for i, cropped_image in enumerate(cropped_images_list):
            #转为PIL格式的image进行预处理（BGR->RGB）
            cropped_image = Image.fromarray(cropped_image[..., ::-1])
            cropped_image=preprocess(cropped_image,(224, 224))
            features=Net(cropped_image)
            current_frame_features[i]=features


        if self.frames_features == []:
            #初始化的id仅对初始帧有效，后续id要根据匹配结果进行更新
            self.frames_features.append(current_frame_features)
        else:
            if len(self.frames_features)>interval:
                self.frames_features.pop(0) #删除最早的一帧ps:节约一下内存，不然会爆

            last_interval_frame_features=self.frames_features[-interval:]

            
            last_ids = []  # 用于存储成本矩阵中每一列对应的行人ID
            for frame_feature in reversed(last_interval_frame_features):
                for last_id in frame_feature.keys():
                    if last_id not in last_ids:
                        last_ids.append(last_id)

            max_id=max(last_ids)

            cost_matrix = []
            
            for current_ID, current_features in current_frame_features.items():
                row = []
                appeared_ids=[]
                for frame_feature in reversed(last_interval_frame_features):
                    for last_id, last_features in frame_feature.items():
                        '''
                        cosine_similarity = self.compute_cosine_similarity(current_features, last_features)
                        cost = 1 - cosine_similarity  # 转换为成本
                        row.append(cost)  
                        feature_loss=feature_loss(current_features,last_features)
                        '''
                        if last_id not in appeared_ids:
                            appeared_ids.append(last_id)
                            cosine_similarity = self.compute_cosine_similarity(current_features, last_features)
                            cost = 1 - cosine_similarity
                            row.append(cost) 
                            
                cost_matrix.append(row)
            
            row_ind, col_ind = linear_sum_assignment(cost_matrix)

            
            best_match = {}  # 最终匹配结果
            for current_id, col in zip(row_ind, col_ind):
                if cost_matrix[current_id][col] < 1 - threshold:
                    matched_id = last_ids[col]
                    best_match[matched_id] = current_id


            for current_id in current_frame_features.keys():
                if current_id not in best_match.values():
                    max_id += 1
                    best_match[max_id] = current_id

            best_match = {last_id: current_frame_features[current_id] for last_id, current_id in best_match.items()}  
            self.frames_features.append(best_match) 

        #存放,ID：位置结果
        current_bbox=self.boxes_xyxy[idx]
        current_bbox=dict(enumerate(current_bbox))
        self.GT_sequence.append(dict(zip(self.frames_features[-1].keys(),current_bbox.values())))
        print(self.GT_sequence[-1])



    def trackByIoU_net(self,iou=0.5,interval=1,threshold=0.8,fine_tune=True,net_type='vgg'):
        '''
        iou:IoU阈值
        interval:间隔帧数
        '''
        boxes_xyxy=self.boxes_xyxy[-1]
        last_interval_frame_results=[]
        if self.GT_sequence == []:
            self.GT_sequence.append(dict(enumerate(boxes_xyxy)))
        else:
            last_interval_frame_results=self.GT_sequence[-interval:]

            last_ids = []
            for result in reversed(last_interval_frame_results):
                for last_id in result.keys():
                    if last_id not in last_ids:
                        last_ids.append(last_id)
            max_id=max(last_ids)

            # 匈牙利算法
            #但是这么做的话，会有可能错误匹配：比如上一帧检测出两个人，这一帧有一个人没被检测到，但是又有前面被遮挡的人出现，导致画面中总人数没变
            # 但是那个没被检测出来的行人的ID会被分配给这个新出现的人，这样就会导致错误匹配
            cost_matrix = []
            for current_id, current_bbox in enumerate(boxes_xyxy):
                row = []
                appeared_ids=[]
                for result in reversed(last_interval_frame_results):
                    for last_id, last_bbox in result.items():
                        if last_id not in appeared_ids:
                            appeared_ids.append(last_id)
                            IoU = self.compute_jaccard(current_bbox, last_bbox)
                            cost=1-IoU
                            row.append(cost)
                cost_matrix.append(row)
                        
            row_ind, col_ind = linear_sum_assignment(cost_matrix)
            # 候选匹配结果
            potential_matches={}

            best_match = {}  # 最终匹配结果
            for current_id, col in zip(row_ind, col_ind):
                matched_id = last_ids[col]
                if cost_matrix[current_id][col] < 1 - iou:
                    best_match[matched_id] = current_id
                else:
                    potential_matches[matched_id]=current_id
            

            # if len(self.boxes_xyxy) ==238:
            #     print(best_match,potential_matches)
                
            
            if fine_tune:
                if net_type=='vgg':
                    Net=self.fine_tune_vgg
                else:
                    Net=self.fine_tune_resnet
            else:
                if net_type=='vgg':
                    Net=self.vgg
                else:
                    Net=self.resnet
            

            
            # 我们认为IoU匹配结果是可靠的，也就是在一段时间内没被遮挡的人的匹配是鲁棒的，
            # 出现遮挡会导致有追踪目标短暂消失，因此在其再次出现时，IoU的值会低于阈值/或其不是最优解,导致被识别为新目标
            # 因此，对于那些因为IoU匹配度过低被筛选掉的人，我们需要再次判断是否在之前出现过，通过检查其特征与last_ids（interval的长度决定了目标最多可以消失多久）
            # 中未被匹配到的人的特征的余弦相似度来决定匹配
            # 为增加匹配鲁棒性，增加纠错机制，防止因为IoU跟踪错之后后面一直跟踪错。
            
            not_matched_last_ids=[]
            for last_id in last_ids:
                if last_id not in best_match.keys():
                    not_matched_last_ids.append(last_id)

            current_ids=[]
            for current_id in potential_matches.values():
                current_ids.append(current_id)
            
            vgg_cost_matrix = []

            if potential_matches != {}:
                for _, current_id in potential_matches.items():
                    idx=len(self.boxes_xyxy)-1
                    current_image=self.orig_imgs[idx]
                    current_bbox=boxes_xyxy[current_id]
                    row = []
                    for id in not_matched_last_ids:
                        i=0
                        idx=len(self.boxes_xyxy)-1
                        for result in reversed(last_interval_frame_results):
                            i+=1
                            if id in result.keys():
                                last_bbox=result[id]
                                break
                            
                        idx=idx-i

                        last_image=self.orig_imgs[idx]
                        last_cropped_image=self.crop_one_img(last_image,last_bbox)
                        current_cropped_image=self.crop_one_img(current_image,current_bbox)
                        cosine_similarity=self.caculate_TwoImages_similarity(Net,last_cropped_image,current_cropped_image)
                        row.append(1-cosine_similarity)
                    vgg_cost_matrix.append(row)

                row_ind, col_ind = linear_sum_assignment(vgg_cost_matrix)

                if len(self.boxes_xyxy) == 84 or len(self.boxes_xyxy) == 122 :
                    print(vgg_cost_matrix,not_matched_last_ids)
                    
                # 阈值过低，会导致新出现的人被匹配到之前已经消失的人的编号上
                # 阈值过高，会导致之前被遮挡的人再次出现时无法被匹配到。
                for row, col in zip(row_ind, col_ind):
                    current_id=current_ids[row]
                    matched_id=not_matched_last_ids[col]
                    if vgg_cost_matrix[row][col]<1-threshold:
                        best_match[matched_id]=current_id


            for current_id, _ in enumerate(boxes_xyxy):
                if current_id not in best_match.values():
                    max_id += 1
                    best_match[max_id] = current_id
            
            best_match = {last_id: boxes_xyxy[current_id] for last_id, current_id in best_match.items()}

            self.GT_sequence.append(best_match)



    def crop_one_img(self,orig_img,box_xyxy):
        '''
        orig_img:原始图像
        box_xyxy:行人边界框
        '''
        x1, y1, x2, y2 = map(int, [box_xyxy[0], box_xyxy[1], box_xyxy[2], box_xyxy[3]])
        cropped_image = orig_img[y1:y2, x1:x2]
        return cropped_image    

    def caculate_TwoImages_similarity(self,feature_extractor,img1,img2):
        '''
        img1:图像1
        img2:图像2
        计算两张图像的余弦相似度
        '''
        img1 = Image.fromarray(img1[..., ::-1])
        img2 = Image.fromarray(img2[..., ::-1])
        img1=preprocess(img1,(224,224))
        img2=preprocess(img2,(224,224))
        device = next(feature_extractor.parameters()).device
        img1 = img1.to(device)
        img2 = img2.to(device)
        img1_features=feature_extractor(img1)
        img2_features=feature_extractor(img2)
        return self.compute_cosine_similarity(img1_features,img2_features)
    

    def id_to_color(self,id):
        # 将ID映射到0到1之间的色相值
        hue = id * 0.618033988749895 % 1
        # 转换为RGB颜色
        r, g, b = colorsys.hsv_to_rgb(hue, 1, 1)
        return int(r * 255), int(g * 255), int(b * 255)
    

    def save_result(self, folder_path):
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)
        for index, (img, current_bbox) in enumerate(zip(self.orig_imgs, self.GT_sequence)):
            for id, bbox in current_bbox.items():
                # 为每个ID生成一个颜色
                color = self.id_to_color(id)

                x1, y1, x2, y2 = map(int, bbox)
                cv2.rectangle(img, (x1, y1), (x2, y2), color, 5)
                cv2.putText(img, str(id), (x1, y1), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 5)

            filename = os.path.join(folder_path, f'{index + 1:06d}.jpg')
            cv2.imwrite(filename, img)

    

        
        


In [12]:
class MOTBenchmark(object):
    pass


In [60]:
from PIL import Image
import configparser
import math
from ultralytics import YOLO

dataset_name="test22"

def get_src(dataset_name):
    src="MOT15/train/"
    data_source=src+dataset_name+"/img1"
    config_src=src+dataset_name+"/seqinfo.ini"
    return data_source,config_src

config = configparser.ConfigParser()
data_source,config_src=get_src(dataset_name)
config.read(config_src)



model = YOLO(r'ultralytics\yolov8n.pt')
results = model(data_source,stream=True,classes=[0],conf=0.65)
tracker=Tracker(data_source)


way="iou_net"

if way=="iou":
    interval=math.ceil(float(config['Sequence']['frameRate'])*2)
elif way=="net":
    interval=math.ceil(float(config['Sequence']['frameRate'])/2)
elif way=="iou_net":    
    interval=math.ceil(float(config['Sequence']['frameRate'])*3)

for r in results:

    tracker.put_bbox(r.boxes.xyxy)
    if way=="iou":
        tracker.trackByIoU(iou=0.5,interval=interval)
    elif way=="net":
        tracker.trackBYnet(fine_tune=False,threshold=0.9,interval=interval)
    elif way=="iou_net":
        tracker.trackByIoU_net(iou=0.75,interval=interval,threshold=0.9224,fine_tune=True,net_type='vgg')


tracker.save_result(r'E:\ML\Pedestrian_trackingMOT\results\IoU_vgg\test4(fine_tune,vgg)')






image 1/196 e:\ML\Pedestrian_trackingMOT\MOT15\train\test4\img1\000001.jpg: 384x640 4 persons, 135.9ms
image 2/196 e:\ML\Pedestrian_trackingMOT\MOT15\train\test4\img1\000002.jpg: 384x640 4 persons, 114.9ms
image 3/196 e:\ML\Pedestrian_trackingMOT\MOT15\train\test4\img1\000003.jpg: 384x640 4 persons, 115.8ms
image 4/196 e:\ML\Pedestrian_trackingMOT\MOT15\train\test4\img1\000004.jpg: 384x640 5 persons, 104.4ms
image 5/196 e:\ML\Pedestrian_trackingMOT\MOT15\train\test4\img1\000005.jpg: 384x640 4 persons, 102.9ms
image 6/196 e:\ML\Pedestrian_trackingMOT\MOT15\train\test4\img1\000006.jpg: 384x640 4 persons, 106.7ms
image 7/196 e:\ML\Pedestrian_trackingMOT\MOT15\train\test4\img1\000007.jpg: 384x640 3 persons, 102.6ms
image 8/196 e:\ML\Pedestrian_trackingMOT\MOT15\train\test4\img1\000008.jpg: 384x640 4 persons, 102.0ms
image 9/196 e:\ML\Pedestrian_trackingMOT\MOT15\train\test4\img1\000009.jpg: 384x640 5 persons, 107.1ms
image 10/196 e:\ML\Pedestrian_trackingMOT\MOT15\train\test4\img1\000010.

In [None]:
import cv2
import os

# 文件夹路径和视频属性
folder_path = 'MOT15/train/KITTI-17/img1'
video_name = 'output_video_original.avi'
frame_rate = 10 # 帧率
frame_size = (1224, 370)  # 分辨率

# 初始化OpenCV视频编写器
fourcc = cv2.VideoWriter_fourcc(*'XVID')
video_writer = cv2.VideoWriter(video_name, fourcc, frame_rate, frame_size)

# 遍历文件夹中的图片
for file_name in sorted(os.listdir(folder_path)):
    if file_name.endswith('.jpg') or file_name.endswith('.png'):
        image_path = os.path.join(folder_path, file_name)
        image = cv2.imread(image_path)

        # 调整图片大小以匹配视频的分辨率
        image = cv2.resize(image, frame_size)
        
        # 将图片写入视频  
        video_writer.write(image)

# 释放资源
video_writer.release()


In [56]:
from moviepy.editor import ImageSequenceClip
import os

# 设置图片所在的文件夹和输出视频的参数
folder_path = r'results\IoU_vgg\test33(full-fine_tune,vgg)'
output_video_path = 'output_video_test33_0.85vgg.mp4'
fps =float(config['Sequence']['frameRate'])  # 帧率

# 获取文件夹内所有图片的路径
image_files = [os.path.join(folder_path, img) for img in os.listdir(folder_path) if img.endswith((".png", ".jpg", ".jpeg"))]
image_files.sort()  # 确保文件是按顺序排列的

# 创建视频剪辑并保存
clip = ImageSequenceClip(image_files, fps=fps)
clip.write_videofile(output_video_path, fps=fps)



Moviepy - Building video output_video_test33_0.85vgg.mp4.
Moviepy - Writing video output_video_test33_0.85vgg.mp4



                                                              

Moviepy - Done !
Moviepy - video ready output_video_test33_0.85vgg.mp4
