## Create, store and slice events

In [7]:
import dv_processing as dv

# Initialize an empty store
store = dv.EventStore()

# Get the current timestamp
timestamp = dv.now()

# Add some events into the event store
# This allocates and inserts events at the back, the function arguments are:
# timestamp, x, y, polarity
store.push_back(timestamp, 0, 0, True)
store.push_back(timestamp + 1000, 1, 1, False)
store.push_back(timestamp + 2000, 2, 2, False)
store.push_back(timestamp + 3000, 3, 3, True)

# Perform time-based slicing of event store, the output event store "sliced" will contain
# the second and third events from above. The end timestamp (second argument) is 2001, since start
# timestamp (first argument) is inclusive and timestamp is exclusive, so 1 is added.
sliced = store.sliceTime(timestamp + 1000, timestamp + 2001)

# This should print two events
for ev in store:
    print(f"Sliced event [{ev.timestamp()}, {ev.x()}, {ev.y()}, {ev.polarity()}]")

print("---------------------------------------------")

for ev in sliced:
    print(f"Sliced event [{ev.timestamp()}, {ev.x()}, {ev.y()}, {ev.polarity()}]")

Sliced event [1756835056449278, 0, 0, True]
Sliced event [1756835056450278, 1, 1, False]
Sliced event [1756835056451278, 2, 2, False]
Sliced event [1756835056452278, 3, 3, True]
---------------------------------------------
Sliced event [1756835056450278, 1, 1, False]
Sliced event [1756835056451278, 2, 2, False]


### Slicing by time

In [None]:
import dv_processing as dv
import cv2
import numpy as np
from datetime import timedelta

# Generate synthetic events
store = dv.data.generate.uniformEventsWithinTimeRange(10000, timedelta(milliseconds=10), (100, 100), 10)

# Loop over each event
for i, ev in enumerate(store):
    # Create a blank black image
    image = np.zeros((180, 240, 3), dtype=np.uint8)

    # Draw the event
    x, y, polarity = ev.x(), ev.y(), ev.polarity()
    color = (255, 255, 255) if polarity else (0, 0, 255)
    cv2.circle(image, (x, y), 2, color, -1)

    # Show the image
    cv2.imshow("Event", image)
    cv2.waitKey(500)  # show each image for 500 ms

cv2.destroyAllWindows()

# Get all events with timestamp above 12500, it will be 13000 and up
eventsAfterTimestamp = store.sliceTime(12500)

# Print the timestamp ranges
print(f"1. {eventsAfterTimestamp}")

# Slice event within time range [12000; 16000); the end time is exclusive
eventsInRange = store.sliceTime(12000, 16000)

# Print the timestamp ranges; It will print that range is [12000; 15000] since end time is exclusive and
# event at timestamp 16000 is not going to be included.
print(f"2. {eventsInRange}")

1. EventStore containing 7 events within 6000µs duration; time range within [13000; 19000]
2. EventStore containing 4 events within 3000µs duration; time range within [12000; 15000]


### Slicing by number of events

In [None]:
import dv_processing as dv
from datetime import timedelta

# Generate 10 events with time range [10000; 20000]
store = dv.data.generate.uniformEventsWithinTimeRange(10000, timedelta(milliseconds=10), (100, 100), 10)

# Get all events beyond and including index 5
events_after_index = store.slice(5)
print(f"1. {events_after_index}")

# Get 3 events starting with index 2  
events_in_range = store.slice(2, 3)
print(f"2. {events_in_range}")

# Use sliceBack to retrieve event from the end; this call will retrieve last 3 events
last_events = store.sliceBack(3)
print(f"3. {last_events}")

1. EventStore containing 5 events within 4000µs duration; time range within [15000; 19000]
2. EventStore containing 3 events within 2000µs duration; time range within [12000; 14000]
3. EventStore containing 3 events within 2000µs duration; time range within [17000; 19000]


### Combining multiple EventStores

In [10]:
import dv_processing as dv
from datetime import timedelta

# Generate 10 events with timestamps in range [10000; 20000]
store1 = dv.data.generate.uniformEventsWithinTimeRange(10000, timedelta(milliseconds=10), (100, 100), 10)

# Generate second event store with 10 events with timestamps in range [20000; 29000] to the second store
store2 = dv.data.generate.uniformEventsWithinTimeRange(20000, timedelta(milliseconds=10), (100, 100), 10)

# Final event store which will contain all events
final_store = dv.EventStore()

