Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create integration tests for general use case #38

Closed
gardusig opened this issue Feb 16, 2022 · 0 comments · Fixed by #106
Closed

Create integration tests for general use case #38

gardusig opened this issue Feb 16, 2022 · 0 comments · Fixed by #106
Assignees
Labels
enhancement New feature or request

Comments

@gardusig
Copy link
Contributor

gardusig commented Feb 16, 2022

Aim to test all possible ways of application startup, with and without some parameters.

Tool used to test by hand:

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.http.api_client import ApiClient
from conductor.client.http.api.metadata_resource_api import MetadataResourceApi
from conductor.client.http.api.task_resource_api import TaskResourceApi
from conductor.client.http.api.workflow_resource_api import WorkflowResourceApi
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from typing import List
import logging

logger = logging.getLogger(
    Configuration.get_logging_formatted_name(
        __name__
    )
)


class SimplePythonWorker(WorkerInterface):
    def execute(self, task: Task) -> TaskResult:
        task_result = self.get_task_result_from_task(task)
        task_result.add_output_data('key1', 'value')
        task_result.add_output_data('key2', 42)
        task_result.add_output_data('key3', False)
        task_result.status = TaskResultStatus.COMPLETED
        return task_result


def get_python_task_definition_example() -> List[dict]:
    return [
        {
            "createTime": 1650595379661,
            "createdBy": "",
            "name": "python_task_example_from_code",
            "description": "Python task example from code",
            "retryCount": 3,
            "timeoutSeconds": 300,
            "inputKeys": [],
            "outputKeys": [],
            "timeoutPolicy": "TIME_OUT_WF",
            "retryLogic": "FIXED",
            "retryDelaySeconds": 10,
            "responseTimeoutSeconds": 180,
            "inputTemplate": {},
            "rateLimitPerFrequency": 0,
            "rateLimitFrequencyInSeconds": 1,
            "ownerEmail": "gustavo.gardusi@orkes.io",
            "backoffScaleFactor": 1
        },
    ]


def get_python_workflow_definition_example() -> dict:
    return {
        "updateTime": 1650595431465,
        "name": "workflow_with_python_task_example_from_code",
        "description": "Workflow with python task example from code",
        "version": 1,
        "tasks": [
            {
                "name": "python_task_example_from_code",
                "taskReferenceName": "python_task_example_from_code_ref_0",
                "inputParameters": {

                },
                "type": "SIMPLE",
                "decisionCases": {

                },
                "defaultCase": [

                ],
                "forkTasks":[

                ],
                "startDelay":0,
                "joinOn":[

                ],
                "optional":False,
                "defaultExclusiveJoinTask":[

                ],
                "asyncComplete":False,
                "loopOver":[

                ]
            }
        ],
        "inputParameters": [

        ],
        "outputParameters": {
            "workerOutput": "${python_task_example_from_code_ref_0.output}"
        },
        "schemaVersion": 2,
        "restartable": True,
        "workflowStatusListenerEnabled": False,
        "ownerEmail": "gustavo.gardusi@orkes.io",
        "timeoutPolicy": "ALERT_ONLY",
        "timeoutSeconds": 0,
        "variables": {

        },
        "inputTemplate": {

        }
    }


def define_task_and_workflow(api_client: ApiClient) -> None:
    metadata_client = MetadataResourceApi(api_client)
    try:
        metadata_client.register_task_def1(
            body=get_python_task_definition_example()
        )
        metadata_client.create(
            body=get_python_workflow_definition_example()
        )
    except Exception as e:
        logger.debug(f'Failed to define task/workflow, reason: {e}')


def start_workflow(api_client: ApiClient, workflow_name: str) -> str:
    workflow_client = WorkflowResourceApi(api_client)
    workflowId = workflow_client.start_workflow(
        body={},
        name=workflow_name
    )
    return workflowId


def start_workflows(api_client: ApiClient, workflow_name: str, qty: int) -> List[str]:
    workflowIdList = []
    for _ in range(qty):
        try:
            workflowId = start_workflow(api_client, workflow_name)
            workflowIdList.append(workflowId)
            logger.debug(
                f'Started workflow: {workflow_name}, with id: {workflowId}'
            )
        except Exception as e:
            logger.debug(
                f'Failed to start workflow: {workflow_name}, reason: {e}'
            )
    return workflowIdList


def main():
    configuration = Configuration(
        base_url='https://play.orkes.io',
        debug=True,
        authentication_settings=AuthenticationSettings(
            key_id='',
            key_secret=''
        )
    )
    configuration.apply_logging_config()

    api_client = ApiClient(configuration)

    workflow_id = start_workflow(
        api_client,
        'workflow_with_python_task_example_from_code'
    )
    logger.debug(f'workflow_id: {workflow_id}')

    task_api = TaskResourceApi(api_client)
    response = task_api.update_task_by_ref_name(
        output={'hello': 'world'},
        workflow_id=workflow_id,
        task_ref_name='python_task_example_from_code_ref_0',
        status=TaskResultStatus.COMPLETED.value,
    )
    logger.debug(f'task update response: {response}')

    workers = [
        SimplePythonWorker('python_task_example_from_code'),
    ]
    workflow_ids = start_workflows(
        api_client,
        'workflow_with_python_task_example_from_code',
        10
    )
    metrics_settings = MetricsSettings()
    with TaskHandler(workers, configuration, metrics_settings) as task_handler:
        task_handler.start_processes()
        task_handler.join_processes()


if __name__ == '__main__':
    main()
@gardusig gardusig changed the title Add integration tests implementation for general task/worker use case Create integration tests for general use case Jul 12, 2022
@gardusig gardusig added the enhancement New feature or request label Jul 12, 2022
@gardusig gardusig self-assigned this Jul 24, 2022
@gardusig gardusig linked a pull request Jul 24, 2022 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant