From a6b3ae7174ccacd001687123cdad3d99c494490b Mon Sep 17 00:00:00 2001 From: Victor Verhaert <33786515+VictorVerhaert@users.noreply.github.com> Date: Tue, 19 Nov 2024 11:11:12 +0100 Subject: [PATCH 1/4] added costs as a column to tracking databases --- CHANGELOG.md | 2 ++ openeo/extra/job_management.py | 3 +++ tests/extra/test_job_management.py | 4 ++++ 3 files changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ce502b7f..7ca18c6d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `MultiBackendJobManager`: costs has been added as a column in tracking databases ([[#588](https://github.com/Open-EO/openeo-python-client/issues/588)]) + ### Removed ### Fixed diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 4b56a9f25..10253cf3d 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -207,6 +207,7 @@ def start_job( "cpu": _ColumnProperties(dtype="str"), "memory": _ColumnProperties(dtype="str"), "duration": _ColumnProperties(dtype="str"), + "costs": _ColumnProperties(dtype="str"), } def __init__( @@ -744,6 +745,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = for key in job_metadata.get("usage", {}).keys(): if key in active.columns: active.loc[i, key] = _format_usage_stat(job_metadata, key) + if "costs" in job_metadata.keys(): + active.loc[i, "costs"] = job_metadata.get("costs") except OpenEoApiError as e: # TODO: inspect status code and e.g. differentiate between 4xx/5xx diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 4c04f5512..3d06a47fb 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -283,6 +283,7 @@ def test_normalize_df(self): "memory", "duration", "backend_name", + "costs", ] ) @@ -673,6 +674,7 @@ def test_initialize_from_df(self, tmp_path, db_class): "memory", "duration", "backend_name", + "costs", } actual_columns = set(db_class(path).read().columns) @@ -852,6 +854,7 @@ def test_initialize_from_df(self, tmp_path): "memory", "duration", "backend_name", + "costs", } # Raw file content check @@ -930,6 +933,7 @@ def test_initialize_from_df(self, tmp_path): "memory", "duration", "backend_name", + "costs", } df_from_disk = ParquetJobDatabase(path).read() From 0f468e0fc47405ab3abcca23623aeacbd32ea40a Mon Sep 17 00:00:00 2001 From: Victor Verhaert <33786515+VictorVerhaert@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:17:33 +0100 Subject: [PATCH 2/4] changed type of costs column to float64 added testing of usage metrics and costs --- openeo/extra/job_management.py | 2 +- openeo/rest/_testing.py | 13 ++++++++++++- tests/extra/test_job_management.py | 9 ++++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 10253cf3d..799a22f95 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -207,7 +207,7 @@ def start_job( "cpu": _ColumnProperties(dtype="str"), "memory": _ColumnProperties(dtype="str"), "duration": _ColumnProperties(dtype="str"), - "costs": _ColumnProperties(dtype="str"), + "costs": _ColumnProperties(dtype="float64"), } def __init__( diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py index a1e036be1..197515c3a 100644 --- a/openeo/rest/_testing.py +++ b/openeo/rest/_testing.py @@ -225,11 +225,22 @@ def _handle_get_job(self, request, context): self.batch_jobs[job_id]["status"] = self._get_job_status( job_id=job_id, current_status=self.batch_jobs[job_id]["status"] ) - return { + result = { # TODO: add some more required fields like "process" and "created"? "id": job_id, "status": self.batch_jobs[job_id]["status"], } + if self.batch_jobs[job_id]["status"] == "finished": # HACK some realistic values for a small job + result["costs"] = 4 + result["usage"] = { + "cpu": {"unit": "cpu-seconds", "value": 30.0}, + "duration": {"unit": "seconds", "value": 55}, + "input_pixel": {"unit": "mega-pixel", "value": 6.0}, + "max_executor_memory": {"unit": "gb", "value": 0.5}, + "memory": {"unit": "mb-seconds", "value": 150000.0}, + "network_received": {"unit": "b", "value": 200000}, + } + return result def _handle_get_job_results(self, request, context): """Handler of `GET /job/{job_id}/results` (list batch job results).""" diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 3d06a47fb..004337fb3 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -174,6 +174,8 @@ def test_basic(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock): ("job-2022", "finished", "foo"), ] + assert not pd.read_csv(job_db_path)[["cpu", "memory", "duration", "costs"]].isnull().any().any() + # Check downloaded results and metadata. assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == { Path(f"job_{job_id}") / filename @@ -204,6 +206,7 @@ def test_db_class(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock, assert len(result) == 5 assert set(result.status) == {"finished"} assert set(result.backend_name) == {"foo", "bar"} + assert not result[["cpu", "memory", "duration", "costs"]].isnull().any().any() @pytest.mark.parametrize( ["filename", "expected_db_class"], @@ -262,6 +265,8 @@ def test_basic_threading(self, tmp_path, job_manager, job_manager_root_dir, slee ("job-2022", "finished", "foo"), ] + assert not pd.read_csv(job_db_path)[["cpu", "memory", "duration", "costs"]].isnull().any().any() + # Check downloaded results and metadata. assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == { Path(f"job_{job_id}") / filename @@ -334,13 +339,15 @@ def start_worker_thread(): ) # Also check that we got sensible end results in the job db. - assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ + result = pd.read_csv(job_db_path) + assert [(r.id, r.status, r.backend_name) for r in result.itertuples()] == [ ("job-2018", "finished", "foo"), ("job-2019", "finished", "foo"), ("job-2020", "finished", "bar"), ("job-2021", "finished", "bar"), ("job-2022", "error", "foo"), ] + assert not result[result["status"] == "finished"][["cpu", "memory", "duration", "costs"]].isnull().any().any() # Check downloaded results and metadata. assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == { From 8bb37d18af4325c4fc4c7356ceb3bd0d66a8348b Mon Sep 17 00:00:00 2001 From: Victor Verhaert <33786515+VictorVerhaert@users.noreply.github.com> Date: Fri, 22 Nov 2024 13:39:19 +0100 Subject: [PATCH 3/4] removed unused usage metrics from dummy backend --- openeo/rest/_testing.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py index 197515c3a..d9e2430fa 100644 --- a/openeo/rest/_testing.py +++ b/openeo/rest/_testing.py @@ -235,10 +235,7 @@ def _handle_get_job(self, request, context): result["usage"] = { "cpu": {"unit": "cpu-seconds", "value": 30.0}, "duration": {"unit": "seconds", "value": 55}, - "input_pixel": {"unit": "mega-pixel", "value": 6.0}, - "max_executor_memory": {"unit": "gb", "value": 0.5}, "memory": {"unit": "mb-seconds", "value": 150000.0}, - "network_received": {"unit": "b", "value": 200000}, } return result From 1e7878e5dd0846ec8084a00561d01ab1163c9dea Mon Sep 17 00:00:00 2001 From: Victor Verhaert <33786515+VictorVerhaert@users.noreply.github.com> Date: Mon, 25 Nov 2024 11:57:16 +0100 Subject: [PATCH 4/4] changed tests to check for specific usage values also changed the values to be more recognizable --- openeo/rest/_testing.py | 8 ++--- tests/extra/test_job_management.py | 57 +++++++++++++++++------------- 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py index d9e2430fa..05210c114 100644 --- a/openeo/rest/_testing.py +++ b/openeo/rest/_testing.py @@ -231,11 +231,11 @@ def _handle_get_job(self, request, context): "status": self.batch_jobs[job_id]["status"], } if self.batch_jobs[job_id]["status"] == "finished": # HACK some realistic values for a small job - result["costs"] = 4 + result["costs"] = 123 result["usage"] = { - "cpu": {"unit": "cpu-seconds", "value": 30.0}, - "duration": {"unit": "seconds", "value": 55}, - "memory": {"unit": "mb-seconds", "value": 150000.0}, + "cpu": {"unit": "cpu-seconds", "value": 1234.5}, + "memory": {"unit": "mb-seconds", "value": 34567.89}, + "duration": {"unit": "seconds", "value": 2345}, } return result diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 004337fb3..3419b46e1 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -18,6 +18,7 @@ # httpretty avoids this specific problem because it mocks at the socket level, # But I would rather not have two dependencies with almost the same goal. import httpretty +import numpy as np import pandas import pandas as pd import pytest @@ -166,16 +167,17 @@ def test_basic(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock): } ) - assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ - ("job-2018", "finished", "foo"), - ("job-2019", "finished", "foo"), - ("job-2020", "finished", "bar"), - ("job-2021", "finished", "bar"), - ("job-2022", "finished", "foo"), + assert [ + (r.id, r.status, r.backend_name, r.cpu, r.memory, r.duration, r.costs) + for r in pd.read_csv(job_db_path).itertuples() + ] == [ + ("job-2018", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2019", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2020", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2021", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2022", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), ] - assert not pd.read_csv(job_db_path)[["cpu", "memory", "duration", "costs"]].isnull().any().any() - # Check downloaded results and metadata. assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == { Path(f"job_{job_id}") / filename @@ -206,7 +208,10 @@ def test_db_class(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock, assert len(result) == 5 assert set(result.status) == {"finished"} assert set(result.backend_name) == {"foo", "bar"} - assert not result[["cpu", "memory", "duration", "costs"]].isnull().any().any() + assert set(result.cpu) == {"1234.5 cpu-seconds"} + assert set(result.memory) == {"34567.89 mb-seconds"} + assert set(result.duration) == {"2345 seconds"} + assert set(result.costs) == {123} @pytest.mark.parametrize( ["filename", "expected_db_class"], @@ -257,16 +262,17 @@ def test_basic_threading(self, tmp_path, job_manager, job_manager_root_dir, slee # TODO #645 how to collect stats with the threaded run_job? assert sleep_mock.call_count > 10 - assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ - ("job-2018", "finished", "foo"), - ("job-2019", "finished", "foo"), - ("job-2020", "finished", "bar"), - ("job-2021", "finished", "bar"), - ("job-2022", "finished", "foo"), + assert [ + (r.id, r.status, r.backend_name, r.cpu, r.memory, r.duration, r.costs) + for r in pd.read_csv(job_db_path).itertuples() + ] == [ + ("job-2018", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2019", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2020", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2021", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2022", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), ] - assert not pd.read_csv(job_db_path)[["cpu", "memory", "duration", "costs"]].isnull().any().any() - # Check downloaded results and metadata. assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == { Path(f"job_{job_id}") / filename @@ -339,15 +345,16 @@ def start_worker_thread(): ) # Also check that we got sensible end results in the job db. - result = pd.read_csv(job_db_path) - assert [(r.id, r.status, r.backend_name) for r in result.itertuples()] == [ - ("job-2018", "finished", "foo"), - ("job-2019", "finished", "foo"), - ("job-2020", "finished", "bar"), - ("job-2021", "finished", "bar"), - ("job-2022", "error", "foo"), + results = pd.read_csv(job_db_path).replace({np.nan: None}) # np.nan's are replaced by None for easy comparison + assert [ + (r.id, r.status, r.backend_name, r.cpu, r.memory, r.duration, r.costs) for r in results.itertuples() + ] == [ + ("job-2018", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2019", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2020", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2021", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2022", "error", "foo", None, None, None, None), ] - assert not result[result["status"] == "finished"][["cpu", "memory", "duration", "costs"]].isnull().any().any() # Check downloaded results and metadata. assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == {