Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions faust/stores/rocksdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ def as_options(self) -> Options:


class Store(base.SerializedStore):
"""RocksDB table storage."""
"""RocksDB table storage.
Pass 'options={'read_only': True}' as an option into a Table class
to allow a RocksDB store be used by multiple apps.
"""

offset_key = b"__faust\0offset__"

Expand All @@ -161,6 +164,7 @@ def __init__(
*,
key_index_size: Optional[int] = None,
options: Optional[Mapping[str, Any]] = None,
read_only: Optional[bool] = False,
**kwargs: Any,
) -> None:
if rocksdb is None:
Expand All @@ -177,6 +181,7 @@ def __init__(
if not self.url.path:
self.url /= self.table_name
self.options = options or {}
self.read_only = self.options.pop("read_only", read_only)
self.rocksdb_options = RocksDBOptions(**self.options)
if key_index_size is None:
key_index_size = app.conf.table_key_index_size
Expand Down Expand Up @@ -364,7 +369,10 @@ def _db_for_partition(self, partition: int) -> DB:
return db

def _open_for_partition(self, partition: int) -> DB:
return self.rocksdb_options.open(self.partition_path(partition))
path = self.partition_path(partition)
return self.rocksdb_options.open(
path, read_only=self.read_only if os.path.isfile(path) else False
)

def _get(self, key: bytes) -> Optional[bytes]:
event = current_event()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/stores/test_rocksdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def test_db_for_partition(self, *, store):
def test_open_for_partition(self, *, store):
open = store.rocksdb_options.open = Mock(name="options.open")
assert store._open_for_partition(1) is open.return_value
open.assert_called_once_with(store.partition_path(1))
open.assert_called_once_with(store.partition_path(1), read_only=False)

def test__get__missing(self, *, store):
store._get_bucket_for_key = Mock(name="get_bucket_for_key")
Expand Down