Skip to content

Commit

Permalink
First implementation of export/import using new disk object store
Browse files Browse the repository at this point in the history
Use the `Container.export` method in export/import functionality
  • Loading branch information
sphuber committed Apr 28, 2021
1 parent a3930ff commit f30f000
Show file tree
Hide file tree
Showing 33 changed files with 294 additions and 587 deletions.
3 changes: 0 additions & 3 deletions aiida/orm/nodes/process/calculation/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ class CalcJobNode(CalculationNode):
SCHEDULER_LAST_JOB_INFO_KEY = 'last_job_info'
SCHEDULER_DETAILED_JOB_INFO_KEY = 'detailed_job_info'

# Base path within the repository where to put objects by default
_repository_base_path = 'raw_input'

# An optional entry point for a CalculationTools instance
_tools = None

Expand Down
2 changes: 1 addition & 1 deletion aiida/tools/graph/deletions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from aiida.backends.utils import delete_nodes_and_connections
from aiida.common.log import AIIDA_LOGGER
from aiida.common.warnings import AiidaDeprecationWarning
from aiida.orm import Group, Node, QueryBuilder, load_node
from aiida.orm import Group, Node, QueryBuilder
from aiida.tools.graph.graph_traversers import get_nodes_delete

__all__ = ('DELETE_LOGGER', 'delete_nodes', 'delete_group_nodes')
Expand Down
7 changes: 4 additions & 3 deletions aiida/tools/importexport/archive/migrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Migration archive files from old export versions to the newest, used by `verdi export migrate` command."""
from pathlib import Path
from typing import Any, Callable, Dict, Tuple, Union
from typing import Callable, Dict, Tuple

from aiida.tools.importexport.archive.common import CacheFolder

Expand All @@ -22,6 +21,7 @@
from .v07_to_v08 import migrate_v7_to_v8
from .v08_to_v09 import migrate_v8_to_v9
from .v09_to_v10 import migrate_v9_to_v10
from .v10_to_v11 import migrate_v10_to_v11

# version from -> version to, function which acts on the cache folder
_vtype = Dict[str, Tuple[str, Callable[[CacheFolder], None]]]
Expand All @@ -34,5 +34,6 @@
'0.6': ('0.7', migrate_v6_to_v7),
'0.7': ('0.8', migrate_v7_to_v8),
'0.8': ('0.9', migrate_v8_to_v9),
'0.9': ('0.10', migrate_v9_to_v10)
'0.9': ('0.10', migrate_v9_to_v10),
'0.10': ('0.11', migrate_v10_to_v11),
}
4 changes: 1 addition & 3 deletions aiida/tools/importexport/archive/migrations/v03_to_v04.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,12 @@ def migration_trajectory_symbols_to_attribute(data: dict, folder: CacheFolder):
"""Apply migrations: 0026 - REV. 1.0.26 and 0027 - REV. 1.0.27
Create the symbols attribute from the repository array for all `TrajectoryData` nodes.
"""
from aiida.tools.importexport.common.config import NODES_EXPORT_SUBFOLDER

path = folder.get_path(flush=False)

for node_id, content in data['export_data'].get('Node', {}).items():
if content.get('type', '') == 'node.data.array.trajectory.TrajectoryData.':
uuid = content['uuid']
symbols_path = path.joinpath(NODES_EXPORT_SUBFOLDER, uuid[0:2], uuid[2:4], uuid[4:], 'path', 'symbols.npy')
symbols_path = path.joinpath('nodes', uuid[0:2], uuid[2:4], uuid[4:], 'path', 'symbols.npy')
symbols = np.load(os.path.abspath(symbols_path)).tolist()
symbols_path.unlink()
# Update 'node_attributes'
Expand Down
74 changes: 74 additions & 0 deletions aiida/tools/importexport/archive/migrations/v10_to_v11.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Migration from v0.10 to v0.11, used by `verdi export migrate` command.
This migration deals with the file repository. In the old version, the
"""
import os
import shutil

