-
Notifications
You must be signed in to change notification settings - Fork 11
/
aggregate.py
572 lines (461 loc) · 21.7 KB
/
aggregate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 et:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the datalad package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""
Interface for aggregating metadata from (sub)dataset into (super)datasets
Aggregating a subdataset (sds) into the UUID set of the root dataset (rds)
is relatively simple. (There is a possible error condition, where different
UUIDs would be added to the same rds-pd-version at the same path).
Assumption:
The sds are real sub-datasets of the rds
Issue 1: path determination
----------------------------
For UUID-set aggregation and for tree-version-list aggregation, the new path
of the sds metadata has to be determined.
UUIDSet:
UUID ---connected to--> [{sds-pd-version, path, metadata-root-record}, ...]
Since the sds UUID-set metadata has no path yet in the aggregated rds metadata,
the path has to be determined. For a given uuid and a given rds, with metadata
md(rds) all possible paths would be:
relevant-rds-versions = for all versions(rds) where sds-version in [sds-pd-version of uuid]
for rds-version in relevant-rds-versions:
for sds-pd-version, _, metadata-root-record in version-list(uuid):
if sds-pd-version is sub-dataset of rds-version:
sds-path = path of sds-pd-version in rds-version
add metadata_root_record to uuid-set(rds).sds-pd-version, sds-path
else:
Error("Cannot find path of sds-uuid@sds-pd-version in any rds@version)
Error("What can you do? Not much besides re-aggregating")
Error("What can we do? Add a structure that allows for 'detached' metadata")
"""
import dataclasses
import logging
import subprocess
import time
from pathlib import Path
from typing import (
List,
Tuple,
cast,
)
from datalad.interface.base import (
Interface,
build_doc,
eval_results,
)
from datalad.distribution.dataset import (
Dataset,
EnsureDataset,
datasetmethod,
)
from datalad.support.constraints import (
EnsureStr,
EnsureNone,
)
from datalad.support.exceptions import InsufficientArgumentsError
from datalad.support.param import Parameter
from dataladmetadatamodel.common import get_top_level_metadata_objects
from dataladmetadatamodel.datasettree import DatasetTree
from dataladmetadatamodel.metadatapath import MetadataPath
from dataladmetadatamodel.uuidset import UUIDSet
from dataladmetadatamodel.versionlist import TreeVersionList
from dataladmetadatamodel.mapper.gitmapper.objectreference import (
flush_object_references)
from dataladmetadatamodel.mapper.gitmapper.utils import locked_backend
from .utils import check_dataset
__docformat__ = 'restructuredtext'
lgr = logging.getLogger('datalad.metadata.aggregate')
@dataclasses.dataclass
class AggregateItem:
source_tree_version_list: TreeVersionList
source_uuid_set: UUIDSet
destination_path: MetadataPath
@build_doc
class Aggregate(Interface):
"""Aggregate metadata of one or more sub-datasets for later reporting.
.. note::
MetadataRecord storage is not forced to reside inside the datalad repository
of the dataset. MetadataRecord might be stored within the repository that
is used by a dataset, but it might as well be stored in another
repository (or a non-git backend, once those exist). To distinguish
metadata storage from the dataset storage, we refer to metadata storage
as metadata-store. For now, the metadata-store is usually the
git-repository that holds the dataset.
.. note::
The distinction is the reason for the "double"-path arguments below.
for each source metadata-store that should be integrated into the root
metadata-store, we have to give the source metadata-store itself and the
intra-dataset-path with regard to the root-dataset.
MetadataRecord aggregation refers to a procedure that combines metadata from
different sub-datasets into a root dataset, i.e. a dataset that contains
all the sub-datasets. Aggregated metadata is "prefixed" with the
intra-dataset-paths of the sub-datasets. The intra-dataset-path for a
sub-dataset is the path from the top-level directory of the root dataset,
i.e. the directory that contains the ".datalad"-entry, to the top-level
directory of the respective sub-dataset.
Aggregate works on existing metadata, it will not extract meta data from
data file. To create metadata, use the meta-extract command.
As a result of the aggregation, the metadata of all specified sub-datasets
will be available in the root metadata-store. A datalad meta-dump command
on the root metadata-store will therefore be able to process metadata
from the root dataset, as well as all aggregated sub-datasets.
"""
_examples_ = [
dict(
text="For example, if the root dataset path is '/home/root_ds', "
"the following command can be used to aggregate metadata of "
"two sub-datasets, e.g. '/home/root_ds/sub_ds1' and "
"'/home/root_ds/sub_ds2', into the root dataset",
code_cmd="datalad meta-aggregate -d /home/root_ds "
"/home/root_ds/sub_ds1 /home/root_ds/sub_ds2")
]
_params_ = dict(
dataset=Parameter(
args=("-d", "--dataset"),
metavar="ROOT_DATASET",
doc="""Topmost dataset metadata will be aggregated into. If no
dataset is specified, a dataset will be discovered based on the
current working directory. MetadataRecord for aggregated datasets will
contain a dataset path that is relative to the top-dataset""",
constraints=EnsureDataset() | EnsureNone()),
path=Parameter(
args=("path",),
metavar="SUB_DATASET_PATH",
doc=r"""
SUB_DATASET_PATH is a path to a sub-dataset whose metadata shall be
aggregated into the topmost dataset (ROOT_DATASET). The sub-dataset
must be located within the directory of the topmost dataset. Note:
if SUB_DATASET_PATH is relative, it is resolved against the current
working directory, not against the path of the topmost dataset""",
nargs="+",
constraints=EnsureStr() | EnsureNone()))
@staticmethod
@datasetmethod(name='meta_aggregate')
@eval_results
def __call__(
dataset=None,
path=None):
root_dataset = check_dataset(dataset or ".", "meta_aggregate")
root_realm = root_dataset.path
path_realm_associations = process_path_spec(root_dataset, path)
backend = "git"
# TODO: we should read-lock all ag_realms
# Collect aggregate information
aggregate_items = []
for ag_path, ag_metadata_store in path_realm_associations:
ag_tree_version_list, ag_uuid_set = get_top_level_metadata_objects(
backend,
ag_metadata_store)
if ag_tree_version_list is None or ag_uuid_set is None:
message = (
f"No valid datalad metadata found in: {ag_metadata_store}, "
f"ignoring metadata store at {ag_metadata_store.resolve()} "
f"(and sub-dataset {ag_path}).")
lgr.warning(message)
continue
aggregate_items.append(
AggregateItem(
ag_tree_version_list,
ag_uuid_set,
ag_path))
if not aggregate_items:
raise InsufficientArgumentsError(
"No valid metadata stores were specified for aggregation")
with locked_backend(root_dataset.pathobj):
tree_version_list, uuid_set = get_top_level_metadata_objects(
backend,
root_realm)
if tree_version_list is None:
lgr.warning(
f"no tree version list found in {root_realm}, "
f"creating an empty tree version list")
tree_version_list = TreeVersionList()
if uuid_set is None:
lgr.warning(
f"no uuid set found in {root_realm}, "
f"creating an empty set")
uuid_set = UUIDSet()
perform_aggregation(
root_realm,
tree_version_list,
uuid_set,
aggregate_items)
tree_version_list.write_out(root_realm)
uuid_set.write_out(root_realm)
flush_object_references(root_dataset.pathobj)
yield dict(
action="meta_aggregate",
status='ok',
path=root_realm,
destination=root_realm,
sources=[str(pam[1]) for pam in path_realm_associations],
backend=backend,
message=f"aggregated metadata into {root_realm}")
return
def process_path_spec(root_dataset: Dataset,
paths: List[str]
) -> List[Tuple[MetadataPath, Path]]:
result = []
for path in paths:
path_object = Path(path).absolute()
if path_object == root_dataset.pathobj:
raise ValueError(
f"Cannot aggregate {path_object} into itself")
sub_dataset = check_dataset(str(path_object), "meta_aggregate")
result.append((
MetadataPath(sub_dataset.pathobj.relative_to(root_dataset.pathobj)),
sub_dataset.pathobj))
return result
def perform_aggregation(destination_metadata_store: str,
tree_version_list: TreeVersionList,
destination_uuid_set: UUIDSet,
aggregate_items: List[AggregateItem]
):
for aggregate_item in aggregate_items:
copy_uuid_set(
destination_metadata_store,
destination_uuid_set,
aggregate_item.source_uuid_set,
aggregate_item.destination_path)
copy_tree_version_list(
destination_metadata_store,
tree_version_list,
aggregate_item.source_tree_version_list,
aggregate_item.destination_path)
def copy_uuid_set(destination_metadata_store: str,
destination_uuid_set: UUIDSet,
source_uuid_set: UUIDSet,
destination_path: MetadataPath):
"""
For each uuid in the source uuid set, create a version
list in the destination uuid set, if it does not yet exist
and copy the metadata for all versions into the version list.
"""
# For every uuid in the source uuid set get the source version list
for uuid in source_uuid_set.uuids():
lgr.debug(f"aggregating metadata of dataset UUID: {uuid}")
src_version_list = source_uuid_set.get_version_list(uuid)
# If the destination does not contain a version list for the
# source UUID, we add a copy of the source version list with
# a the specified path prefix
if uuid not in destination_uuid_set.uuids():
lgr.debug(
f"no version list for UUID: {uuid} in dest, creating it, "
f"by copying the source version list")
destination_uuid_set.set_version_list(
uuid,
src_version_list.deepcopy(
new_destination=destination_metadata_store,
path_prefix=destination_path))
else:
# Get the destination version list
lgr.debug(f"updating destination version list for UUID: {uuid}")
dest_version_list = destination_uuid_set.get_version_list(uuid)
# Copy the individual version elements from source to destination.
for pd_version in src_version_list.versions():
lgr.debug(
f"reading metadata element for pd version {pd_version} "
f"of UUID: {uuid}")
time_stamp, old_path, element = \
src_version_list.get_versioned_element(
primary_data_version=pd_version,
prefix_path=MetadataPath(""))
new_path = destination_path / old_path
lgr.debug(
f"adding version {pd_version} with path "
f"{new_path} to UUID: {uuid}")
dest_version_list.set_versioned_element(
primary_data_version=pd_version,
time_stamp=time_stamp,
prefix_path=new_path,
element=element.deepcopy(
new_destination=destination_metadata_store))
# Unget the versioned element
lgr.debug(
f"persisting copied metadata element for pd version "
f"{pd_version} of UUID: {uuid}")
dest_version_list.unget_versioned_element(
primary_data_version=pd_version,
prefix_path=new_path,
new_destination=destination_metadata_store)
# Remove the source versioned element from memory
lgr.debug(
f"purging source metadata element for pd version "
f"{pd_version} of UUID: {uuid}")
src_version_list.unget_versioned_element(
primary_data_version=pd_version,
prefix_path=old_path)
# Unget the version list in the destination, that should persist it
lgr.debug(f"persisting copied version list for UUID: {uuid}")
destination_uuid_set.unget_version_list(uuid)
# Remove the version list from memory
lgr.debug(f"purging source version list for UUID: {uuid}")
source_uuid_set.unget_version_list(uuid)
def copy_to_existing(destination_metadata_store: str,
root_dataset_tree: DatasetTree,
source_dataset_tree: DatasetTree,
destination_path: MetadataPath):
if destination_path in root_dataset_tree:
lgr.warning(
f"replacing subtree {destination_path} for root dataset "
f"{root_dataset_tree}")
root_dataset_tree.delete_subtree(destination_path)
copied_dataset_tree = source_dataset_tree.deepcopy(
"git",
destination_metadata_store)
# TODO: due to the current policy of deepcopy, i.e. write
# out everything that was copied and purge the complete
# object, we have to read in the object after copying it
# only, to write it out again. This is wasteful. We should
# instead specify that the top-level object, here: the
# DatasetTree, is not written out.
copied_dataset_tree.read_in()
root_dataset_tree.add_subtree(
copied_dataset_tree,
destination_path)
def copy_tree_version_list(destination_metadata_store: str,
destination_tree_version_list: TreeVersionList,
source_tree_version_list: TreeVersionList,
destination_path: MetadataPath):
"""Copy the source tree to the versioned or un-versioned destination path
For each source_tree in source_tree_version_list we do:
1. Check whether it has an un-versioned root-path.
(Case 1): If so, we extend the un-versioned root-path by
"destination path", copy the existing tree, and are done.
2. If case 1 is not given, we do the following:
For every destination_tree in destination_tree_version_list that has a
source_tree with the given version at the given path, we copy the
source_tree into the dataset_tree of the destination_tree. (Case 2)
3. If case 2 appeared, the source_tree_version_list entry is copied
into the destination tree version list with an "un-versioned path" and
the version of the source_tree. (Case 3)
"""
for stv_info in source_tree_version_list.versioned_elements:
source_version, (_, source_path, source_dataset_tree) = stv_info
source_dataset_tree = cast(DatasetTree, source_dataset_tree)
# Case 1: extend un-versioned path of dataset tree
if source_path != MetadataPath(""):
un_versioned__path = destination_path / source_path
copied_dataset_tree = source_dataset_tree.deepcopy(
"git",
destination_metadata_store)
copied_dataset_tree.read_in()
destination_tree_version_list.set_dataset_tree(
primary_data_version=source_version,
time_stamp=str(time.time()),
prefix_path=un_versioned__path,
dataset_tree=copied_dataset_tree)
# Save newly created tree-copy to new destination
destination_tree_version_list.unget_dataset_tree(
source_version,
destination_path,
destination_metadata_store)
else:
was_contained = False
for dtv_info in destination_tree_version_list.versioned_elements:
dest_version, (_, dest_path, dest_dataset_tree) = dtv_info
dest_dataset_tree = cast(DatasetTree, dest_dataset_tree)
# Ignore dataset trees with un-versioned paths:
if dest_path != MetadataPath(""):
continue
is_contained = does_version_contain_version_at(
superset_path=Path(destination_metadata_store),
superset_version=dest_version,
subset_version=source_version,
subset_path=destination_path)
if is_contained is True:
# Case 2: copy source tree into dataset tree
was_contained = True
copy_to_existing(
destination_metadata_store=destination_metadata_store,
root_dataset_tree=dest_dataset_tree,
source_dataset_tree=source_dataset_tree,
destination_path=destination_path
)
destination_tree_version_list.set_dataset_tree(
primary_data_version=dest_version,
time_stamp=str(time.time()),
prefix_path=MetadataPath(""),
dataset_tree=dest_dataset_tree)
# Save newly created tree-copy to new destination
destination_tree_version_list.unget_dataset_tree(
primary_data_version=dest_version,
prefix_path=MetadataPath(""),
new_destination=destination_metadata_store)
if was_contained is False:
# Case 3: copy anonymously
copied_dataset_tree = source_dataset_tree.deepcopy(
"git",
destination_metadata_store)
# TODO: due to the current policy of deepcopy, i.e. write
# out everything that was copied and purge the complete
# object, we have to read in the object after copying it
# only, to write it out again. This is wasteful. We should
# instead specify that the top-level object, here: the
# DatasetTree, is not written out.
copied_dataset_tree.read_in()
destination_tree_version_list.set_dataset_tree(
primary_data_version=source_version,
time_stamp=str(time.time()),
prefix_path=destination_path,
dataset_tree=copied_dataset_tree)
# Save newly created tree-copy to new destination
destination_tree_version_list.unget_dataset_tree(
source_version,
destination_path,
destination_metadata_store)
# Source should not need saving, so we purge it
source_tree_version_list.purge()
return
def does_version_contain_version_at(superset_path: Path,
superset_version: str,
subset_version: str,
subset_path: MetadataPath
) -> bool:
result = subprocess.run([
f"git",
"-P",
"--git-dir", str(superset_path / ".git"),
"log",
f"--find-object={subset_version}",
f"--pretty=tformat:%h",
"--no-abbrev",
"--", str(subset_path)],
stdout=subprocess.PIPE)
submodule_commits = result.stdout.decode().splitlines()
if len(submodule_commits) == 1:
result = subprocess.run([
f"git",
"-P",
"--git-dir", str(superset_path / ".git"),
"log",
"-n", "1",
f"--pretty=tformat:%h",
"--no-abbrev"],
stdout=subprocess.PIPE)
start_commit = result.stdout.decode().splitlines()[0]
end_commit = submodule_commits[0]
first_index = 0
else:
start_commit, end_commit = submodule_commits
first_index = 1
result = subprocess.run([
f"git",
"-P",
"--git-dir", str(superset_path / ".git"),
"log",
f"{start_commit}~{first_index}...{end_commit}",
f"--pretty=tformat:%h",
"--no-abbrev"],
stdout=subprocess.PIPE)
all_submodule_holders = result.stdout.decode().splitlines()
lgr.debug(
f"does_version_contain_version_at(super-set version={superset_version}"
f", subset_version={subset_version}): "
f"{superset_version in all_submodule_holders}")
return superset_version in all_submodule_holders