Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into duckdb-integration
Browse files Browse the repository at this point in the history
  • Loading branch information
samhita-alla committed Feb 2, 2023
2 parents 18ffb76 + ea39054 commit 0b75b43
Show file tree
Hide file tree
Showing 55 changed files with 1,180 additions and 74 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ jobs:
- flytekit-aws-batch
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-dask
- flytekit-data-fsspec
- flytekit-dbt
- flytekit-deck-standard
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repos:
hooks:
- id: black
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black"]
Expand Down
1 change: 1 addition & 0 deletions .readthedocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sphinx:
python:
install:
- requirements: doc-requirements.txt
- requirements: docs/requirements.txt
1 change: 1 addition & 0 deletions doc-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ whylogs # whylogs
whylabs-client # whylogs
ray # ray
scikit-learn # scikit-learn
dask[distributed] # dask
vaex # vaex
mlflow # mlflow
7 changes: 7 additions & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# TODO: Remove after buf migration is done and packages updated, see doc-requirements.in
# skl2onnx and tf2onnx added here so that the plugin API reference is rendered,
# with the caveat that the docs build environment has the environment variable
# PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python set so that protobuf can be parsed
# using Python, which is acceptable for docs building.
skl2onnx
tf2onnx
4 changes: 4 additions & 0 deletions docs/source/clients.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.. automodule:: flytekit.clients
:no-members:
:no-inherited-members:
:no-special-members:
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Expected output:
flytekit
configuration
remote
clients
testing
extend
deck
Expand Down
12 changes: 12 additions & 0 deletions docs/source/plugins/dask.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
.. _dask:

###################################################
Dask API reference
###################################################

.. tags:: Integration, DistributedComputing, KubernetesOperator