# Add the events into the final store; this operation is shallow, so no data copies
# are performed, but the underlying data has shared ownership between all stores
final_store.add(store1)
final_store.add(store2)

# Print specific information on what we contain in the final event store
print(f"{final_store}")

EventStore containing 20 events within 19000µs duration; time range within [10000; 29000]


## Event stream

### Event Batch

In [4]:
import time

import dv_processing as dv

# Open any camera
capture = dv.io.camera.open()

# Run the loop while camera is still connected
while capture.isRunning():
    # Read batch of events
    events = capture.getNextEventBatch()

    # The method does not wait for data arrive, it returns immediately with
    # latest available data or if no data is available, returns a `None`
    if events is not None:
        # Print received packet time range
        print(events)
        #print(events[0].x())
        
    else:
        # No data has arrived yet, short sleep to reduce CPU load
        time.sleep(0.001)

EventStore containing 41 events within 9733µs duration; time range within [1757014899888594; 1757014899898327]
EventStore containing 39 events within 9882µs duration; time range within [1757014899898673; 1757014899908555]
EventStore containing 45 events within 9504µs duration; time range within [1757014899908755; 1757014899918259]
EventStore containing 36 events within 9648µs duration; time range within [1757014899918701; 1757014899928349]
EventStore containing 42 events within 9989µs duration; time range within [1757014899928584; 1757014899938573]
EventStore containing 43 events within 9245µs duration; time range within [1757014899939043; 1757014899948288]
EventStore containing 39 events within 9864µs duration; time range within [1757014899948607; 1757014899958471]
EventStore containing 47 events within 9619µs duration; time range within [1757014899958865; 1757014899968484]
EventStore containing 42 events within 9692µs duration; time range within [1757014899968875; 1757014899978567]
E

KeyboardInterrupt: 

### Visualizing each batch

In [3]:
import cv2 as cv
import dv_processing as dv

# Abrir la cámara
capture = dv.io.camera.open()

if not capture.isEventStreamAvailable():
    raise RuntimeError("Input camera does not provide an event stream.")

# Inicializar visualizador
visualizer = dv.visualization.EventVisualizer(capture.getEventResolution())
visualizer.setBackgroundColor(dv.visualization.colors.white())
visualizer.setPositiveColor(dv.visualization.colors.iniBlue())
visualizer.setNegativeColor(dv.visualization.colors.darkGray())

# Ventana de OpenCV
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Loop principal: mostrar imagen cada vez que llegan eventos
while capture.isRunning():
    events = capture.getNextEventBatch()
    if events is not None and len(events) > 0:
        # Generar imagen y mostrarla inmediatamente
        frame = visualizer.generateImage(events)
        cv.imshow("Preview", frame)

    # Salir con ESC
    if cv.waitKey(1) == 27:
        break

cv.destroyAllWindows()
capture.close()


KeyboardInterrupt: 

## Event stream slicing

### EventStreamSlicer

In [15]:
import dv_processing as dv
from datetime import timedelta

# Initialize slicer, it will have no jobs at this time
slicer = dv.EventStreamSlicer()


def print_time_interval(events: dv.EventStore):
    # Print the time duration received by this method
    print(f"* Received event with {events.duration()} duration in time-based slicing")


# Register this method to be called every 33 millisecond worth of event data
time_job_id = slicer.doEveryTimeInterval(timedelta(milliseconds=33), print_time_interval)


def print_event_number(events: dv.EventStore):
    # Print the number of events received here
    print(f"# Received {events.size()} events in number-based slicing")


# Register this method to be called every 100 events
number_job_id = slicer.doEveryNumberOfElements(100, print_event_number)

# Implement data generation; The following loop will generate 10 stores
# of events, each containing 100 events within 10 millisecond duration.
for i in range(10):
    # Generate 100 events within 20 millisecond interval. These will be sliced correctly by the slicer.
    store = dv.data.generate.uniformEventsWithinTimeRange(i * 20000, timedelta(milliseconds=20), (100, 100), 100)

    # Now push the store into the slicer, the data contents within the store
    # can be arbitrary, the slicer implementation takes care of correct slicing
    # algorithm and calls the previously registered callbacks accordingly.
    slicer.accept(store)

    # When a packet with index 5 is reached, modify the parameters
    if i == 5:
        # Modify time range to 10 milliseconds instead of 33
        slicer.modifyTimeInterval(time_job_id, timedelta(milliseconds=10))
        # Modify number to 200 instead of 100
        slicer.modifyNumberInterval(number_job_id, 200)

