In [1]:
%pip install expelliarmus aedat event_stream h5py aestream loris brotli plotly kaleido dv_processing

from expelliarmus import Wizard
import aedat
import event_stream
import gc
import hashlib
import h5py
import aestream
import numpy as np
import timeit
import requests
import pickle
import os
import loris
import brotli
import json
from pathlib import Path

Note: you may need to restart the kernel to use updated packages.
NO SPECK module 'aestream.aestream_ext' has no attribute 'SpeckInput'


In [2]:
# fname = "driving_sample"
fname = "construction"  # use this one if you want to include aedat and eventstream benchmarks

# where to download and generate all the benchmark data
folder = Path("data/file-benchmark")
folder.mkdir(parents=True, exist_ok=True)

# key is the name of the encoding, value is the file name ending
extension_map = {
    "aedat": ".aedat4",
    "dat": ".dat",
    "evt2": "_evt2.raw",
    "evt3": "_evt3.raw",
    "hdf5": ".hdf5",
    "hdf5_lzf": "_lzf.hdf5",
    "hdf5_gzip": "_gzip.hdf5",
    "numpy": ".npy",
    "loris": ".es",
    "eventstream": ".es",
    "brotli": ".bin.br",
    "undr_numpy": ".dvs",
    "undr_brotli_11": ".11.dvs.br",
    "undr_brotli_6": ".6.dvs.br",
    "undr_brotli_1": ".1.dvs.br",
}
get_fpath = lambda encoding: f"{folder}/{fname}{extension_map[encoding]}"

## Download the 'base' files
These are the files with the original data, which will be loaded and then converted to all other formats under test. Currently you can choose between events from a Prophesee raw evt3 or an aedat4 sample file. 

In [3]:
def download_file_from_url(file_path, url):
    print(f"Downloading file to {file_path}... ")
    r = requests.get(
        url,
        allow_redirects=True,
    )
    with open(f"{file_path}.download", "wb") as file:
        file.write(r.content)
    r.raise_for_status()
    os.rename(f"{file_path}.download", file_path)
    print("done!")


if fname == "driving_sample":
    fpath = get_fpath("evt3")
    if not Path(fpath).is_file():
        download_file_from_url(
            fpath, "https://dataset.prophesee.ai/index.php/s/nVcLLdWAnNzrmII/download"
        )
    wizard = Wizard(encoding="evt3")
    events_ti8_xi2_yi2_pu1 = wizard.read(fpath)


if fname == "construction":
    aedat_fpath = get_fpath("aedat")
    if not Path(aedat_fpath).is_file():
        download_file_from_url(
            aedat_fpath,
            "https://cloudstor.aarnet.edu.au/plus/s/ORQ2oOz9NfwiHLZ/download?path=%2F&files=construction.aedat4",
        )
    decoder = aedat.Decoder(aedat_fpath) # type: ignore
    width = 0
    height = 0
    for stream in decoder.id_to_stream().values():
        if stream["type"] == "events":
            width = stream["width"]
            height = stream["height"]
            break
    assert width != 0
    assert height != 0
    events_tu8_xu2_yu2_onb = np.concatenate(
        [packet["events"] for packet in decoder if "events" in packet]
    )
    assert np.count_nonzero(np.diff(events_tu8_xu2_yu2_onb["t"].astype("<i8")) < 0) == 0
    events_tu8_xu2_yu2_onb["t"] -= events_tu8_xu2_yu2_onb["t"][0]
    events_ti8_xi2_yi2_pu1 = events_tu8_xu2_yu2_onb.astype(
        np.dtype([("t", "<i8"), ("x", "<i2"), ("y", "<i2"), ("p", "u1")], align=True)
    )
    events_tu8_xu2_yu2_pu1 = events_tu8_xu2_yu2_onb.astype(np.dtype([("t", "<u8"), ("x", "<u2"), ("y", "<u2"), ("p", "<u1")]))

## Generate all comparison files

In [4]:
# evt2 and dat
raw_encodings = ["dat", "evt2", "evt3"]
for encoding in raw_encodings:
    fpath = get_fpath(encoding)
    if not os.path.exists(fpath):
        print(f"Generating {fpath}.")
        wizard = Wizard(encoding=encoding)
        wizard.save(fpath=fpath, arr=events_ti8_xi2_yi2_pu1)