.. automodule:: flytekitplugins.dask
:no-members:
:no-inherited-members:
:no-special-members:
2 changes: 2 additions & 0 deletions docs/source/plugins/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Plugin API reference
* :ref:`AWS Sagemaker <awssagemaker>` - AWS Sagemaker plugin reference
* :ref:`Google Bigquery <bigquery>` - Google Bigquery plugin reference
* :ref:`FS Spec <fsspec>` - FS Spec API reference
* :ref:`Dask <dask>` - Dask standard API reference
* :ref:`Deck standard <deck>` - Deck standard API reference
* :ref:`Dolt standard <dolt>` - Dolt standard API reference
* :ref:`Great expectations <greatexpectations>` - Great expectations API reference
Expand Down Expand Up @@ -41,6 +42,7 @@ Plugin API reference
AWS Sagemaker <awssagemaker>
Google Bigquery <bigquery>
FS Spec <fsspec>
Dask <dask>
Deck standard <deck>
Dolt standard <dolt>
Great expectations <greatexpectations>
Expand Down
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@
from flytekit.core.workflow import ImperativeWorkflow as Workflow
from flytekit.core.workflow import WorkflowFailurePolicy, reference_workflow, workflow
from flytekit.deck import Deck
from flytekit.extras import pytorch, tensorflow
from flytekit.extras import pytorch, sklearn, tensorflow
from flytekit.extras.persistence import GCSPersistence, HttpPersistence, S3Persistence
from flytekit.loggers import logger
from flytekit.models.common import Annotations, AuthRole, Labels
Expand Down
6 changes: 6 additions & 0 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ def _dispatch_execute(
logger.info(f"Engine folder written successfully to the output prefix {output_prefix}")
logger.debug("Finished _dispatch_execute")

if os.environ.get("FLYTE_FAIL_ON_ERROR", "").lower() == "true" and _constants.ERROR_FILE_NAME in output_file_dict:
# This env is set by the flytepropeller
# AWS batch job get the status from the exit code, so once we catch the error,
# we should return the error code here
exit(1)


def get_one_of(*args) -> str:
"""
Expand Down
19 changes: 19 additions & 0 deletions flytekit/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
=====================
Clients
=====================
.. currentmodule:: flytekit.clients
This module provides lower level access to a Flyte backend.
.. _clients_module:
.. autosummary::
:template: custom.rst
:toctree: generated/
:nosignatures:
~friendly.SynchronousFlyteClient
~raw.RawSynchronousFlyteClient
"""
15 changes: 10 additions & 5 deletions flytekit/clis/sdk_in_container/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import click

from flytekit.clis.sdk_in_container.constants import CTX_CONFIG_FILE
from flytekit.configuration import Config, ImageConfig
from flytekit.configuration import Config, ImageConfig, get_config_file
from flytekit.loggers import cli_logger
from flytekit.remote.remote import FlyteRemote

Expand All @@ -25,10 +25,15 @@ def get_and_save_remote_with_click_context(
:return: FlyteRemote instance
"""
cfg_file_location = ctx.obj.get(CTX_CONFIG_FILE)
cfg_obj = Config.auto(cfg_file_location)
cli_logger.info(
f"Creating remote with config {cfg_obj}" + (f" with file {cfg_file_location}" if cfg_file_location else "")
)
cfg_file = get_config_file(cfg_file_location)
if cfg_file is None:
cfg_obj = Config.for_sandbox()
cli_logger.info("No config files found, creating remote with sandbox config")
else:
cfg_obj = Config.auto(cfg_file_location)
cli_logger.info(
f"Creating remote with config {cfg_obj}" + (f" with file {cfg_file_location}" if cfg_file_location else "")
)
r = FlyteRemote(cfg_obj, default_project=project, default_domain=domain)
if save:
ctx.obj[FLYTE_REMOTE_INSTANCE_KEY] = r
Expand Down
10 changes: 5 additions & 5 deletions flytekit/configuration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ class PlatformConfig(object):
:param endpoint: DNS for Flyte backend
:param insecure: Whether or not to use SSL
:param insecure_skip_verify: Wether to skip SSL certificate verification
:param console_endpoint: endpoint for console if differenet than Flyte backend
:param console_endpoint: endpoint for console if different than Flyte backend
:param command: This command is executed to return a token using an external process.
:param client_id: This is the public identifier for the app which handles authorization for a Flyte deployment.
More details here: https://www.oauth.com/oauth2-servers/client-registration/client-id-secret/.
Expand All @@ -311,7 +311,7 @@ class PlatformConfig(object):
:param auth_mode: The OAuth mode to use. Defaults to pkce flow.
"""

endpoint: str = "localhost:30081"
endpoint: str = "localhost:30080"
insecure: bool = False
insecure_skip_verify: bool = False
console_endpoint: typing.Optional[str] = None
Expand Down Expand Up @@ -529,7 +529,7 @@ def with_params(
)

@classmethod
def auto(cls, config_file: typing.Union[str, ConfigFile] = None) -> Config:
def auto(cls, config_file: typing.Union[str, ConfigFile, None] = None) -> Config:
"""
Automatically constructs the Config Object. The order of precedence is as follows
1. first try to find any env vars that match the config vars specified in the FLYTE_CONFIG format.
Expand Down Expand Up @@ -558,9 +558,9 @@ def for_sandbox(cls) -> Config:
:return: Config
"""
return Config(
platform=PlatformConfig(endpoint="localhost:30081", auth_mode="Pkce", insecure=True),
platform=PlatformConfig(endpoint="localhost:30080", auth_mode="Pkce", insecure=True),
data_config=DataConfig(
s3=S3Config(endpoint="http://localhost:30084", access_key_id="minio", secret_access_key="miniostorage")
s3=S3Config(endpoint="http://localhost:30002", access_key_id="minio", secret_access_key="miniostorage")
),
)

Expand Down
12 changes: 9 additions & 3 deletions flytekit/configuration/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
FLYTECTL_CONFIG_ENV_VAR = "FLYTECTL_CONFIG"


def _exists(val: typing.Any) -> bool:
"""Check if a value is defined."""
return isinstance(val, bool) or bool(val is not None and val)


@dataclass
class LegacyConfigEntry(object):
"""
Expand Down Expand Up @@ -63,7 +68,7 @@ def read_from_file(
@dataclass
class YamlConfigEntry(object):
"""
Creates a record for the config entry. contains
Creates a record for the config entry.
Args:
switch: dot-delimited string that should match flytectl args. Leaving it as dot-delimited instead of a list
of strings because it's easier to maintain alignment with flytectl.
Expand All @@ -80,10 +85,11 @@ def read_from_file(
return None
try:
v = cfg.get(self)
if v:
if _exists(v):
return transform(v) if transform else v
except Exception:
...

return None


Expand Down Expand Up @@ -273,7 +279,7 @@ def set_if_exists(d: dict, k: str, v: typing.Any) -> dict:
The input dictionary ``d`` will be mutated.
"""
if v:
if _exists(v):
d[k] = v
return d

Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/base_sql_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(
task_config=task_config,
**kwargs,
)
self._query_template = query_template.replace("\n", "\\n").replace("\t", "\\t")
self._query_template = re.sub(r"\s+", " ", query_template.replace("\n", " ").replace("\t", " ")).strip()

@property
def query_template(self) -> str:
Expand Down
23 changes: 17 additions & 6 deletions flytekit/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import typing
from typing import Any, List

from flytekit.core.resources import Resources
from flytekit.core.resources import Resources, convert_resources_to_resource_model
from flytekit.core.utils import _dnsify
from flytekit.loggers import logger
from flytekit.models import literals as _literal_models
from flytekit.models.core import workflow as _workflow_model
from flytekit.models.task import Resources as _resources_model
Expand Down Expand Up @@ -92,9 +93,14 @@ def with_overrides(self, *args, **kwargs):
for k, v in alias_dict.items():
self._aliases.append(_workflow_model.Alias(var=k, alias=v))
if "requests" in kwargs or "limits" in kwargs:
requests = _convert_resource_overrides(kwargs.get("requests"), "requests")
limits = _convert_resource_overrides(kwargs.get("limits"), "limits")
self._resources = _resources_model(requests=requests, limits=limits)
requests = kwargs.get("requests")
if requests and not isinstance(requests, Resources):
raise AssertionError("requests should be specified as flytekit.Resources")
limits = kwargs.get("limits")
if limits and not isinstance(limits, Resources):
raise AssertionError("limits should be specified as flytekit.Resources")

self._resources = convert_resources_to_resource_model(requests=requests, limits=limits)
if "timeout" in kwargs:
timeout = kwargs["timeout"]
if timeout is None:
Expand All @@ -114,6 +120,12 @@ def with_overrides(self, *args, **kwargs):
self._metadata._interruptible = kwargs["interruptible"]
if "name" in kwargs:
self._metadata._name = kwargs["name"]
if "task_config" in kwargs:
logger.warning("This override is beta. We may want to revisit this in the future.")
new_task_config = kwargs["task_config"]
if not isinstance(new_task_config, type(self.flyte_entity._task_config)):
raise ValueError("can't change the type of the task config")
self.flyte_entity._task_config = new_task_config
return self


Expand All @@ -122,8 +134,7 @@ def _convert_resource_overrides(
) -> [_resources_model.ResourceEntry]:
if resources is None:
return []
if not isinstance(resources, Resources):
raise AssertionError(f"{resource_name} should be specified as flytekit.Resources")

resource_entries = []
if resources.cpu is not None:
resource_entries.append(_resources_model.ResourceEntry(_resources_model.ResourceName.CPU, resources.cpu))
Expand Down
3 changes: 2 additions & 1 deletion flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,8 @@ def create_and_link_node_from_remote(
extra_inputs = used_inputs ^ set(kwargs.keys())
if len(extra_inputs) > 0:
raise _user_exceptions.FlyteAssertion(
"Too many inputs were specified for the interface. Extra inputs were: {}".format(extra_inputs)
f"Too many inputs for [{entity.name}] Expected inputs: {typed_interface.inputs.keys()} "
f"- extra inputs: {extra_inputs}"
)

# Detect upstream nodes
Expand Down
43 changes: 42 additions & 1 deletion flytekit/core/resources.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from dataclasses import dataclass
from typing import Optional
from typing import List, Optional

from flytekit.models import task as task_models


@dataclass
Expand Down Expand Up @@ -35,3 +37,42 @@ class Resources(object):
class ResourceSpec(object):
requests: Optional[Resources] = None
limits: Optional[Resources] = None


_ResouceName = task_models.Resources.ResourceName
_ResourceEntry = task_models.Resources.ResourceEntry


def _convert_resources_to_resource_entries(resources: Resources) -> List[_ResourceEntry]:
resource_entries = []
if resources.cpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.CPU, value=resources.cpu))
if resources.mem is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.MEMORY, value=resources.mem))
if resources.gpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.GPU, value=resources.gpu))
if resources.storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.STORAGE, value=resources.storage))
if resources.ephemeral_storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.EPHEMERAL_STORAGE, value=resources.ephemeral_storage))
return resource_entries


def convert_resources_to_resource_model(
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
) -> task_models.Resources:
"""
Convert flytekit ``Resources`` objects to a Resources model
:param requests: Resource requests. Optional, defaults to ``None``
:param limits: Resource limits. Optional, defaults to ``None``
:return: The given resources as requests and limits
"""
request_entries = []
limit_entries = []
if requests is not None:
request_entries = _convert_resources_to_resource_entries(requests)
if limits is not None:
limit_entries = _convert_resources_to_resource_entries(limits)
return task_models.Resources(requests=request_entries, limits=limit_entries)
2 changes: 1 addition & 1 deletion flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type:

def guess_python_type(self, literal_type: LiteralType) -> type:
if literal_type.union_type is not None:
return typing.Union[tuple(TypeEngine.guess_python_type(v.type) for v in literal_type.union_type.variants)] # type: ignore
return typing.Union[tuple(TypeEngine.guess_python_type(v) for v in literal_type.union_type.variants)]

raise ValueError(f"Union transformer cannot reverse {literal_type}")

Expand Down
Loading

0 comments on commit 0b75b43

Please sign in to comment.