Skip to content

Commit

Permalink
Merge pull request #367 from datalad/issue-287
Browse files Browse the repository at this point in the history
Refactor extraction code
  • Loading branch information
christian-monch committed Mar 17, 2023
2 parents c741d49 + 9eb3a51 commit 30ed90b
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 173 deletions.
167 changes: 71 additions & 96 deletions datalad_metalad/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class ExtractionArguments:
source_dataset_version: str
local_source_object_path: Path
extractor_class: Union[type(MetadataExtractor), type(FileMetadataExtractor)]
extractor_type: str
extractor_name: str
extraction_parameter: Dict[str, str]
file_tree_path: Optional[MetadataPath]
Expand Down Expand Up @@ -297,6 +298,7 @@ def __call__(
local_source_object_path=(
source_dataset.pathobj / file_tree_path).absolute(),
extractor_class=extractor_class,
extractor_type=None,
extractor_name=extractor_name,
extraction_parameter=args_to_dict(extractor_args),
file_tree_path=file_tree_path,
Expand All @@ -310,11 +312,13 @@ def __call__(
# requested and the extractor class is a subclass of
# DatasetMetadataExtractor (or a legacy extractor class).
if path:
extraction_arguments.extractor_type = 'file'
# Check whether the path points to a sub_dataset.
ensure_path_validity(source_dataset, file_tree_path)
yield from do_file_extraction(extraction_arguments)
else:
yield from do_dataset_extraction(extraction_arguments)
extraction_arguments.extractor_type = 'dataset'

yield from do_extraction(ep=extraction_arguments)
return

@staticmethod
Expand Down Expand Up @@ -349,116 +353,83 @@ def custom_result_renderer(res, **kwargs):
ui.message(json.dumps(context))


def do_dataset_extraction(ep: ExtractionArguments):

if not issubclass(ep.extractor_class, MetadataExtractorBase):

lgr.debug(
"performing legacy dataset level metadata "
"extraction for dataset at at %s",
ep.source_dataset.path)

yield from legacy_extract_dataset(ep)
return

if not issubclass(ep.extractor_class, DatasetMetadataExtractor):
raise ValueError(
"A dataset-level metadata-extraction was attempted (since no "
"path argument was given), but the specified extractor "
f"({ep.extractor_name}) is not a dataset-level extractor.")

lgr.debug(
"extracting dataset level metadata for dataset at %s",
ep.source_dataset.path)

extractor = ep.extractor_class(
ep.source_dataset,
ep.source_dataset_version,
ep.extraction_parameter)

yield from perform_dataset_metadata_extraction(ep, extractor)


def do_file_extraction(ep: ExtractionArguments):
def do_extraction(ep: ExtractionArguments):
extractor_type = ep.extractor_type

# Legacy extraction
legacy_extractor_map = {
'file': legacy_extract_file,
'dataset': legacy_extract_dataset,
}
if not issubclass(ep.extractor_class, MetadataExtractorBase):

lgr.debug(
"performing legacy file level metadata "
"extraction for file at %s/%s",
ep.source_dataset.path,
ep.file_tree_path)

yield from legacy_extract_file(ep)
"performing legacy %s-level metadata "
"extraction for %s at %s",
extractor_type,
extractor_type,
ep.source_dataset.path / ep.file_tree_path
if extractor_type == 'file' else ep.source_dataset.path)

yield from legacy_extractor_map[extractor_type](ep)
return

if not issubclass(ep.extractor_class, FileMetadataExtractor):

raise ValueError(
"A file-level metadata-extraction was attempted, but the "
f"specified extractor ({ep.extractor_name}) is not a "
f"file-level extractor.")


# Latest generation extraction
extractor_class_map = {
'file': FileMetadataExtractor,
'dataset': DatasetMetadataExtractor,
}
if not issubclass(ep.extractor_class, extractor_class_map[extractor_type]):
msg = (
f"A {extractor_type}-level metadata-extraction was attempted",
"since no path argument was given" if extractor_type == 'dataset' else "",
f"but the specified extractor ({ep.extractor_name})",
f"is not a {extractor_type}-level extractor"
)
raise ValueError(msg)

lgr.debug(
"performing file level extracting for file at %s/%s",
ep.source_dataset.path,
ep.file_tree_path)

file_info = get_file_info(ep.source_dataset, ep.file_tree_path)
extractor = ep.extractor_class(
"performing %s-level metadata "
"extraction for %s at %s",
extractor_type,
extractor_type,
ep.source_dataset.path / ep.file_tree_path \
if extractor_type == 'file' else ep.source_dataset.path)

if extractor_type == 'file':
file_info = get_file_info(ep.source_dataset, ep.file_tree_path)
extractor = ep.extractor_class(
ep.source_dataset,
ep.source_dataset_version,
file_info,
ep.extraction_parameter)
ensure_content_availability(extractor, file_info)
else:
extractor = ep.extractor_class(
ep.source_dataset,
ep.source_dataset_version,
file_info,
ep.extraction_parameter)

ensure_content_availability(extractor, file_info)

yield from perform_file_metadata_extraction(ep, extractor)

yield from perform_metadata_extraction(ep, extractor)

def perform_file_metadata_extraction(extraction_arguments: ExtractionArguments,
extractor: FileMetadataExtractor):

def perform_metadata_extraction(
ep: ExtractionArguments,
extractor: Union[DatasetMetadataExtractor, FileMetadataExtractor]
):

# Get output category; only IMMEDIATE is supported
output_category = extractor.get_data_output_category()
if output_category != DataOutputCategory.IMMEDIATE:
raise NotImplementedError(
f"Output category {output_category} not supported")

result = extractor.extract(None)
result.datalad_result_dict["action"] = "meta_extract"
result.datalad_result_dict["path"] = extraction_arguments.local_source_object_path
if result.extraction_success:
result.datalad_result_dict["metadata_record"] = dict(
type="file",
dataset_id=extraction_arguments.source_dataset_id,
dataset_version=extraction_arguments.source_dataset_version,
path=extraction_arguments.file_tree_path,
extractor_name=extraction_arguments.extractor_name,
extractor_version=extractor.get_version(),
extraction_parameter=extraction_arguments.extraction_parameter,
extraction_time=time.time(),
agent_name=extraction_arguments.agent_name,
agent_email=extraction_arguments.agent_email,
extracted_metadata=result.immediate_data)

yield result.datalad_result_dict


def perform_dataset_metadata_extraction(ep: ExtractionArguments,
extractor: DatasetMetadataExtractor):

output_category = extractor.get_data_output_category()
if output_category != DataOutputCategory.IMMEDIATE:
raise NotImplementedError(
f"Output category {output_category} not supported")


# Prepare result record
result_template = {
"action": "meta_extract",
"path": ep.local_source_object_path
}

# Let the extractor get the files it requires
# Handle both return possibilities of bool and Generator

# Get required content
res = extractor.get_required_content()
if isinstance(res, bool):
if res is False:
Expand All @@ -475,8 +446,8 @@ def perform_dataset_metadata_extraction(ep: ExtractionArguments,
yield r
if failure_count > 0:
return

# Process results
# Run extraction and update result
result = extractor.extract(None)
result.datalad_result_dict.update(result_template)
if result.extraction_success:
Expand All @@ -491,7 +462,11 @@ def perform_dataset_metadata_extraction(ep: ExtractionArguments,
agent_name=ep.agent_name,
agent_email=ep.agent_email,
extracted_metadata=result.immediate_data)

if issubclass(ep.extractor_class, FileMetadataExtractor):
result.datalad_result_dict["metadata_record"].update(
dict(path=ep.file_tree_path)
)

yield result.datalad_result_dict


Expand Down
50 changes: 25 additions & 25 deletions datalad_metalad/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,31 @@ def get_version(self) -> str: # TODO shall we remove this and regard it as
@abc.abstractmethod
def get_data_output_category(self) -> DataOutputCategory:
raise NotImplementedError

def get_required_content(self) -> Union[bool, Generator]:
"""Let the extractor get the content that it needs locally.
The default implementation is to do nothing and return True
Extractors that overwrite this function can return a boolean
(True/False) value OR yield DataLad result records.
Returns
-------
bool
True if all required content could be fetched, False
otherwise. If False is returned, the extractor
infrastructure will signal an error and the extractor's
extract method will not be called.
Yields
------
dict
DataLad result records. If a result record is yielded
with a failure 'status' (i.e. equal to 'impossible' or
'error') the extractor infrastructure will signal an error
and the extractor's extract method will not be called.
"""
return True

def get_state(self, dataset):
"""Report on extractor-related state and configuration
Expand Down Expand Up @@ -159,31 +184,6 @@ def __init__(self,
self.ref_commit = ref_commit
self.parameter = parameter or {}

def get_required_content(self) -> Union[bool, Generator]:
"""Let the extractor get the content that it needs locally.
The default implementation is to do nothing and return True
Extractors that overwrite this function can return a boolean
(True/False) value OR yield DataLad result records.
Returns
-------
bool
True if all required content could be fetched, False
otherwise. If False is returned, the extractor
infrastructure will signal an error and the extractor's
extract method will not be called.
Yields
------
dict
DataLad result records. If a result record is yielded
with a failure 'status' (i.e. equal to 'impossible' or
'error') the extractor infrastructure will signal an error
and the extractor's extract method will not be called.
"""
return True


class FileMetadataExtractor(MetadataExtractorBase, metaclass=abc.ABCMeta):
def __init__(self,
Expand Down

0 comments on commit 30ed90b

Please sign in to comment.