In [0]:
import zipfile
from google.colab import drive
drive.mount('/content/drive/')


In [0]:
import zipfile
#path of ssd directory
zip_ref = zipfile.ZipFile("/content/drive/My Drive/ssd.zip", 'r')
zip_ref.extractall()
zip_ref.close()
!pip install filterpy


In [0]:
#download UA-Track dataset
!wget http://detrac-db.rit.albany.edu/Data/DETRAC-train-data.zip


In [0]:
#extract dataset
import zipfile
zip_ref = zipfile.ZipFile("DETRAC-train-data.zip")
zip_ref.extractall()
zip_ref.close()


In [0]:
#train script

from ssd import *
import os
import sys
import time
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
import torch.nn.init as init
import torch.utils.data as data
import numpy as np
import argparse
from torch.utils.data.sampler import SubsetRandomSampler

import warnings
warnings.simplefilter("ignore", UserWarning)




batch_size = 32
num_workers = 4
lr = 1e-3
momentum=0.9
weight_decay= 5e-4
gamma=0.1
#path of train image directory
imgs_path = './Insight-MVT_Annotation_Train/'
#path of image annotation
# image_path                                   box1                 box2                   box3....
#./Insight-MVT_Annotation_Train/image1.jpg, x1_1,y1_1,x2_1,y2_1, x1_2,y1_2,x2_2,y2_2 .....
labels_path = '/content/drive/My Drive/ssd_train.csv'

input_size = 300
num_classes = 2
use_cuda = True
epochs=60000

if torch.cuda.is_available():
    torch.set_default_tensor_type('torch.cuda.FloatTensor')
    cuda=True
else:
    torch.set_default_tensor_type('torch.FloatTensor')
    cuda = False


dataset = Transform(imgs_path,labels_path,transform=SSDAugmentation())

dataset_len = len(dataset)
print("dataset len",dataset_len)

#load data
train_loader = DataLoader(dataset,batch_size,num_workers=0, collate_fn=detection_collate)
data_loaders = {"train": train_loader, "val": train_loader}


start_epoch=48000
#path of pretrained file
model_path = "/content/drive/My Drive/checkpoints/ssd_ckpt_%d.pth"% start_epoch
optim_path = "/content/drive/My Drive/checkpoints/optim_ckpt_%d.pth"% start_epoch

ssd_net = build_ssd('train', cfg['min_dim'], cfg['num_classes'])
net = ssd_net
ssd_net.load_state_dict(torch.load(model_path))

if cuda:
    net = torch.nn.DataParallel(net)
    cudnn.benchmark = True
    net = net.cuda()


optimizer = optim.SGD(net.parameters(), lr=lr, momentum=momentum,
                      weight_decay=weight_decay)
optimizer.load_state_dict(torch.load(optim_path))

criterion = MultiBoxLoss(cfg['num_classes'], 0.5, True, 0, True, 3, 0.5,
                         False, cuda)

net.train()
# loss counters
loc_loss = 0
conf_loss = 0
epoch = 0

train_iterator = iter(train_loader)


message_s=""

  for epoch in range(start_epoch+1, epochs):
      

        loc_loss = 0
        conf_loss = 0
      
    
        net.train(True)  # Set model to training mode
        try:
          images,targets = next(train_iterator)
        except StopIteration:
          train_iterator = iter(train_loader)
          images,targets = next(train_iterator)
      
            
        images = Variable(images.cuda())
        targets = [Variable(ann.cuda(), requires_grad=True) for ann in targets]
        
       
        # forward
        t0 = time.time()
        out = net(images)
        # backprop
        optimizer.zero_grad()
        loss_l, loss_c = criterion(out, targets)
        loss = loss_l + loss_c
        
        
        loss.backward()
        optimizer.step()
          
        t1 = time.time()
        loc_loss += loss_l.item()
        conf_loss += loss_c.item()

        
        message_s +="epoch:"+str(epoch)+' timer: ' +str(t1 - t0)\
        + " loss: "+str(loss.item())+ " loc_loss: "+str(loc_loss) +" conf_loss: "+str(conf_loss)+"\n"

        if epoch %1000==0:
          print(" epoch: "+str(epoch)+" loss:"+ str(loss))

        #by 4000 step save traned model
        if epoch % 4000 == 0:
              print("saveing...")
              torch.save(ssd_net.state_dict(),  f"/content/drive/My Drive/checkpoints/ssd_ckpt_%d.pth" % epoch)
              torch.save(optimizer.state_dict(),  f"/content/drive/My Drive/checkpoints/optim_ckpt_%d.pth" % epoch)
              #save loss log
              the_file = open("/content/drive/My Drive/ssdlog_message_%d.txt"% (epoch),'w')
              the_file.write(message_s)
              the_file.close()
              message_s =""



In [0]:
#eval script
from ssd import *
import os
import torch
from torch.autograd import Variable
import torch.backends.cudnn as cudnn
import torch.nn.init as init
import torch.utils.data as data
import numpy as np
import zipfile
import warnings
warnings.simplefilter("ignore", UserWarning)


imgs_path = './Insight-MVT_Annotation_Train/'
labels_path = '/content/drive/My Drive/ssd_val.csv'
model_path = "/content/drive/My Drive/checkpoints/ssd_ckpt_%d.pth"% 57000


net = build_ssd('test', 300, num_classes=2)            # initialize SSD
net.load_state_dict(torch.load(model_path))
net.eval()

dataset = Transform(imgs_path,labels_path,transform=SSDAugmentation())

train_loader = DataLoader(dataset,batch_size=1,num_workers=0)


iou_threshold=0.5
w = 1280
h = 720
all_detections = []
all_annotations = []
m=0
torch.set_default_tensor_type('torch.cuda.FloatTensor')

for data_ in train_loader:
   
    im = data_[0][0]
    gt= data_[1][0]
    gt[:,0] *= w
    gt[:,2] *= w
    gt[:,1] *= h
    gt[:,3] *= h
    all_annotations.append(gt)

    x = Variable(im.unsqueeze(0))
    x = x.cuda()
    x = x.float()
    detections = net(x)[0][1] #just veehicle class

    
    mask = detections[:, 0].gt(0.).expand(5, detections.size(0)).t()
    detections = torch.masked_select(detections, mask).view(-1, 5)
    detections[:,1] *= w
    detections[:,3] *= w
    detections[:,2] *= h
    detections[:,4] *= h
    all_detections.append(detections)
    

def _compute_ap(recall, precision):
    """ Compute the average precision, given the recall and precision curves.
    Code originally from https://github.com/rbgirshick/py-faster-rcnn.
    # Arguments
        recall:    The recall curve (list).
        precision: The precision curve (list).
    # Returns
        The average precision as computed in py-faster-rcnn.
    """
    # correct AP calculation
    # first append sentinel values at the end
    mrec = np.concatenate(([0.], recall, [1.]))
    mpre = np.concatenate(([0.], precision, [0.]))

    # compute the precision envelope
    for i in range(mpre.size - 1, 0, -1):
        mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i])

    # to calculate area under PR curve, look for points
    # where X axis (recall) changes value
    i = np.where(mrec[1:] != mrec[:-1])[0]

    # and sum (\Delta recall) * prec
    ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])
    return ap


false_positives = np.zeros((0,))
true_positives  = np.zeros((0,))
scores          = np.zeros((0,))
num_annotations = 0.0
print("dataset len: ",len(dataset))
for i in range(len(dataset)):
    detections = all_detections[i]
    annotations = all_annotations[i]
    num_annotations  += annotations.shape[0]
    detected_annotations = []
 
   
    for d in detections:
        scores = np.append(scores, d[0].cpu().detach())

        if annotations.shape[0] == 0:
            false_positives = np.append(false_positives, 1)
            true_positives  = np.append(true_positives, 0)
            continue
        d = d.unsqueeze(0)
        overlaps = jaccard(d[:,1:].float(), annotations[:,:4].float())
        overlaps = overl+aps.cpu().detach().numpy()
        assigned_annotation = np.argmax(overlaps, axis=1)
        max_overlap         = overlaps[0, assigned_annotation]
        
        if max_overlap >= iou_threshold and assigned_annotation not in detected_annotations:
          false_positives = np.append(false_positives, 0)
          true_positives  = np.append(true_positives, 1)
          detected_annotations.append(assigned_annotation)
        else:
          false_positives = np.append(false_positives, 1)
          true_positives  = np.append(true_positives, 0)

if num_annotations == 0:
  average_precisions[label] = 0, 0

# sort by score
indices = np.argsort(-scores)
false_positives = false_positives[indices]
true_positives  = true_positives[indices]

# compute false positives and true positives
false_positives = np.cumsum(false_positives)
true_positives  = np.cumsum(true_positives)

# compute recall and precision
recall    = true_positives / num_annotations
precision = true_positives / np.maximum(true_positives + false_positives, np.finfo(np.float64).eps)

average_precision  = _compute_ap(recall, precision)
r = recall[-1]
p = precision[-1]
f1 = 2*r*p/(r+p+1e-6)
print("recall: ",r)
print("precision: ", p)
print("average_precision: ",average_precision)
print("f1_score: ", f1)




In [0]:
#detector script

from google.colab.patches import cv2_imshow
import os
import torch
from ssd import *
import numpy as np 
import cv2
import time

torch.set_default_tensor_type('torch.cuda.FloatTensor')
def base_transform(image, size, mean):
    x = cv2.resize(image, (size, size)).astype(np.float32)
    x -= mean
    x = x.astype(np.float32)
    return x

class BaseTransform:
    def __init__(self, size, mean):
        self.size = size
        self.mean = np.array(mean, dtype=np.float32)

    def __call__(self, image, boxes=None, labels=None):
        return base_transform(image, self.size, self.mean), boxes, labels

model_path = "/content/drive/My Drive/checkpoints/ssd_ckpt_%d.pth"% 57000
net = build_ssd('test', 300, 2)    # initialize SSD
net.load_state_dict(torch.load(model_path))

COLORS = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
FONT = cv2.FONT_HERSHEY_SIMPLEX
transform = BaseTransform(net.size, (104/256.0, 117/256.0, 123/256.0))


#path of test image
frame = cv2.imread('./Insight-MVT_Annotation_Train/MVI_20061/img00300.jpg')
height, width = frame.shape[:2]
x = torch.from_numpy(transform(frame)[0]).permute(2, 0, 1)
x = Variable(x.unsqueeze(0))
start_time = time.time()
y = net(x.cuda())  # forward pass
end_time = time.time()

print("diff time: ",start_time - end_time)
detections = y.data
# scale each detection back up to the image
scale = torch.Tensor([width, height, width, height])
for i in range(detections.size(1)):
    j = 0
    while detections[0, i, j, 0] >= 0.5:
        #print(detections[0, i, j, :])
        pt = (detections[0, i, j, 1:] * scale).cpu().numpy()
        #print(pt)
        cv2.rectangle(frame,
                      (int(pt[0]), int(pt[1])),
                      (int(pt[2]), int(pt[3])),
                      COLORS[i % 3], 2)
        cv2.putText(frame, "vehicle", (int(pt[0]), int(pt[1])),
                    FONT, 0.5, (255, 255, 255), 2, cv2.LINE_AA)
        j += 1

cv2_imshow(frame)



In [0]:

from sort import *
########  tracking video ###########################
%matplotlib inline
from google.colab.patches import cv2_imshow
import numpy as np

from ssd import *
import os
import time
import torch
import cv2
import os
import warnings

warnings.simplefilter("ignore", UserWarning)
torch.set_default_tensor_type('torch.cuda.FloatTensor')
VIDEO_SAVE_DIR="traffic_result"
if(os.path.exists(VIDEO_SAVE_DIR)):
  !rm -r traffic_result
  os.mkdir(VIDEO_SAVE_DIR)
else:
  os.mkdir(VIDEO_SAVE_DIR)

def detection(frame):
  height, width = frame.shape[:2]
  x = torch.from_numpy(transform(frame)[0]).permute(2, 0, 1)
  x = Variable(x.unsqueeze(0))
  x = x.cuda()
  y = net(x)  # forward pass
  detections = y.data
  final_detections = []
  # scale each detection back up to the image
  scale = torch.Tensor([width, height, width, height])
  for i in range(detections.size(1)):
      j = 0
      while detections[0, i, j, 0] >= 0.5:
          j += 1
          final_detections.append(detections[0, i, j, :])

  result = []

  for d in final_detections:
    result.append(d.cpu().detach().numpy())

  result = np.asarray(result)  
  if(result.size != 0):
    result = np.roll(result,-1,axis=1) 
  return result

def base_transform(image, size, mean):
    x = cv2.resize(image, (size, size)).astype(np.float32)
    x -= mean
    x = x.astype(np.float32)
    return x

class BaseTransform:
    def __init__(self, size, mean):
        self.size = size
        self.mean = np.array(mean, dtype=np.float32)

    def __call__(self, image, boxes=None, labels=None):
        return base_transform(image, self.size, self.mean), boxes, labels


COLORS = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
FONT = cv2.FONT_HERSHEY_SIMPLEX
start_epoch=57000
model_path = "/content/drive/My Drive/checkpoints/ssd_ckpt_%d.pth"% start_epoch
net = build_ssd('test', 300, 2)    # initialize SSD
net.load_state_dict(torch.load(model_path))
transform = BaseTransform(net.size, (104/256.0, 117/256.0, 123/256.0))

#capture = cv2.VideoCapture("/content/drive/My Drive/3.mp4")

frames = []
frame_count=0
mot_tracker = Sort()
colours = np.random.rand(32,3)*255

frame_path = []
for name in os.listdir("/content/Insight-MVT_Annotation_Train/MVI_40131"):
  frame_path.append("/content/Insight-MVT_Annotation_Train/MVI_40131/"+name)

frame_path = sorted(frame_path)
for path in frame_path:
    #ret, frame = capture.read()
    # if not ret:
    #     break  
    frame = cv2.imread(path) 
    frame_count += 1

    if(frame_count % 2== 0):
      

      height, width = frame.shape[:2]
      detections = detection(frame)
      scale = np.array([width,height,width,height])
      
      if detections.size != 0:
        detections = (detections[:,:4]* scale)
        #trackers_objects  = mot_tracker.update(detections)
        trackers = mot_tracker.update(detections)
        
        for d in trackers:
          try:
              xmin=int(d[2])
              ymin=int(d[3])
              xmax=int(d[0])
              ymax=int(d[1])
              label=int(d[4])
              cv2.rectangle(frame,(xmin,ymin),(xmax,ymax),(255,0,0),2)
              cv2.putText(frame, str(label), (int(xmin), int(ymin)),
                      cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 4, cv2.LINE_AA)
          except:
            print("error")            
      
        name = '{}.jpg'.format(frame_count)
        name = os.path.join(VIDEO_SAVE_DIR, name)
        cv2.imwrite(name, frame)
    if(frame_count % 100==0):
      print(frame_count)
      

#convert tracked image to video

#-----------------CONVERT IMAGE TO VIDEO--------------------
import glob
import os
import cv2
# video = cv2.VideoCapture("/content/drive/My Drive/3.mp4");

# # Find OpenCV version
# (major_ver, minor_ver, subminor_ver) = (cv2.__version__).split('.')

# if int(major_ver)  < 3 :
#     fps = video.get(cv2.cv.CV_CAP_PROP_FPS)
#     print("Frames per second using video.get(cv2.cv.CV_CAP_PROP_FPS): {0}".format(fps))
# else :
#     fps = video.get(cv2.CAP_PROP_FPS)
#     print("Frames per second using video.get(cv2.CAP_PROP_FPS) : {0}".format(fps))

#video.release();
fps=25
def make_video(outvid, images=None, fps=30, size=None,
               is_color=True, format="FMP4"):
  
    from cv2 import VideoWriter, VideoWriter_fourcc, imread, resize
    fourcc = VideoWriter_fourcc(*"FMP4")
    vid = None
    for image in images:
        #print(image)
        if not os.path.exists(image):
            raise FileNotFoundError(image)
        img = imread(image)
        if vid is None:
            if size is None:
                size = img.shape[1], img.shape[0]
            vid = VideoWriter(outvid, fourcc, float(fps), size, is_color)
        if size[0] != img.shape[1] and size[1] != img.shape[0]:
            img = resize(img, size)
        vid.write(img)
    vid.release()
    return vid



images = list(glob.iglob(os.path.join("traffic_result", '*.*')))
# Sort the images by name index.
images = sorted(images, key=lambda x: float(os.path.split(x)[1][:-3]))  

outvid =  "/content/drive/My Drive/ssd_out.mp4"
make_video(outvid, images, fps)

In [0]:
import zipfile
zip_ref = zipfile.ZipFile("/content/drive/My Drive/img_tracking2.zip", 'r')
zip_ref.extractall()
zip_ref.close()

In [0]:
### calculate accuracy for tracking ---------------


%matplotlib inline
from google.colab.patches import cv2_imshow
import numpy as np

from ssd import *
import os
import time
import torch
import cv2
import os
import warnings
from sort import *
warnings.simplefilter("ignore", UserWarning)
torch.set_default_tensor_type('torch.cuda.FloatTensor')
VIDEO_SAVE_DIR="traffic_result"
if(os.path.exists(VIDEO_SAVE_DIR)):
  !rm -r traffic_result
  os.mkdir(VIDEO_SAVE_DIR)
else:
  os.mkdir(VIDEO_SAVE_DIR)

def detection(frame):
  height, width = frame.shape[:2]
  x = torch.from_numpy(transform(frame)[0]).permute(2, 0, 1)
  x = Variable(x.unsqueeze(0))
  x = x.cuda()
  y = net(x)  # forward pass
  detections = y.data
  final_detections = []
  # scale each detection back up to the image
  scale = torch.Tensor([width, height, width, height])
  for i in range(detections.size(1)):
      j = 0
      while detections[0, i, j, 0] >= 0.5:
          j += 1
          final_detections.append(detections[0, i, j, :])

  result = []

  for d in final_detections:
    result.append(d.cpu().detach().numpy())

  result = np.asarray(result)  
  if(result.size != 0):
    result = np.roll(result,-1,axis=1) 
  return result

def base_transform(image, size, mean):
    x = cv2.resize(image, (size, size)).astype(np.float32)
    x -= mean
    x = x.astype(np.float32)
    return x

class BaseTransform:
    def __init__(self, size, mean):
        self.size = size
        self.mean = np.array(mean, dtype=np.float32)

    def __call__(self, image, boxes=None, labels=None):
        return base_transform(image, self.size, self.mean), boxes, labels


COLORS = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
FONT = cv2.FONT_HERSHEY_SIMPLEX
start_epoch=57000
model_path = "/content/drive/My Drive/checkpoints/ssd_ckpt_%d.pth"% start_epoch
net = build_ssd('test', 300, 2)    # initialize SSD
net.load_state_dict(torch.load(model_path))
transform = BaseTransform(net.size, (104/256.0, 117/256.0, 123/256.0))



import os
frames = []
frame_count=0
mot_tracker = Sort()
colours = np.random.rand(32,3)*255

the_file = open('test_tracking.txt')
list_of_test_img = []

for line in the_file:
  list_of_test_img.append(line.strip())


list_of_test_img = sorted(list_of_test_img)

all_ = []
path = "/content/Insight-MVT_Annotation_Train/"


for id,filename in enumerate(list_of_test_img):

    frame = cv2.imread(path+"/"+str(filename),1)
    #print(frame.shape)
    # Save each frame of the video to a list
    frame_count += 1
    height, width = frame.shape[:2]
    #print(height, width )
    detections = detection(frame)
    scale = np.array([width,height,width,height])
    if frame_count % 100 == 0:
      print(frame_count)

    if detections.size != 0:
      
      detections = (detections[:,:4]* scale)
      #trackers_objects  = mot_tracker.update(detections)
      trackers = mot_tracker.update(detections)
      #print(trackers)
      for d in trackers:
        try:
            xmin=int(d[0])
            ymin=int(d[1])
            xmax=int(d[2])
            ymax=int(d[3])
            label=int(d[4])
            cv2.rectangle(frame,(xmin,ymin),(xmax,ymax),(255,0,0),2)
            cv2.putText(frame, str(label), (int(xmin), int(ymin)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 4, cv2.LINE_AA)
            

            all_.append([filename,label,xmin,ymin,xmax,ymax])
        except:
          print("error")            

import pandas as pd

df = pd.DataFrame(all_, columns=["name","label","x1","y1",'x2','y2'])
df.to_csv('ssd_track_out.csv', index=False) 
