# <center>基于相关滤波器的目标跟踪</center>

## 视频预处理

输入一个视频，将其转换为图像序列，并转灰度图以便进一步处理。

In [1]:
import math
import os
import numpy
import torch
from torch import nn
from torchvision import transforms
import torchvision.transforms.functional as F
from PIL import Image
import tkinter as tk
from tkinter import filedialog
import matplotlib.pyplot as plt
import copy
import cv2

#显示RGB图像
def show_image(image):
    image_plt = copy.deepcopy(image)
    image_plt = image_plt.permute(1, 2, 0)
    plt.imshow(image_plt)
    plt.axis('off')
    plt.show()

#显示灰度图像
def show_gray_image(image):
    image_plt = copy.deepcopy(image)
    image_plt = image_plt.squeeze()
    plt.imshow(image_plt, cmap = 'gray')
    plt.axis('off')
    plt.show()

#RGB转灰度
def rgb_to_gray(image):
    transform = transforms.Grayscale(num_output_channels = 1)
    image_out = transform(image)
    return image_out

#从本地文件加载视频
def load_video(output_path = './output', frameInterval = 1):
    root = tk.Tk()
    root.withdraw()
    filepath = filedialog.askopenfilename(title = "请选择待追踪的视频", filetypes = [("Video Files", "*.mp4")])
    if filepath:
        if not os.path.exists(output_path):
            os.makedirs(output_path)
        else:
            for filename in os.listdir(output_path):
                file = os.path.join(output_path, filename)
                if os.path.isfile(file) or os.path.islink(file):
                    os.unlink(file)
                elif os.path.isdir(file):
                    shutil.rmtree(file)
        camera = cv2.VideoCapture(filepath)
        index = 0
        count = 0
        while True:
            res, image = camera.read()
            if not res:
                break
            if count % frameInterval == 0:
                cv2.imwrite(output_path + str(index) + '.png', image)
                index += 1
            count += 1
        camera.release()
        return True, index
    else:
        return False, 0

## 图像预处理

实现余弦窗和高斯滤波器，对目标区域进行预处理。

In [2]:
#生成高斯滤波器
def gaussian_kernel(width, height, sigma = 2):
    kernel = numpy.zeros((1, height, width))
    for x in range(width):
        for y in range(height):
            kernel[0, y, x] = numpy.exp(-((((height - 1) / 2 - y) ** 2 + ((width - 1) / 2 - x) ** 2) / (2 * sigma ** 2)))
    kernel /= numpy.sum(kernel)
    return kernel

#生成余弦窗口
def cos_window(width, height):
    kernel = numpy.zeros((height, width))
    for x in range(width):
        for y in range(height):
            kernel[y, x] = numpy.sin(numpy.pi * x / (width - 1)) * numpy.sin(numpy.pi * y / (height - 1))
            #处理浮点数精度问题
            if kernel[y, x] < 1e-15:
                kernel[y, x] = 0.0
    return kernel

#图像预处理
def preprocess(image, epsilon = 1e-5):
    if isinstance(image, torch.Tensor):
        image_out = Image.fromarray(image.numpy().astype('uint8'))
    elif isinstance(image, Image.Image):
        image_out = image
    else:
        image_out = Image.fromarray(image.astype('uint8'))
    transform = transforms.Compose([transforms.PILToTensor(), transforms.Grayscale(num_output_channels = 1)])
    image_out = transform(image_out)
    image_out = image_out.to(torch.float)
    image_out /= 255
    image_out = torch.squeeze(image_out, 0)
    height, width = image_out.shape[-2:]
    image_out = torch.log(image_out + 1) #对图像取对数，降低背景噪声
    image_out = (image_out - torch.mean(image_out)) / (torch.std(image_out) + epsilon) #归一化
    image_out = torch.tensor(cos_window(width, height)) * image_out #降低频谱泄漏
    image_out = (torch.unsqueeze(image_out, 0)).numpy()
    return image_out

## 确定目标初始位置

用户在视频第一帧画出一个矩形框，表示待跟踪目标的位置。在实际应用中可以搭配YOLO等目标检测算法。

In [3]:
#绘制矩形框并保存
def draw_rect(input_path, rect, color):
    image = cv2.imread(input_path)
    cv2.rectangle(image, (int(rect[0]), int(rect[1])), (int(rect[0] + rect[2]), int(rect[1] + rect[3])), color, 1)
    cv2.imwrite(input_path, image)

#使用opencv库提供的RoI选择
def select_target(path):
    image = cv2.imread(path)
    rect = cv2.selectROI("Press enter to select target", image)
    if rect[2] == 0 or rect[3] == 0: #未选择有效的范围
        return False, None, None
    cv2.destroyAllWindows()
    #截取选中的区域并预处理
    target = image[int(rect[1]):int(rect[1] + rect[3]), int(rect[0]):int(rect[0] + rect[2])]
    target = Image.fromarray(cv2.cvtColor(target, cv2.COLOR_BGR2RGB)) #opencv图像转PIL
    target = preprocess(target)
    return True, rect, target

#仿射变换时的镜像反射
def border_reflect(n, border):
    if n < 0:
        return -n
    elif n >= border:
        return 2 * border - n - 1
    else:
        return n

#对目标进行随机重定位
def random_warp(image, rand = 0.1):
    height, width = image.shape[-2:]
    if image.ndim == 3:
        image = image[0]
    angle = numpy.random.uniform(-rand, rand)
    c, s = numpy.cos(angle), numpy.sin(angle)
    warp_mat = numpy.array([[c + numpy.random.uniform(-rand, rand), -s + numpy.random.uniform(-rand, rand), 0],
                  [s + numpy.random.uniform(-rand, rand), c + numpy.random.uniform(-rand, rand), 0]])
    center_warp = numpy.array([[width / 2], [height / 2]])
    tmp = numpy.sum(warp_mat[:, :2], axis = 1).reshape((2, 1))
    warp_mat[:, 2:] = center_warp - center_warp * tmp
    #仿射变换
    image_out = numpy.zeros((height, width))
    for v in range(height):
        for u in range(width):
            x = border_reflect(round(warp_mat[0][0] * u + warp_mat[0][1] * v + warp_mat[0][2]), width)
            y = border_reflect(round(warp_mat[1][0] * u + warp_mat[1][1] * v + warp_mat[1][2]), height)
            image_out[v][u] = image[y][x]
    return image_out

## 更新目标位置

通过目标响应的最大值确定目标在当前帧的位置。采用傅立叶变换来加快计算速度。

In [4]:
#初始化相关滤波器
def init_corr_filter(target, warp_time = 8):
    height, width = target.shape[-2:]
    a = numpy.zeros((1, height, width), dtype = "complex128")
    b = numpy.zeros((1, height, width), dtype = "complex128")
    g = numpy.fft.fft2(gaussian_kernel(width, height)) #初始目标响应
    for i in range(warp_time): #多次随机重定位后确定初始相关滤波器
        image_out = random_warp(target)
        f = numpy.fft.fft2(preprocess(image_out))
        a += g * numpy.conj(f)
        b += f * numpy.conj(f)
    return a, b, g

#更新相关滤波器和目标位置
def update_corr_filter(image, target_rect, a, b, gaussian, eta = 0.125):
    height = target_rect[3]
    width = target_rect[2]
    h = a / b #相关滤波器
    target = image[int(target_rect[1]):int(target_rect[1] + target_rect[3]), int(target_rect[0]):int(target_rect[0] + target_rect[2])]
    f = preprocess(target)
    g = h * numpy.fft.fft2(f)
    g = numpy.real(numpy.fft.ifft2(g)) #目标响应
    cur_pos = numpy.unravel_index(numpy.argmax(g, axis = None), g.shape) #目标响应最大值位置是当前帧的目标位置
    psr = (numpy.max(g) + numpy.mean(g)) / numpy.std(g) #使用峰值旁瓣比来量化目标跟踪效果
    offset_x = cur_pos[2] - width // 2
    offset_y = cur_pos[1] - height // 2
    rect = [target_rect[0] + offset_x, target_rect[1] + offset_y, target_rect[2], target_rect[3]]
    target = image[int(rect[1]):int(rect[1] + rect[3]), int(rect[0]):int(rect[0] + rect[2])]
    f = preprocess(target)
    f = numpy.fft.fft2(f)
    a_ = eta * (gaussian * numpy.conj(f)) + (1 - eta) * a #求相关相当于旋转180度（即复共轭）后求卷积
    b_ = eta * (f * numpy.conj(f)) + (1 - eta) * b
    return rect, a_, b_, psr

