Skip to content

Commit

Permalink
fix: dry merge
Browse files Browse the repository at this point in the history
  • Loading branch information
kennyworkman committed Dec 30, 2021
2 parents 97f1219 + 94cf315 commit 2809118
Show file tree
Hide file tree
Showing 36 changed files with 954 additions and 189 deletions.
Binary file added .setup.py.swo
Binary file not shown.
Binary file added .swp
Binary file not shown.
59 changes: 30 additions & 29 deletions flytekit/bin/entrypoint.py
@@ -1,7 +1,7 @@
import contextlib
import datetime as _datetime
import importlib as _importlib
import logging as _logging
import logging as python_logging
import os as _os
import pathlib
import random as _random
Expand Down Expand Up @@ -37,6 +37,7 @@
from flytekit.interfaces import random as _flyte_random
from flytekit.interfaces.data import data_proxy as _data_proxy
from flytekit.interfaces.stats.taggable import get_stats as _get_stats
from flytekit.loggers import entrypoint_logger as logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import literals as _literal_models
from flytekit.models.core import errors as _error_models
Expand Down Expand Up @@ -95,6 +96,7 @@ def _dispatch_execute(
c: OR if an unhandled exception is retrieved - record it as an errors.pb
"""
output_file_dict = {}
logger.debug(f"Starting _dispatch_execute for {task_def.name}")
try:
# Step1
local_inputs_file = _os.path.join(ctx.execution_state.working_dir, "inputs.pb")
Expand All @@ -108,14 +110,14 @@ def _dispatch_execute(
outputs = _scoped_exceptions.system_entry_point(task_def.dispatch_execute)(ctx, idl_input_literals)
# Step3a
if isinstance(outputs, VoidPromise):
_logging.getLogger().warning("Task produces no outputs")
logger.warning("Task produces no outputs")
output_file_dict = {_constants.OUTPUT_FILE_NAME: _literal_models.LiteralMap(literals={})}
elif isinstance(outputs, _literal_models.LiteralMap):
output_file_dict = {_constants.OUTPUT_FILE_NAME: outputs}
elif isinstance(outputs, _dynamic_job.DynamicJobSpec):
output_file_dict = {_constants.FUTURES_FILE_NAME: outputs}
else:
_logging.getLogger().error(f"SystemError: received unknown outputs from task {outputs}")
logger.error(f"SystemError: received unknown outputs from task {outputs}")
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
"UNKNOWN_OUTPUT",
Expand All @@ -128,30 +130,30 @@ def _dispatch_execute(
# Handle user-scoped errors
except _scoped_exceptions.FlyteScopedUserException as e:
if isinstance(e.value, IgnoreOutputs):
_logging.warning(f"User-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
logger.warning(f"User-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
return
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
e.error_code, e.verbose_message, e.kind, _execution_models.ExecutionError.ErrorKind.USER
)
)
_logging.error("!! Begin User Error Captured by Flyte !!")
_logging.error(e.verbose_message)
_logging.error("!! End Error Captured by Flyte !!")
logger.error("!! Begin User Error Captured by Flyte !!")
logger.error(e.verbose_message)
logger.error("!! End Error Captured by Flyte !!")

# Handle system-scoped errors
except _scoped_exceptions.FlyteScopedSystemException as e:
if isinstance(e.value, IgnoreOutputs):
_logging.warning(f"System-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
logger.warning(f"System-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
return
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
e.error_code, e.verbose_message, e.kind, _execution_models.ExecutionError.ErrorKind.SYSTEM
)
)
_logging.error("!! Begin System Error Captured by Flyte !!")
_logging.error(e.verbose_message)
_logging.error("!! End Error Captured by Flyte !!")
logger.error("!! Begin System Error Captured by Flyte !!")
logger.error(e.verbose_message)
logger.error("!! End Error Captured by Flyte !!")

# Interpret all other exceptions (some of which may be caused by the code in the try block outside of
# dispatch_execute) as recoverable system exceptions.
Expand All @@ -166,16 +168,17 @@ def _dispatch_execute(
_execution_models.ExecutionError.ErrorKind.SYSTEM,
)
)
_logging.error(f"Exception when executing task {task_def.name or task_def.id.name}, reason {str(e)}")
_logging.error("!! Begin Unknown System Error Captured by Flyte !!")
_logging.error(exc_str)
_logging.error("!! End Error Captured by Flyte !!")
logger.error(f"Exception when executing task {task_def.name or task_def.id.name}, reason {str(e)}")
logger.error("!! Begin Unknown System Error Captured by Flyte !!")
logger.error(exc_str)
logger.error("!! End Error Captured by Flyte !!")

for k, v in output_file_dict.items():
_common_utils.write_proto_to_file(v.to_flyte_idl(), _os.path.join(ctx.execution_state.engine_dir, k))

ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True)
_logging.info(f"Engine folder written successfully to the output prefix {output_prefix}")
logger.info(f"Engine folder written successfully to the output prefix {output_prefix}")
logger.debug("Finished _dispatch_execute")


@contextlib.contextmanager
Expand All @@ -184,14 +187,11 @@ def setup_execution(
dynamic_addl_distro: str = None,
dynamic_dest_dir: str = None,
):
log_level = _internal_config.LOGGING_LEVEL.get() or _sdk_config.LOGGING_LEVEL.get()
_logging.getLogger().setLevel(log_level)

ctx = FlyteContextManager.current_context()

# Create directories
user_workspace_dir = ctx.file_access.get_random_local_directory()
_click.echo(f"Using user directory {user_workspace_dir}")
logger.info(f"Using user directory {user_workspace_dir}")
pathlib.Path(user_workspace_dir).mkdir(parents=True, exist_ok=True)
from flytekit import __version__ as _api_version

Expand Down Expand Up @@ -219,7 +219,7 @@ def setup_execution(
"api_version": _api_version,
},
),
logging=_logging,
logging=python_logging,
tmp_dir=user_workspace_dir,
)

Expand All @@ -231,7 +231,7 @@ def setup_execution(
raw_output_prefix=raw_output_data_prefix,
)
except TypeError: # would be thrown from DataPersistencePlugins.find_plugin
_logging.error(f"No data plugin found for raw output prefix {raw_output_data_prefix}")
logger.error(f"No data plugin found for raw output prefix {raw_output_data_prefix}")
raise
else:
raise Exception("No raw output prefix detected. Please upgrade your version of Propeller to 0.4.0 or later.")
Expand Down Expand Up @@ -280,7 +280,6 @@ def _handle_annotated_task(
"""
Entrypoint for all PythonTask extensions
"""
_click.echo("Running native-typed task")
_dispatch_execute(ctx, task_def, inputs, output_prefix)


Expand Down Expand Up @@ -366,7 +365,7 @@ def _execute_task(
# Use the resolver to load the actual task object
_task_def = resolver_obj.load_task(loader_args=resolver_args)
if test:
_click.echo(
logger.info(
f"Test detected, returning. Args were {inputs} {output_prefix} {raw_output_data_prefix} {resolver} {resolver_args}"
)
return
Expand Down Expand Up @@ -401,7 +400,7 @@ def _execute_map_task(
output_prefix = _os.path.join(output_prefix, str(task_index))

if test:
_click.echo(
logger.info(
f"Test detected, returning. Inputs: {inputs} Computed task index: {task_index} "
f"New output prefix: {output_prefix} Raw output path: {raw_output_data_prefix} "
f"Resolver and args: {resolver} {resolver_args}"
Expand Down Expand Up @@ -443,7 +442,9 @@ def execute_task_cmd(
resolver,
resolver_args,
):
_click.echo(_utils.get_version_message())
logger.info(_utils.get_version_message())
# We get weird errors if there are no click echo messages at all, so emit an empty string so that unit tests pass.
_click.echo("")
# Backwards compatibility - if Propeller hasn't filled this in, then it'll come through here as the original
# template string, so let's explicitly set it to None so that the downstream functions will know to fall back
# to the original shard formatter/prefix config.
Expand All @@ -455,10 +456,10 @@ def execute_task_cmd(
# The addition of a new top-level command seemed out of scope at the time of this writing to pursue given how
# pervasive this top level command already (plugins mostly).
if not resolver:
_click.echo("No resolver found, assuming legacy API task...")
logger.info("No resolver found, assuming legacy API task...")
_legacy_execute_task(task_module, task_name, inputs, output_prefix, raw_output_data_prefix, test)
else:
_click.echo(f"Attempting to run with {resolver}...")
logger.debug(f"Running task execution with resolver {resolver}...")
_execute_task(
inputs,
output_prefix,
Expand Down Expand Up @@ -527,7 +528,7 @@ def map_execute_task_cmd(
resolver,
resolver_args,
):
_click.echo(_utils.get_version_message())
logger.info(_utils.get_version_message())

_execute_map_task(
inputs,
Expand Down
Empty file.
Empty file removed flytekit/common/types/files.py
Empty file.
3 changes: 2 additions & 1 deletion flytekit/configuration/images.py
Expand Up @@ -2,6 +2,7 @@
import typing

from flytekit.configuration import common as _config_common
from flytekit.loggers import logger


def get_specified_images() -> typing.Dict[str, str]:
Expand All @@ -21,7 +22,7 @@ def get_specified_images() -> typing.Dict[str, str]:
try:
image_names = _config_common.CONFIGURATION_SINGLETON.config.options("images")
except configparser.NoSectionError:
print("No images specified, will use the default image")
logger.info("No images specified, will use the default image")
image_names = None
if image_names:
for i in image_names:
Expand Down
16 changes: 10 additions & 6 deletions flytekit/core/interface.py
Expand Up @@ -267,27 +267,31 @@ def _change_unrecognized_type_to_pickle(t: Type[T]) -> Type[T]:
return t


def transform_signature_to_interface(signature: inspect.Signature, docstring: Optional[Docstring] = None) -> Interface:
def transform_function_to_interface(fn: Callable, docstring: Optional[Docstring] = None) -> Interface:
"""
From the annotations on a task function that the user should have provided, and the output names they want to use
for each output parameter, construct the TypedInterface object
For now the fancy object, maybe in the future a dumb object.
"""
outputs = extract_return_annotation(signature.return_annotation)
type_hints = typing.get_type_hints(fn)
signature = inspect.signature(fn)
return_annotation = type_hints.get("return", None)

outputs = extract_return_annotation(return_annotation)
for k, v in outputs.items():
outputs[k] = _change_unrecognized_type_to_pickle(v)
inputs = OrderedDict()
for k, v in signature.parameters.items():
annotation = v.annotation
annotation = type_hints.get(k, None)
default = v.default if v.default is not inspect.Parameter.empty else None
# Inputs with default values are currently ignored, we may want to look into that in the future
inputs[k] = (_change_unrecognized_type_to_pickle(annotation), default)

# This is just for typing.NamedTuples - in those cases, the user can select a name to call the NamedTuple. We
# would like to preserve that name in our custom collections.namedtuple.
custom_name = None
return_annotation = signature.return_annotation
if hasattr(return_annotation, "__bases__"):
bases = return_annotation.__bases__
if len(bases) == 1 and bases[0] == tuple and hasattr(return_annotation, "_fields"):
Expand Down Expand Up @@ -334,7 +338,7 @@ def output_name_generator(length: int) -> Generator[str, None, None]:
yield default_output_name(x)


def extract_return_annotation(return_annotation: Union[Type, Tuple]) -> Dict[str, Type]:
def extract_return_annotation(return_annotation: Union[Type, Tuple, None]) -> Dict[str, Type]:
"""
The purpose of this function is to sort out whether a function is returning one thing, or multiple things, and to
name the outputs accordingly, either by using our default name function, or from a typing.NamedTuple.
Expand Down Expand Up @@ -368,7 +372,7 @@ def t(a: int, b: str) -> Dict[str, int]: ...

# Handle Option 6
# We can think about whether we should add a default output name with type None in the future.
if return_annotation is None or return_annotation is inspect.Signature.empty:
if return_annotation in (None, type(None), inspect.Signature.empty):
return {}

# This statement results in true for typing.Namedtuple, single and void return types, so this
Expand Down
5 changes: 2 additions & 3 deletions flytekit/core/launch_plan.py
@@ -1,11 +1,10 @@
from __future__ import annotations

import inspect
from typing import Any, Callable, Dict, List, Optional, Type

from flytekit.core import workflow as _annotated_workflow
from flytekit.core.context_manager import FlyteContext, FlyteContextManager, FlyteEntities
from flytekit.core.interface import Interface, transform_inputs_to_parameters, transform_signature_to_interface
from flytekit.core.interface import Interface, transform_function_to_interface, transform_inputs_to_parameters
from flytekit.core.promise import create_and_link_node, translate_inputs_to_literals
from flytekit.core.reference_entity import LaunchPlanReference, ReferenceEntity
from flytekit.models import common as _common_models
Expand Down Expand Up @@ -399,7 +398,7 @@ def reference_launch_plan(
"""

def wrapper(fn) -> ReferenceLaunchPlan:
interface = transform_signature_to_interface(inspect.signature(fn))
interface = transform_function_to_interface(fn)
return ReferenceLaunchPlan(project, domain, name, version, interface.inputs, interface.outputs)

return wrapper
2 changes: 2 additions & 0 deletions flytekit/core/map_task.py
Expand Up @@ -58,6 +58,8 @@ def __init__(
self._max_concurrency = concurrency
self._min_success_ratio = min_success_ratio
self._array_task_interface = python_function_task.python_interface
if "metadata" not in kwargs and python_function_task.metadata:
kwargs["metadata"] = python_function_task.metadata
super().__init__(
name=name,
interface=collection_interface,
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/promise.py
Expand Up @@ -500,7 +500,7 @@ def create_task_output(
if len(promises) == 1:
if not entity_interface:
return promises[0]
# See transform_signature_to_interface for more information, we're using the existence of a name as a proxy
# See transform_function_to_interface for more information, we're using the existence of a name as a proxy
# for the user having specified a one-element typing.NamedTuple, which means we should _not_ extract it. We
# should still return a tuple but it should be one of ours.
if not entity_interface.output_tuple_name:
Expand Down
7 changes: 2 additions & 5 deletions flytekit/core/python_function_task.py
Expand Up @@ -14,7 +14,6 @@
"""


import inspect
from abc import ABC
from collections import OrderedDict
from enum import Enum
Expand All @@ -24,7 +23,7 @@
from flytekit.core.base_task import Task, TaskResolverMixin
from flytekit.core.context_manager import ExecutionState, FastSerializationSettings, FlyteContext, FlyteContextManager
from flytekit.core.docstring import Docstring
from flytekit.core.interface import transform_signature_to_interface
from flytekit.core.interface import transform_function_to_interface
from flytekit.core.python_auto_container import PythonAutoContainerTask, default_task_resolver
from flytekit.core.tracker import is_functools_wrapped_module_level, isnested, istestfunction
from flytekit.core.workflow import (
Expand Down Expand Up @@ -114,9 +113,7 @@ def __init__(
"""
if task_function is None:
raise ValueError("TaskFunction is a required parameter for PythonFunctionTask")
self._native_interface = transform_signature_to_interface(
inspect.signature(task_function), Docstring(callable_=task_function)
)
self._native_interface = transform_function_to_interface(task_function, Docstring(callable_=task_function))
mutated_interface = self._native_interface.remove_inputs(ignore_input_vars)
super().__init__(
task_type=task_type,
Expand Down
10 changes: 6 additions & 4 deletions flytekit/core/task.py
@@ -1,9 +1,8 @@
import datetime as _datetime
import inspect
from typing import Any, Callable, Dict, List, Optional, Type, Union

from flytekit.core.base_task import TaskMetadata
from flytekit.core.interface import transform_signature_to_interface
from flytekit.core.base_task import TaskMetadata, TaskResolverMixin
from flytekit.core.interface import transform_function_to_interface
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.reference_entity import ReferenceEntity, TaskReference
from flytekit.core.resources import Resources
Expand Down Expand Up @@ -87,6 +86,7 @@ def task(
limits: Optional[Resources] = None,
secret_requests: Optional[List[Secret]] = None,
execution_mode: Optional[PythonFunctionTask.ExecutionBehavior] = PythonFunctionTask.ExecutionBehavior.DEFAULT,
task_resolver: Optional[TaskResolverMixin] = None,
) -> Union[Callable, PythonFunctionTask]:
"""
This is the core decorator to use for any task type in flytekit.
Expand Down Expand Up @@ -170,6 +170,7 @@ def foo2():
Refer to :py:class:`Secret` to understand how to specify the request for a secret. It
may change based on the backend provider.
:param execution_mode: This is mainly for internal use. Please ignore. It is filled in automatically.
:param task_resolver: Provide a custom task resolver.
"""

def wrapper(fn) -> PythonFunctionTask:
Expand All @@ -192,6 +193,7 @@ def wrapper(fn) -> PythonFunctionTask:
limits=limits,
secret_requests=secret_requests,
execution_mode=execution_mode,
task_resolver=task_resolver,
)

return task_instance
Expand Down Expand Up @@ -237,7 +239,7 @@ def reference_task(
"""

def wrapper(fn) -> ReferenceTask:
interface = transform_signature_to_interface(inspect.signature(fn))
interface = transform_function_to_interface(fn)
return ReferenceTask(project, domain, name, version, interface.inputs, interface.outputs)

return wrapper

0 comments on commit 2809118

Please sign in to comment.