Skip to content

Commit

Permalink
Add method to RocksDB for backing up partitions (#304)
Browse files Browse the repository at this point in the history
* create method for backing up partition

* if we're not flushing, just directly read the db

* annotate backup method

* Define backup_partition in StoreT baseclass and derivatives

* change partition to tp

* change partition to union tp or int since all we care about is partition index

* fix error log

* add method to restore backups

* add forgotten ellipses

* remove misleading docstring

* Check if backup path is directory and make paths

* Convert partition paths used in restoration to str

* dedicate backup path by tablename

* update backup docstring

* dont import BackupEngine to fix linting

* commit lint changes

* reformat docstrings

* add general Exception

* add backup_partition and restore_backup to MyStore test class

* add backup_partition and restore_backup to MySerializedStore test class

* check permissions to create dirs and write to backup dir before spawning backupengine

* remove redundant exception handle

* add backup methods to ChangeloggedObjectManager

Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
  • Loading branch information
wbarnha and patkivikram committed Jul 19, 2022
1 parent a887571 commit 0bb2685
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 1 deletion.
20 changes: 20 additions & 0 deletions faust/stores/aerospike.py
Expand Up @@ -262,3 +262,23 @@ def aerospike_fun_call_with_retry(self, fun, *args, **kwargs):
ex
) # crash the app to prevent the offset from progressing
raise ex

async def backup_partition(
self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1
) -> None:
"""Backup partition from this store.
Not yet implemented for Aerospike.
"""
raise NotImplementedError("Not yet implemented for Aerospike.")

def restore_backup(
self, tp: Union[TP, int], latest: bool = True, backup_id: int = 0
) -> None:
"""Restore partition backup from this store.
Not yet implemented for Aerospike.
"""
raise NotImplementedError("Not yet implemented for Aerospike.")
22 changes: 21 additions & 1 deletion faust/stores/memory.py
@@ -1,5 +1,5 @@
"""In-memory table storage."""
from typing import Any, Callable, Iterable, MutableMapping, Optional, Set, Tuple
from typing import Any, Callable, Iterable, MutableMapping, Optional, Set, Tuple, Union

from faust.types import TP, EventT
from faust.types.stores import KT, VT
Expand Down Expand Up @@ -82,3 +82,23 @@ def reset_state(self) -> None:
"""
...

async def backup_partition(
self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1
) -> None:
"""Backup partition from this store.
This does nothing when using the in-memory store.
"""
...

def restore_backup(
self, tp: Union[TP, int], latest: bool = True, backup_id: int = 0
) -> None:
"""Restore partition backup from this store.
This does nothing when using the in-memory store.
"""
...
79 changes: 79 additions & 0 deletions faust/stores/rocksdb.py
Expand Up @@ -2,7 +2,9 @@
import asyncio
import gc
import math
import os
import shutil
import tempfile
import typing
from collections import defaultdict
from contextlib import suppress
Expand Down Expand Up @@ -183,6 +185,83 @@ def __init__(
self._key_index = LRUCache(limit=self.key_index_size)
self.db_lock = asyncio.Lock()
self.rebalance_ack = False
self._backup_path = os.path.join(self.path, f"{str(self.basename)}-backups")
try:
self._backup_engine = None
if not os.path.isdir(self._backup_path):
os.makedirs(self._backup_path, exist_ok=True)
testfile = tempfile.TemporaryFile(dir=self._backup_path)
testfile.close()
except PermissionError:
self.log.warning(
f'Unable to make directory for path "{self._backup_path}",'
f"disabling backups."
)
except OSError:
self.log.warning(
f'Unable to create files in "{self._backup_path}",' f"disabling backups"
)
else:
self._backup_engine = rocksdb.BackupEngine(self._backup_path)

async def backup_partition(
self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1
) -> None:
"""Backup partition from this store.
This will be saved in a separate directory in the data directory called
'{table-name}-backups'.
Arguments:
tp: Partition to backup
flush: Flush the memset before backing up the state of the table.
purge: Purge old backups in the process
keep: How many backups to keep after purging
This is only supported in newer versions of python-rocksdb which can read
the RocksDB database using multi-process read access.
See https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB to know more.
"""
if self._backup_engine:
partition = tp
if isinstance(tp, TP):
partition = tp.partition
try:
if flush:
db = await self._try_open_db_for_partition(partition)
else:
db = self.rocksdb_options.open(
self.partition_path(partition), read_only=True
)
self._backup_engine.create_backup(db, flush_before_backup=flush)
if purge:
self._backup_engine.purge_old_backups(keep)
except Exception:
self.log.info(f"Unable to backup partition {partition}.")

def restore_backup(
self, tp: Union[TP, int], latest: bool = True, backup_id: int = 0
) -> None:
"""Restore partition backup from this store.
Arguments:
tp: Partition to restore
latest: Restore the latest backup, set as False to restore a specific ID
backup_id: Backup to restore
"""
if self._backup_engine:
partition = tp
if isinstance(tp, TP):
partition = tp.partition
if latest:
self._backup_engine.restore_latest_backup(
str(self.partition_path(partition)), self._backup_path
)
else:
self._backup_engine.restore_backup(
backup_id, str(self.partition_path(partition)), self._backup_path
)

def persisted_offset(self, tp: TP) -> Optional[int]:
"""Return the last persisted offset.
Expand Down
13 changes: 13 additions & 0 deletions faust/tables/objects.py
Expand Up @@ -183,3 +183,16 @@ def apply_changelog_batch(

for tp, offset in tp_offsets.items():
self.set_persisted_offset(tp, offset)

async def backup_partition(
self, tp, flush: bool = True, purge: bool = False, keep: int = 1
) -> None:
raise NotImplementedError

def restore_backup(
self,
tp,
latest: bool = True,
backup_id: int = 0,
) -> None:
raise NotImplementedError
15 changes: 15 additions & 0 deletions faust/types/stores.py
Expand Up @@ -101,3 +101,18 @@ async def on_recovery_completed(
self, active_tps: Set[TP], standby_tps: Set[TP]
) -> None:
...

@abc.abstractmethod
async def backup_partition(
self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1
) -> None:
...

@abc.abstractmethod
def restore_backup(
self,
tp: Union[TP, int],
latest: bool = True,
backup_id: int = 0,
) -> None:
...
26 changes: 26 additions & 0 deletions tests/unit/stores/test_base.py
Expand Up @@ -29,6 +29,19 @@ def apply_changelog_batch(self, *args, **kwargs):
def reset_state(self):
...

async def backup_partition(
self, tp, flush: bool = True, purge: bool = False, keep: int = 1
) -> None:
...

def restore_backup(
self,
tp,
latest: bool = True,
backup_id: int = 0,
) -> None:
...


class Test_Store:
@pytest.fixture
Expand Down Expand Up @@ -120,6 +133,19 @@ def _clear(self):
def reset_state(self):
...

async def backup_partition(
self, tp, flush: bool = True, purge: bool = False, keep: int = 1
) -> None:
...

def restore_backup(
self,
tp,
latest: bool = True,
backup_id: int = 0,
) -> None:
...


class Test_SerializedStore:
@pytest.fixture
Expand Down

0 comments on commit 0bb2685

Please sign in to comment.