## 输出目标跟踪视频

在图像序列中标注出目标位置，并生成视频。

In [5]:
output_path = './output/'
flag1, frames = load_video(output_path, 5)
green = (0, 255, 0)
red = (0, 0, 255)
threshold = 7
if flag1:
    flag2, rect, target = select_target(output_path + "0.png")
    print("目标初始位置：({},{})".format(rect[0] + rect[2] // 2, rect[1] + rect[3] // 2))
    draw_rect(output_path + "0.png", rect, green)
    if flag2:
        #生成图像序列
        a, b, g = init_corr_filter(target)
        for i in range(1, frames):
            image = cv2.imread(output_path + str(i) + ".png")
            rect, a, b, psr = update_corr_filter(image, rect, a, b, g)
            if psr < threshold:
                state = "目标可能丢失"
                draw_rect(output_path + str(i) + ".png", rect, red)
            else:
                state = "目标跟踪正常"
                draw_rect(output_path + str(i) + ".png", rect, green)
            print("第{}帧位置：({},{})，{}".format(i, rect[0] + rect[2] // 2, rect[1] + rect[3] // 2, state))
        
        #生成视频
        frame = cv2.imread(output_path + "0.png")
        height, width, _ = frame.shape
        video = cv2.VideoWriter(output_path + "output.mp4", cv2.VideoWriter_fourcc(*'mp4v'), 30, (width, height))
        video.write(frame)
        for i in range(1, frames):
            frame = cv2.imread(output_path + str(i) + ".png")
            video.write(frame)
        video.release()

目标初始位置：(1028,519)
第1帧位置：(1038,528)，目标可能丢失
第2帧位置：(1038,528)，目标可能丢失
第3帧位置：(1038,528)，目标可能丢失
第4帧位置：(1038,528)，目标可能丢失
第5帧位置：(1038,528)，目标可能丢失
第6帧位置：(1038,528)，目标可能丢失
第7帧位置：(1038,528)，目标跟踪正常
第8帧位置：(1038,528)，目标跟踪正常
第9帧位置：(1038,528)，目标跟踪正常
第10帧位置：(1038,528)，目标跟踪正常
第11帧位置：(1038,528)，目标跟踪正常
第12帧位置：(1038,528)，目标跟踪正常
第13帧位置：(1038,528)，目标跟踪正常
第14帧位置：(1038,528)，目标跟踪正常
第15帧位置：(1038,528)，目标跟踪正常
第16帧位置：(1039,526)，目标跟踪正常
第17帧位置：(1045,539)，目标可能丢失
第18帧位置：(1045,538)，目标可能丢失
第19帧位置：(1046,538)，目标可能丢失
第20帧位置：(1046,538)，目标跟踪正常
第21帧位置：(1046,538)，目标跟踪正常
第22帧位置：(1046,538)，目标跟踪正常
第23帧位置：(1046,538)，目标跟踪正常
第24帧位置：(1046,538)，目标跟踪正常
第25帧位置：(1046,538)，目标跟踪正常
第26帧位置：(1046,538)，目标跟踪正常
第27帧位置：(1046,538)，目标跟踪正常
第28帧位置：(1046,538)，目标跟踪正常
第29帧位置：(1046,538)，目标跟踪正常
第30帧位置：(1046,538)，目标跟踪正常
第31帧位置：(1046,538)，目标跟踪正常
第32帧位置：(1046,538)，目标跟踪正常
第33帧位置：(1046,538)，目标跟踪正常
第34帧位置：(1046,538)，目标跟踪正常
第35帧位置：(1046,538)，目标跟踪正常
第36帧位置：(1046,538)，目标跟踪正常
第37帧位置：(1046,539)，目标跟踪正常
第38帧位置：(1046,539)，目标跟踪正常
第39帧位置：(1057,543)，目标可能丢失
第40帧位置：(1036,541