Skip to content

Commit

Permalink
[feat][task] Support Kubernetes task (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuchg committed Nov 16, 2022
1 parent 3ada193 commit 89f7407
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/source/tasks/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ In this section
flink
map_reduce
procedure
kubernetes

datax
sub_process
Expand Down
42 changes: 42 additions & 0 deletions docs/source/tasks/kubernetes.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Kubernetes
==========


A Kubernetes task type's example and dive into information of **PyDolphinScheduler**.

Example
-------

.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_kubernetes_example.py
:start-after: [start workflow_declare]
:end-before: [end workflow_declare]

Dive Into
---------

.. automodule:: pydolphinscheduler.tasks.kubernetes


YAML file example
-----------------

.. literalinclude:: ../../../examples/yaml_define/Kubernetes.yaml
:start-after: # under the License.
:language: yaml
29 changes: 29 additions & 0 deletions examples/yaml_define/Kubernetes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Define the workflow
workflow:
name: "kubernetes"

# Define the tasks under the workflow
tasks:
- name: kubernetes
task_type: K8S
image: ds-dev
namespace: '{ "name": "default","cluster": "lab" }'
minCpuCores: 2.0
minMemorySpace: 10.0
1 change: 1 addition & 0 deletions src/pydolphinscheduler/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class TaskType(str):
OPENMLDB = "OPENMLDB"
PYTORCH = "PYTORCH"
DVC = "DVC"
KUBERNETES = "K8S"


class DefaultTaskCodeNum(str):
Expand Down
36 changes: 36 additions & 0 deletions src/pydolphinscheduler/examples/task_kubernetes_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# [start workflow_declare]
"""A example workflow for task kubernetes."""

from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.kubernetes import Kubernetes

with ProcessDefinition(
name="task_kubernetes_example",
tenant="tenant_exists",
) as pd:
task_k8s = Kubernetes(
name="task_k8s",
image="ds-dev",
namespace=str({"name": "default", "cluster": "lab"}),
min_cpu_cores=2.0,
min_memory_space=10.0,
)
pd.submit()
# [end workflow_declare]
2 changes: 2 additions & 0 deletions src/pydolphinscheduler/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pydolphinscheduler.tasks.dvc import DVCDownload, DVCInit, DVCUpload
from pydolphinscheduler.tasks.flink import Flink
from pydolphinscheduler.tasks.http import Http
from pydolphinscheduler.tasks.kubernetes import Kubernetes
from pydolphinscheduler.tasks.map_reduce import MR
from pydolphinscheduler.tasks.mlflow import (
MLflowModels,
Expand Down Expand Up @@ -66,4 +67,5 @@
"SubProcess",
"Switch",
"SageMaker",
"Kubernetes",
]
55 changes: 55 additions & 0 deletions src/pydolphinscheduler/tasks/kubernetes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Task Kubernetes."""
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task


class Kubernetes(Task):
"""Task Kubernetes object, declare behavior for Kubernetes task to dolphinscheduler.
:param name: task name
:param image: the registry url for image.
:param namespace: the namespace for running Kubernetes task.
:param min_cpu_cores: min CPU requirement for running Kubernetes task.
:param min_memory_space: min memory requirement for running Kubernetes task.
:param params_map: It is a local user-defined parameter for Kubernetes task.
"""

_task_custom_attr = {
"image",
"namespace",
"min_cpu_cores",
"min_memory_space",
}

def __init__(
self,
name: str,
image: str,
namespace: str,
min_cpu_cores: float,
min_memory_space: float,
*args,
**kwargs
):
super().__init__(name, TaskType.KUBERNETES, *args, **kwargs)
self.image = image
self.namespace = namespace
self.min_cpu_cores = min_cpu_cores
self.min_memory_space = min_memory_space
68 changes: 68 additions & 0 deletions tests/tasks/test_kubernetes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Test Task Kubernetes."""

from unittest.mock import patch

from pydolphinscheduler.tasks.kubernetes import Kubernetes


def test_kubernetes_get_define():
"""Test task kubernetes function get_define."""
code = 123
version = 1
name = "test_kubernetes_get_define"
image = "ds-dev"
namespace = str({"name": "default", "cluster": "lab"})
minCpuCores = 2.0
minMemorySpace = 10.0

expect = {
"code": code,
"name": name,
"version": 1,
"description": None,
"delayTime": 0,
"taskType": "K8S",
"taskParams": {
"resourceList": [],
"localParams": [],
"image": image,
"namespace": namespace,
"minCpuCores": minCpuCores,
"minMemorySpace": minMemorySpace,
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {},
},
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
"timeout": 0,
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
k8s = Kubernetes(name, image, namespace, minCpuCores, minMemorySpace)
assert k8s.get_define() == expect

0 comments on commit 89f7407

Please sign in to comment.