diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ce502b7f..7ca18c6d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `MultiBackendJobManager`: costs has been added as a column in tracking databases ([[#588](https://github.com/Open-EO/openeo-python-client/issues/588)]) + ### Removed ### Fixed diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 4b56a9f25..799a22f95 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -207,6 +207,7 @@ def start_job( "cpu": _ColumnProperties(dtype="str"), "memory": _ColumnProperties(dtype="str"), "duration": _ColumnProperties(dtype="str"), + "costs": _ColumnProperties(dtype="float64"), } def __init__( @@ -744,6 +745,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = for key in job_metadata.get("usage", {}).keys(): if key in active.columns: active.loc[i, key] = _format_usage_stat(job_metadata, key) + if "costs" in job_metadata.keys(): + active.loc[i, "costs"] = job_metadata.get("costs") except OpenEoApiError as e: # TODO: inspect status code and e.g. differentiate between 4xx/5xx diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py index a1e036be1..05210c114 100644 --- a/openeo/rest/_testing.py +++ b/openeo/rest/_testing.py @@ -225,11 +225,19 @@ def _handle_get_job(self, request, context): self.batch_jobs[job_id]["status"] = self._get_job_status( job_id=job_id, current_status=self.batch_jobs[job_id]["status"] ) - return { + result = { # TODO: add some more required fields like "process" and "created"? "id": job_id, "status": self.batch_jobs[job_id]["status"], } + if self.batch_jobs[job_id]["status"] == "finished": # HACK some realistic values for a small job + result["costs"] = 123 + result["usage"] = { + "cpu": {"unit": "cpu-seconds", "value": 1234.5}, + "memory": {"unit": "mb-seconds", "value": 34567.89}, + "duration": {"unit": "seconds", "value": 2345}, + } + return result def _handle_get_job_results(self, request, context): """Handler of `GET /job/{job_id}/results` (list batch job results).""" diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 4c04f5512..3419b46e1 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -18,6 +18,7 @@ # httpretty avoids this specific problem because it mocks at the socket level, # But I would rather not have two dependencies with almost the same goal. import httpretty +import numpy as np import pandas import pandas as pd import pytest @@ -166,12 +167,15 @@ def test_basic(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock): } ) - assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ - ("job-2018", "finished", "foo"), - ("job-2019", "finished", "foo"), - ("job-2020", "finished", "bar"), - ("job-2021", "finished", "bar"), - ("job-2022", "finished", "foo"), + assert [ + (r.id, r.status, r.backend_name, r.cpu, r.memory, r.duration, r.costs) + for r in pd.read_csv(job_db_path).itertuples() + ] == [ + ("job-2018", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2019", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2020", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2021", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2022", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), ] # Check downloaded results and metadata. @@ -204,6 +208,10 @@ def test_db_class(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock, assert len(result) == 5 assert set(result.status) == {"finished"} assert set(result.backend_name) == {"foo", "bar"} + assert set(result.cpu) == {"1234.5 cpu-seconds"} + assert set(result.memory) == {"34567.89 mb-seconds"} + assert set(result.duration) == {"2345 seconds"} + assert set(result.costs) == {123} @pytest.mark.parametrize( ["filename", "expected_db_class"], @@ -254,12 +262,15 @@ def test_basic_threading(self, tmp_path, job_manager, job_manager_root_dir, slee # TODO #645 how to collect stats with the threaded run_job? assert sleep_mock.call_count > 10 - assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ - ("job-2018", "finished", "foo"), - ("job-2019", "finished", "foo"), - ("job-2020", "finished", "bar"), - ("job-2021", "finished", "bar"), - ("job-2022", "finished", "foo"), + assert [ + (r.id, r.status, r.backend_name, r.cpu, r.memory, r.duration, r.costs) + for r in pd.read_csv(job_db_path).itertuples() + ] == [ + ("job-2018", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2019", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2020", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2021", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2022", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), ] # Check downloaded results and metadata. @@ -283,6 +294,7 @@ def test_normalize_df(self): "memory", "duration", "backend_name", + "costs", ] ) @@ -333,12 +345,15 @@ def start_worker_thread(): ) # Also check that we got sensible end results in the job db. - assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [ - ("job-2018", "finished", "foo"), - ("job-2019", "finished", "foo"), - ("job-2020", "finished", "bar"), - ("job-2021", "finished", "bar"), - ("job-2022", "error", "foo"), + results = pd.read_csv(job_db_path).replace({np.nan: None}) # np.nan's are replaced by None for easy comparison + assert [ + (r.id, r.status, r.backend_name, r.cpu, r.memory, r.duration, r.costs) for r in results.itertuples() + ] == [ + ("job-2018", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2019", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2020", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2021", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123), + ("job-2022", "error", "foo", None, None, None, None), ] # Check downloaded results and metadata. @@ -673,6 +688,7 @@ def test_initialize_from_df(self, tmp_path, db_class): "memory", "duration", "backend_name", + "costs", } actual_columns = set(db_class(path).read().columns) @@ -852,6 +868,7 @@ def test_initialize_from_df(self, tmp_path): "memory", "duration", "backend_name", + "costs", } # Raw file content check @@ -930,6 +947,7 @@ def test_initialize_from_df(self, tmp_path): "memory", "duration", "backend_name", + "costs", } df_from_disk = ParquetJobDatabase(path).read()