In [20]:
# Download video from youtube, using pytube as API

In [13]:
import sys
from pytube import YouTube
import os

def download_video(url, output_path='videos'):   ## downlod video under /video folder
    """
    funtion to download videos
    """
    try:

        os.makedirs(output_path, exist_ok=True)
        yt = YouTube(url)
        video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
        
        # download video
        if video:
            video.download(output_path=output_path)
            print(f"Downloaded video to {os.path.join(output_path, video.default_filename)}")
        else:
            print("No mp4 video available for download.")
                
    except Exception as e:
        print(f"An error occurred: {e}")

In [2]:
## Call download_video function to download the video
youtube_url = 'https://youtu.be/WeF4wpw7w9k'
download_video(youtube_url)

Downloaded video to videos/Cyclist and vehicle Tracking - 1.mp4


In [3]:
youtube_url = 'https://youtu.be/2NFwY15tRtA'
download_video(youtube_url)

Downloaded video to videos/Cyclist and vehicle tracking - 2.mp4


In [4]:
youtube_url = 'https://youtu.be/5dRramZVu2Q'
download_video(youtube_url)

Downloaded video to videos/Drone Tracking Video.mp4


In [14]:
youtube_url ='https://youtu.be/2hQx48U1L-Y'
download_video(youtube_url)

Downloaded video to videos/Dji Mavic air 2 drone using litchi app with follow me mode on a bike occluded by trees.mp4


In [18]:
## Video Preprocess

In [None]:
## Finetune the detection model using Visdrone

In [None]:
## Download dataset

In [69]:
!pip install gdown

Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Collecting gdown
  Downloading gdown-5.1.0-py3-none-any.whl.metadata (5.7 kB)
Collecting PySocks!=1.5.7,>=1.5.6 (from requests[socks]->gdown)
  Downloading PySocks-1.7.1-py3-none-any.whl.metadata (13 kB)
Downloading gdown-5.1.0-py3-none-any.whl (17 kB)
Downloading PySocks-1.7.1-py3-none-any.whl (16 kB)
Installing collected packages: PySocks, gdown
Successfully installed PySocks-1.7.1 gdown-5.1.0


In [75]:
!gdown 'https://drive.google.com/uc?id=1a2oHjcEcwXP8oUF95qiwrqzACb2YlUhn'

Downloading...
From (original): https://drive.google.com/uc?id=1a2oHjcEcwXP8oUF95qiwrqzACb2YlUhn
From (redirected): https://drive.google.com/uc?id=1a2oHjcEcwXP8oUF95qiwrqzACb2YlUhn&confirm=t&uuid=5ecafc02-e30d-4cec-bc47-6bc055396b04
To: /workspaces/artificial_intelligence/Drone_follow_me/VisDrone2019-DET-train.zip
100%|██████████████████████████████████████| 1.55G/1.55G [01:07<00:00, 22.8MB/s]


In [None]:
!unzip VisDrone2019-DET-train.zip

In [None]:
## Load fintune dataset

In [1]:
from torchvision.transforms import Compose, Resize, ToTensor
import torch
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, Resize, ToTensor, Normalize

  _torch_pytree._register_pytree_node(


In [1]:
from torchvision.transforms import Compose, Resize, ToTensor, Normalize

def get_transform():
    transform = Compose([
        Resize((896, 896)),
        ToTensor(),
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    return transform

  _torch_pytree._register_pytree_node(


In [15]:
import os
import torch
from PIL import Image
from torchvision.transforms import functional as F
from torch.utils.data import Dataset, DataLoader

class VisDroneDataset(Dataset):
    def __init__(self, img_dir, ann_dir, transform=None):
        self.img_dir = img_dir
        self.ann_dir = ann_dir
        self.transform = transform or get_transform()
        self.img_names = self.filter_images_with_targets()

    def filter_images_with_targets(self):
        img_names = []
        for img_name in os.listdir(self.img_dir):
            if img_name.endswith('.jpg'):
                ann_path = os.path.join(self.ann_dir, img_name.replace('.jpg', '.txt'))
                if self.has_interested_objects(ann_path):
                    img_names.append(img_name)
        return img_names

    def has_interested_objects(self, ann_path):
        with open(ann_path) as f:
            for line in f:
                _, _, _, _, _, class_label, _, _ = map(int, line.split(',')[:8])
                if class_label in [3, 4]:  # bicycle or car
                    return True
        return False
    
    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, idx):
        img_name = self.img_names[idx]
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        
        orig_size = torch.tensor([image.width, image.height], dtype=torch.float32)
        target_size = torch.tensor([896, 896], dtype=torch.float32)
    
        ann_path = os.path.join(self.ann_dir, img_name.replace('.jpg', '.txt'))
        boxes, labels = self.parse_annotation(ann_path)
        
        scale_factor = target_size / orig_size

        ## Resize bounding box according to image transform factor
        boxes = boxes * scale_factor.repeat(2)
        
        if self.transform:
            image = self.transform(image)
        
        # target dictionary for object detection
        target = {"boxes": boxes, "labels": labels}
        return image, target

    ## Filter the bicycle and car class from the annotation file
    def parse_annotation(self, annotation_path):
        boxes = []
        labels = []
        with open(annotation_path) as f:
            for line in f:
                x_min, y_min, width, height, _, class_label, _, _ = map(int, line.split(',')[:8])
                if class_label in [3,4]:  # select bicycle and car
                    boxes.append([x_min, y_min, x_min + width, y_min + height])
                    labels.append(class_label - 2)
        boxes = torch.tensor(boxes, dtype=torch.float32) if boxes else torch.zeros((0, 4), dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.int64) if labels else torch.zeros((0,), dtype=torch.int64)
        return boxes, labels

def collate_fn(batch):
    images = torch.stack([item[0] for item in batch], 0)
    targets = [item[1] for item in batch]
    return images, targets

In [16]:
## Finetune

In [17]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FasterRCNN_ResNet50_FPN_Weights
import torch
from torch.optim import SGD
from torch.optim.lr_scheduler import StepLR

def get_model(num_classes):
    # load pretrained model
    model = fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT)
    # number of features
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace class number
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

def train_model(model, data_loader, device, num_epochs):
    optimizer = SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.0005)
    scheduler = StepLR(optimizer, step_size=10, gamma=0.1) 
    
    model.to(device)
    print('Start training')
    
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        print(f'Training epoch {epoch}')
        
        for images, targets in data_loader:
            images = [image.to(device) for image in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            optimizer.zero_grad()
            
            try:
                loss_dict = model(images, targets)
                losses = sum(loss for loss in loss_dict.values())
                
                losses.backward()
                optimizer.step()
                
                total_loss += losses.item()
            except Exception as e:
                print(f"An error occurred: {e}")
                continue
                
        scheduler.step()
        
        print(f"Epoch #{epoch} loss: {total_loss / len(data_loader)}")
        
        if epoch % 3 == 0:
            save_path = f'checkpoints_1/epoch_{epoch}.pth'
            torch.save(model.state_dict(), save_path)
            print(f"Saved model to {save_path}")

In [18]:
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, Resize, ToTensor, Normalize

img_dir = 'VisDrone2019-DET-train/images'
ann_dir='VisDrone2019-DET-train/annotations'

print('Start loading dataset')
train_dataset = VisDroneDataset(img_dir=img_dir, ann_dir=ann_dir, transform=None)
data_loader = DataLoader(train_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)
print(len(data_loader))

Start loading dataset
1563


In [19]:
num_epochs = 19
num_classes = 3
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = get_model(num_classes)
train_model(model, data_loader, device, num_epochs)

Start training
Training epoch 0
An error occurred: All bounding boxes should have positive height and width. Found invalid box [349.142822265625, 120.3809585571289, 351.4285583496094, 120.3809585571289] for target at index 3.
Epoch #0 loss: 0.903699917541203
Saved model to checkpoints_1/epoch_0.pth
Training epoch 1
An error occurred: All bounding boxes should have positive height and width. Found invalid box [349.142822265625, 120.3809585571289, 351.4285583496094, 120.3809585571289] for target at index 3.
Epoch #1 loss: 0.765516576720062
Training epoch 2
An error occurred: All bounding boxes should have positive height and width. Found invalid box [349.142822265625, 120.3809585571289, 351.4285583496094, 120.3809585571289] for target at index 3.
Epoch #2 loss: 0.7156013125435747
Training epoch 3
An error occurred: All bounding boxes should have positive height and width. Found invalid box [349.142822265625, 120.3809585571289, 351.4285583496094, 120.3809585571289] for target at index 3.


RuntimeError: CUDA error: an illegal memory access was encountered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [None]:
## Load fintune model for inference

In [9]:
# Detection

In [9]:
import torch
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import Compose, Resize, ToTensor, Normalize
from PIL import Image, ImageDraw
import numpy as np


def get_transform():
    transform = Compose([
        Resize((896, 896)),
        ToTensor(),
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    return transform

def load_model_for_inference(path, device):
    model = fasterrcnn_resnet50_fpn(pretrained=False, num_classes=3)
    model.load_state_dict(torch.load(path, map_location=device))
    model = model.to(device)
    model.eval()
    return model

def inference(model, image, score_threshold_bi, score_threshold_car):
    pred = {}
    transform = get_transform()
    img_transformed = transform(image).unsqueeze(0)
    img_transformed = img_transformed.to(device)
    with torch.no_grad():
        predictions = model(img_transformed)
    predictions = predictions[0]
    
    # All detections
    all_labels = predictions['labels'].cpu().numpy()
    all_scores = predictions['scores'].cpu().numpy()
    all_boxes = predictions['boxes'].cpu().numpy()
    
    # Filter the detections with given thresholds
    filtered_boxes = []
    filtered_scores = []
    filtered_labels = []

    for label, score, box in zip(all_labels, all_scores, all_boxes):
        if label == 1 and score > score_threshold_bi:
            filtered_labels.append(label)
            filtered_scores.append(score)
            filtered_boxes.append(box)
        elif label != 1 and score > score_threshold_car:
            filtered_labels.append(label)
            filtered_scores.append(score)
            filtered_boxes.append(box)

    pred['boxes'] = np.array(filtered_boxes)
    pred['scores'] = np.array(filtered_scores)
    pred['labels'] = np.array(filtered_labels)
    # print(pred['scores'])
    return pred

In [10]:
import cv2
import imageio
import numpy as np
import torch
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import functional as F

In [11]:
# Kalman filter

In [67]:
def adjust_boxes(boxes, transformed_dim=(896, 896), new_dim=(1280, 720)):
    # Resize the bouding box according the target frame size
    scale_w, scale_h = new_dim[0] / transformed_dim[0], new_dim[1] / transformed_dim[1]
    
    adjusted_boxes = []
    for box in boxes:
        x1, y1, x2, y2 = box
        adjusted_box = [x1 * scale_w, y1 * scale_h, x2 * scale_w, y2 * scale_h]
        adjusted_boxes.append(adjusted_box)
    
    return adjusted_boxes

def adjust_frame_size(frame, target_size=(1280, 720), macro_block_size=16):
    # Adjust the frame size
    target_width = int(np.ceil(target_size[0] / macro_block_size) * macro_block_size)
    target_height = int(np.ceil(target_size[1] / macro_block_size) * macro_block_size)
    
    if isinstance(frame, Image.Image):
        frame = np.array(frame)
    
    adjusted_frame = cv2.resize(frame, (target_width, target_height))
    
    return adjusted_frame

def visualize_frame(frame, boxes, scores, labels, trackers):
    ## Visualize trace and detection boxes
    for trk in trackers:
        if 'last_box' in trk:
            box = trk['last_box']
            x1, y1, x2, y2 = [int(coord) for coord in box]
            color = (255, 255, 255) 
            cls = ''
            ## Choose diffrent color of bounding box and trace for different class
            if trk['type'] == 1:
                cls, color = 'bicycle', (150, 123, 238)
            elif trk['type'] == 2:
                # cls, color = 'bicycle', (150, 123, 238)
                cls, color = 'car', (123, 238, 176)
            
            # draw bounding box
            frame = cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, cls, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
        
        # Draw the trace using history points
        if 'history' in trk:
            for pt in trk['history'][-20:]:  # Use 80 most recent points only because change of the scene
                cv2.circle(frame, pt, 3, color, -1) 
                
        # Plot the predict points in red
        pred_x, pred_y = int(trk['kf'].x[0]), int(trk['kf'].x[1])
        cv2.circle(frame, (pred_x, pred_y), 5, (255, 0, 0), -1) 

    return frame

In [68]:
def initialize_kalman():
    kf = KalmanFilter(dim_x=4, dim_z=2)
    dt = 1.0  # time gap

    #  transition matrix
    kf.F = np.array([[1, 0, dt, 0],
                     [0, 1, 0, dt],
                     [0, 0, 1, 0],
                     [0, 0, 0, 1]])

    #  measurement matrix
    kf.H = np.array([[1, 0, 0, 0],
                     [0, 1, 0, 0]])

    kf.P *= 1000.  #  initial state covariance
    kf.R = np.eye(2) * 10  # measurement noise
    kf.Q = np.eye(4) * 0.1  # process noise

    # dictionary containing the initialized Kalman filter instance
    return {'kf': kf, 'missed_count': 0, 'history': []}

def compute_iou(boxA, boxB):
    ## Compute the intersection over union of two boxes
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    interArea = max(0, xB - xA) * max(0, yB - yA)

    boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
    boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])

    iou = interArea / float(boxAArea + boxBArea - interArea)
    return iou

def compute_cost_matrix(detections, trackers,adjusted_boxes_resized):
    ## coumpute cost matrix for assign detection to trackers
    cost_matrix = np.zeros((len(detections), len(trackers)), dtype=np.float32)
    for d, det in enumerate(detections):
        for t, trk in enumerate(trackers):
            if 'last_box' in trk:
                iou = compute_iou(adjusted_boxes_resized[d], trk['last_box'])
                cost_matrix[d, t] = 1 - iou 
            else:
                cost_matrix[d, t] = np.linalg.norm(np.array(det[:2]) - np.array(trk['kf'].x[:2].reshape(-1)))
    return cost_matrix

def assign_detections_to_trackers(detections, trackers,adjusted_boxes_resized):
    ## assign detection to trackers using hungarian algorithm
    cost_matrix = compute_cost_matrix(detections, trackers,adjusted_boxes_resized)
    row_ind, col_ind = linear_sum_assignment(cost_matrix)
    return row_ind, col_ind, cost_matrix

def update_trackers(trackers, adjusted_boxes_resized, pred_scores, row_ind, col_ind, pred_labels):
    matched_indices = set(row_ind)
    matched_trackers = set(col_ind)
    detections = [[(box[0] + box[2]) / 2, (box[1] + box[3]) / 2] for box in adjusted_boxes_resized]

    # update the existing trackers
    for d, t in zip(row_ind, col_ind):
        trk = trackers[t]
        det = detections[d]
        trk['kf'].update(np.array([[det[0]], [det[1]]]))  # update karlman filter
        trk['missed_count'] = 0  #  reset missed_count
        trk['last_box'] = adjusted_boxes_resized[d]
        trk['type'] = pred_labels[d]

    # Evaluate whether to create a tracker for unassociated detections
    for d in range(len(detections)):
        if d not in matched_indices:
            det = detections[d]
            should_create_new_tracker = True
            if should_create_new_tracker:
                kf = initialize_kalman()
                kf['kf'].update(np.array([[det[0]], [det[1]]]))
                new_tracker = {
                    'kf': kf['kf'],
                    'missed_count': 0,
                    'last_box':adjusted_boxes_resized[d],
                    'history': [(int(det[0]), int(det[1]))],
                    'type': pred_labels[d]  # 保type of target
                }
                trackers.append(new_tracker)

    # Increase the missed_count for unassigned trackers
    for t, trk in enumerate(trackers):
        if t not in matched_trackers:
            trk['missed_count'] += 1

In [69]:
def get_transform():
    transform = Compose([
        Resize((896, 896)),
        ToTensor(),
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    return transform

In [70]:
# Multi-object Tracking in videos

In [73]:
import cv2
import imageio
from PIL import Image
import numpy as np

## read video
print("Reading video")
video_path = 'videos/2.mp4'
output_filename = 'videos/Cyclist and vehicle 2.mp4'
video_reader = imageio.get_reader(video_path)  ## read video
writer = imageio.get_writer(output_filename, fps=20)   

## Load the detection model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = load_model_for_inference('checkpoints_1/epoch_9.pth',device)
print("Object Tracking")
## Object tracking using kalman filter
trackers = []
frame_index=0
for frame in video_reader:
    original_dim = (896, 896)  #  Size of object detection image after transform
    frame_resized = adjust_frame_size(frame, target_size=(1280, 720))   ## resize frame


    frame_pil = Image.fromarray(frame_resized)
    pred = inference(model, frame_pil, 0.4,0.8)  #Object detection
    pred_boxes, pred_scores, pred_labels = pred['boxes'], pred['scores'], pred['labels']
    
    if len(pred_boxes) > 0:
        adjusted_boxes_resized = adjust_boxes(pred_boxes, original_dim, (1280, 720))
        detections = [[(box[0] + box[2]) / 2, (box[1] + box[3]) / 2] for box in adjusted_boxes_resized]
        row_ind, col_ind, _ = assign_detections_to_trackers(detections, trackers,adjusted_boxes_resized)
        update_trackers(trackers, adjusted_boxes_resized, pred_scores, row_ind, col_ind, pred_labels)  # 使用检测结果更新跟踪器状态
    else:
        writer.append_data(frame_resized)
        continue

    
     # Predict
    for trk in trackers:
        trk['kf'].predict()
        pred_x, pred_y = int(trk['kf'].x[0]), int(trk['kf'].x[1])
        trk['history'].append((pred_x, pred_y))  # Update history

    trackers = [trk for trk in trackers if trk['missed_count'] < 20]  # Clear unupdated trackers

    # Visualize the object tracking and object detection results
    vis_frame = visualize_frame(frame_resized, adjusted_boxes_resized, pred_scores, pred_labels, trackers)
    # output_temp=f"videos/temp/{frame_index}.jpg"
    # cv2.imwrite(output_temp,vis_frame)   ## write the results into a new video
    writer.append_data(vis_frame)  

    frame_index += 1

    # print(frame_index )
print("Object tracking complete")
writer.close()

Reading video
Object Tracking


  pred_x, pred_y = int(trk['kf'].x[0]), int(trk['kf'].x[1])
  pred_x, pred_y = int(trk['kf'].x[0]), int(trk['kf'].x[1])


Object tracking complete
