Skip to content

Commit

Permalink
Add optional execution metadata to objects written to storage for eas…
Browse files Browse the repository at this point in the history
…y attribution (#2370)

Signed-off-by: Robert Deaton <robert.deaton@freenome.com>
  • Loading branch information
rdeaton-freenome committed May 6, 2024
1 parent 0869786 commit 5290171
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 0 deletions.
8 changes: 8 additions & 0 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,18 @@ def setup_execution(
task_id=_identifier.Identifier(_identifier.ResourceType.TASK, tk_project, tk_domain, tk_name, tk_version),
)

metadata = {
"flyte-execution-project": exe_project,
"flyte-execution-domain": exe_domain,
"flyte-execution-launchplan": exe_lp,
"flyte-execution-workflow": exe_wf,
"flyte-execution-name": exe_name,
}
try:
file_access = FileAccessProvider(
local_sandbox_dir=tempfile.mkdtemp(prefix="flyte"),
raw_output_prefix=raw_output_data_prefix,
execution_metadata=metadata,
)
except TypeError: # would be thrown from DataPersistencePlugins.find_plugin
logger.error(f"No data plugin found for raw output prefix {raw_output_data_prefix}")
Expand Down
20 changes: 20 additions & 0 deletions flytekit/configuration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,24 @@ def auto(cls, config_file: typing.Union[str, ConfigFile] = None) -> GCSConfig:
return GCSConfig(**kwargs)


@dataclass(init=True, repr=True, eq=True, frozen=True)
class GenericPersistenceConfig(object):
"""
Data storage configuration that applies across any provider.
"""

attach_execution_metadata: bool = True

@classmethod
def auto(cls, config_file: typing.Union[str, ConfigFile] = None) -> GCSConfig:
config_file = get_config_file(config_file)
kwargs = {}
kwargs = set_if_exists(
kwargs, "attach_execution_metadata", _internal.Persistence.ATTACH_EXECUTION_METADATA.read(config_file)
)
return GenericPersistenceConfig(**kwargs)


@dataclass(init=True, repr=True, eq=True, frozen=True)
class AzureBlobStorageConfig(object):
"""
Expand Down Expand Up @@ -631,6 +649,7 @@ class DataConfig(object):
s3: S3Config = S3Config()
gcs: GCSConfig = GCSConfig()
azure: AzureBlobStorageConfig = AzureBlobStorageConfig()
generic: GenericPersistenceConfig = GenericPersistenceConfig()

@classmethod
def auto(cls, config_file: typing.Union[str, ConfigFile] = None) -> DataConfig:
Expand All @@ -639,6 +658,7 @@ def auto(cls, config_file: typing.Union[str, ConfigFile] = None) -> DataConfig:
azure=AzureBlobStorageConfig.auto(config_file),
s3=S3Config.auto(config_file),
gcs=GCSConfig.auto(config_file),
generic=GenericPersistenceConfig.auto(config_file),
)


Expand Down
5 changes: 5 additions & 0 deletions flytekit/configuration/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def get_specified_images(cfg: typing.Optional[ConfigFile]) -> typing.Dict[str, s
return cfg.yaml_config.get("images", images)


class Persistence(object):
SECTION = "persistence"
ATTACH_EXECUTION_METADATA = ConfigEntry(LegacyConfigEntry(SECTION, "attach_execution_metadata", bool))


class AWS(object):
SECTION = "aws"
S3_ENDPOINT = ConfigEntry(LegacyConfigEntry(SECTION, "endpoint"), YamlConfigEntry("storage.connection.endpoint"))
Expand Down
10 changes: 10 additions & 0 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def __init__(
local_sandbox_dir: Union[str, os.PathLike],
raw_output_prefix: str,
data_config: typing.Optional[DataConfig] = None,
execution_metadata: typing.Optional[dict] = None,
):
"""
Args:
Expand All @@ -148,6 +149,11 @@ def __init__(
self._local = fsspec.filesystem(None)

self._data_config = data_config if data_config else DataConfig.auto()

if self.data_config.generic.attach_execution_metadata:
self._execution_metadata = execution_metadata
else:
self._execution_metadata = None
self._default_protocol = get_protocol(str(raw_output_prefix))
self._default_remote = cast(fsspec.AbstractFileSystem, self.get_filesystem(self._default_protocol))
if os.name == "nt" and raw_output_prefix.startswith("file://"):
Expand Down Expand Up @@ -308,6 +314,10 @@ def put(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):
self.strip_file_header(from_path), self.strip_file_header(to_path), dirs_exist_ok=True
)
from_path, to_path = self.recursive_paths(from_path, to_path)
if self._execution_metadata:
if "metadata" not in kwargs:
kwargs["metadata"] = {}
kwargs["metadata"].update(self._execution_metadata)
dst = file_system.put(from_path, to_path, recursive=recursive, **kwargs)
if isinstance(dst, (str, pathlib.Path)):
return dst
Expand Down

0 comments on commit 5290171

Please sign in to comment.