# Received 100 events in number-based slicing
* Received event with 0:00:00.032800 duration in time-based slicing
# Received 100 events in number-based slicing
# Received 100 events in number-based slicing
* Received event with 0:00:00.032800 duration in time-based slicing
# Received 100 events in number-based slicing
* Received event with 0:00:00.032800 duration in time-based slicing
# Received 100 events in number-based slicing
# Received 100 events in number-based slicing
* Received event with 0:00:00.009800 duration in time-based slicing
* Received event with 0:00:00.009800 duration in time-based slicing
* Received event with 0:00:00.009800 duration in time-based slicing
* Received event with 0:00:00.009800 duration in time-based slicing
* Received event with 0:00:00.009800 duration in time-based slicing
* Received event with 0:00:00.009800 duration in time-based slicing
# Received 200 events in number-based slicing
* Received event with 0:00:00.009800 duration in time-based slicin

### EventVisualizer

In [1]:
from datetime import timedelta

import cv2 as cv
import dv_processing as dv

# Open any camera
capture = dv.io.camera.open()

# Make sure it supports event stream output, throw an error otherwise
if not capture.isEventStreamAvailable():
    raise RuntimeError("Input camera does not provide an event stream.")

# Initialize an accumulator with some resolution
visualizer = dv.visualization.EventVisualizer(capture.getEventResolution())

# Apply color scheme configuration, these values can be modified to taste
visualizer.setBackgroundColor(dv.visualization.colors.white())
visualizer.setPositiveColor(dv.visualization.colors.iniBlue())
visualizer.setNegativeColor(dv.visualization.colors.darkGray())

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()


# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    # Generate a preview frame
    frame = visualizer.generateImage(events)

    # Show the accumulated image
    cv.imshow("Preview", frame)
    cv.waitKey(2)


# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while capture.isRunning():
    # Receive events
    events = capture.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

KeyboardInterrupt: 

### Visualizer on python

In [15]:
import numpy as np
import cv2 as cv
import dv_processing as dv
from datetime import timedelta

# Abrir la cámara
camera = dv.io.camera.open()
if not camera.isEventStreamAvailable():
    raise RuntimeError("La cámara no proporciona un stream de eventos.")

# Obtener resolución de eventos
res_x, res_y = camera.getEventResolution()

# Crear ventana de OpenCV
cv.namedWindow("Events", cv.WINDOW_NORMAL)

# Crear slicer
slicer = dv.EventStreamSlicer()

# Callback de visualización cada 33 ms
def slicing_callback(events: dv.EventStore):
    if len(events) == 0:
        return

    # Imagen de fondo blanco
    image = 255 * np.ones((res_y, res_x, 3), dtype=np.uint8)

    # Un solo bucle directo sobre los eventos
    for e in events:
        x, y = e.x(), e.y()
        if 0 <= x < res_x and 0 <= y < res_y:
            image[y, x] = (0, 255, 0) if e.polarity() else (0, 0, 255)

    # Mostrar imagen
    cv.imshow("Events", image)
    cv.waitKey(2)

# Registrar el callback en el slicer
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Loop principal: alimentar el slicer con eventos
while camera.isRunning():
    events = camera.getNextEventBatch()
    if events is not None:
        slicer.accept(events)

    if cv.waitKey(1) == 27:  # salir con ESC
        break

cv.destroyAllWindows()
camera.close()


KeyboardInterrupt: 

## Frame

In [2]:
import cv2 as cv
import dv_processing as dv

# Open any camera
capture = dv.io.camera.open()

# Initiate a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Run the loop while camera is still connected
while capture.isRunning():
    # Read a frame from the camera
    frame = capture.getNextFrame()

    # The method does not wait for frame arrive, it returns immediately with
    # latest available frame or if no data is available, returns a `None`
    if frame is not None:
        # Print received packet time range
        print(f"Received a frame at time [{frame.timestamp}]")

        # Show a preview of the image
        cv.imshow("Preview", frame.image)
    cv.waitKey(2)