# variants of hdf5
hdf5_encodings = ["hdf5", "hdf5_lzf", "hdf5_gzip"]
for encoding in hdf5_encodings:
    fpath = get_fpath(encoding)
    if not os.path.exists(fpath):
        with h5py.File(fpath, "w") as fp:
            print(f"Generating {fpath}.")
            dataset_dict = dict(
                name="events",
                shape=events_ti8_xi2_yi2_pu1.shape,
                dtype=events_ti8_xi2_yi2_pu1.dtype,
                data=events_ti8_xi2_yi2_pu1,
            )
            if encoding == "hdf5":
                fp.create_dataset(**dataset_dict)
            elif encoding == "hdf5_lzf":
                fp.create_dataset(**dataset_dict, compression="lzf")
            elif encoding == "hdf5_gzip":
                fp.create_dataset(**dataset_dict, compression="gzip")

# Event Stream
fpath = get_fpath("eventstream")
if not os.path.exists(fpath):
    print(f"Generating {fpath}.")
    with event_stream.Encoder(fpath, "dvs", width, height) as encoder:
        encoder.write(events_tu8_xu2_yu2_onb)

# numpy (pickle)
fpath = get_fpath("numpy")
if not os.path.exists(fpath):
    print(f"Generating {fpath}.")
    np.save(fpath, events_ti8_xi2_yi2_pu1, allow_pickle=True)

# numpy (UNDR)
fpath = get_fpath("undr_numpy")
if not os.path.exists(fpath):
    print(f"Generating {fpath}.")
    events_tu8_xu2_yu2_pu1.tofile(fpath)

# brotli (UNDR)
brotli_qualities = [1, 6, 11]
with open(get_fpath("undr_numpy"), "rb") as uncompressed_file:
    uncompressed_bytes = uncompressed_file.read()
for quality in brotli_qualities:
    fpath = get_fpath(f"undr_brotli_{quality}")
    if not os.path.exists(fpath):
        print(f"Generating {fpath}.")
        compressed_bytes = brotli.compress(uncompressed_bytes, quality=quality)
        with open(fpath, "wb") as compressed_file:
            compressed_file.write(compressed_bytes)


In [5]:
# aedat4
# we do this in order to exclude IMU events that are part of the original file
# see here for how to install dv https://dv-processing.inivation.com/rel_1.7/installation.html
fpath = str(folder / "construction_rewritten.aedat4")
if not os.path.exists(fpath):
    print(f"Generating {fpath}.")
    import dv_processing as dv
    from tqdm.notebook import tqdm
    store = dv.EventStore()

    aedat_compatible_events = events_tu8_xu2_yu2_onb.astype(np.dtype([("t", int), ("x", int), ("y", int), ("p", bool)]))
    for event in tqdm(aedat_compatible_events):
        store.push_back(timestamp=event["t"], x=event["x"], y=event["y"], polarity=event["p"])

    resolution = (aedat_compatible_events['x'].max()+1, aedat_compatible_events['y'].max()+1)
    config = dv.io.MonoCameraWriter.EventOnlyConfig("DVXplorer_sample", resolution)
    writer = dv.io.MonoCameraWriter(fpath, config)
    writer.writeEvents(store)

## Run benchmarks

In [6]:
REPEAT = 5
get_fsize_MiB = lambda fpath: round(fpath.stat().st_size / (1024 * 1024))

def hash(events: np.ndarray, normalize_time: bool = False) -> str:
    if normalize_time:
        events["t"] -= events["t"][0]
    result = hashlib.sha3_224()
    result.update(events.astype(np.dtype([("t", "<u8"), ("x", "<u2"), ("y", "<u2"), ("on", "?")])).tobytes())
    return result.hexdigest()

reference_hash = hash(events_tu8_xu2_yu2_onb)
number_of_events = len(events_tu8_xu2_yu2_onb)

In [7]:
# Delete variables to minimize memory usage before the benchmarks

del events_tu8_xu2_yu2_onb
del events_ti8_xi2_yi2_pu1
del events_tu8_xu2_yu2_pu1
gc.collect()

0

In [8]:
# expelliarmus
expelliarmus_times = []
expelliarmus_sizes = []
for encoding in raw_encodings:
    print(f"Benchmarking expelliarmus ({encoding}).")
    gc.collect()
    fpath = get_fpath(encoding)
    def expelliarmus_read() -> np.ndarray:
        wizard = Wizard(encoding)
        wizard.set_file(fpath)
        return wizard.read(fpath)
    assert hash(expelliarmus_read()) == reference_hash
    expelliarmus_times.append(timeit.timeit(expelliarmus_read, number=REPEAT) / REPEAT)
    expelliarmus_sizes.append(get_fsize_MiB(Path(fpath)))


