In [None]:
from IPython.display import display, clear_output
from PIL import Image
import dv_processing as dv
import time
import numpy as np

recording = dv.io.MonoCameraRecording("/home/rokas/Downloads/logo_slow_2.aedat4")

def run_frames():
    frame = recording.getNextFrame()

    while frame is not None:
        display(Image.fromarray(frame.image))
        frame = recording.getNextFrame()
        time.sleep(0.010)
        clear_output(wait=True)

run_frames()
recording.resetSequentialRead()

In [None]:
def run_events():
    acc = dv.Accumulator((346, 260))
    
    def preview_accumulated(events):    
        acc.accept(events)
        frame = acc.generateFrame()
        display(Image.fromarray((frame.image * 3 * 255).astype(np.uint8)))
        time.sleep(0.005)
        clear_output(wait=True)
    
    eventStreamSlicer = dv.EventStreamSlicer()
    eventStreamSlicer.doEveryTimeInterval(33000, preview_accumulated)
    
    events = recording.getNextEventBatch()
    while events is not None:
        eventStreamSlicer.accept(events)
        events = recording.getNextEventBatch()
        
run_events()
recording.resetSequentialRead()

In [None]:
def run_pix_events():
    acc = dv.EdgeMapAccumulator((346, 260), 0.25)
    
    def preview_accumulated(events):    
        acc.accept(events)
        frame = acc.generateFrame()
        display(Image.fromarray(frame.image))
        time.sleep(0.005)
        clear_output(wait=True)
    
    eventStreamSlicer = dv.EventStreamSlicer()
    eventStreamSlicer.doEveryTimeInterval(33000, preview_accumulated)
    
    events = recording.getNextEventBatch()
    while events is not None:
        eventStreamSlicer.accept(events)
        events = recording.getNextEventBatch()
        
run_pix_events()
recording.resetSequentialRead()

In [None]:
def run_tracking():
    recording = dv.io.MonoCameraRecording("/home/rokas/Downloads/logo_slow_2.aedat4")
    acc = dv.EdgeMapAccumulator((346, 260), 0.25)
    config = dv.features.LucasKanadeConfig()
    tracker = dv.features.EventFeatureLKTracker.RegularTracker((346, 260), config)
    tracks = dv.features.FeatureTracks()
    
    def preview_accumulated(events):    
        tracker.accept(events)
        result = tracker.runTracking()
        if result is not None:
            tracks.accept(result)
            image = tracker.getAccumulatedFrame()
            display(Image.fromarray(tracks.visualize(image)))
            time.sleep(0.005)
            clear_output(wait=True)
            
    eventStreamSlicer = dv.EventStreamSlicer()
    eventStreamSlicer.doEveryTimeInterval(5000, preview_accumulated)
    
    events = recording.getNextEventBatch()
    while events is not None:
        eventStreamSlicer.accept(events)
        events = recording.getNextEventBatch()
        
run_tracking()

In [None]:
import torch
import pandas
from torch import hub
import cv2

model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

def plot_boxes(results, frame):
    labels, cord = results
    n = len(labels)
    x_shape, y_shape = frame.shape[1], frame.shape[0]
    for i in range(n):
        row = cord[i]
        # If score is less than 0.2 we avoid making a prediction.
        if row[4] < 0.01: 
            continue
        x1 = int(row[0]*x_shape)
        y1 = int(row[1]*y_shape)
        x2 = int(row[2]*x_shape)
        y2 = int(row[3]*y_shape)
        bgr = (0, 255, 0) # color of the box
        classes = model.names # Get the name of label index
        label_font = cv2.FONT_HERSHEY_SIMPLEX #Font for the label.
        cv2.rectangle(frame, (x1, y1), (x2, y2),  bgr, 2) #Plot the boxes
        # cv2.putText(frame,classes[labels[i]], (x1, y1), label_font, 0.9, bgr, 2) #Put a label over box.
        return frame

def score_frame(frame):
    results = model(frame)
    labels = results.xyxyn[0][:, -1].cpu().numpy()
    cord = results.xyxyn[0][:, :-1].cpu().numpy()
    return labels, cord

def run_detection():
    acc = dv.Accumulator((346, 260))
    
    def preview_accumulated(events):    
        acc.accept(events)
        frame = acc.generateFrame()
        image = cv2.cvtColor((frame.image * 3 * 255).astype(np.uint8), cv2.COLOR_GRAY2BGR)
        detection = score_frame(image)
        plot_boxes(detection, image)
        display(Image.fromarray(image))
        clear_output(wait=True)
    
    eventStreamSlicer = dv.EventStreamSlicer()
    eventStreamSlicer.doEveryTimeInterval(33000, preview_accumulated)
    
    events = recording.getNextEventBatch()
    while events is not None:
        eventStreamSlicer.accept(events)
        events = recording.getNextEventBatch()

run_detection()
