diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 6153ed6fa9..1888376e50 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -1,7 +1,7 @@ import contextlib import datetime as _datetime import importlib as _importlib -import logging as _logging +import logging as python_logging import os as _os import pathlib import random as _random @@ -37,6 +37,7 @@ from flytekit.interfaces import random as _flyte_random from flytekit.interfaces.data import data_proxy as _data_proxy from flytekit.interfaces.stats.taggable import get_stats as _get_stats +from flytekit.loggers import entrypoint_logger as logger from flytekit.models import dynamic_job as _dynamic_job from flytekit.models import literals as _literal_models from flytekit.models.core import errors as _error_models @@ -95,6 +96,7 @@ def _dispatch_execute( c: OR if an unhandled exception is retrieved - record it as an errors.pb """ output_file_dict = {} + logger.debug(f"Starting _dispatch_execute for {task_def.name}") try: # Step1 local_inputs_file = _os.path.join(ctx.execution_state.working_dir, "inputs.pb") @@ -108,14 +110,14 @@ def _dispatch_execute( outputs = _scoped_exceptions.system_entry_point(task_def.dispatch_execute)(ctx, idl_input_literals) # Step3a if isinstance(outputs, VoidPromise): - _logging.getLogger().warning("Task produces no outputs") + logger.warning("Task produces no outputs") output_file_dict = {_constants.OUTPUT_FILE_NAME: _literal_models.LiteralMap(literals={})} elif isinstance(outputs, _literal_models.LiteralMap): output_file_dict = {_constants.OUTPUT_FILE_NAME: outputs} elif isinstance(outputs, _dynamic_job.DynamicJobSpec): output_file_dict = {_constants.FUTURES_FILE_NAME: outputs} else: - _logging.getLogger().error(f"SystemError: received unknown outputs from task {outputs}") + logger.error(f"SystemError: received unknown outputs from task {outputs}") output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument( _error_models.ContainerError( "UNKNOWN_OUTPUT", @@ -128,30 +130,30 @@ def _dispatch_execute( # Handle user-scoped errors except _scoped_exceptions.FlyteScopedUserException as e: if isinstance(e.value, IgnoreOutputs): - _logging.warning(f"User-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!") + logger.warning(f"User-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!") return output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument( _error_models.ContainerError( e.error_code, e.verbose_message, e.kind, _execution_models.ExecutionError.ErrorKind.USER ) ) - _logging.error("!! Begin User Error Captured by Flyte !!") - _logging.error(e.verbose_message) - _logging.error("!! End Error Captured by Flyte !!") + logger.error("!! Begin User Error Captured by Flyte !!") + logger.error(e.verbose_message) + logger.error("!! End Error Captured by Flyte !!") # Handle system-scoped errors except _scoped_exceptions.FlyteScopedSystemException as e: if isinstance(e.value, IgnoreOutputs): - _logging.warning(f"System-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!") + logger.warning(f"System-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!") return output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument( _error_models.ContainerError( e.error_code, e.verbose_message, e.kind, _execution_models.ExecutionError.ErrorKind.SYSTEM ) ) - _logging.error("!! Begin System Error Captured by Flyte !!") - _logging.error(e.verbose_message) - _logging.error("!! End Error Captured by Flyte !!") + logger.error("!! Begin System Error Captured by Flyte !!") + logger.error(e.verbose_message) + logger.error("!! End Error Captured by Flyte !!") # Interpret all other exceptions (some of which may be caused by the code in the try block outside of # dispatch_execute) as recoverable system exceptions. @@ -166,16 +168,17 @@ def _dispatch_execute( _execution_models.ExecutionError.ErrorKind.SYSTEM, ) ) - _logging.error(f"Exception when executing task {task_def.name or task_def.id.name}, reason {str(e)}") - _logging.error("!! Begin Unknown System Error Captured by Flyte !!") - _logging.error(exc_str) - _logging.error("!! End Error Captured by Flyte !!") + logger.error(f"Exception when executing task {task_def.name or task_def.id.name}, reason {str(e)}") + logger.error("!! Begin Unknown System Error Captured by Flyte !!") + logger.error(exc_str) + logger.error("!! End Error Captured by Flyte !!") for k, v in output_file_dict.items(): _common_utils.write_proto_to_file(v.to_flyte_idl(), _os.path.join(ctx.execution_state.engine_dir, k)) ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True) - _logging.info(f"Engine folder written successfully to the output prefix {output_prefix}") + logger.info(f"Engine folder written successfully to the output prefix {output_prefix}") + logger.debug("Finished _dispatch_execute") @contextlib.contextmanager @@ -184,14 +187,11 @@ def setup_execution( dynamic_addl_distro: str = None, dynamic_dest_dir: str = None, ): - log_level = _internal_config.LOGGING_LEVEL.get() or _sdk_config.LOGGING_LEVEL.get() - _logging.getLogger().setLevel(log_level) - ctx = FlyteContextManager.current_context() # Create directories user_workspace_dir = ctx.file_access.get_random_local_directory() - _click.echo(f"Using user directory {user_workspace_dir}") + logger.info(f"Using user directory {user_workspace_dir}") pathlib.Path(user_workspace_dir).mkdir(parents=True, exist_ok=True) from flytekit import __version__ as _api_version @@ -219,7 +219,7 @@ def setup_execution( "api_version": _api_version, }, ), - logging=_logging, + logging=python_logging, tmp_dir=user_workspace_dir, ) @@ -231,7 +231,7 @@ def setup_execution( raw_output_prefix=raw_output_data_prefix, ) except TypeError: # would be thrown from DataPersistencePlugins.find_plugin - _logging.error(f"No data plugin found for raw output prefix {raw_output_data_prefix}") + logger.error(f"No data plugin found for raw output prefix {raw_output_data_prefix}") raise else: raise Exception("No raw output prefix detected. Please upgrade your version of Propeller to 0.4.0 or later.") @@ -280,7 +280,6 @@ def _handle_annotated_task( """ Entrypoint for all PythonTask extensions """ - _click.echo("Running native-typed task") _dispatch_execute(ctx, task_def, inputs, output_prefix) @@ -366,7 +365,7 @@ def _execute_task( # Use the resolver to load the actual task object _task_def = resolver_obj.load_task(loader_args=resolver_args) if test: - _click.echo( + logger.info( f"Test detected, returning. Args were {inputs} {output_prefix} {raw_output_data_prefix} {resolver} {resolver_args}" ) return @@ -401,7 +400,7 @@ def _execute_map_task( output_prefix = _os.path.join(output_prefix, str(task_index)) if test: - _click.echo( + logger.info( f"Test detected, returning. Inputs: {inputs} Computed task index: {task_index} " f"New output prefix: {output_prefix} Raw output path: {raw_output_data_prefix} " f"Resolver and args: {resolver} {resolver_args}" @@ -443,7 +442,9 @@ def execute_task_cmd( resolver, resolver_args, ): - _click.echo(_utils.get_version_message()) + logger.info(_utils.get_version_message()) + # We get weird errors if there are no click echo messages at all, so emit an empty string so that unit tests pass. + _click.echo("") # Backwards compatibility - if Propeller hasn't filled this in, then it'll come through here as the original # template string, so let's explicitly set it to None so that the downstream functions will know to fall back # to the original shard formatter/prefix config. @@ -455,10 +456,10 @@ def execute_task_cmd( # The addition of a new top-level command seemed out of scope at the time of this writing to pursue given how # pervasive this top level command already (plugins mostly). if not resolver: - _click.echo("No resolver found, assuming legacy API task...") + logger.info("No resolver found, assuming legacy API task...") _legacy_execute_task(task_module, task_name, inputs, output_prefix, raw_output_data_prefix, test) else: - _click.echo(f"Attempting to run with {resolver}...") + logger.debug(f"Running task execution with resolver {resolver}...") _execute_task( inputs, output_prefix, @@ -527,7 +528,7 @@ def map_execute_task_cmd( resolver, resolver_args, ): - _click.echo(_utils.get_version_message()) + logger.info(_utils.get_version_message()) _execute_map_task( inputs, diff --git a/flytekit/configuration/images.py b/flytekit/configuration/images.py index 10544071d6..092ccaaff7 100644 --- a/flytekit/configuration/images.py +++ b/flytekit/configuration/images.py @@ -2,6 +2,7 @@ import typing from flytekit.configuration import common as _config_common +from flytekit.loggers import logger def get_specified_images() -> typing.Dict[str, str]: @@ -21,7 +22,7 @@ def get_specified_images() -> typing.Dict[str, str]: try: image_names = _config_common.CONFIGURATION_SINGLETON.config.options("images") except configparser.NoSectionError: - print("No images specified, will use the default image") + logger.info("No images specified, will use the default image") image_names = None if image_names: for i in image_names: diff --git a/flytekit/loggers.py b/flytekit/loggers.py index 4527a659e1..bc3e243883 100644 --- a/flytekit/loggers.py +++ b/flytekit/loggers.py @@ -1,29 +1,60 @@ -import logging as _logging -import os as _os +import logging +import os from pythonjsonlogger import jsonlogger -logger = _logging.getLogger("flytekit") -# Always set the root logger to debug until we can add more user based controls -logger.setLevel(_logging.WARNING) +# Note: +# The environment variable controls exposed to affect the individual loggers should be considered to be beta. +# The ux/api may change in the future. +# At time of writing, the code was written to preserve existing default behavior +# For now, assume this is the environment variable whose usage will remain unchanged and controls output for all +# loggers defined in this file. +LOGGING_ENV_VAR = "FLYTE_SDK_LOGGING_LEVEL" + +# By default, the root flytekit logger to debug so everything is logged, but enable fine-tuning +logger = logging.getLogger("flytekit") +# Root logger control +flytekit_root_env_var = f"{LOGGING_ENV_VAR}_ROOT" +if os.getenv(flytekit_root_env_var) is not None: + logger.setLevel(int(os.getenv(flytekit_root_env_var))) +else: + logger.setLevel(logging.DEBUG) + +# Stop propagation so that configuration is isolated to this file (so that it doesn't matter what the +# global Python root logger is set to). +logger.propagate = False # Child loggers -auth_logger = logger.getChild("auth") -cli_logger = logger.getChild("cli") -remote_logger = logger.getChild("remote") +child_loggers = { + "auth": logger.getChild("auth"), + "cli": logger.getChild("cli"), + "remote": logger.getChild("remote"), + "entrypoint": logger.getChild("entrypoint"), +} +auth_logger = child_loggers["auth"] +cli_logger = child_loggers["cli"] +remote_logger = child_loggers["remote"] +entrypoint_logger = child_loggers["entrypoint"] -# create console handler and set level to debug -ch = _logging.StreamHandler() +# create console handler +ch = logging.StreamHandler() # Don't want to import the configuration library since that will cause all sorts of circular imports, let's # just use the environment variable if it's defined. Decide in the future when we implement better controls # if we should control with the channel or with the logger level. -logging_env_var = "FLYTE_SDK_LOGGING_LEVEL" -level_from_env = _os.getenv(logging_env_var) +# The handler log level controls whether log statements will actually print to the screen +level_from_env = os.getenv(LOGGING_ENV_VAR) if level_from_env is not None: ch.setLevel(int(level_from_env)) else: - ch.setLevel(_logging.WARNING) + ch.setLevel(logging.WARNING) + +# Consider this API to be beta +for log_name, child_logger in child_loggers.items(): + env_var = f"{LOGGING_ENV_VAR}_{log_name.upper()}" + level_from_env = os.getenv(env_var) + if level_from_env is not None: + child_logger.setLevel(int(level_from_env)) # create formatter formatter = jsonlogger.JsonFormatter(fmt="%(asctime)s %(name)s %(levelname)s %(message)s")