Skip to content

Commit

Permalink
SqliteDosStorage: Implement the backup functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
sphuber committed May 7, 2024
1 parent ea5f51b commit 18e447c
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 21 deletions.
85 changes: 67 additions & 18 deletions src/aiida/storage/sqlite_dos/backend.py
Expand Up @@ -16,11 +16,12 @@
from typing import TYPE_CHECKING, Optional
from uuid import uuid4

from disk_objectstore import Container
from disk_objectstore import Container, backup_utils
from pydantic import BaseModel, Field, field_validator
from sqlalchemy import insert
from sqlalchemy.orm import scoped_session, sessionmaker

from aiida.common import exceptions
from aiida.common.log import AIIDA_LOGGER
from aiida.manage import Profile
from aiida.manage.configuration.settings import AIIDA_CONFIG_FOLDER
Expand All @@ -40,6 +41,8 @@
__all__ = ('SqliteDosStorage',)

LOGGER = AIIDA_LOGGER.getChild(__file__)
FILENAME_DATABASE = 'database.sqlite'
FILENAME_CONTAINER = 'container'


class SqliteDosMigrator(PsqlDosMigrator):
Expand All @@ -52,7 +55,7 @@ class SqliteDosMigrator(PsqlDosMigrator):
"""

def __init__(self, profile: Profile) -> None:
filepath_database = Path(profile.storage_config['filepath']) / 'database.sqlite'
filepath_database = Path(profile.storage_config['filepath']) / FILENAME_DATABASE
filepath_database.touch()

self.profile = profile
Expand All @@ -64,7 +67,7 @@ def get_container(self) -> Container:
:returns: The disk-object store container configured for the repository path of the current profile.
"""
filepath_container = Path(self.profile.storage_config['filepath']) / 'container'
filepath_container = Path(self.profile.storage_config['filepath']) / FILENAME_CONTAINER
return Container(str(filepath_container))

def initialise_database(self) -> None:
Expand Down Expand Up @@ -112,6 +115,18 @@ def filepath_is_absolute(cls, value: str) -> str:
"""Return the resolved and absolute filepath."""
return str(Path(value).resolve().absolute())

@property
def filepath_root(self) -> Path:
return Path(self.profile.storage_config['filepath'])

@property
def filepath_container(self) -> Path:
return self.filepath_root / FILENAME_CONTAINER

@property
def filepath_database(self) -> Path:
return self.filepath_root / FILENAME_DATABASE

@classmethod
def initialise(cls, profile: Profile, reset: bool = False) -> bool:
filepath = Path(profile.storage_config['filepath'])
Expand All @@ -132,7 +147,7 @@ def initialise(cls, profile: Profile, reset: bool = False) -> bool:

def __str__(self) -> str:
state = 'closed' if self.is_closed else 'open'
return f'SqliteDosStorage[{self._profile.storage_config["filepath"]}]: {state},'
return f'SqliteDosStorage[{self.filepath_root}]: {state},'

def _initialise_session(self):
"""Initialise the SQLAlchemy session factory.
Expand All @@ -144,28 +159,22 @@ def _initialise_session(self):
Multi-thread support is currently required by the REST API.
Although, in the future, we may want to move the multi-thread handling to higher in the AiiDA stack.
"""
engine = create_sqla_engine(Path(self._profile.storage_config['filepath']) / 'database.sqlite')
engine = create_sqla_engine(self.filepath_database)
self._session_factory = scoped_session(sessionmaker(bind=engine, future=True, expire_on_commit=True))

def _backup(
self,
dest: str,
keep: Optional[int] = None,
):
raise NotImplementedError

def delete(self) -> None: # type: ignore[override]
"""Delete the storage and all the data."""
filepath = Path(self.profile.storage_config['filepath'])
if filepath.exists():
rmtree(filepath)
LOGGER.report(f'Deleted storage directory at `{filepath}`.')
if self.filepath_root.exists():
rmtree(self.filepath_root)
LOGGER.report(f'Deleted storage directory at `{self.filepath_root}`.')

def get_container(self) -> 'Container':
return Container(str(self.filepath_container))

def get_repository(self) -> 'DiskObjectStoreRepositoryBackend':
from aiida.repository.backend import DiskObjectStoreRepositoryBackend

