## 1. Read data
### 1.1 Checking available streams

In [2]:
import dv_processing as dv

# Store the file path
path_to_file = "../data/robotics.aedat4"

# Open a file
reader = dv.io.MonoCameraRecording(path_to_file)

# Print file path and camera name
print(f"Checking available streams in [{path_to_file}] for camera name [{reader.getCameraName()}]:")

# Check if event stream is available
if reader.isEventStreamAvailable():
    # # Check the resolution of event stream
    resolution = reader.getEventResolution()

    # # Print that the stream is present and its resolution
    # print(f"  * Event stream with resolution [{resolution.width}x{resolution.height}]")
    # Access the width and height from the tuple
    
    width, height = resolution
    # Print that the stream is present and its resolution
    print(f"  * Event stream with resolution [{width}x{height}]")

# Check if frame stream is available
if reader.isFrameStreamAvailable():
    # Check the resolution of frame stream
    resolution = reader.getFrameResolution()

    # Print that the stream is available and its resolution
    print(f"  * Frame stream with resolution [{resolution.width}x{resolution.height}]")

# Check if IMU stream is available
if reader.isImuStreamAvailable():
    # Print that the IMU stream is available
    print("  * IMU stream")

# Check if trigger stream is available
if reader.isTriggerStreamAvailable():
    # Print that the trigger stream is available
    print("  * Trigger stream")

Checking available streams in [../data/robotics.aedat4] for camera name [DVXplorer_DXM00071]:
  * Event stream with resolution [640x480]
  * IMU stream
  * Trigger stream


### 1.2 Read different type of data

**Read events from a file**

In [3]:
import dv_processing as dv

# Open any camera
reader = dv.io.MonoCameraRecording("../data/robotics.aedat4")

# Run the loop while camera is still connected
while reader.isRunning():
    # Read batch of events
    events = reader.getNextEventBatch()
    if events is not None:
        # Print received packet time range
        print(f"{events}")

EventStore containing 21759 events within 9808µs duration; time range within [1724848695373335; 1724848695383143]
EventStore containing 16733 events within 9808µs duration; time range within [1724848695383343; 1724848695393151]
EventStore containing 31444 events within 9808µs duration; time range within [1724848695393351; 1724848695403159]
EventStore containing 27270 events within 9807µs duration; time range within [1724848695403360; 1724848695413167]
EventStore containing 30727 events within 9808µs duration; time range within [1724848695413367; 1724848695423175]
EventStore containing 34700 events within 9808µs duration; time range within [1724848695423375; 1724848695433183]
EventStore containing 55218 events within 9810µs duration; time range within [1724848695433383; 1724848695443193]
EventStore containing 30233 events within 9808µs duration; time range within [1724848695443393; 1724848695453201]
EventStore containing 30141 events within 9808µs duration; time range within [1724848695

**Read IMU data**

In [4]:
import dv_processing as dv

# Open a file
reader = dv.io.MonoCameraRecording("../data/robotics.aedat4")

# Run the loop while stream contains data
while reader.isRunning():
    # Read a batch of IMU data from the camera
    imu_batch = reader.getNextImuBatch()
    if imu_batch is not None and len(imu_batch) > 0:
        # Print the info of the imu data
        print(f"Received {len(imu_batch)} IMU measurements")

Received 7 IMU measurements
Received 8 IMU measurements
Received 9 IMU measurements
Received 7 IMU measurements
Received 9 IMU measurements
Received 7 IMU measurements
Received 9 IMU measurements
Received 7 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 7 IMU measurements
Received 9 IMU measurements
Received 7 IMU measurements
Received 8 IMU measurements
Received 9 IMU measurements
Received 8 IMU measurements
Received 7 IMU measurements
Received 9 IMU measurements
Received 7 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 7 IMU measurements
Received 9 IMU measurements
Received 8 IMU measurements
Received 8 IMU measurements
Received 8 IMU measu

**Read triggers data**

In [6]:
import dv_processing as dv

# Open a file
reader = dv.io.MonoCameraRecording("../data/robotics.aedat4")

# Run the loop while camera is still connected
while reader.isRunning():
    # Read a a batch of triggers from the camera
    triggers = reader.getNextTriggerBatch()

    # Check whether batch is valid and contains data
    if triggers is not None and len(triggers) > 0:
        # Print the trigger batch information
        print(f"Received {len(triggers)} triggers")

In [7]:
## Try to use time surface for visualization
import dv_processing as dv
import cv2 as cv
from datetime import timedelta

# Open any camera
reader = dv.io.MonoCameraRecording("../data/robotics.aedat4")
surface = dv.TimeSurface(reader.getEventResolution())

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()


# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    # Pass the events to update the time surface
    surface.accept(events)

    # Generate a preview frame
    frame = surface.generateFrame()

    # Show the accumulated image
    cv.imshow("Preview", frame.image)
    cv.waitKey(2)


# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while reader.isRunning():
    # Receive events
    events = reader.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

objc[90109]: Class CaptureDelegate is implemented in both /Users/gg/Library/Python/3.9/lib/python/site-packages/dv_processing.dylibs/libopencv_videoio.4.8.0.dylib (0x10dd90880) and /Users/gg/Library/Python/3.9/lib/python/site-packages/cv2/cv2.abi3.so (0x35c66e5d8). One of the two will be used. Which one is undefined.
objc[90109]: Class CVWindow is implemented in both /Users/gg/Library/Python/3.9/lib/python/site-packages/dv_processing.dylibs/libopencv_highgui.4.8.0.dylib (0x10d410b38) and /Users/gg/Library/Python/3.9/lib/python/site-packages/cv2/cv2.abi3.so (0x35c66e628). One of the two will be used. Which one is undefined.
objc[90109]: Class CVView is implemented in both /Users/gg/Library/Python/3.9/lib/python/site-packages/dv_processing.dylibs/libopencv_highgui.4.8.0.dylib (0x10d410b60) and /Users/gg/Library/Python/3.9/lib/python/site-packages/cv2/cv2.abi3.so (0x35c66e650). One of the two will be used. Which one is undefined.
objc[90109]: Class CVSlider is implemented in both /Users/g

KeyboardInterrupt: 

In [1]:
# speed invariant time surface
## Try to use time surface for visualization
import dv_processing as dv
import cv2 as cv
from datetime import timedelta

# Open any camera
reader = dv.io.MonoCameraRecording("../data/robotics.aedat4")
surface = dv.SpeedInvariantTimeSurface(reader.getEventResolution())

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()


# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    # Pass the events to update the time surface
    surface.accept(events)

    # Generate a preview frame
    frame = surface.generateFrame()

    # Show the accumulated image
    cv.imshow("Preview", frame.image)
    cv.waitKey(2)


# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while reader.isRunning():
    # Receive events
    events = reader.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

objc[90669]: Class CaptureDelegate is implemented in both /Users/gg/Library/Python/3.9/lib/python/site-packages/dv_processing.dylibs/libopencv_videoio.4.8.0.dylib (0x112568880) and /Users/gg/Library/Python/3.9/lib/python/site-packages/cv2/cv2.abi3.so (0x3455d65d8). One of the two will be used. Which one is undefined.
objc[90669]: Class CVWindow is implemented in both /Users/gg/Library/Python/3.9/lib/python/site-packages/dv_processing.dylibs/libopencv_highgui.4.8.0.dylib (0x1119b8b38) and /Users/gg/Library/Python/3.9/lib/python/site-packages/cv2/cv2.abi3.so (0x3455d6628). One of the two will be used. Which one is undefined.
objc[90669]: Class CVView is implemented in both /Users/gg/Library/Python/3.9/lib/python/site-packages/dv_processing.dylibs/libopencv_highgui.4.8.0.dylib (0x1119b8b60) and /Users/gg/Library/Python/3.9/lib/python/site-packages/cv2/cv2.abi3.so (0x3455d6650). One of the two will be used. Which one is undefined.
objc[90669]: Class CVSlider is implemented in both /Users/g

KeyboardInterrupt: 

In [2]:
import dv_processing as dv
import cv2 as cv
from datetime import timedelta

# Open any camera
reader = dv.io.MonoCameraRecording("../data/robotics.aedat4")
surface = dv.TimeSurface(reader.getEventResolution())

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()

# Initialize VideoWriter
frame_width, frame_height = reader.getEventResolution()
fourcc = cv.VideoWriter_fourcc(*'MJPG')  # Try different codecs if needed
out = cv.VideoWriter('output.avi', fourcc, 30.0, (frame_width, frame_height), isColor=True)

# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    # Pass the events to update the time surface
    surface.accept(events)

    # Generate a preview frame
    frame = surface.generateFrame()

    # Convert the frame to BGR if it is not already
    bgr_frame = cv.cvtColor(frame.image, cv.COLOR_GRAY2BGR)

    # Show the accumulated image
    cv.imshow("Preview", bgr_frame)

    # Write the frame to the video file
    out.write(bgr_frame)

    # Wait for a short period to allow for window events
    if cv.waitKey(33) & 0xFF == ord('q'):  # Press 'q' to exit
        return

# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while reader.isRunning():
    # Receive events
    events = reader.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

# Release the VideoWriter object and destroy all windows
out.release()
cv.destroyAllWindows()

In [6]:
# accumulating events

import dv_processing as dv
import cv2 as cv
from datetime import timedelta

# Open any camera
reader = dv.io.MonoCameraRecording("../data/robotics.aedat4")

# Initialize an accumulator with some resolution
accumulator = dv.Accumulator(reader.getEventResolution())

# Apply configuration, these values can be modified to taste
accumulator.setMinPotential(0.0)
accumulator.setMaxPotential(1.0)
accumulator.setNeutralPotential(0.5)
accumulator.setEventContribution(0.15)
accumulator.setDecayFunction(dv.Accumulator.Decay.EXPONENTIAL)
accumulator.setDecayParam(1e+6)
accumulator.setIgnorePolarity(False)
accumulator.setSynchronousDecay(False)

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()

# Initialize VideoWriter
frame_width, frame_height = reader.getEventResolution()
fourcc = cv.VideoWriter_fourcc(*'MJPG')  # Try different codecs if needed
out = cv.VideoWriter('output.avi', fourcc, 30.0, (frame_width, frame_height), isColor=True)

# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    # Pass the events to update the time surface
    accumulator.accept(events)

    # Generate a preview frame
    frame = accumulator.generateFrame()

    # Convert the frame to BGR if it is not already
    bgr_frame = cv.cvtColor(frame.image, cv.COLOR_GRAY2BGR)

    # Show the accumulated image
    cv.imshow("Preview", bgr_frame)

    # Write the frame to the video file
    out.write(bgr_frame)

    # Wait for a short period to allow for window events
    if cv.waitKey(33) & 0xFF == ord('q'):  # Press 'q' to exit
        return

# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while reader.isRunning():
    # Receive events
    events = reader.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

# Release the VideoWriter object and destroy all windows
out.release()
cv.destroyAllWindows()

: 

In [9]:
# Edge acummulating events
# Events occur without considering polarity

import dv_processing as dv
import cv2 as cv
from datetime import timedelta

# Open any camera
reader = dv.io.MonoCameraRecording("../data/robotics.aedat4")

# Initialize an accumulator with some resolution
visualizer = dv.visualization.EventVisualizer(reader.getEventResolution())

# Apply color scheme configuration, these values can be modified to taste
visualizer.setBackgroundColor(dv.visualization.colors.white())
visualizer.setPositiveColor(dv.visualization.colors.iniBlue())
visualizer.setNegativeColor(dv.visualization.colors.iniBlue())

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()

# Initialize VideoWriter
frame_width, frame_height = reader.getEventResolution()
fourcc = cv.VideoWriter_fourcc(*'MJPG')  # Try different codecs if needed
out = cv.VideoWriter('output.avi', fourcc, 30.0, (frame_width, frame_height), isColor=True)

# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):

    # Generate a preview frame
    frame = visualizer.generateImage(events)

    # Show the accumulated image
    cv.imshow("Preview", frame)

    # Write the frame to the video file
    out.write(frame)

    # Wait for a short period to allow for window events
    if cv.waitKey(33) & 0xFF == ord('q'):  # Press 'q' to exit
        return

# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while reader.isRunning():
    # Receive events
    events = reader.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

# Release the VideoWriter object and destroy all windows
out.release()
cv.destroyAllWindows()

: 

In [2]:
# for hand
## Try to use time surface for visualization
import dv_processing as dv
import cv2 as cv
from datetime import timedelta

# Open any camera
reader = dv.io.MonoCameraRecording("./hand.aedat4")
surface = dv.TimeSurface(reader.getEventResolution())

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()


# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    # Pass the events to update the time surface
    surface.accept(events)

    # Generate a preview frame
    frame = surface.generateFrame()

    # Show the accumulated image
    cv.imshow("Preview", frame.image)
    cv.waitKey(2)


# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while reader.isRunning():
    # Receive events
    events = reader.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

KeyboardInterrupt: 

In [3]:
# hand speed invariant time surface
# speed invariant time surface
## Try to use time surface for visualization
import dv_processing as dv
import cv2 as cv
from datetime import timedelta

# Open any camera
reader = dv.io.MonoCameraRecording("./hand.aedat4")
surface = dv.SpeedInvariantTimeSurface(reader.getEventResolution())

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()


# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    # Pass the events to update the time surface
    surface.accept(events)

    # Generate a preview frame
    frame = surface.generateFrame()

    # Show the accumulated image
    cv.imshow("Preview", frame.image)
    cv.waitKey(2)


# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while reader.isRunning():
    # Receive events
    events = reader.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

In [5]:
import dv_processing as dv
import cv2 as cv
from datetime import timedelta
import os

# Create the output directory if it doesn't exist
output_dir = "./hand_edge"
os.makedirs(output_dir, exist_ok=True)

# Open the camera recording
reader = dv.io.MonoCameraRecording("./hand.aedat4")

# Initialize an accumulator with the event resolution
visualizer = dv.visualization.EventVisualizer(reader.getEventResolution())

# Apply color scheme configuration
visualizer.setBackgroundColor(dv.visualization.colors.white())
visualizer.setPositiveColor(dv.visualization.colors.iniBlue())
visualizer.setNegativeColor(dv.visualization.colors.iniBlue())

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()

# Initialize VideoWriter
frame_width, frame_height = reader.getEventResolution()
fourcc = cv.VideoWriter_fourcc(*'MJPG')
out = cv.VideoWriter('hand_edge.avi', fourcc, 30.0, (frame_width, frame_height), isColor=True)

# Initialize frame counter
frame_count = 0

# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    global frame_count
    
    # Generate a preview frame
    frame = visualizer.generateImage(events)

    # Show the accumulated image
    cv.imshow("Preview", frame)

    # Write the frame to the video file
    out.write(frame)

    # Save the frame as an image in the output directory
    frame_filename = os.path.join(output_dir, f"frame_{frame_count:04d}.png")
    cv.imwrite(frame_filename, frame)
    frame_count += 1

    # Wait for a short period to allow for window events
    if cv.waitKey(33) & 0xFF == ord('q'):  # Press 'q' to exit
        return

# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is running
while reader.isRunning():
    events = reader.getNextEventBatch()
    if events is not None:
        slicer.accept(events)

# Release the VideoWriter object and destroy all windows
out.release()
cv.destroyAllWindows()