diff --git a/faust/stores/rocksdb.py b/faust/stores/rocksdb.py index 3181951b2..9d7b06d7c 100644 --- a/faust/stores/rocksdb.py +++ b/faust/stores/rocksdb.py @@ -138,7 +138,10 @@ def as_options(self) -> Options: class Store(base.SerializedStore): - """RocksDB table storage.""" + """RocksDB table storage. + Pass 'options={'read_only': True}' as an option into a Table class + to allow a RocksDB store be used by multiple apps. + """ offset_key = b"__faust\0offset__" @@ -161,6 +164,7 @@ def __init__( *, key_index_size: Optional[int] = None, options: Optional[Mapping[str, Any]] = None, + read_only: Optional[bool] = False, **kwargs: Any, ) -> None: if rocksdb is None: @@ -177,6 +181,7 @@ def __init__( if not self.url.path: self.url /= self.table_name self.options = options or {} + self.read_only = self.options.pop("read_only", read_only) self.rocksdb_options = RocksDBOptions(**self.options) if key_index_size is None: key_index_size = app.conf.table_key_index_size @@ -364,7 +369,10 @@ def _db_for_partition(self, partition: int) -> DB: return db def _open_for_partition(self, partition: int) -> DB: - return self.rocksdb_options.open(self.partition_path(partition)) + path = self.partition_path(partition) + return self.rocksdb_options.open( + path, read_only=self.read_only if os.path.isfile(path) else False + ) def _get(self, key: bytes) -> Optional[bytes]: event = current_event() diff --git a/tests/unit/stores/test_rocksdb.py b/tests/unit/stores/test_rocksdb.py index bde79685b..a59eb54f2 100644 --- a/tests/unit/stores/test_rocksdb.py +++ b/tests/unit/stores/test_rocksdb.py @@ -236,7 +236,7 @@ def test_db_for_partition(self, *, store): def test_open_for_partition(self, *, store): open = store.rocksdb_options.open = Mock(name="options.open") assert store._open_for_partition(1) is open.return_value - open.assert_called_once_with(store.partition_path(1)) + open.assert_called_once_with(store.partition_path(1), read_only=False) def test__get__missing(self, *, store): store._get_bucket_for_key = Mock(name="get_bucket_for_key")