Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly set the content type for flyte deck #1658

Merged
merged 18 commits into from
Jun 1, 2023
6 changes: 5 additions & 1 deletion flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,16 @@ def get_one_of(*args) -> str:
@contextlib.contextmanager
def setup_execution(
raw_output_data_prefix: str,
output_prefix: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this?

checkpoint_path: Optional[str] = None,
prev_checkpoint: Optional[str] = None,
dynamic_addl_distro: Optional[str] = None,
dynamic_dest_dir: Optional[str] = None,
):
"""

:param raw_output_data_prefix:
:param raw_output_data_prefix: Where to write offloaded data (files, directories, dataframes).
:param output_prefix: Where to write primitive outputs.
:param checkpoint_path:
:param prev_checkpoint:
:param dynamic_addl_distro: Works in concert with the other dynamic arg. If present, indicates that if a dynamic
Expand Down Expand Up @@ -247,6 +249,7 @@ def setup_execution(
logging=user_space_logger,
tmp_dir=user_workspace_dir,
raw_output_prefix=raw_output_data_prefix,
output_prefix=output_prefix,
checkpoint=checkpointer,
task_id=_identifier.Identifier(_identifier.ResourceType.TASK, tk_project, tk_domain, tk_name, tk_version),
)
Expand Down Expand Up @@ -337,6 +340,7 @@ def _execute_task(

with setup_execution(
raw_output_data_prefix,
output_prefix,
checkpoint_path,
prev_checkpoint,
dynamic_addl_distro,
Expand Down
6 changes: 6 additions & 0 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier],
logging,
raw_output_prefix,
output_prefix=None,
checkpoint=None,
decks=None,
task_id: typing.Optional[_identifier.Identifier] = None,
Expand All @@ -173,6 +174,7 @@
self._execution_id = execution_id
self._logging = logging
self._raw_output_prefix = raw_output_prefix
self._output_prefix = output_prefix
# AutoDeletingTempDir's should be used with a with block, which creates upon entry
self._attrs = kwargs
# It is safe to recreate the Secrets Manager
Expand Down Expand Up @@ -201,6 +203,10 @@
def raw_output_prefix(self) -> str:
return self._raw_output_prefix

@property

Check warning on line 206 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L206

Added line #L206 was not covered by tests
def output_prefix(self) -> str:
return self._output_prefix

Check warning on line 208 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L208

Added line #L208 was not covered by tests

@property
def working_directory(self) -> str:
"""
Expand Down
8 changes: 4 additions & 4 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@
return file_system.get(from_path, to_path, recursive=recursive)
raise oe

def put(self, from_path: str, to_path: str, recursive: bool = False):
def put(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):

Check warning on line 206 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L206

Added line #L206 was not covered by tests
file_system = self.get_filesystem_for_path(to_path)
from_path = self.strip_file_header(from_path)
if recursive:
Expand All @@ -217,7 +217,7 @@
self.strip_file_header(from_path), self.strip_file_header(to_path), dirs_exist_ok=True
)
from_path, to_path = self.recursive_paths(from_path, to_path)
return file_system.put(from_path, to_path, recursive=recursive)
return file_system.put(from_path, to_path, recursive=recursive, **kwargs)

def get_random_remote_path(self, file_path_or_file_name: typing.Optional[str] = None) -> str:
"""
Expand Down Expand Up @@ -304,7 +304,7 @@
)

@timeit("Upload data to remote")
def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_multipart: bool = False):
def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_multipart: bool = False, **kwargs):
"""
The implication here is that we're always going to put data to the remote location, so we .remote to ensure
we don't use the true local proxy if the remote path is a file://
Expand All @@ -316,7 +316,7 @@
try:
local_path = str(local_path)

self.put(cast(str, local_path), remote_path, recursive=is_multipart)
self.put(cast(str, local_path), remote_path, recursive=is_multipart, **kwargs)
except Exception as ex:
raise FlyteAssertion(
f"Failed to put data from {local_path} to {remote_path} (recursive={is_multipart}).\n\n"
Expand Down
20 changes: 12 additions & 8 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

Each task has a least three decks (input, output, default). Input/output decks are
used to render tasks' input/output data, and the default deck is used to render line plots,
scatter plots or markdown text. In addition, users can create new decks to render
scatter plots or Markdown text. In addition, users can create new decks to render
their data with custom renderers.

.. warning::
Expand Down Expand Up @@ -145,14 +145,18 @@

def _output_deck(task_name: str, new_user_params: ExecutionParameters):
ctx = FlyteContext.current_context()
if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION:
output_dir = ctx.execution_state.engine_dir
else:
output_dir = ctx.file_access.get_random_local_directory()
deck_path = os.path.join(output_dir, DECK_FILE_NAME)
with open(deck_path, "w") as f:
local_dir = ctx.file_access.get_random_local_directory()
local_path = os.path.join(local_dir, DECK_FILE_NAME)
with open(local_path, "w") as f:
f.write(_get_deck(new_user_params, ignore_jupyter=True))
logger.info(f"{task_name} task creates flyte deck html to file://{deck_path}")
logger.info(f"{task_name} task creates flyte deck html to file://{local_path}")
if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION:
remote_path = os.path.join(new_user_params.output_prefix, DECK_FILE_NAME)
kwargs: typing.Dict[str, str] = {

Check warning on line 155 in flytekit/deck/deck.py

View check run for this annotation

Codecov / codecov/patch

flytekit/deck/deck.py#L154-L155

Added lines #L154 - L155 were not covered by tests
"ContentType": "text/html", # For s3
"content_type": "text/html", # For gcs
}
ctx.file_access.put_data(local_path, remote_path, **kwargs)

Check warning on line 159 in flytekit/deck/deck.py

View check run for this annotation

Codecov / codecov/patch

flytekit/deck/deck.py#L159

Added line #L159 was not covered by tests
eapolinario marked this conversation as resolved.
Show resolved Hide resolved


def get_deck_template() -> "Template":
Expand Down