Benchmarking expelliarmus (dat).
Benchmarking expelliarmus (evt2).
Benchmarking expelliarmus (evt3).


In [9]:
# hdf5 variants
hdf5_times = []
hdf5_sizes = []
for encoding in hdf5_encodings:
    print(f"Benchmarking HDF5 ({encoding}).")
    gc.collect()
    fpath = get_fpath(encoding)
    def hdf5_read() -> np.ndarray:
        with h5py.File(fpath) as file:
            return file["events"][:] # type: ignore
    assert hash(hdf5_read()) == reference_hash
    hdf5_times.append(timeit.timeit(hdf5_read, number=REPEAT) / REPEAT)
    hdf5_sizes.append(get_fsize_MiB(Path(fpath)))

Benchmarking HDF5 (hdf5).
Benchmarking HDF5 (hdf5_lzf).
Benchmarking HDF5 (hdf5_gzip).


In [10]:
# brotli (UNDR)
brotli_times = []
brotli_sizes = []
for quality in brotli_qualities:
    print(f"Benchmarking Brotli (Q={quality}).")
    gc.collect()
    fpath = get_fpath(f"undr_brotli_{quality}")
    def brotli_read() -> np.ndarray:
        with open(fpath, "rb") as file:
            return np.frombuffer(brotli.decompress(file.read()), dtype=np.dtype([("t", "<u8"), ("x", "<u2"), ("y", "<u2"), ("p", "<u1")]))
    assert hash(brotli_read()) == reference_hash
    brotli_times.append(timeit.timeit(brotli_read, number=REPEAT) / REPEAT)
    brotli_sizes.append(get_fsize_MiB(Path(fpath)))

Benchmarking Brotli (Q=1).
Benchmarking Brotli (Q=6).
Benchmarking Brotli (Q=11).


In [11]:
# numpy
print("Benchmarking NumPy.")
gc.collect()
fpath = get_fpath("numpy")
def numpy_read() -> np.ndarray:
    return np.load(fpath)
numpy_time = timeit.timeit(numpy_read, number=REPEAT) / REPEAT
numpy_size = get_fsize_MiB(Path(fpath))

Benchmarking NumPy.


In [12]:
# numpy (UNDR)
print("Benchmarking NumPy (UNDR).")
gc.collect()
fpath = get_fpath("undr_numpy")
def undr_numpy_read() -> np.ndarray:
    return np.fromfile(fpath, dtype=np.dtype([("t", "<u8"), ("x", "<u2"), ("y", "<u2"), ("p", "<u1")]))
assert hash(undr_numpy_read()) == reference_hash
undr_numpy_time = timeit.timeit(undr_numpy_read, number=REPEAT) / REPEAT
undr_numpy_size = get_fsize_MiB(Path(fpath))

Benchmarking NumPy (UNDR).


In [13]:
# aedat4
print("Benchmarking AEDAT.")
gc.collect()
fpath = get_fpath("aedat")
def aedat_read() -> np.ndarray:
    decoder = aedat.Decoder(fpath) # type: ignore
    return np.concatenate([packet["events"] for packet in decoder if "events" in packet])
assert hash(aedat_read(), normalize_time=True) == reference_hash
aedat_time = timeit.timeit(aedat_read, number=REPEAT)/ REPEAT
aedat_size = get_fsize_MiB(Path(fpath))


Benchmarking AEDAT.


In [14]:
# loris
print("Benchmarking loris.")
gc.collect()
fpath = get_fpath("loris")
def loris_read() -> np.ndarray:
    return loris.read_file(fpath)["events"] # type: ignore
assert hash(loris_read()) == reference_hash
loris_time = timeit.timeit(loris_read, number=REPEAT) / REPEAT
loris_size = get_fsize_MiB(Path(fpath))

Benchmarking loris.


In [15]:
# eventstream
print("Benchmarking eventstream.")
gc.collect()
fpath = get_fpath("eventstream")
def eventstream_read() -> np.ndarray:
    with event_stream.Decoder(fpath) as decoder:
        return np.concatenate([packet for packet in decoder])
assert hash(eventstream_read()) == reference_hash
eventstream_time = timeit.timeit(eventstream_read, number=REPEAT) / REPEAT
eventstream_size = get_fsize_MiB(Path(fpath))

