-
Notifications
You must be signed in to change notification settings - Fork 4
/
asset_collection_operations.py
229 lines (204 loc) · 9.58 KB
/
asset_collection_operations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""idmtools comps asset collections operations.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import os
import uuid
from dataclasses import field, dataclass
from functools import partial
from logging import getLogger, DEBUG
from typing import Type, Union, List, TYPE_CHECKING, Optional
from uuid import UUID
import humanfriendly
from COMPS.Data import AssetCollection as COMPSAssetCollection, QueryCriteria, AssetCollectionFile, SimulationFile, OutputFileMetadata, WorkItemFile
from tqdm import tqdm
from idmtools import IdmConfigParser
from idmtools.assets import AssetCollection, Asset
from idmtools.entities.iplatform_ops.iplatform_asset_collection_operations import IPlatformAssetCollectionOperations
from idmtools_platform_comps.utils.general import get_file_as_generator
if TYPE_CHECKING: # pragma: no cover
from idmtools_platform_comps.comps_platform import COMPSPlatform
logger = getLogger(__name__)
user_logger = getLogger("user")
@dataclass
class CompsPlatformAssetCollectionOperations(IPlatformAssetCollectionOperations):
"""
Provides AssetCollection Operations to COMPSPlatform.
"""
platform: 'COMPSPlatform' # noqa F821
platform_type: Type = field(default=COMPSAssetCollection)
def get(self, asset_collection_id: Optional[UUID], load_children: Optional[List[str]] = None, query_criteria: Optional[QueryCriteria] = None, **kwargs) -> COMPSAssetCollection:
"""
Get an asset collection by id.
Args:
asset_collection_id: Id of asset collection
load_children: Optional list of children to load. Defaults to assets and tags
query_criteria: Optional query_criteria. Ignores children default
**kwargs:
Returns:
COMPSAssetCollection
"""
children = load_children if load_children is not None else ["assets", "tags"]
if asset_collection_id is None and query_criteria is None:
raise ValueError("You cannot query for all asset collections. Please specify a query criteria or an id")
query_criteria = query_criteria or QueryCriteria().select_children(children)
return COMPSAssetCollection.get(id=asset_collection_id, query_criteria=query_criteria)
def platform_create(self, asset_collection: AssetCollection, **kwargs) -> COMPSAssetCollection:
"""
Create AssetCollection.
Args:
asset_collection: AssetCollection to create
**kwargs:
Returns:
COMPSAssetCollection
"""
ac = COMPSAssetCollection()
ac_files = set()
ac_map = dict()
for asset in asset_collection:
# using checksum is not accurate and not all systems will support de-duplication
if asset.checksum is None:
md5_checksum_str = asset.calculate_checksum()
ac_files.add(
(
asset.filename,
asset.relative_path,
uuid.UUID(md5_checksum_str)
)
)
ac_map[asset] = uuid.UUID(md5_checksum_str)
else: # We should already have this asset so we should have a md5sum
ac_files.add(
(
asset.filename,
asset.relative_path,
asset.checksum if isinstance(asset.checksum, uuid.UUID) else uuid.UUID(asset.checksum)
)
)
ac_map[asset] = asset.checksum if isinstance(asset.checksum, uuid.UUID) else uuid.UUID(asset.checksum)
# remove any duplicates
if logger.isEnabledFor(DEBUG):
logger.debug(f"Building ac. Filtered out {len(asset_collection) - len(ac_files)} assets that exist on COMPS already")
for file in ac_files:
ac.add_asset(AssetCollectionFile(file_name=file[0], relative_path=file[1], md5_checksum=file[2]))
del ac_files
# Add tags
if asset_collection.tags:
ac.set_tags(asset_collection.tags)
# check for missing files first
missing_files = ac.save(return_missing_files=True)
if missing_files:
if logger.isEnabledFor(DEBUG):
logger.debug(f"{len(missing_files)} missing files detected")
ac2 = COMPSAssetCollection()
if asset_collection.tags:
ac2.set_tags(ac.tags)
total_size = 0
for asset, cksum in ac_map.items():
if cksum in missing_files:
if asset.absolute_path:
total_size += os.path.getsize(asset.absolute_path)
ac2.add_asset(
AssetCollectionFile(
file_name=asset.filename,
relative_path=asset.relative_path
),
file_path=asset.absolute_path
)
else:
total_size += len(asset.bytes)
ac2.add_asset(
AssetCollectionFile(
file_name=asset.filename,
relative_path=asset.relative_path
),
data=asset.bytes
)
else:
ac2.add_asset(AssetCollectionFile(
file_name=asset.filename,
relative_path=asset.relative_path,
md5_checksum=cksum
))
if IdmConfigParser.is_output_enabled():
user_logger.info(f"Uploading {len(missing_files)} files/{humanfriendly.format_size(total_size)}")
callback = None
prog = None
if not IdmConfigParser.is_progress_bar_disabled():
prog = tqdm(desc="Uploading files", unit='file', total=len(missing_files))
def update_progress(total_files_uploaded):
prog.n = total_files_uploaded
prog.display()
callback = update_progress
ac2.save(upload_files_callback=callback)
if callback:
callback(len(missing_files))
prog.close()
ac = ac2
asset_collection.uid = ac.id
asset_collection._platform_object = ac
asset_collection.platform = self.platform
asset_collection.platform_id = self.platform.uid
return ac
def to_entity(self, asset_collection: Union[COMPSAssetCollection, SimulationFile, List[SimulationFile], OutputFileMetadata, List[WorkItemFile]], **kwargs) \
-> AssetCollection:
"""
Convert COMPS Asset Collection or Simulation File to IDM Asset Collection.
Args:
asset_collection: Comps asset/asset collection to convert to idm asset collection
**kwargs:
Returns:
AssetCollection
Raises:
ValueError - If the file is not a SimulationFile or WorkItemFile
"""
ac = AssetCollection()
# set the platform/original object
ac.platform = self.platform
# we support comps simulations files and experiments as asset collections
# only true asset collections have ids
if isinstance(asset_collection, COMPSAssetCollection):
ac.uid = asset_collection.id
ac.tags = asset_collection.tags
elif isinstance(asset_collection, list) and len(asset_collection):
if not isinstance(asset_collection[0], (SimulationFile, WorkItemFile)):
raise ValueError("Unknown asset list")
else:
for file in asset_collection:
ac.add_asset(self.__simulation_file_to_asset(file))
assets = asset_collection.assets if isinstance(asset_collection, COMPSAssetCollection) else asset_collection
# if we have just one, make it a list
if isinstance(asset_collection, SimulationFile):
ac.add_asset(self.__simulation_file_to_asset(asset_collection))
if assets:
# add items to asset collection
for asset in assets:
if isinstance(asset, OutputFileMetadata):
a = Asset(filename=asset.friendly_name, relative_path=asset.path_from_root, persisted=True)
else:
a = Asset(filename=asset.file_name, checksum=asset.md5_checksum)
a._platform_object = asset
if isinstance(asset_collection, COMPSAssetCollection):
a.relative_path = asset.relative_path
a.persisted = True
a.length = asset.length
if isinstance(asset, OutputFileMetadata) or asset.uri:
a.download_generator_hook = partial(get_file_as_generator, asset)
ac.assets.append(a)
return ac
def __simulation_file_to_asset(self, asset_collection: Union[SimulationFile, WorkItemFile]):
"""
Converts a Simulation File to an Asset.
Args:
asset_collection:
Returns:
Asset created from sim file.
"""
asset = Asset(filename=asset_collection.file_name, checksum=asset_collection.md5_checksum)
# set original object for quick access again later
asset._platform_object = asset_collection
asset.is_simulation_file = True
asset.persisted = True
asset.length = asset_collection.length
if asset.uri:
asset.download_generator_hook = partial(get_file_as_generator, asset_collection)
return asset