diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index ced5cee37..5fb811737 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -538,7 +538,7 @@ def _job_update_loop( self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats) stats["job launch"] += 1 - job_db.persist(not_started.loc[i : i + 1]) + job_db.persist(not_started.loc[[i]]) stats["job_db persist"] += 1 total_added += 1 diff --git a/openeo/extra/job_management/stac_job_db.py b/openeo/extra/job_management/stac_job_db.py index ac19fc352..ff83b8bdd 100644 --- a/openeo/extra/job_management/stac_job_db.py +++ b/openeo/extra/job_management/stac_job_db.py @@ -51,18 +51,35 @@ def __init__( self.base_url = stac_root_url self.bulk_size = 500 + # TODO: is the "item_id" column a feature we can/should deprecate? + self._ensure_item_id_column = True + def exists(self) -> bool: return any(c.id == self.collection_id for c in self.client.get_collections()) def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: """ Normalize the given dataframe to be compatible with :py:class:`MultiBackendJobManager` - by adding the default columns and setting the index. + by adding the default columns and using the STAC item ids as index values. """ df = MultiBackendJobManager._normalize_df(df) - # If the user doesn't specify the item_id column, we will use the index. - if "item_id" not in df.columns: - df = df.reset_index(names=["item_id"]) + + if isinstance(df.index, pd.RangeIndex): + # Index is supposed to contain meaningful STAC item ids, + # not some default RangeIndex. + if "item_id" in df.columns: + # Leverage legacy usage pattern with an "item_id" column: copy that over as index. + df.index = df["item_id"] + elif df.shape[0] > 0: + _log.warning( + "STAC API oriented dataframe normalization: no meaningful index. This might cause consistency issues." + ) + + if self._ensure_item_id_column and "item_id" not in df.columns: + df["item_id"] = df.index + + # Make sure the index (of item ids) are strings, to play well with (py)STAC schemas + df.index = df.index.astype(str) return df def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"): @@ -128,7 +145,7 @@ def item_from(self, series: pd.Series) -> pystac.Item: :return: pystac.Item """ series_dict = series.to_dict() - item_id = series_dict.pop("item_id") + item_id = str(series.name) item_dict = {} item_dict.setdefault("stac_version", pystac.get_stac_version()) item_dict.setdefault("type", "Feature") @@ -165,6 +182,13 @@ def count_by_status(self, statuses: Iterable[str] = ()) -> dict: else: return items["status"].value_counts().to_dict() + def _search_result_to_df(self, search_result: pystac_client.ItemSearch) -> pd.DataFrame: + series = [self.series_from(item) for item in search_result.items()] + # Note: `series_from` sets the item id as the series "name", + # which ends up in the index of the dataframe + df = pd.DataFrame(series) + return df + def get_by_status(self, statuses: Iterable[str], max: Optional[int] = None) -> pd.DataFrame: if isinstance(statuses, str): statuses = {statuses} @@ -177,11 +201,9 @@ def get_by_status(self, statuses: Iterable[str], max: Optional[int] = None) -> p filter=status_filter, max_items=max, ) + df = self._search_result_to_df(search_results) - series = [self.series_from(item) for item in search_results.items()] - - df = pd.DataFrame(series).reset_index(names=["item_id"]) - if len(series) == 0: + if df.shape[0] == 0: # TODO: What if default columns are overwritten by the user? df = self._normalize_df( df @@ -189,6 +211,10 @@ def get_by_status(self, statuses: Iterable[str], max: Optional[int] = None) -> p return df def persist(self, df: pd.DataFrame): + if df.empty: + _log.warning("No data to persist in STAC API job database, skipping.") + return + if not self.exists(): spatial_extent = pystac.SpatialExtent([[-180, -90, 180, 90]]) temporal_extent = pystac.TemporalExtent([[None, None]]) @@ -196,16 +222,26 @@ def persist(self, df: pd.DataFrame): c = pystac.Collection(id=self.collection_id, description="STAC API job database collection.", extent=extent) self._create_collection(c) - all_items = [] - if not df.empty: - - def handle_row(series): - item = self.item_from(series) - all_items.append(item) - - df.apply(handle_row, axis=1) + # Merge updates with existing items (if any) + existing_items = self.client.search( + method="GET", + collections=[self.collection_id], + ids=[str(i) for i in df.index.tolist()], + ) + existing_df = self._search_result_to_df(existing_items) - self._upload_items_bulk(self.collection_id, all_items) + if existing_df.empty: + df_to_persist = df + else: + # Merge data on item_id (in the index) + df_to_persist = existing_df + # TODO: better way to do update without risk for data update loss? + assert set(df.index).issubset(df_to_persist.index) + df_to_persist.update(df, overwrite=True) + + items_to_persist = [self.item_from(s) for _, s in df_to_persist.iterrows()] + _log.info(f"Bulk upload of {len(items_to_persist)} items to STAC API collection {self.collection_id!r}") + self._upload_items_bulk(self.collection_id, items_to_persist) def _prepare_item(self, item: pystac.Item, collection_id: str): item.collection_id = collection_id diff --git a/tests/extra/job_management/test_stac_job_db.py b/tests/extra/job_management/test_stac_job_db.py index 24ebc25e3..a9e53db4e 100644 --- a/tests/extra/job_management/test_stac_job_db.py +++ b/tests/extra/job_management/test_stac_job_db.py @@ -1,6 +1,6 @@ import datetime import re -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, List, Optional, Union from unittest import mock from unittest.mock import MagicMock, patch @@ -65,10 +65,6 @@ def job_db_not_exists(mock_pystac_client) -> STACAPIJobDatabase: ) -@pytest.fixture -def dummy_dataframe() -> pd.DataFrame: - return pd.DataFrame({"no": [1], "geometry": [2], "here": [3]}) - @pytest.fixture def normalized_dummy_dataframe() -> pd.DataFrame: @@ -117,37 +113,23 @@ def normalized_merged_dummy_dataframe() -> pd.DataFrame: ) -@pytest.fixture -def dummy_geodataframe() -> gpd.GeoDataFrame: - return gpd.GeoDataFrame( - { - "there": [1], - "is": [2], - "geometry": [Point(1, 1)], - }, - geometry="geometry", - ) - - -@pytest.fixture -def normalized_dummy_geodataframe() -> pd.DataFrame: - return pd.DataFrame( - { - "item_id": [0], - "there": [1], - "is": [2], - "geometry": [{"type": "Point", "coordinates": (1.0, 1.0)}], - "id": None, - "backend_name": None, - "status": ["not_started"], - "start_time": None, - "running_start_time": None, - "cpu": None, - "memory": None, - "duration": None, - "costs": None, - } - ) +def _common_normalized_df_data(rows: int = 1) -> dict: + """ + Helper to build a dict (to be passed to `pd.DataFrame`) + with common columns that are the result of normalization, + but mainly boilerplate data that is not relevant to these tests. + """ + return { + "id": None, + "backend_name": None, + "status": ["not_started"] * rows, + "start_time": None, + "running_start_time": None, + "cpu": None, + "memory": None, + "duration": None, + "costs": None, + } def _pystac_item( @@ -157,6 +139,8 @@ def _pystac_item( geometry: Optional = None, bbox: Optional = None, datetime_: Union[None, str, datetime.date, datetime.date] = "2025-06-07", + links: Optional[List[Union[pystac.Link, dict]]] = None, + **kwargs, ) -> pystac.Item: """Helper to easily construct a dummy but valid pystac.Item""" if isinstance(datetime_, str): @@ -164,13 +148,18 @@ def _pystac_item( elif isinstance(datetime_, datetime.date): datetime_ = datetime.datetime.combine(datetime_, datetime.time.min, tzinfo=datetime.timezone.utc) - return pystac.Item( + item = pystac.Item( id=id, geometry=geometry, bbox=bbox, properties=properties or {}, datetime=datetime_, + **kwargs, ) + if links: + for link in links: + item.add_link(pystac.Link.from_dict(link) if isinstance(link, dict) else link) + return item @pytest.fixture @@ -184,21 +173,15 @@ def dummy_stac_item() -> pystac.Item: ) -@pytest.fixture -def dummy_series_no_item_id() -> pd.Series: - return pd.Series({"datetime": "2020-05-22T00:00:00Z", "some_property": "value"}, name="test") - - @pytest.fixture def bulk_dataframe(): return pd.DataFrame( - { - "item_id": [f"test-{i}" for i in range(10)], + index=[f"item-{i}" for i in range(10)], + data={ + "datetime": [f"2020-{i + 1:02d}-01" for i in range(10)], "some_property": [f"value-{i}" for i in range(10)], - "datetime": [f"2020-{i+1:02d}-01" for i in range(10)], }, - index=[i for i in range(10)], ) @@ -208,20 +191,60 @@ def test_exists(self, job_db_exists, job_db_not_exists): assert job_db_exists.exists() == True assert job_db_not_exists.exists() == False + @pytest.mark.parametrize( + ["df", "expected"], + [ + ( + pd.DataFrame({"no": [1], "geometry": [2], "here": [3]}), + pd.DataFrame( + index=["0"], + data={ + "no": [1], + "geometry": [2], + "here": [3], + "item_id": [0], + **_common_normalized_df_data(), + }, + ), + ), + # Item id used as index + ( + pd.DataFrame(index=["item-123", "item-456"], data={"hello": ["world", "earth"]}), + pd.DataFrame( + index=["item-123", "item-456"], + data={ + "item_id": ["item-123", "item-456"], + "hello": ["world", "earth"], + **_common_normalized_df_data(rows=2), + }, + ), + ), + # Legacy: item_id column + ( + pd.DataFrame({"item_id": ["item-123", "item-456"], "hello": ["world", "earth"]}), + pd.DataFrame( + index=pd.Index(["item-123", "item-456"], name="item_id"), + data={ + "item_id": ["item-123", "item-456"], + "hello": ["world", "earth"], + **_common_normalized_df_data(rows=2), + }, + ), + ), + ], + ) @patch("openeo.extra.job_management.stac_job_db.STACAPIJobDatabase.persist", return_value=None) - def test_initialize_from_df_non_existing( - self, mock_persist, job_db_not_exists, dummy_dataframe, normalized_dummy_dataframe - ): - - job_db_not_exists.initialize_from_df(dummy_dataframe) + def test_initialize_from_df_non_existing(self, mock_persist, job_db_not_exists, df, expected): + job_db_not_exists.initialize_from_df(df) mock_persist.assert_called_once() - pdt.assert_frame_equal(mock_persist.call_args[0][0], normalized_dummy_dataframe) + pdt.assert_frame_equal(mock_persist.call_args[0][0], expected, check_like=True) assert job_db_not_exists.has_geometry == False - def test_initialize_from_df_existing_error(self, job_db_exists, dummy_dataframe): + def test_initialize_from_df_existing_error(self, job_db_exists): + df = pd.DataFrame({"hello": ["world"]}) with pytest.raises(FileExistsError): - job_db_exists.initialize_from_df(dummy_dataframe) + job_db_exists.initialize_from_df(df) @patch("openeo.extra.job_management.stac_job_db.STACAPIJobDatabase.persist", return_value=None) @patch("openeo.extra.job_management.stac_job_db.STACAPIJobDatabase.get_by_status") @@ -241,36 +264,75 @@ def test_initialize_from_df_existing_append( pdt.assert_frame_equal(mock_persist.call_args[0][0], normalized_merged_dummy_dataframe) assert job_db_exists.has_geometry == False + @pytest.mark.parametrize( + ["df", "expected"], + [ + ( + gpd.GeoDataFrame( + index=["item-123", "item-456"], + data={"hello": ["world", "earth"]}, + geometry=[Point(1, 1), Point(2, 2)], + ), + pd.DataFrame( + index=["item-123", "item-456"], + data={ + "item_id": ["item-123", "item-456"], + "hello": ["world", "earth"], + "geometry": [ + {"type": "Point", "coordinates": (1.0, 1.0)}, + {"type": "Point", "coordinates": (2.0, 2.0)}, + ], + **_common_normalized_df_data(), + }, + ), + ) + ], + ) @patch("openeo.extra.job_management.stac_job_db.STACAPIJobDatabase.persist", return_value=None) - def test_initialize_from_df_with_geometry( - self, mock_persists, job_db_not_exists, dummy_geodataframe, normalized_dummy_geodataframe - ): - job_db_not_exists.initialize_from_df(dummy_geodataframe) + def test_initialize_from_df_with_geometry(self, mock_persists, job_db_not_exists, df, expected): + job_db_not_exists.initialize_from_df(df) mock_persists.assert_called_once() - pdt.assert_frame_equal(mock_persists.call_args[0][0], normalized_dummy_geodataframe) + pdt.assert_frame_equal(mock_persists.call_args[0][0], expected, check_like=True) assert job_db_not_exists.has_geometry == True assert job_db_not_exists.geometry_column == "geometry" - def test_series_from(self, job_db_exists, dummy_series_no_item_id, dummy_stac_item): - pdt.assert_series_equal(job_db_exists.series_from(dummy_stac_item), dummy_series_no_item_id) + @pytest.mark.parametrize( + ["item", "expected"], + [ + ( + _pystac_item( + id="item-123", + properties={"some_property": "value"}, + datetime_=datetime.datetime(2020, 5, 22), + ), + pd.Series( + name="item-123", + data={"some_property": "value", "datetime": "2020-05-22T00:00:00Z"}, + ), + ), + ], + ) + def test_series_from(self, job_db_exists, item, expected): + actual = job_db_exists.series_from(item) + pdt.assert_series_equal(actual, expected) @pytest.mark.parametrize( ["series", "expected"], [ # Minimal (using current time as datetime) ( - pd.Series({"item_id": "item-123"}), + pd.Series({}, name="item-123"), _pystac_item(id="item-123", datetime_="2022-02-02"), ), # With explicit datetime, and some other properties ( pd.Series( - { - "item_id": "item-123", + name="item-123", + data={ "datetime": "2023-04-05", "hello": "world", - } + }, ), _pystac_item( id="item-123", @@ -280,16 +342,21 @@ def test_series_from(self, job_db_exists, dummy_series_no_item_id, dummy_stac_it ), ( pd.Series( - { - "item_id": "item-123", - "datetime": datetime.datetime(2023, 4, 5, 12, 34), + name="item-123", + data={ + "datetime": "2023-04-05", "hello": "world", - } + }, ), - _pystac_item( + pystac.Item( id="item-123", - properties={"hello": "world"}, - datetime_="2023-04-05T12:34:00Z", + geometry=None, + bbox=None, + properties={ + "datetime": "2023-04-05", + "hello": "world", + }, + datetime=datetime.datetime(2023, 4, 5), ), ), ], @@ -305,8 +372,8 @@ def test_item_from(self, job_db_exists, series, expected, time_machine): [ ( pd.Series( - { - "item_id": "item-123", + name="item-123", + data={ "datetime": "2023-04-05", "geometry": {"type": "Polygon", "coordinates": (((1, 2), (3, 4), (0, 5), (1, 2)),)}, }, @@ -354,47 +421,48 @@ def test_get_by_status_result(self, job_db_exists): pdt.assert_frame_equal( df, pd.DataFrame( - { - "item_id": ["test"], + index=["test"], + data={ "datetime": ["2020-05-22T00:00:00Z"], "some_property": ["value"], }, - index=[0], ), ) - @patch("requests.post") - def test_persist_single_chunk(self, mock_requests_post, bulk_dataframe, job_db_exists): - def bulk_items(df): - all_items = [] - if not df.empty: - - def handle_row(series): - item = job_db_exists.item_from(series) - job_db_exists._prepare_item(item, job_db_exists.collection_id) - all_items.append(item) - - df.apply(handle_row, axis=1) - return all_items + def test_persist_single_chunk(self, requests_mock, job_db_exists, mock_pystac_client): + rows = 5 + bulk_dataframe = pd.DataFrame( + index=[f"item-{i}" for i in range(rows)], + data={ + "datetime": [f"2020-{i + 1:02d}-01" for i in range(rows)], + "some_property": [f"value-{i}" for i in range(rows)], + }, + ) + mock_pystac_client.search.return_value.items.return_value = [] + + expected_items = [ + _pystac_item( + id=f"item-{i}", + properties={"some_property": f"value-{i}"}, + datetime_=f"2020-{i + 1:02d}-01", + collection="collection-1", + links=[{"rel": "collection", "href": "collection-1"}], + ) + for i in range(rows) + ] + expected_items = {item.id: item.to_dict() for item in expected_items} - items = bulk_items(bulk_dataframe) + def post_bulk_items(request, context): + post_data = request.json() + assert post_data == {"method": "upsert", "items": expected_items} + return {"status": "success"} - mock_requests_post.return_value.status_code = 200 - mock_requests_post.return_value.json.return_value = {"status": "success"} - mock_requests_post.reason = "OK" + post_bulk_items_mock = requests_mock.post( + re.compile(r"http://fake-stac-api/collections/.*/bulk_items"), json=post_bulk_items + ) job_db_exists.persist(bulk_dataframe) - - mock_requests_post.assert_called_once() - - mock_requests_post.assert_called_with( - url=f"http://fake-stac-api/collections/{job_db_exists.collection_id}/bulk_items", - auth=None, - json={ - "method": "upsert", - "items": {item.id: item.to_dict() for item in items}, - }, - ) + assert post_bulk_items_mock.called @patch("requests.post") def test_persist_multiple_chunks(self, mock_requests_post, bulk_dataframe, job_db_exists): @@ -547,27 +615,59 @@ def _get_search(self, request, context): def test_run_jobs_basic(tmp_path, dummy_backend_foo, requests_mock, sleep_mock): - job_manager = MultiBackendJobManager(root_dir=tmp_path, poll_sleep=2) - job_manager.add_backend("foo", connection=dummy_backend_foo.connection) - stac_api_url = "http://stacapi.test" dummy_stac_api = DummyStacApi(root_url=stac_api_url, requests_mock=requests_mock) + # Initialize job db job_db = STACAPIJobDatabase(collection_id="collection-123", stac_root_url=stac_api_url) df = pd.DataFrame( - { - "item_id": ["item-2024", "item-2025"], - "year": [2024, 2025], - } + {"year": [2024, 2025]}, + index=["item-2024", "item-2025"], ) job_db.initialize_from_df(df=df) + assert dummy_stac_api.items == { + "collection-123": { + "item-2024": dirty_equals.IsPartialDict( + { + "type": "Feature", + "id": "item-2024", + "properties": dirty_equals.IsPartialDict( + { + "year": 2024, + "id": None, + "status": "not_started", + "backend_name": None, + } + ), + } + ), + "item-2025": dirty_equals.IsPartialDict( + { + "type": "Feature", + "id": "item-2025", + "properties": dirty_equals.IsPartialDict( + { + "year": 2025, + "id": None, + "status": "not_started", + "backend_name": None, + } + ), + } + ), + } + } + # Set up job manager + job_manager = MultiBackendJobManager(root_dir=tmp_path, poll_sleep=2) + job_manager.add_backend("foo", connection=dummy_backend_foo.connection) + + # Run job manager loop def create_job(row, connection, **kwargs): year = int(row["year"]) pg = {"dummy1": {"process_id": "dummy", "arguments": {"year": year}, "result": True}} job = connection.create_job(pg) return job - run_stats = job_manager.run_jobs(job_db=job_db, start_job=create_job) assert run_stats == dirty_equals.IsPartialDict(