From d52df7d45b7ed92f6f4315a048ccd8075f991aa8 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Thu, 6 May 2021 14:17:04 +0200 Subject: [PATCH 1/3] Fix task env vars --- pkg/workloads/k8s.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index 96a326ed45..13694fcb4d 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -657,12 +657,12 @@ func tensorFlowHandlerContainers(api *spec.API, envVars []kcore.EnvVar, isJob bo func taskEnvVars(api *spec.API) []kcore.EnvVar { envVars := apiContainerEnvVars(api) envVars = append(envVars, - kcore.EnvVar{ Name: "CORTEX_TASK_SPEC", Value: TaskSpecPath, }, ) + envVars = append(envVars, getKubexitEnvVars(APIContainerName)...) return envVars } From 81f17cfa8933af921062cf6eeed9a9aeaff86adb Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Thu, 6 May 2021 14:17:31 +0200 Subject: [PATCH 2/3] Add local operator functionality in tasks e2e tests --- test/e2e/e2e/tests.py | 27 +++++++++++++++++++-------- test/e2e/e2e/utils.py | 14 +++++++++----- test/e2e/tests/aws/test_task.py | 1 + 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/test/e2e/e2e/tests.py b/test/e2e/e2e/tests.py index 89c1dead3a..f37fcd3b73 100644 --- a/test/e2e/e2e/tests.py +++ b/test/e2e/e2e/tests.py @@ -183,12 +183,16 @@ def test_batch_api( job_spec = response.json() # monitor job progress + job_id = job_spec["job_id"] + endpoint_override = ( + f"http://localhost:8888/batch/{api_name}?jobID={job_id}" if local_operator else None + ) assert job_done( client=client, - api_name=job_spec["api_name"], - job_id=job_spec["job_id"], + api_name=api_name, + job_id=job_id, timeout=job_timeout, - local_operator=local_operator, + endpoint_override=endpoint_override, ), f"job did not succeed (api_name: {api_name}, job_id: {job_spec['job_id']})" except: @@ -330,6 +334,7 @@ def test_task_api( job_timeout: int = None, retry_attempts: int = 0, api_config_name: str = "cortex.yaml", + local_operator: bool = False, ): api_dir = TEST_APIS_DIR / api with open(str(api_dir / api_config_name)) as f: @@ -341,16 +346,17 @@ def test_task_api( client.create_api(api_spec=api_specs[0], project_dir=str(api_dir)) try: + endpoint_override = f"http://localhost:8888/tasks/{api_name}" if local_operator else None assert endpoint_ready( - client=client, api_name=api_name, timeout=deploy_timeout + client=client, + api_name=api_name, + timeout=deploy_timeout, + endpoint_override=endpoint_override, ), f"api {api_name} not ready" response = None for _ in range(retry_attempts + 1): - response = request_task( - client, - api_name, - ) + response = request_task(client, api_name, local_operator=local_operator) if response.status_code == HTTPStatus.OK: break @@ -358,11 +364,16 @@ def test_task_api( job_spec = response.json() + job_id = job_spec["job_id"] + endpoint_override = ( + f"http://localhost:8888/tasks/{api_name}?jobID={job_id}" if local_operator else None + ) assert job_done( client=client, api_name=api_name, job_id=job_spec["job_id"], timeout=job_timeout, + endpoint_override=endpoint_override, ), f"task job did not succeed (api_name: {api_name}, job_id: {job_spec['job_id']})" except: diff --git a/test/e2e/e2e/utils.py b/test/e2e/e2e/utils.py index 7ec512dfc8..3d89485777 100644 --- a/test/e2e/e2e/utils.py +++ b/test/e2e/e2e/utils.py @@ -96,11 +96,11 @@ def _is_ready(): def job_done( - client: cx.Client, api_name: str, job_id: str, timeout: int = None, local_operator: bool = False + client: cx.Client, api_name: str, job_id: str, timeout: int = None, endpoint_override: str = None ) -> bool: def _is_ready(): - if local_operator: - job_info = requests.get(f"http://localhost:8888/batch/{api_name}?jobID={job_id}") + if endpoint_override: + job_info = requests.get(endpoint_override) job_info = job_info.json() return job_info["job_status"]["status"] == "succeeded" @@ -211,9 +211,13 @@ def request_task( api_name: str, config: Dict = None, timeout: int = None, + local_operator: bool = False, ): - api_info = client.get_api(api_name) - endpoint = api_info["endpoint"] + if local_operator: + endpoint = f"http://localhost:8888/tasks/{api_name}" + else: + api_info = client.get_api(api_name) + endpoint = api_info["endpoint"] payload = {} if config is not None: diff --git a/test/e2e/tests/aws/test_task.py b/test/e2e/tests/aws/test_task.py index 8aba167901..05fec51e75 100644 --- a/test/e2e/tests/aws/test_task.py +++ b/test/e2e/tests/aws/test_task.py @@ -32,4 +32,5 @@ def test_task_api(printer: Callable, config: Dict, client: cx.Client, api: str): retry_attempts=5, deploy_timeout=config["global"]["task_deploy_timeout"], job_timeout=config["global"]["task_job_timeout"], + local_operator=config["global"]["local_operator"], ) From c7e62afb9cb38cf56e8a97eaa093381f1e325b25 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Thu, 6 May 2021 14:23:09 +0200 Subject: [PATCH 3/3] Fix lint errors --- test/e2e/e2e/utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/e2e/e2e/utils.py b/test/e2e/e2e/utils.py index 3d89485777..fdaf54630f 100644 --- a/test/e2e/e2e/utils.py +++ b/test/e2e/e2e/utils.py @@ -96,7 +96,11 @@ def _is_ready(): def job_done( - client: cx.Client, api_name: str, job_id: str, timeout: int = None, endpoint_override: str = None + client: cx.Client, + api_name: str, + job_id: str, + timeout: int = None, + endpoint_override: str = None, ) -> bool: def _is_ready(): if endpoint_override: