In [None]:
!pip install aperturedb tqdm
from aperturedb import Utils
c = Utils.create_connector()

In [None]:
u = Utils.Utils(c)
u.summary()

In [None]:
# Now we retrieve the items we are working with:
# Retrieve the YOLO4 interface
!rm -f yolo4.py
!wget https://raw.githubusercontent.com/drewaogle/YOLOv4-OpenCV-CUDA-DNN/refs/heads/main/yolo4.py
# Retreive video
!wget https://aperturedata-public.s3.us-west-2.amazonaws.com/aperturedb_applications/norman.mp4


In [None]:

from importlib import reload
import yolo4
reload(yolo4)
from yolo4 import RemoteYOLOv4
class DetectorOptions:
    image='' # path for images
    stream='' # path for stream
    cfg="models/yolov4.cfg" # path to config
    weights="models/yolov4.weights" # path to weights
    namesfile="models/coco.names" # path for output to name mapping
    input_size=416
    use_gpu=False # use GPU or not
    outdir="output/norman"
    no_squash_detections=True # if detections exist, don't rerun.
    def __init__(self, image='',stream=''):
        self.image = image
        self.stream=stream # 'webcam' to open webcam w/ OpenCV

# now we pull data
dopts = DetectorOptions( stream="norman.mp4")
yolo = RemoteYOLOv4.__new__(RemoteYOLOv4)
yolo.__init__(dopts)


In [None]:
#Now let't check detections
import pandas as pd
df = pd.read_csv("output/norman/detections.csv")
print(df)

In [None]:
# Fist we'll define the options we're going to use.
class ClipOptions:
    offset_frame=0 # starting offset in frames
    end_frame=-1 # ending offset in frames
    initconf=50 # minimun confidence to start ( 0-100 )
    initlen=5 # minimum detection duration in frames to start a clip
    dropconf=25 # confidence to end a frame (0 -100 )
    droplen=5 # number of detection missed frames to end a clip
    detections="output/noman/detections.csv" # path to output detections
    verbose=False # lots of info
    flush=False # remove old uuids
    nosave=False # dont add data to db
    label="" # label for video
    def __init__(self,video):
        self.video=video # video file to add

opts = ClipOptions( "norman.mp4" )
opts.label="Norman_Bike"
opts.initconf=45
opts.initlen=3
opts.dropconf=20
opts.droplen=3

In [None]:
# function to prepare dataframe for work; add columns and trim frames we don't want.
def preprocess(df, args ):
   processed = df
   processed.columns = ["frame","label","confidence","left","top","width","height" ]
   processed.drop(processed[processed.frame < args.offset_frame].index, inplace=True)
   if args.end_frame > -1:
      processed.drop(processed[processed.frame > args.end_frame].index,inplace=True)
   return processed

norman_detects = preprocess( df, opts )
print(norman_detects)


In [None]:
# process a frame by hand here.
from IPython.display import display as ds
import cv2
from PIL import Image

def display_image_and_bb( num, df ):
                    
    cv_image = cv2.imread( f"output/norman/video{num}.jpg")

    # Draw a rectangle around the faces
    counter = 0
    for id,coords in df[df["frame"] == num].iterrows():
        left   = coords["left"]
        top    = coords["top"]
        right  = coords["left"] + coords["width"]
        bottom = coords["top"] + coords["height"]
        cv2.rectangle(cv_image, (left, top), (right, bottom), (0, 255, 0), 2)
        y = top - 15 if top - 15 > 15 else top + 15
        cv2.putText(cv_image, coords["label"], (left, y),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 0), 2)
        counter += 1

    cv_image_rgb = cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB)
    ds(Image.fromarray(cv_image_rgb))



In [None]:
display_image_and_bb( 150, norman_detects)

In [None]:
# simple Clip class for data storage
class Clip:
    def __init__(self, label, start,conf):
        self.label = label
        self.start_frame = start
        self.total_frames = 0 # don't include start in total.
        self.missed_frames = 0
        self.max_confidence = conf
        self.min_confidence = conf
    def is_active( self, current_frame, drop_len ):
        last_seen = self.start_frame + self.total_frames
        # drop_len is the number of frame that can be missed and label is "active"
        # drop_len of 1 means active continues if unseen in previous frame.
        return last_seen + drop_len > current_frame
    def __str__(self):
        return f"(Clip [{self.label}] @ {self.start_frame} + {self.total_frames}"
    def __repr__(self):
        return f"C[{self.label} @ {self.start_frame} + {self.total_frames}]"
    def as_finished(self):
        return f"{self.label}_{self.start_frame+self.total_frames}"
    def add_confidence(self,new_confidence):
        self.max_confidence = max(self.max_confidence,new_confidence)
        self.min_confidence = min(self.min_confidence,new_confidence)
        self.total_frames = self.total_frames + 1 + self.missed_frames
        # when a frame is a hit, we add the missed frames to what is considered the total length.
        self.missed_frames = 0 
    # frames where confidence was below threshold but kept to avoid drop out.
    def add_missed(self,missed_confidence):
        self.missed_frames = self.missed_frames + 1

class ClipStorage:
    def __init__(self):
        self.active = {} # clips that have been seen, but not passed initializition count ( suppressed mis-identification )
        self.registered = {} # clips that are 'valid', and currently "seent"
        self.finished = {} # clips that were valid, but dropped off.

In [None]:
# process events which trigger on new frame.
def process_new_frame( verbose, drop_len, cur_frame, last_frame, storage): 
           # drop any which werent active last frame
           new_active = {}
           new_registered = {}
           for clip in storage.active.values():
               if not clip.is_active( cur_frame, drop_len ):
                   if verbose:
                       print(f'At frame {cur_frame}, Dropped {clip}') #{old_label} ( last active {last_active} )')                   
               else:   
                   if verbose:
                       print(f"At frame {cur_frame}, kept {clip}")
                   new_active[clip.label] = clip
           for clip in storage.registered.values():
               if not clip.is_active( cur_frame, drop_len ): #cur_frame != last_active +1:
                   if verbose:
                       print(f'At frame {cur_frame}, Retired {clip}') #{old_label} frame duration: {total_active}, started {start_frame}')
                   storage.finished[ clip.as_finished() ] = clip
               else:
                   new_registered[clip.label] = clip
           if verbose:
               print(f"Active dict: {storage.active}")
           storage.registered = new_registered
           storage.active = new_active   

In [None]:
# process a row in the detections
# YOLOv4 can detect mutitple objects in a frame - this is a single detection in a given frame.
def process_row(verbose, initconf, initlen, dropconf, cur_frame, label, label_confidence, storage):
       if label in storage.active.keys():
           clip = storage.active[label]
           if label_confidence * 100 > initconf:
               clip.add_confidence(label_confidence)
               # total frames doesn't include first frame, so add 1.
               if clip.total_frames +1 >= initlen: 
                   if verbose:
                       print(f"At {cur_frame}, moved {clip} to registered")
                   storage.registered[label] = clip
                   del storage.active[label]
               else:
                   if verbose:
                       print(f"At frame {cur_frame}, saw {clip}")
           else:   
               if verbose:
                   print(f"{clip} seen at frame {cur_frame}, but confidence [ {label_confidence*100} < { initconf }]" )
       elif label in storage.registered.keys():
           clip = storage.registered[label]
           # if above confidence for dropping, consider a new registered frame
           if label_confidence * 100 > dropconf:
                # allows frame to miss one and restart; duration calculated from start to current.
                clip.add_confidence(label_confidence)
           else:
               clip.add_missed(label_confidence)
       else:    
           # if label not in active list, nor registered.
           if label_confidence * 100 > initconf:
               clip = Clip( label, cur_frame, label_confidence )
               if verbose: 
                   print(f"* Added {clip} to actived")
               storage.active[label] = clip
        

In [None]:
# main loop over a frame.
def process(args,pf):
    args.verbose = True
    clip_store = ClipStorage()
    last_frame =0
    cur_frame = 0
    for idx,row in pf.iterrows():
        cur_frame = row['frame']
        label = row['label']
        if cur_frame > 155:
            break
        if cur_frame != last_frame:
           if args.verbose:
               print(f"Processing switch from {last_frame} to {cur_frame}")
           process_new_frame(args.verbose,args.droplen, cur_frame, last_frame,clip_store)

        # all old active and registered are dropped prior to this.
        process_row(args.verbose, args.initconf, args.initlen, args.dropconf,cur_frame,label,row['confidence'],clip_store)

        last_frame = cur_frame
    return registered,finished
       

    

In [None]:
norman_registered,norman_finished = process(opts,norman_detects)
print(norman_registered)
print(norman_finished)