Skip to content

Commit

Permalink
feat: Apply PEP-563 for codebase (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiajie committed Dec 25, 2023
1 parent 1b2ad57 commit e1d1527
Show file tree
Hide file tree
Showing 51 changed files with 404 additions and 383 deletions.
2 changes: 2 additions & 0 deletions .ruff.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
src = ["src"]

target-version = "py38"

# max-line-length = 110
line-length = 110

Expand Down
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
# under the License.

"""The script for setting up pydolphinscheduler."""
from __future__ import annotations

import logging
import os
import sys
from distutils.command.sdist import sdist
from distutils.dir_util import remove_tree
from distutils.errors import DistutilsExecError
from typing import List

from setuptools import Command, setup

Expand All @@ -38,7 +39,7 @@ class CleanCommand(Command):
"""Command to clean up python api before setup by running `python setup.py clean`."""

description = "Clean up project root"
user_options: List[str] = []
user_options: list[str] = []
clean_list = [
"build",
"htmlcov",
Expand Down
8 changes: 4 additions & 4 deletions src/pydolphinscheduler/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

"""Module engine."""

from typing import Dict, Optional
from __future__ import annotations

from py4j.protocol import Py4JJavaError

Expand Down Expand Up @@ -47,9 +47,9 @@ def __init__(
task_type: str,
main_class: str,
main_package: str,
program_type: Optional[ProgramType] = ProgramType.SCALA,
program_type: ProgramType | None = ProgramType.SCALA,
*args,
**kwargs
**kwargs,
):
super().__init__(name, task_type, *args, **kwargs)
self.main_class = main_class
Expand All @@ -76,7 +76,7 @@ def get_jar_id(self) -> int:
return self.get_resource_info(self.program_type, self.main_package).get("id")

@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> dict:
"""Override Task.task_params for engine children task.
children task have some specials attribute for task_params, and is odd if we
Expand Down
8 changes: 4 additions & 4 deletions src/pydolphinscheduler/core/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

"""Module resource."""

from typing import Optional
from __future__ import annotations

from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.java_gateway import gateway
Expand All @@ -38,9 +38,9 @@ class Resource(Base):
def __init__(
self,
name: str,
content: Optional[str] = None,
description: Optional[str] = None,
user_name: Optional[str] = None,
content: str | None = None,
description: str | None = None,
user_name: str | None = None,
):
super().__init__(name, description)
self.content = content
Expand Down
97 changes: 49 additions & 48 deletions src/pydolphinscheduler/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
# under the License.

"""DolphinScheduler Task and TaskRelation object."""

from __future__ import annotations

import copy
import types
import warnings
from collections.abc import Sequence
from datetime import timedelta
from logging import getLogger
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union

from pydolphinscheduler import configuration
from pydolphinscheduler.constants import (
Expand Down Expand Up @@ -72,7 +75,7 @@ def __init__(
self,
pre_task_code: int,
post_task_code: int,
name: Optional[str] = None,
name: str | None = None,
):
super().__init__(name)
self.pre_task_code = pre_task_code
Expand Down Expand Up @@ -147,35 +150,35 @@ class Task(Base):
_task_custom_attr: set = set()

ext: set = None
ext_attr: Union[str, types.FunctionType] = None
ext_attr: str | types.FunctionType = None

DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}

def __init__(
self,
name: str,
task_type: str,
description: Optional[str] = None,
flag: Optional[str] = TaskFlag.YES,
task_priority: Optional[str] = TaskPriority.MEDIUM,
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
environment_name: Optional[str] = None,
task_group_id: Optional[int] = 0,
task_group_priority: Optional[int] = 0,
delay_time: Optional[int] = 0,
fail_retry_times: Optional[int] = 0,
fail_retry_interval: Optional[int] = 1,
timeout_notify_strategy: Optional = None,
timeout: Optional[Union[timedelta, int]] = None,
workflow: Optional[Workflow] = None,
resource_list: Optional[List] = None,
dependence: Optional[Dict] = None,
wait_start_timeout: Optional[Dict] = None,
condition_result: Optional[Dict] = None,
resource_plugin: Optional[ResourcePlugin] = None,
is_cache: Optional[bool] = False,
input_params: Optional[Dict] = None,
output_params: Optional[Dict] = None,
description: str | None = None,
flag: str | None = TaskFlag.YES,
task_priority: str | None = TaskPriority.MEDIUM,
worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP,
environment_name: str | None = None,
task_group_id: int | None = 0,
task_group_priority: int | None = 0,
delay_time: int | None = 0,
fail_retry_times: int | None = 0,
fail_retry_interval: int | None = 1,
timeout_notify_strategy: str | None = None,
timeout: timedelta | int | None = None,
workflow: Workflow | None = None,
resource_list: list | None = None,
dependence: dict | None = None,
wait_start_timeout: dict | None = None,
condition_result: dict | None = None,
resource_plugin: ResourcePlugin | None = None,
is_cache: bool | None = False,
input_params: dict | None = None,
output_params: dict | None = None,
*args,
**kwargs,
):
Expand All @@ -192,7 +195,7 @@ def __init__(
self.fail_retry_interval = fail_retry_interval
self.delay_time = delay_time
self.timeout_notify_strategy = timeout_notify_strategy
self._timeout: Union[timedelta, int] = timeout
self._timeout: timedelta | int = timeout
self._workflow = None
self._input_params = input_params or {}
self._output_params = output_params or {}
Expand All @@ -214,9 +217,9 @@ def __init__(
)
self._local_params = kwargs.get("local_params")

self._upstream_task_codes: Set[int] = set()
self._downstream_task_codes: Set[int] = set()
self._task_relation: Set[TaskRelation] = set()
self._upstream_task_codes: set[int] = set()
self._downstream_task_codes: set[int] = set()
self._task_relation: set[TaskRelation] = set()
# move attribute code and version after _workflow and workflow declare
self.code, self.version = self.gen_code_and_version()
# Add task to workflow, maybe we could put into property workflow latter
Expand All @@ -238,12 +241,12 @@ def __init__(
self.get_content()

@property
def workflow(self) -> Optional[Workflow]:
def workflow(self) -> Workflow | None:
"""Get attribute workflow."""
return self._workflow

@workflow.setter
def workflow(self, workflow: Optional[Workflow]):
def workflow(self, workflow: Workflow | None):
"""Set attribute workflow."""
self._workflow = workflow

Expand All @@ -270,7 +273,7 @@ def is_cache(self) -> str:
raise PyDSParamException("is_cache must be a bool")

@property
def resource_list(self) -> List[Dict[str, Resource]]:
def resource_list(self) -> list[dict[str, Resource]]:
"""Get task define attribute `resource_list`."""
resources = set()
for res in self._resource_list:
Expand All @@ -292,24 +295,24 @@ def resource_list(self) -> List[Dict[str, Resource]]:
return [{ResourceKey.NAME: r} for r in resources]

@property
def user_name(self) -> Optional[str]:
def user_name(self) -> str | None:
"""Return username of workflow."""
if self.workflow:
return self.workflow.user.name
else:
raise PyDSParamException("`user_name` cannot be empty.")

@property
def condition_result(self) -> Dict:
def condition_result(self) -> dict:
"""Get attribute condition_result."""
return self._condition_result

@condition_result.setter
def condition_result(self, condition_result: Optional[Dict]):
def condition_result(self, condition_result: dict | None):
"""Set attribute condition_result."""
self._condition_result = condition_result

def _get_attr(self) -> Set[str]:
def _get_attr(self) -> set[str]:
"""Get final task task_params attribute.
Base on `_task_default_attr`, append attribute from `_task_custom_attr` and subtract attribute from
Expand All @@ -321,7 +324,7 @@ def _get_attr(self) -> Set[str]:
return attr

@property
def task_params(self) -> Optional[Dict]:
def task_params(self) -> dict | None:
"""Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
Expand Down Expand Up @@ -373,29 +376,27 @@ def get_content(self):
def __hash__(self):
return hash(self.code)

def __lshift__(self, other: Union["Task", Sequence["Task"]]):
def __lshift__(self, other: Task | Sequence[Task]):
"""Implement Task << Task."""
self.set_upstream(other)
return other

def __rshift__(self, other: Union["Task", Sequence["Task"]]):
def __rshift__(self, other: Task | Sequence[Task]):
"""Implement Task >> Task."""
self.set_downstream(other)
return other

def __rrshift__(self, other: Union["Task", Sequence["Task"]]):
def __rrshift__(self, other: Task | Sequence[Task]):
"""Call for Task >> [Task] because list don't have __rshift__ operators."""
self.__lshift__(other)
return self

def __rlshift__(self, other: Union["Task", Sequence["Task"]]):
def __rlshift__(self, other: Task | Sequence[Task]):
"""Call for Task << [Task] because list don't have __lshift__ operators."""
self.__rshift__(other)
return self

def _set_deps(
self, tasks: Union["Task", Sequence["Task"]], upstream: bool = True
) -> None:
def _set_deps(self, tasks: Task | Sequence[Task], upstream: bool = True) -> None:
"""Set parameter tasks dependent to current task.
it is a wrapper for :func:`set_upstream` and :func:`set_downstream`.
Expand Down Expand Up @@ -427,16 +428,16 @@ def _set_deps(
)
self.workflow._task_relations.add(task_relation)

def set_upstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
def set_upstream(self, tasks: Task | Sequence[Task]) -> None:
"""Set parameter tasks as upstream to current task."""
self._set_deps(tasks, upstream=True)

def set_downstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
def set_downstream(self, tasks: Task | Sequence[Task]) -> None:
"""Set parameter tasks as downstream to current task."""
self._set_deps(tasks, upstream=False)

# TODO code should better generate in bulk mode when :ref: workflow run submit or start
def gen_code_and_version(self) -> Tuple:
def gen_code_and_version(self) -> tuple:
"""Generate task code and version from java gateway.
If task name do not exists in workflow before, if will generate new code and version id
Expand Down Expand Up @@ -474,7 +475,7 @@ def local_params(self):
def add_in(
self,
name: str,
value: Optional[Union[int, str, float, bool, BaseDataType]] = None,
value: int | str | float | bool | BaseDataType | None = None,
):
"""Add input parameters.
Expand All @@ -494,7 +495,7 @@ def add_in(
def add_out(
self,
name: str,
value: Optional[Union[int, str, float, bool, BaseDataType]] = None,
value: int | str | float | bool | BaseDataType | None = None,
):
"""Add output parameters.
Expand Down

0 comments on commit e1d1527

Please sign in to comment.