Skip to content

STREAMS API in Garnet #1131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 62 commits into
base: research
Choose a base branch
from

Conversation

ramananeesh
Copy link

Adds support for STREAMS in Garnet.

Index Structure

Uses an in-memory B+tree (B-tree) index that has the following features:

  1. Fast insertions through the tail leaf node of the index, taking advantage of sorted data input.
  2. Maximizes leaf occupancy, taking advantage of sorted data input.
  3. Uses tombstones for deletes.
  4. Supports forward and reverse range scans.

Supported Operations in API

The following operations are currently supported:

  1. XADD - add using auto-generated id and user-defined id. Does not currently support [NOMKSTREAM], [MAXLEN], [MINID] and [threshold]
  2. XLEN
  3. XRANGE
  4. XDEL

STREAMID

Stream ID is a 128-bit ID for an entry in the Stream that is of a format ts-seq where ts is generally the timestamp and seq is the sequence number.

STREAM

The Stream Object that consists of an instance to its B-tree index and a Tsavorite log instance for persistence. Every entry added to a Stream is first inserted into the Tsavorite log that returns the added address. This address is added as the value to the index using the STREAMID as key.

StreamManager

A container/wrapper that holds all Streams in the server in a dictionary.

SessionStreamCache

A local cache of Streams added by the client for faster access. Currently capped at capacity and uses a simple FIFO policy for the initial version. Can be extended to support other eviction strategies (preferably LRU).

Copy link
Contributor

@PaulusParssinen PaulusParssinen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First thanks for contributing!

I see the PR is marked as draft, but there's misunderstanding about the nature of Unsafe.AsPointer I want to help with as early as possible: It's use is always a mistake and unnecessary. Embrace Span<T> and normal struct usage as much as possible. I left few preliminary comments.

@prvyk
Copy link
Contributor

prvyk commented Mar 24, 2025

Two nits:

  1. FLUSHDB/FLUSHALL should clear the sequence number.
  2. Return value is bulk string, not simple string.

@ramananeesh ramananeesh changed the base branch from main to research June 19, 2025 01:12
ramananeesh and others added 25 commits June 23, 2025 13:56
…icrosoft#1248)

* Avoid memory allocation for hash operations wherever possible

* Also reuse the existing value array for HSET

* Changing ReadOnlySpan argument of WriteIntegerFromBytes scoped

* Added an helper for key reading + avoid allocation for double increment

---------

Co-authored-by: Tal Zaccai <talzacc@microsoft.com>
…meout. (microsoft#1262)

Co-authored-by: Tal Zaccai <talzacc@microsoft.com>
* fix regression in script loading; previously, if repeatedly reloading the same script into a session would cause the cache to churn each time - substantially increasing allocations and runtime of SCRIPT|LOAD and EVAL

* formatting

* small style cleanups

* min requirement of powershell 7, just for a nicer error message
* fix replication propagation in stored proc

* add test for replication store proc

* nit fix

* enable ReadWriteSession for AofReplay

* use txnVersion for enqueue in transactionManager

* address comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants