An ETS-based partitioned buffer library for high-throughput data processing in Elixir.
PartitionedBuffer is a generic, reusable buffering system inspired by the
OpenTelemetry Batch Processor.
It efficiently buffers arbitrary data and periodically processes it using a
configurable processor function at regular intervals.
It ships with two concrete buffer implementations:
PartitionedBuffer.Queue— Ordered queue buffer (insertion-time ordered, backed by:ordered_setETS tables).PartitionedBuffer.Map— Key-value map buffer (last-write-wins semantics, backed by:setETS tables).
Both use partitioning to reduce lock contention and double-buffering for zero-downtime processing.
Add :partitioned_buffer to your list of dependencies in mix.exs:
def deps do
[
{:partitioned_buffer, "~> 0.2"}
]
endPartitionedBuffer.Queue buffers items in insertion order and processes them
in batches:
# Start a queue buffer
{:ok, _pid} =
PartitionedBuffer.Queue.start_link(
name: :my_queue,
processor: fn batch -> IO.inspect(batch) end
)
# Push items into the buffer
:ok = PartitionedBuffer.Queue.push(:my_queue, "message1")
:ok = PartitionedBuffer.Queue.push(:my_queue, ["message2", "message3"])
# Check buffer size
PartitionedBuffer.Queue.size(:my_queue)PartitionedBuffer.Map buffers key-value entries with last-write-wins
semantics. Entries with the same key overwrite previous values:
# Start a map buffer
# The processor receives a list of {key, value, version, updates} tuples
{:ok, _pid} =
PartitionedBuffer.Map.start_link(
name: :my_map,
processor: fn batch -> IO.inspect(batch) end
)
# Put entries into the buffer
:ok = PartitionedBuffer.Map.put(:my_map, :key1, "value1")
:ok = PartitionedBuffer.Map.put_all(:my_map, %{key2: "value2", key3: "value3"})
# Read and delete entries
"value1" = PartitionedBuffer.Map.get(:my_map, :key1)
:ok = PartitionedBuffer.Map.delete(:my_map, :key1)
# Check buffer size
PartitionedBuffer.Map.size(:my_map)For scenarios requiring "newer version wins" semantics (e.g., event sourcing,
state synchronization), use put_newer/5 and put_all_newer/3:
# Only updates if the version is greater than the existing one
:ok = PartitionedBuffer.Map.put_newer(:my_map, :key1, "v1", 100)
:ok = PartitionedBuffer.Map.put_newer(:my_map, :key1, "v2", 200) # overwrites
:ok = PartitionedBuffer.Map.put_newer(:my_map, :key1, "v3", 50) # ignored (50 < 200)
"v2" = PartitionedBuffer.Map.get(:my_map, :key1)
# Batch versioned updates
entries = [
{:user_1, %{name: "Alice"}, 100},
{:user_2, %{name: "Bob"}, 200}
]
:ok = PartitionedBuffer.Map.put_all_newer(:my_map, entries)In most applications, you'll want to add a buffer as a child in your application's supervision tree:
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Queue buffer
{PartitionedBuffer.Queue,
name: :event_queue,
processor: &MyApp.EventProcessor.process_batch/1,
processing_interval_ms: 1000,
partitions: 4},
# Map buffer
{PartitionedBuffer.Map,
name: :state_map,
processor: &MyApp.StateProcessor.process_batch/1}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endThe buffer will be automatically started with your application and will process any remaining items during graceful shutdown.
{:ok, _pid} = PartitionedBuffer.Queue.start_link(
name: :my_buffer,
partitions: 4, # Number of partitions (default: schedulers_online)
processing_interval_ms: 1000, # Process every second (default: 5000)
processing_batch_size: 100, # Batch size for processing (default: 10)
processing_timeout_ms: 5000, # Timeout for processing tasks (default: 60000)
processor: &MyApp.Exporter.export/1
)See the PartitionedBuffer module documentation for the full list of start and
runtime options.