Received a frame at time [1756919996415227]
Received a frame at time [1756919996452342]
Received a frame at time [1756919996490579]
Received a frame at time [1756919996528714]
Received a frame at time [1756919996566850]
Received a frame at time [1756919996604886]
Received a frame at time [1756919996642922]
Received a frame at time [1756919996680862]
Received a frame at time [1756919996718802]
Received a frame at time [1756919996756646]
Received a frame at time [1756919996794490]
Received a frame at time [1756919996832241]
Received a frame at time [1756919996869992]
Received a frame at time [1756919996907652]
Received a frame at time [1756919996945312]
Received a frame at time [1756919996982884]
Received a frame at time [1756919997020455]
Received a frame at time [1756919997057938]
Received a frame at time [1756919997095421]
Received a frame at time [1756919997132818]
Received a frame at time [1756919997170215]
Received a frame at time [1756919997207528]
Received a frame at time [175691

KeyboardInterrupt: 

## MultiStreamSlicer

In [33]:
from datetime import timedelta

import cv2 as cv
import dv_processing as dv

# Open the camera, just use first detected DAVIS camera
camera = dv.io.camera.DAVIS()

# Set ROI area
#camera.setCropArea((50, 5, 255, 255))

# Initialize a multi-stream slicer
slicer = dv.EventMultiStreamSlicer("events")

# Add a frame stream to the slicer
slicer.addFrameStream("frames")

# Initialize a visualizer for the overlay
visualizer = dv.visualization.EventVisualizer(camera.getEventResolution(), dv.visualization.colors.white(),
                                              dv.visualization.colors.green(), dv.visualization.colors.red())

# Create a window for image display
cv.namedWindow("Preview", cv.WINDOW_NORMAL)


# Callback method for time based slicing
def display_preview(data):
    # Retrieve frame data using the named method and stream name
    frames = data.getFrames("frames")

    # Retrieve event data
    events = data.getEvents("events")

    # Retrieve and color convert the latest frame of retrieved frames
    latest_image = None
    if len(frames) > 0:
        if len(frames[-1].image.shape) == 3:
            # We already have colored image, no conversion
            latest_image = frames[-1].image
        else:
            # Image is grayscale, convert to color (BGR image)
            latest_image = cv.cvtColor(frames[-1].image, cv.COLOR_GRAY2BGR)
    else:
        return

    # Generate a preview and show the final image
    cv.imshow("Preview", visualizer.generateImage(events, latest_image))

    # If escape button is pressed (code 27 is escape key), exit the program cleanly
    if cv.waitKey(2) == 27:
        exit(0)


# Register a job to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=10), display_preview)

# Continue the loop while both cameras are connected
while camera.isRunning():
    events = camera.getNextEventBatch()
    if events is not None:
        slicer.accept("events", events)

    frame = camera.getNextFrame()
    if frame is not None:
        slicer.accept("frames", [frame])

In [5]:
!pip install dv-processing

Collecting dv-processing

  error: subprocess-exited-with-error
  
  Getting requirements to build wheel did not run successfully.
  exit code: 1
  
  [25 lines of output]
  Using git hash
  Traceback (most recent call last):
    File "C:\ProgramData\Lucid Vision Labs\ArenaView\ArenaPy\Miniconda3\envs\lucid_env\lib\site-packages\pip\_vendor\pep517\in_process\_in_process.py", line 351, in <module>
      main()
    File "C:\ProgramData\Lucid Vision Labs\ArenaView\ArenaPy\Miniconda3\envs\lucid_env\lib\site-packages\pip\_vendor\pep517\in_process\_in_process.py", line 333, in main
      json_out['return_val'] = hook(**hook_input['kwargs'])
    File "C:\ProgramData\Lucid Vision Labs\ArenaView\ArenaPy\Miniconda3\envs\lucid_env\lib\site-packages\pip\_vendor\pep517\in_process\_in_process.py", line 118, in get_requires_for_build_wheel
      return hook(config_settings)
    File "C:\Users\bermude8\AppData\Local\Temp\pip-build-env-67lwr3z8\overlay\Lib\site-packages\setuptools\build_meta.py", line 341, in get_requires_for_b


  Using cached dv_processing-1.7.9.tar.gz (9.5 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'error'


## Accumulator

### Accumulating event from a camera

In [3]:
from datetime import timedelta

import cv2 as cv
import dv_processing as dv

# Open any camera
capture = dv.io.camera.open()

# Make sure it supports event stream output, throw an error otherwise
if not capture.isEventStreamAvailable():
    raise RuntimeError("Input camera does not provide an event stream.")

# Initialize an accumulator with some resolution
accumulator = dv.Accumulator(capture.getEventResolution())

# Apply configuration, these values can be modified to taste
accumulator.setMinPotential(0.0)
accumulator.setMaxPotential(1.0)
accumulator.setNeutralPotential(0.5)
accumulator.setEventContribution(0.15)
accumulator.setDecayFunction(dv.Accumulator.Decay.EXPONENTIAL)
accumulator.setDecayParam(1e+6)
accumulator.setIgnorePolarity(False)
accumulator.setSynchronousDecay(False)

# Initialize preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()


# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    # Pass events into the accumulator and generate a preview frame
    accumulator.accept(events)
    frame = accumulator.generateFrame()

    # Show the accumulated image
    cv.imshow("Preview", frame.image)
    cv.waitKey(2)


# Register a callback every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while capture.isRunning():
    # Receive events
    events = capture.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

KeyboardInterrupt: 

### Edge accumulation

In [6]:
from datetime import timedelta

import cv2 as cv
import dv_processing as dv

# Open any camera
capture = dv.io.camera.open()

# Make sure it supports event stream output, throw an error otherwise
if not capture.isEventStreamAvailable():
    raise RuntimeError("Input camera does not provide an event stream.")

# Initialize an accumulator with some resolution
accumulator = dv.EdgeMapAccumulator(capture.getEventResolution())

# Apply configuration, these values can be modified to taste
accumulator.setNeutralPotential(0.0)
accumulator.setEventContribution(0.25)
accumulator.setNeutralPotential(1.0)
accumulator.setIgnorePolarity(True)

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()


# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    # Pass events into the accumulator and generate a preview frame
    accumulator.accept(events)
    frame = accumulator.generateFrame()

    # Show the accumulated image
    cv.imshow("Preview", frame.image)
    cv.waitKey(2)


# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while capture.isRunning():
    # Receive events
    events = capture.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

KeyboardInterrupt: 

### Time surface

In [8]:
from datetime import timedelta

import cv2 as cv
import dv_processing as dv

# Open any camera
capture = dv.io.camera.open()

# Make sure it supports event stream output, throw an error otherwise
if not capture.isEventStreamAvailable():
    raise RuntimeError("Input camera does not provide an event stream.")

# Initialize an accumulator with camera sensor resolution
surface = dv.TimeSurface(capture.getEventResolution())

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()


# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    # Pass the events to update the time surface
    surface.accept(events)

    # Generate a preview frame
    frame = surface.generateFrame()

    # Show the accumulated image
    cv.imshow("Preview", frame.image)
    cv.waitKey(2)


# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while capture.isRunning():
    # Receive events
    events = capture.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

KeyboardInterrupt: 

### Speed invariant time surface

In [2]:
from datetime import timedelta

import cv2 as cv
import dv_processing as dv

# Open any camera
capture = dv.io.camera.open()

# Make sure it supports event stream output, throw an error otherwise
if not capture.isEventStreamAvailable():
    raise RuntimeError("Input camera does not provide an event stream.")

# Initialize an accumulator with camera sensor resolution
surface = dv.SpeedInvariantTimeSurface(capture.getEventResolution())

# Initialize a preview window
cv.namedWindow("Preview", cv.WINDOW_NORMAL)

# Initialize a slicer
slicer = dv.EventStreamSlicer()


# Declare the callback method for slicer
def slicing_callback(events: dv.EventStore):
    # Pass the events to update the time surface
    surface.accept(events)

    # Generate a preview frame
    frame = surface.generateFrame()

    # Show the accumulated image
    cv.imshow("Preview", frame.image)
    cv.waitKey(2)


# Register callback to be performed every 33 milliseconds
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

# Run the event processing while the camera is connected
while capture.isRunning():
    # Receive events
    events = capture.getNextEventBatch()

    # Check if anything was received
    if events is not None:
        # If so, pass the events into the slicer to handle them
        slicer.accept(events)

KeyboardInterrupt: 

## Filtering events

In [None]:
import dv_processing as dv
import cv2 as cv
from datetime import timedelta

# Hardcoded VGA resolution
resolution = (640, 480)

# Initializing input events with uniformly distributed events which represent noise
events = dv.data.generate.uniformEventsWithinTimeRange(0, timedelta(milliseconds=10), resolution, 1000)

# Adding additional data for drawing, this will give an idea whether the filter removes actual signal events
events.add(dv.data.generate.dvLogoAsEvents(10000, resolution))

# Initialize a background activity noise filter with 1-millisecond activity period
filter = dv.noise.BackgroundActivityNoiseFilter(resolution, backgroundActivityDuration=timedelta(milliseconds=1))

# Pass events to the filter
filter.accept(events)

# Call generate events to apply the noise filter
filtered = filter.generateEvents()

# Print out the reduction factor, which indicates the percentage of discarded events
print(f"Filter reduced number of events by a factor of {filter.getReductionFactor()}")

# Use a visualizer instance to preview the events
visualizer = dv.visualization.EventVisualizer(resolution)

# Generate preview images of data input and output
input = visualizer.generateImage(events)
output = visualizer.generateImage(filtered)

# Concatenate the images into a single image for preview
preview = cv.hconcat([input, output])

# Display the input and output images
cv.namedWindow("preview", cv.WINDOW_NORMAL)
cv.imshow("preview", preview)
cv.waitKey()

Filter reduced number of events by a factor of 0.005161702632904053


## Writing and Reading camera data 

### Writing event

In [19]:
import time
import dv_processing as dv

# --- Abrir la cámara ---
capture = dv.io.camera.open()

# Obtener la resolución de la cámara
resolution = capture.getEventResolution()

# Configuración para guardar solo eventos
config = dv.io.MonoCameraWriter.EventOnlyConfig("DAVIS346_sample", resolution)

# Crear el writer para escribir en archivo AEDAT4
writer = dv.io.MonoCameraWriter("output_davis346.aedat4", config)

print("Grabando eventos... Presiona Ctrl+C para detener.")

try:
    while capture.isRunning():
        # Leer siguiente lote de eventos
        events = capture.getNextEventBatch()

        if events is not None:
            # Guardar los eventos en el archivo
            writer.writeEvents(events)
        else:
            # Si no hay datos, dormir un poco para no sobrecargar la CPU
            time.sleep(0.001)

except KeyboardInterrupt:
    print("Grabación detenida.")

finally:
    # Cerrar cámara y escritor
    del writer
    del capture
    print("Archivo guardado como output_davis346.aedat4")


Grabando eventos... Presiona Ctrl+C para detener.
Grabación detenida.
Archivo guardado como output_davis346.aedat4


### Read events from a file

In [5]:
from datetime import timedelta
import cv2 as cv
import dv_processing as dv

FILE_PATH = "C:/Users/gmarinhu/OneDrive - purdue.edu/Documents/code/output_davis346.aedat4" 

source = dv.io.MonoCameraRecording(FILE_PATH)

# Verificar que el stream de eventos está disponible
if not source.isEventStreamAvailable():
    raise RuntimeError("El archivo no contiene un stream de eventos.")

# Crear visualizador con la resolución correcta
visualizer = dv.visualization.EventVisualizer(source.getEventResolution())
visualizer.setBackgroundColor(dv.visualization.colors.white())
visualizer.setPositiveColor(dv.visualization.colors.iniBlue())
visualizer.setNegativeColor(dv.visualization.colors.darkGray())

# Ventana de visualización
cv.namedWindow("Eventos", cv.WINDOW_NORMAL)

# Crear slicer para agrupar eventos en intervalos de tiempo
slicer = dv.EventStreamSlicer()

# --- Callback para mostrar los eventos ---
def slicing_callback(events: dv.EventStore):
    frame = visualizer.generateImage(events)
    cv.imshow("Eventos", frame)
    if cv.waitKey(1) & 0xFF == ord('q'):
        # salir del loop principal
        global running
        running = False

# Registrar callback cada 33 ms (~30 FPS)
slicer.doEveryTimeInterval(timedelta(milliseconds=33), slicing_callback)

print("Mostrando eventos... (presiona 'q' para salir)")

# --- Loop principal ---
while source.isRunning():
    events = source.getNextEventBatch()
    if events is not None:
        slicer.accept(events)

# Cerrar
cv.destroyAllWindows()
del source
print("Finalizado.")


Mostrando eventos... (presiona 'q' para salir)
Finalizado.


### Read events with Tonic

In [7]:
import tonic

# Ruta a tu archivo .aedat4
FILE_PATH = "C:/Users/gmarinhu/OneDrive - purdue.edu/Documents/code/output_davis346.aedat4" 

# Cargar los eventos
events = tonic.io.read_aedat4(FILE_PATH)

print("Tipo:", type(events))
print("Campos:", events.dtype.names)
print("Número de eventos:", len(events))

# Ejemplo: acceder a los primeros 5 eventos
print(events[0])

print(type(events[0]["p"]))


Tipo: <class 'numpy.ndarray'>
Campos: ('t', 'x', 'y', 'p')
Número de eventos: 2191717
(1758895097842180, 236, 218, False)
<class 'numpy.bool_'>