container = Container(str(Path(self.profile.storage_config['filepath']) / 'container'))
return DiskObjectStoreRepositoryBackend(container=container)
return DiskObjectStoreRepositoryBackend(container=self.get_container())

@classmethod
def version_head(cls) -> str:
Expand Down Expand Up @@ -225,3 +234,43 @@ def _get_mapper_from_entity(entity_type: 'EntityTypes', with_pk: bool):
mapper = inspect(model).mapper # type: ignore[union-attr]
keys = {key for key, col in mapper.c.items() if with_pk or col not in mapper.primary_key}
return mapper, keys

def _backup(
self,
dest: str,
keep: Optional[int] = None,
):
"""Create a backup of the storage.
:param dest: Path to where the backup will be created. Can be a path on the local file system, or a path on a
remote that can be accessed over SSH in the form ``<user>@<host>:<path>``.
:param keep: The maximum number of backups to keep. If the number of copies exceeds this number, the oldest
backups are removed.
"""
try:
backup_manager = backup_utils.BackupManager(dest, keep=keep)
backup_manager.backup_auto_folders(lambda path, prev: self._backup_storage(backup_manager, path, prev))
except backup_utils.BackupError as exc:
raise exceptions.StorageBackupError(*exc.args) from exc

def _backup_storage(
self,
manager: backup_utils.BackupManager,
path: Path,
prev_backup: Path | None = None,
) -> None:
"""Create a backup of the sqlite database and disk-objectstore to the provided path.
:param manager: BackupManager from backup_utils containing utilities such as for calling the rsync.
:param path: Path to where the backup will be created.
:param prev_backup: Path to the previous backup. Rsync calls will be hard-linked to this path, making the backup
incremental and efficient.
"""
LOGGER.report('Running storage maintenance')
self.maintain(full=False, compress=False)

LOGGER.report('Backing up disk-objectstore container')
manager.call_rsync(self.filepath_container, path, link_dest=prev_backup, dest_trailing_slash=True)

LOGGER.report('Backing up sqlite database')
manager.call_rsync(self.filepath_database, path, link_dest=prev_backup, dest_trailing_slash=True)
5 changes: 3 additions & 2 deletions src/aiida/tools/pytest_fixtures/configuration.py
Expand Up @@ -115,7 +115,8 @@ def factory(
from aiida.manage.manager import get_manager

manager = get_manager()
storage_config = storage_config or {'filepath': str(pathlib.Path(config.dirpath) / 'storage')}
name = name or secrets.token_hex(16)
storage_config = storage_config or {'filepath': str(pathlib.Path(config.dirpath) / name / 'storage')}

if broker_backend and broker_config is None:
broker_config = {
Expand All @@ -133,7 +134,7 @@ def factory(
storage_config=storage_config,
broker_backend=broker_backend,
broker_config=broker_config,
name=name or secrets.token_hex(16),
name=name,
email=email,
is_test_profile=True,
)
Expand Down
15 changes: 14 additions & 1 deletion tests/storage/sqlite_dos/test_backend.py
Expand Up @@ -3,7 +3,7 @@
import pathlib

import pytest
from aiida.storage.sqlite_dos.backend import SqliteDosStorage
from aiida.storage.sqlite_dos.backend import FILENAME_CONTAINER, FILENAME_DATABASE, SqliteDosStorage


@pytest.mark.usefixtures('chdir_tmp_path')
Expand All @@ -25,3 +25,16 @@ def test_archive_import(aiida_config, aiida_profile_factory):
assert QueryBuilder().append(Node).count() == 0
import_archive(get_archive_file('calcjob/arithmetic.add.aiida'))
assert QueryBuilder().append(Node).count() > 0


def test_backup(aiida_config, aiida_profile_factory, tmp_path, manager):
"""Test the backup implementation."""
with aiida_profile_factory(aiida_config, storage_backend='core.sqlite_dos'):
storage = manager.get_profile_storage()
storage.backup(str(tmp_path))
filepath_last = tmp_path / 'last-backup'
assert (tmp_path / 'config.json').exists()
assert filepath_last.is_symlink()
dirpath_backup = filepath_last.resolve()
assert (dirpath_backup / FILENAME_DATABASE).exists()
assert (dirpath_backup / FILENAME_CONTAINER).exists()

0 comments on commit 18e447c

Please sign in to comment.