In [1]:
import h5py
import numpy as np


H, W = 480, 640

drone_data = h5py.File('/home/aric/adasi/data/2025-07-17-12-49-53_full_testing.h5', 'r')
shapes_data = h5py.File('/home/aric/adasi/data/shapes.h5', 'r')

drone_events = drone_data['events_data'][:]
shapes_events = shapes_data['events_data'][:]

print(drone_events.dtype)
print("We have {} events".format(len(drone_events)))
print(f"Our sequence spans {drone_events['t'][-1] - drone_events['t'][0]} seconds.")

[('x', '<i2'), ('y', '<i2'), ('p', '?'), ('t', '<f8')]
We have 1786990 events
Our sequence spans 4.999997000000008 seconds.


In [2]:
# We will use this function later to visualize the events
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
%matplotlib qt

def animate_events(sliced, H, W):
    fig, ax = plt.subplots()
    sc = ax.scatter([], [], c=[], s=1, cmap='coolwarm')
    ax.set_xlim(0, W)
    ax.set_ylim(0, H)

    def update(i):
        sc.set_offsets(np.column_stack((sliced[i]['x'], sliced[i]['y'])))
        sc.set_array(sliced[i]['p'])
        return sc,

    return fig, update

def animate_seq_frame(frames):
    fig, ax = plt.subplots()
    im = ax.imshow(frames[0], cmap='gray')

    def update(i):
        im.set_array(frames[i])
        return im,

    return fig, update


# Tonic

Rather than manually slicing our event stream, we will now use tonic to slice our stream into 30ms slices. Tonic has similar functions for other slicing methods discussed in the lecture.

In [3]:
import tonic

slicer = tonic.slicers.SliceByTime(30e-3)   # 30 ms slices

sliced_events, _ = slicer.slice(shapes_events, 0)

print(len(sliced_events))

sample_slice = sliced_events[0]
print(f"Our sample slice has {len(sample_slice)} events.")
print(f"Our sample slice spans {sample_slice['t'][-1] - sample_slice['t'][0]} seconds.")

333
Our sample slice has 60113 events.
Our sample slice spans 0.029858112335205078 seconds.


That easy! `sliced_events` is now a list of 30ms time windows. Let's visualize this data using the animation functions above.

In [4]:
fig, update = animate_events(sliced_events, H, W)
ani = FuncAnimation(fig, update, frames=len(sliced_events), interval=30)

In this way we have created all the frames into the sequence

# Denoising

In the lecture today you learned about two types of noise filters for event cameras.

- BAF, most basic filter
- YNoise filter

Below you will find the implementation of the YNoise filter. Run it and display the output. Compare to the raw noisy input.

In [None]:
import numpy as np
from tqdm.auto import tqdm

class YNoiseFilter:
    """
    Spatio-temporal density filter for event streams.
    Keep event e=(x,y,p,t) iff at least `threshold` neighbors (same polarity)
    in the N×N neighborhood fired within the last `delta_t` microseconds.
    """

    def __init__(self, size_w=346, size_h=260, delta_t=0.01, N=3, threshold=2):
        assert N % 2 == 1, "N must be odd (e.g., 3, 5, 7, ...)"
        self.W = int(size_w)
        self.H = int(size_h)
        self.delta_t = (delta_t)      # seconds
        self.N = int(N)                  # neighborhood size
        self.m = self.N // 2             # center index
        self.threshold = int(threshold)

        # last_ts[y, x, pol_idx], where pol_idx=0 for p<0, 1 for p>0
        self.last_ts = np.full((self.H, self.W, 2), np.int64(-(1 << 62)), dtype=np.int64)

    @staticmethod
    def _pol_index(p):
        # map polarity to index: negatives -> 0, positives/nonneg -> 1
        return 1 if p > 0 else 0

    def update_matrix(self, e):
        """Update last timestamp memory at (x,y) for polarity p."""
        x = int(e['x']); y = int(e['y']); p = int(e['p']); t = int(e['t'])
        pol_idx = self._pol_index(p)
        if 0 <= x < self.W and 0 <= y < self.H:
            self.last_ts[y, x, pol_idx] = t

    def calculate_density(self, e):
        """Count same-polarity neighbors in N×N whose timestamps are within delta_t."""
        x = int(e['x']); y = int(e['y']); p = int(e['p']); t = int(e['t'])
        pol_idx = self._pol_index(p)

        # Neighborhood bounds (clamped to image)
        u0 = max(0, x - self.m); u1 = min(self.W, x + self.m + 1)
        v0 = max(0, y - self.m); v1 = min(self.H, y + self.m + 1)

        cnt = 0
        for v in range(v0, v1):
            for u in range(u0, u1):
                if u == x and v == y:
                    continue  # exclude center
                # same-polarity recency check
                if t - self.last_ts[v, u, pol_idx] <= self.delta_t:
                    cnt += 1
        return cnt
    
    def is_noise(self, e):
        self.update_matrix(e)
        return self.calculate_density(e) < self.threshold

    def filter_events(self, events):
        """
        Filter events (structured ndarray with fields 'x','y','p','t').
        Returns a structured ndarray of kept events (same dtype).
        """
        kept = []
        for e in tqdm(events):
            if not self.is_noise(e):
                kept.append(e)

        if isinstance(events, np.ndarray):
            return np.array(kept, dtype=events.dtype)
        return kept

Here is how you use the YNoise filter

In [6]:
n_events_before = len(shapes_events)


ynoise_filter = YNoiseFilter(size_w=W, size_h=H, delta_t=0.1, N=3, threshold=7)
filtered_events = ynoise_filter.filter_events(shapes_events)

n_events_after = len(filtered_events)
print(f"Filtered out {n_events_before - n_events_after} events.")

sliced_events, _ = slicer.slice(filtered_events, 0)

fig, update = animate_events(sliced_events, H, W)
ani = FuncAnimation(fig, update, frames=len(sliced_events), interval=30)

  0%|          | 0/28855004 [00:00<?, ?it/s]

Filtered out 6143749 events.


Your job now to try and implement the simpler Background activity filter. You can reuse some of the same logic as found in the YNoise filter.

In [12]:
class BAFNoiseFilter:
    """Background Activity Filter (BAF) for event streams."""

    def __init__(self, size_w=346, size_h=260, delta_t=0.2, N=3):
        self.W = int(size_w)
        self.H = int(size_h)
        self.delta_t = (delta_t)      # seconds
        self.N = int(N)                  # neighborhood size
        
    
    def is_noise(self, e):
        # Your code here
        return False

    def filter_events(self, events):
        kept = []

        for e in tqdm(events):
            if not self.is_noise(e):
                kept.append(e)

        if isinstance(events, np.ndarray):
            return np.array(kept, dtype=events.dtype)
        return kept

Check results

In [13]:
n_events_before = len(shapes_events)


baf_noise_filter = BAFNoiseFilter(size_w=W, size_h=H, delta_t=0.1, N=3)
filtered_events = baf_noise_filter.filter_events(shapes_events)

n_events_after = len(filtered_events)
print(f"Filtered out {n_events_before - n_events_after} events.")

sliced_events, _ = slicer.slice(filtered_events, 0)

fig, update = animate_events(sliced_events, H, W)
ani = FuncAnimation(fig, update, frames=len(sliced_events), interval=30)

  0%|          | 0/28855004 [00:00<?, ?it/s]

Filtered out 0 events.