from aiida.tools.importexport.archive.common import CacheFolder
from .utils import verify_metadata_version, update_metadata


def migrate_repository(metadata, data, folder):
"""Migrate the file repository to a disk object store container."""
from disk_objectstore import Container
from aiida.repository import Repository, File
from aiida.repository.backend import DiskObjectStoreRepositoryBackend

container = Container(os.path.join(folder.get_path(), 'container'))
container.init_container()
backend = DiskObjectStoreRepositoryBackend(container=container)
repository = Repository(backend=backend)

for values in data.get('export_data', {}).get('Node', {}).values():
uuid = values['uuid']
dirpath_calc = os.path.join(folder.get_path(), 'nodes', uuid[:2], uuid[2:4], uuid[4:], 'raw_input')
dirpath_data = os.path.join(folder.get_path(), 'nodes', uuid[:2], uuid[2:4], uuid[4:], 'path')

if os.path.isdir(dirpath_calc):
dirpath = dirpath_calc
elif os.path.isdir(dirpath_data):
dirpath = dirpath_data
else:
raise AssertionError('node repository contains neither `raw_input` nor `path` subfolder.')

if not os.listdir(dirpath):
continue

repository.put_object_from_tree(dirpath)
values['repository_metadata'] = repository.serialize()
# Artificially reset the metadata
repository._directory = File() # pylint: disable=protected-access

container.pack_all_loose(compress=False)
shutil.rmtree(os.path.join(folder.get_path(), 'nodes'))

metadata['all_fields_info']['Node']['repository_metadata'] = {}


def migrate_v10_to_v11(folder: CacheFolder):
"""Migration of export files from v0.10 to v0.11."""
old_version = '0.10'
new_version = '0.11'

_, metadata = folder.load_json('metadata.json')

verify_metadata_version(metadata, old_version)
update_metadata(metadata, new_version)

_, data = folder.load_json('data.json')

# Apply migrations
migrate_repository(metadata, data, folder)

folder.write_json('metadata.json', metadata)
folder.write_json('data.json', data)
70 changes: 11 additions & 59 deletions aiida/tools/importexport/archive/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,23 @@
"""Archive reader classes."""
from abc import ABC, abstractmethod
import json
import os
from pathlib import Path
import tarfile
from types import TracebackType
from typing import Any, Callable, cast, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Type
from typing import Any, Callable, cast, Dict, Iterator, List, Optional, Set, Tuple, Type
import zipfile

from distutils.version import StrictVersion
from archive_path import TarPath, ZipPath, read_file_in_tar, read_file_in_zip
from disk_objectstore import Container

from aiida.common.log import AIIDA_LOGGER
from aiida.common.exceptions import InvalidOperation
from aiida.common.folders import Folder, SandboxFolder
from aiida.tools.importexport.common.config import EXPORT_VERSION, ExportFileFormat, NODES_EXPORT_SUBFOLDER
from aiida.common.folders import SandboxFolder
from aiida.tools.importexport.common.config import EXPORT_VERSION, ExportFileFormat
from aiida.tools.importexport.common.exceptions import (CorruptArchive, IncompatibleArchiveVersionError)
from aiida.tools.importexport.archive.common import (ArchiveMetadata, null_callback)
from aiida.tools.importexport.common.config import NODE_ENTITY_NAME, GROUP_ENTITY_NAME
from aiida.tools.importexport.common.utils import export_shard_uuid