Benchmarking eventstream.


In [16]:
# aestream
aestream_times = list()
aestream_sizes = list()
for encoding in ["dat", "evt3"]:
    print(f"Benchmarking aestream ({encoding}).")
    gc.collect()
    fpath = get_fpath(encoding)
    def aestream_read() -> np.ndarray:
        return aestream.FileInput(fpath, (640, 480)).load()
    # Know bug: AEStream EVT3 decoding is wrong.
    if hash(aestream_read()) != reference_hash:
        print(f"ERROR: Decoded array does not correspond to original one for {encoding}!")
    aestream_times.append(timeit.timeit(expelliarmus_read, number=REPEAT) / REPEAT)
    aestream_sizes.append(get_fsize_MiB(Path(fpath)))

Benchmarking aestream (dat).
Benchmarking aestream (evt3).
Decoded array does not correspond to original one for evt3!


In [None]:
import dv_processing as dv

print("Benchmarking DV.")
gc.collect()
fpath = str(folder / "construction_rewritten.aedat4")

def dv_read() -> np.ndarray:
    reader = dv.io.MonoCameraRecording(fpath)
    event_slices = []
    while reader.isRunning():
        slice = reader.getNextEventBatch()
        if slice is None:
            break
        event_slices.append(slice.numpy())
    return np.concatenate(event_slices)

dv_time = timeit.timeit(dv_read, number=REPEAT) / REPEAT
dv_size = get_fsize_MiB(Path(fpath))

Benchmarking DV.


In [None]:
# Save the results

results = (
    list(
        zip(
            raw_encodings,
            ["expelliarmus"] * len(raw_encodings),
            expelliarmus_times,
            expelliarmus_sizes,
        )
    )
    + list(
        zip(
            ["dat", "evt3"],
            ["AEStream"] * 2,
            aestream_times,
            aestream_sizes,
        )
    )
    + list(zip(hdf5_encodings, ["h5py"] * len(hdf5_encodings), hdf5_times, hdf5_sizes))
    + list(
        zip(
            [f"numpy/brotli (Q={quality})" for quality in brotli_qualities],
            ["numpy/brotli"] * len(brotli_qualities),
            brotli_times,
            brotli_sizes,
        )
    )
    + [
        ("numpy (pickle)", "numpy", numpy_time, numpy_size),
        ("numpy (UNDR)", "numpy", undr_numpy_time, undr_numpy_size),
        ("aedat4", "aedat", aedat_time, aedat_size),
        ("aedat4", "DV", dv_time, dv_size),
        ("eventstream", "loris", loris_time, loris_size),
        ("eventstream", "event_stream", eventstream_time, eventstream_size),
    ]
)

with open("results.json", "w") as results_file:
    json.dump(results, results_file, indent=4)

In [None]:
## Plot results

import pandas
import plotly.express

with open("results.json") as results_file:
    results = json.load(results_file)
dataframe = pandas.DataFrame(
    {
        "Encoding": [result[0] for result in results],
        "Framework": [result[1] for result in results],
        "Read time [s]": [result[2] for result in results],
        "File size [MiB]": [result[3] for result in results],
    }
)

title = f"Reading the same {round(number_of_events / 1e6)} million events from different file formats."

figure = plotly.express.scatter(
    dataframe,
    x="Read time [s]",
    y="File size [MiB]",
    color="Framework",
    symbol="Encoding",
    template="plotly_dark",
    title=title,
)
figure.update_traces(marker_size=13)
figure.update_layout(height=600, width=900)
figure.write_image("file_read_benchmark.png")


figure = plotly.express.scatter(
    dataframe,
    x="Read time [s]",
    y="File size [MiB]",
    color="Framework",
    symbol="Encoding",
    template="plotly_white",
)
figure.update_traces(marker_size=13)
figure.update_layout(height=400, width=1000, margin=dict(l=10,r=10,b=10,t=10),)
figure.write_image("file_read_benchmark_white.png")

figure = plotly.express.scatter(
    dataframe,
    x="Read time [s]",
    y="File size [MiB]",
    color="Framework",
    symbol="Encoding",
    template="plotly_dark",
    title=title,
    log_x=True,
    log_y=True,
)
figure.update_traces(marker_size=13)
figure.update_layout(height=600, width=900)
figure.write_image("file_read_benchmark_log.png", scale=2)