__all__ = (
'ArchiveReaderAbstract',
Expand Down Expand Up @@ -184,41 +183,15 @@ def iter_link_data(self) -> Iterator[dict]:
"""Iterate over links: {'input': <UUID>, 'output': <UUID>, 'label': <LABEL>, 'type': <TYPE>}"""

@abstractmethod
def iter_node_repos(
self,
uuids: Iterable[str],
callback: Callable[[str, Any], None] = null_callback,
) -> Iterator[Folder]:
"""Yield temporary folders containing the contents of the repository for each node.
:param uuids: UUIDs of the nodes over whose repository folders to iterate
:param callback: a callback to report on the process, ``callback(action, value)``,
with the following callback signatures:
- ``callback('init', {'total': <int>, 'description': <str>})``,
to signal the start of a process, its total iterations and description
- ``callback('update', <int>)``,
to signal an update to the process and the number of iterations to progress
:raises `~aiida.tools.importexport.common.exceptions.CorruptArchive`: If the repository does not exist.
"""

def node_repository(self, uuid: str) -> Folder:
"""Return a temporary folder with the contents of the repository for a single node.
:param uuid: The UUID of the node
:raises `~aiida.tools.importexport.common.exceptions.CorruptArchive`: If the repository does not exist.
"""
return next(self.iter_node_repos([uuid]))
def get_repository_container(self) -> Container:
"""Return an instance mapped to the repository container."""


class ReaderJsonBase(ArchiveReaderAbstract):
"""A reader base for the JSON compressed formats."""

FILENAME_DATA = 'data.json'
FILENAME_METADATA = 'metadata.json'
REPO_FOLDER = NODES_EXPORT_SUBFOLDER

def __init__(self, filename: str, sandbox_in_repo: bool = False, **kwargs: Any):
"""A reader for JSON compressed archives.
Expand Down Expand Up @@ -265,7 +238,7 @@ def _get_data(self):
"""Retrieve the data JSON."""
raise NotImplementedError()

def _extract(self, *, path_prefix: str, callback: Callable[[str, Any], None]):
def _extract(self, *, path_prefix: str, callback: Callable[[str, Any], None] = null_callback):
"""Extract repository data to a temporary folder.
:param path_prefix: Only extract paths starting with this prefix.
Expand Down Expand Up @@ -355,32 +328,11 @@ def iter_link_data(self) -> Iterator[dict]:
for value in self._get_data()['links_uuid']:
yield value

def iter_node_repos(
self,
uuids: Iterable[str],
callback: Callable[[str, Any], None] = null_callback,
) -> Iterator[Folder]:
path_prefixes = [os.path.join(self.REPO_FOLDER, export_shard_uuid(uuid)) for uuid in uuids]

if not path_prefixes:
return
self.assert_within_context()
def get_repository_container(self) -> Container:
"""Return an instance mapped to the repository container."""
self._extract(path_prefix='')
assert self._sandbox is not None # required by mypy

# unarchive the common folder if it does not exist
common_prefix = os.path.commonpath(path_prefixes)
if not self._sandbox.get_subfolder(common_prefix).exists():
self._extract(path_prefix=common_prefix, callback=callback)

callback('init', {'total': len(path_prefixes), 'description': 'Iterating node repositories'})
for uuid, path_prefix in zip(uuids, path_prefixes):
callback('update', 1)
subfolder = self._sandbox.get_subfolder(path_prefix)
if not subfolder.exists():
raise CorruptArchive(
f'Unable to find the repository folder for Node with UUID={uuid} in the exported file'
)
yield subfolder
return Container(Path(self._sandbox.abspath) / 'container')


class ReaderJsonZip(ReaderJsonBase):
Expand Down
43 changes: 24 additions & 19 deletions aiida/tools/importexport/archive/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"""Archive writer classes."""
from abc import ABC, abstractmethod
from copy import deepcopy
import os
from pathlib import Path
import shelve
import shutil
Expand All @@ -21,15 +20,13 @@
import zipfile

from archive_path import TarPath, ZipPath
from disk_objectstore import Container

from aiida.common import json
from aiida.common.exceptions import InvalidOperation
from aiida.common.folders import Folder
from aiida.tools.importexport.archive.common import ArchiveMetadata
from aiida.tools.importexport.common.config import (
EXPORT_VERSION, NODE_ENTITY_NAME, NODES_EXPORT_SUBFOLDER, ExportFileFormat
)
from aiida.tools.importexport.common.utils import export_shard_uuid
from aiida.tools.importexport.common.config import EXPORT_VERSION, NODE_ENTITY_NAME, ExportFileFormat

__all__ = ('ArchiveWriterAbstract', 'get_writer', 'WriterJsonZip', 'WriterJsonTar', 'WriterJsonFolder')

Expand Down Expand Up @@ -184,13 +181,10 @@ def write_entity_data(self, name: str, pk: int, id_key: str, fields: Dict[str, A
"""

@abstractmethod
def write_node_repo_folder(self, uuid: str, path: Union[str, Path], overwrite: bool = True):
"""Write a node repository to the archive.
:param uuid: The UUID of the node
:param path: The path to the repository folder on disk
:param overwrite: Allow to overwrite existing path in archive
def write_repository_container(self, container: Container):
"""Write a repository container to the archive.
:param container: the container.
"""


Expand Down Expand Up @@ -223,7 +217,7 @@ def write_group_nodes(self, uuid: str, node_uuids: List[str]):
def write_entity_data(self, name: str, pk: int, id_key: str, fields: Dict[str, Any]):
pass

def write_node_repo_folder(self, uuid: str, path: Union[str, Path], overwrite: bool = True):
def write_repository_container(self, container: Container):
pass


Expand Down Expand Up @@ -328,9 +322,13 @@ def write_entity_data(self, name: str, pk: int, id_key: str, fields: Dict[str, A
self._data['node_extras'][pk] = fields.pop('extras')
self._data['export_data'].setdefault(name, {})[pk] = fields

def write_node_repo_folder(self, uuid: str, path: Union[str, Path], overwrite: bool = True):
def write_repository_container(self, container: Container):
"""Write a repository container to the archive.
:param container: the container.
"""
self.assert_within_context()
(self._archivepath / NODES_EXPORT_SUBFOLDER / export_shard_uuid(uuid)).puttree(path, check_exists=not overwrite)
(self._archivepath / 'container').puttree(container.get_folder())


class WriterJsonTar(ArchiveWriterAbstract):
Expand Down Expand Up @@ -412,9 +410,13 @@ def write_entity_data(self, name: str, pk: int, id_key: str, fields: Dict[str, A
self._data['node_extras'][pk] = fields.pop('extras')
self._data['export_data'].setdefault(name, {})[pk] = fields

def write_node_repo_folder(self, uuid: str, path: Union[str, Path], overwrite: bool = True):
def write_repository_container(self, container: Container):
"""Write a repository container to the archive.
:param container: the container.
"""
self.assert_within_context()
(self._archivepath / NODES_EXPORT_SUBFOLDER / export_shard_uuid(uuid)).puttree(path, check_exists=not overwrite)
(self._archivepath / 'container').puttree(container.get_folder())


class WriterJsonFolder(ArchiveWriterAbstract):
Expand Down Expand Up @@ -495,7 +497,10 @@ def write_entity_data(self, name: str, pk: int, id_key: str, fields: Dict[str, A
self._data['node_extras'][pk] = fields.pop('extras')
self._data['export_data'].setdefault(name, {})[pk] = fields

def write_node_repo_folder(self, uuid: str, path: Union[str, Path], overwrite: bool = True):
def write_repository_container(self, container: Container):
"""Write a repository container to the archive.
:param container: the container.
"""
self.assert_within_context()
repo_folder = self._folder.get_subfolder(NODES_EXPORT_SUBFOLDER).get_subfolder(export_shard_uuid(uuid))
repo_folder.insert_path(src=os.path.abspath(path), dest_name='.', overwrite=overwrite)
self._folder.get_subfolder('container').insert_path(src=container.get_folder(), dest_name='.')
4 changes: 1 addition & 3 deletions aiida/tools/importexport/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
###########################################################################
# pylint: disable=wildcard-import,undefined-variable
"""Common utility functions, classes, and exceptions"""

from .archive import *
from .config import *
from .exceptions import *

__all__ = (archive.__all__ + config.__all__ + exceptions.__all__)
__all__ = (config.__all__ + exceptions.__all__)
Loading

0 comments on commit f30f000

Please sign in to comment.