From 33c74ff4ad1b49089bdd78932dd14e184f55070a Mon Sep 17 00:00:00 2001 From: Andrea Orlandi Date: Thu, 30 Oct 2025 10:16:42 +0100 Subject: [PATCH 1/2] [forge] Improve get_raster and get_detector docstring and test coverage --- tests/test_forge_client.py | 48 +++++++++++++++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/tests/test_forge_client.py b/tests/test_forge_client.py index a88e228..ae7d527 100644 --- a/tests/test_forge_client.py +++ b/tests/test_forge_client.py @@ -675,9 +675,51 @@ def test_get_raster(monkeypatch): """Test the raster information""" RASTER_ID = "foobar" client = _client(monkeypatch) - _add_api_response(detector_api_url("rasters/%s/" % RASTER_ID), json={}, status=201) - client.get_raster(RASTER_ID) - assert len(responses.calls) == 1 + data = { + "id": "123e4567-e89b-12d3-a456-426655440000", + "name": "my first orthomosaic", + "captured_at": "2013-01-29T12:34:56.000000Z", + "folder_id": "63530b2e-a315-424a-b4e4-75997a877475", + "identity_key": "cd4e4567-c66b-16r2-ba16-126655440088", + "type": "wms", + "status": "string", + "multispectral": True, + "tms_url": "https://tms.picterra.ch/67u6hy7695567976/{z}/{x}/{y}", + "tiles_max_zoom": 12, + "tiles_min_zoom": 12, + "footprint": {"type": "Polygon", "coordinates": [[]], "bbox": [0, 0, 0, 0]}, + } + _add_api_response(detector_api_url("rasters/%s/" % RASTER_ID), json=data) + assert client.get_raster(RASTER_ID) == data and len(responses.calls) == 1 + + +@responses.activate +def test_get_detector(monkeypatch): + """Test the detector information""" + DETECTOR_ID = "foobar" + client = _client(monkeypatch) + data = { + "id": "123e4567-e89b-12d3-a456-426655440000", + "name": "my first detector", + "is_runnable": True, + "classes": [ + { + "name": "Feeder ships", + "color": "#0000FF", + "id": "03bff680-6fb0-45b3-b490-b513beb0d512", + } + ], + "configuration": { + "detection_type": "count", + "output_type": "polygon", + "training_steps": 1200, + "backbone": "resnet18", + "tile_size": 256, + "background_sample_ratio": 1, + }, + } + _add_api_response(detector_api_url("detectors/%s/" % DETECTOR_ID), json=data) + assert client.get_detector(DETECTOR_ID) == data and len(responses.calls) == 1 @responses.activate From 5c54cb7ee6e60d44807c31f84e36a43a080349cb Mon Sep 17 00:00:00 2001 From: Andrea Orlandi Date: Thu, 30 Oct 2025 10:20:46 +0100 Subject: [PATCH 2/2] [style] Run and enforce black formatting --- scripts/lint.sh | 2 + setup.py | 4 +- src/picterra/__init__.py | 11 +++- src/picterra/base_client.py | 50 +++++++++------ src/picterra/forge_client.py | 14 ++--- src/picterra/tracer_client.py | 115 ++++++++++++++++++++++++---------- 6 files changed, 133 insertions(+), 63 deletions(-) diff --git a/scripts/lint.sh b/scripts/lint.sh index e1a562e..f773a94 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -4,3 +4,5 @@ printf "==== Running flake8\n" python -m flake8 printf "==== Running mypy\n" mypy src examples +printf "==== Running black\n" +black --check --diff src diff --git a/setup.py b/setup.py index 3a02a37..9b05758 100644 --- a/setup.py +++ b/setup.py @@ -10,9 +10,9 @@ long_description = (this_directory / "README.md").read_text() if sys.version_info >= (3, 8): - lint_deps = ["flake8", "mypy==1.8.0", "types-requests"] + lint_deps = ["flake8", "mypy==1.8.0", "types-requests", "black"] else: - lint_deps = ["flake8", "mypy==1.4.1", "types-requests"] + lint_deps = ["flake8", "mypy==1.4.1", "types-requests", "black"] test_deps = ["pytest==7.1", "responses==0.22", "httpretty"] setup( diff --git a/src/picterra/__init__.py b/src/picterra/__init__.py index 6d23647..cb08483 100644 --- a/src/picterra/__init__.py +++ b/src/picterra/__init__.py @@ -12,4 +12,13 @@ from .tracer_client import TracerClient from .tracer_client import TracerClient as PlotsAnalysisPlatformClient -__all__ = ["APIClient", "DetectorPlatformClient", "ForgeClient", "PlotsAnalysisPlatformClient", "TracerClient", "nongeo_result_to_pixel", "APIError", "ResultsPage"] +__all__ = [ + "APIClient", + "DetectorPlatformClient", + "ForgeClient", + "PlotsAnalysisPlatformClient", + "TracerClient", + "nongeo_result_to_pixel", + "APIError", + "ResultsPage", +] diff --git a/src/picterra/base_client.py b/src/picterra/base_client.py index 64b5976..cf0df07 100644 --- a/src/picterra/base_client.py +++ b/src/picterra/base_client.py @@ -28,28 +28,35 @@ # allow injecting an non-existing package name to test the fallback behavior # of _get_ua in tests (see test_headers_user_agent_version__fallback) def _get_distr_name(): - return 'picterra' + return "picterra" def _get_ua(): import platform + pkg = _get_distr_name() if sys.version_info >= (3, 8): from importlib.metadata import PackageNotFoundError, version + try: ver = version(pkg) except PackageNotFoundError: - ver = 'no_version' + ver = "no_version" else: import pkg_resources # type: ignore[import] + try: ver = pkg_resources.require(pkg)[0].version except pkg_resources.DistributionNotFound: - ver = 'no_version' + ver = "no_version" o_s = " ".join([os.name, platform.system(), platform.release()]) v_info = sys.version_info py = "Python " + str(v_info.major) + "." + str(v_info.minor) - return "picterra-python/%s (%s %s)" % (ver, py, o_s,) + return "picterra-python/%s (%s %s)" % ( + ver, + py, + o_s, + ) class APIError(Exception): @@ -67,9 +74,7 @@ def __init__(self, *args, **kwargs): self.timeout = kwargs.pop("timeout") super().__init__(*args, **kwargs) self.headers.update( - { - "User-Agent": "%s - %s" % (_get_ua(), self.headers["User-Agent"]) - } + {"User-Agent": "%s - %s" % (_get_ua(), self.headers["User-Agent"])} ) def request(self, *args, **kwargs): @@ -109,20 +114,22 @@ def _upload_file_to_blobstore(upload_url: str, filename: str): def multipolygon_to_polygon_feature_collection(mp): return { "type": "FeatureCollection", - "features": [{ - "type": "Feature", - "properties": {}, - "geometry": { - "type": "Polygon", - "coordinates": p + "features": [ + { + "type": "Feature", + "properties": {}, + "geometry": {"type": "Polygon", "coordinates": p}, } - } for p in mp["coordinates"]] + for p in mp["coordinates"] + ], } def _check_resp_is_ok(resp: requests.Response, msg: str) -> None: if not resp.ok: - raise APIError("%s (url %s, status %d): %s" % (msg, resp.url, resp.status_code, resp.text)) + raise APIError( + "%s (url %s, status %d): %s" % (msg, resp.url, resp.status_code, resp.text) + ) T = TypeVar("T") @@ -148,6 +155,7 @@ class ResultsPage(Generic[T]): You can also get a specific page passing the page number to the ``list_XX`` function """ + _fetch: Callable[[str], requests.Response] _next_url: str | None _prev_url: str | None @@ -208,7 +216,7 @@ def __init__(self): self.api_key = api_key def __call__(self, r): - r.headers['X-Api-Key'] = self.api_key + r.headers["X-Api-Key"] = self.api_key return r @@ -220,7 +228,11 @@ class BaseAPIClient: """ def __init__( - self, api_url: str, timeout: int = 30, max_retries: int = 3, backoff_factor: int = 10 + self, + api_url: str, + timeout: int = 30, + max_retries: int = 3, + backoff_factor: int = 10, ): """ Args: @@ -231,9 +243,7 @@ def __init__( retry_strategy comment below backoff_factor: factor used nin the backoff algorithm; see retry_strategy comment below """ - base_url = os.environ.get( - "PICTERRA_BASE_URL", "https://app.picterra.ch/" - ) + base_url = os.environ.get("PICTERRA_BASE_URL", "https://app.picterra.ch/") logger.info( "Using base_url=%s, api_url=%s; %d max retries, %d backoff and %s timeout.", base_url, diff --git a/src/picterra/forge_client.py b/src/picterra/forge_client.py index fe9569a..93ed14c 100644 --- a/src/picterra/forge_client.py +++ b/src/picterra/forge_client.py @@ -4,6 +4,7 @@ Note that Forge is separate from Tracer and so an API key which is valid for one may encounter permissions issues if used with the other """ + from __future__ import annotations import json @@ -602,16 +603,14 @@ def download_result_to_feature_collection(self, operation_id: str, filename: str for class_result in results["by_class"]: with tempfile.NamedTemporaryFile() as f: self.download_vector_layer_to_file( - class_result["result"]["vector_layer_id"], f.name) + class_result["result"]["vector_layer_id"], f.name + ) with open(f.name) as fr: vl_polygon_fc: FeatureCollection = json.load(fr) mp_feature: Feature = { "type": "Feature", "properties": {"class_name": class_result["class"]["name"]}, - "geometry": { - "type": "MultiPolygon", - "coordinates": [] - } + "geometry": {"type": "MultiPolygon", "coordinates": []}, } for poly_feat in vl_polygon_fc["features"]: mp_feature["geometry"]["coordinates"].append( @@ -845,7 +844,9 @@ def download_vector_layer_to_file(self, vector_layer_id: str, filename: str): vector_layer_id: The id of the vector layer to download filename: existing file to save the vector layer in, as a feature collection of polygons """ - resp = self.sess.post(self._full_url("vector_layers/%s/download/" % vector_layer_id)) + resp = self.sess.post( + self._full_url("vector_layers/%s/download/" % vector_layer_id) + ) if not resp.ok: raise APIError(resp.text) op = self._wait_until_operation_completes(resp.json()) @@ -998,7 +999,6 @@ def list_detector_rasters( url = "detectors/%s/training_rasters/" % detector_id return self._return_results_page(url, params) - def create_folder(self, name: str) -> str: """ Creates a new folder with the given name diff --git a/src/picterra/tracer_client.py b/src/picterra/tracer_client.py index 2e159b3..28894a0 100644 --- a/src/picterra/tracer_client.py +++ b/src/picterra/tracer_client.py @@ -4,6 +4,7 @@ Note that Tracer is separate from Forge and so an API key which is valid for one may encounter permissions issues if used with the other """ + import datetime import json import os.path @@ -95,7 +96,7 @@ def create_plots_group( plots_group_name: str, methodology_id: str, plots_geometries_filenames: List[str], - columns: Optional[Dict[str, str]] = None + columns: Optional[Dict[str, str]] = None, ) -> str: """ Creates a new plots group. @@ -112,15 +113,22 @@ def create_plots_group( data = { "name": plots_group_name, "methodology_id": methodology_id, - "custom_columns_values": columns or {} + "custom_columns_values": columns or {}, } resp = self.sess.post(self._full_url("plots_groups/"), json=data) _check_resp_is_ok(resp, "Failure starting plots group commit") op_result = self._wait_until_operation_completes(resp.json())["results"] - self.update_plots_group_plots(op_result["plots_group_id"], plots_geometries_filenames) + self.update_plots_group_plots( + op_result["plots_group_id"], plots_geometries_filenames + ) return op_result["plots_group_id"] - def update_plots_group_plots(self, plots_group_id: str, plots_geometries_filenames: List[str], delete_existing_plots: bool = False): + def update_plots_group_plots( + self, + plots_group_id: str, + plots_geometries_filenames: List[str], + delete_existing_plots: bool = False, + ): """ Updates the geometries of a given plots group @@ -139,13 +147,19 @@ def update_plots_group_plots(self, plots_group_id: str, plots_geometries_filenam with open(filename, "rb") as fh: resp = requests.put(upload_url, data=fh.read()) _check_resp_is_ok(resp, "Failure uploading plots file for group") - files.append({"filename": os.path.basename(filename), "upload_id": upload_id}) + files.append( + {"filename": os.path.basename(filename), "upload_id": upload_id} + ) data = {"files": files, "overwrite": delete_existing_plots} - resp = self.sess.post(self._full_url(f"plots_groups/{plots_group_id}/upload/commit/"), json=data) + resp = self.sess.post( + self._full_url(f"plots_groups/{plots_group_id}/upload/commit/"), json=data + ) _check_resp_is_ok(resp, "Failure starting plots group update:") return self._wait_until_operation_completes(resp.json()) - def download_plots_group_to_file(self, plots_group_id: str, format: Literal["excel", "geojson"], filename: str) -> None: + def download_plots_group_to_file( + self, plots_group_id: str, format: Literal["excel", "geojson"], filename: str + ) -> None: """ Downloads a plots group to a local file @@ -157,7 +171,9 @@ def download_plots_group_to_file(self, plots_group_id: str, format: Literal["exc APIError: There was an error while trying to download the plots group id """ data = {"format": format} - resp = self.sess.post(self._full_url("plots_groups/%s/export/" % plots_group_id), json=data) + resp = self.sess.post( + self._full_url("plots_groups/%s/export/" % plots_group_id), json=data + ) _check_resp_is_ok(resp, "Failure starting plots group download") op = self._wait_until_operation_completes(resp.json()) _download_to_file(op["results"]["download_url"], filename) @@ -167,7 +183,7 @@ def list_plots_groups( search: Optional[str] = None, page_number: Optional[int] = None, include_archived: bool = False, - methodology: Optional[str] = None + methodology: Optional[str] = None, ) -> ResultsPage: """ List all the plots group the user can access, see `ResultsPage` @@ -202,7 +218,7 @@ def analyze_plots_precheck( plots_analysis_name: str, plot_ids: List[str], date_from: datetime.date, - date_to: datetime.date + date_to: datetime.date, ) -> dict: """ Check the analysis for a given date over the plot ids of the specified plot group has no errors @@ -226,9 +242,12 @@ def analyze_plots_precheck( "analysis_name": plots_analysis_name, "upload_id": upload_id, "date_from": date_from.isoformat(), - "date_to": date_to.isoformat() + "date_to": date_to.isoformat(), } - resp = self.sess.post(self._full_url(f"plots_groups/{plots_group_id}/analysis/precheck/"), json=data) + resp = self.sess.post( + self._full_url(f"plots_groups/{plots_group_id}/analysis/precheck/"), + json=data, + ) _check_resp_is_ok(resp, "Failure starting analysis precheck") op_result = self._wait_until_operation_completes(resp.json()) url = op_result["results"]["precheck_data_url"] @@ -240,7 +259,7 @@ def analyze_plots( plots_analysis_name: str, plot_ids: List[str], date_from: datetime.date, - date_to: datetime.date + date_to: datetime.date, ) -> str: """ Runs the analysis for a given date over the plot ids of the specified plot group, @@ -263,9 +282,11 @@ def analyze_plots( "analysis_name": plots_analysis_name, "upload_id": upload_id, "date_from": date_from.isoformat(), - "date_to": date_to.isoformat() + "date_to": date_to.isoformat(), } - resp = self.sess.post(self._full_url(f"plots_groups/{plots_group_id}/analysis/"), json=data) + resp = self.sess.post( + self._full_url(f"plots_groups/{plots_group_id}/analysis/"), json=data + ) _check_resp_is_ok(resp, "Couldn't start analysis") op_result = self._wait_until_operation_completes(resp.json()) analysis_id = op_result["results"]["analysis_id"] @@ -300,7 +321,9 @@ def list_plots_analyses( data["search"] = search.strip() if page_number is not None: data["page_number"] = int(page_number) - return self._return_results_page(f"plots_groups/{plots_group_id}/analysis/", data) + return self._return_results_page( + f"plots_groups/{plots_group_id}/analysis/", data + ) def list_plots_analysis_reports( self, @@ -331,7 +354,10 @@ def list_plots_analysis_reports( See https://app.picterra.ch/public/apidocs/plots_analysis/v1/#tag/reports/operation/getReportsList """ # noqa[E501] if plots_group_id is not None: - warnings.warn("Passing plots_group_id is not needed anymore, remove it", DeprecationWarning) + warnings.warn( + "Passing plots_group_id is not needed anymore, remove it", + DeprecationWarning, + ) params: Dict[str, Any] = {} if page_number is not None: @@ -366,14 +392,17 @@ def list_plots_analysis_report_types( See https://app.picterra.ch/public/apidocs/plots_analysis/v1/#tag/reports/operation/getReportTypesForAnalysis """ # noqa[E501] if plots_group_id is not None: - warnings.warn("Passing plots_group_id is not needed anymore, remove it", DeprecationWarning) + warnings.warn( + "Passing plots_group_id is not needed anymore, remove it", + DeprecationWarning, + ) params: Dict[str, Any] = {} if search is not None: params["search"] = search.strip() resp = self.sess.get( self._full_url(f"plots_analyses/{plots_analysis_id}/reports/types/"), - params=params + params=params, ) _check_resp_is_ok(resp, "Couldn't list report types") return resp.json() @@ -386,7 +415,7 @@ def create_plots_analysis_report_precheck( report_type: str, plots_group_id: Optional[str] = None, *, - metadata: Optional[dict] = None + metadata: Optional[dict] = None, ) -> Dict[str, Any]: """ Check creation of a report with the given parameters is ok @@ -407,17 +436,20 @@ def create_plots_analysis_report_precheck( dict: the precheck data """ if plots_group_id is not None: - warnings.warn("Passing plots_group_id is not needed anymore, remove it", DeprecationWarning) + warnings.warn( + "Passing plots_group_id is not needed anymore, remove it", + DeprecationWarning, + ) upload_id = self._upload_plot_ids(plot_ids) data = { "name": report_name, "upload_id": upload_id, "report_type": report_type, - "metadata": metadata if metadata is not None else {} + "metadata": metadata if metadata is not None else {}, } resp = self.sess.post( self._full_url(f"plots_analyses/{plots_analysis_id}/reports/precheck/"), - json=data + json=data, ) _check_resp_is_ok(resp, "Failure starting precheck") self._wait_until_operation_completes(resp.json()) @@ -431,7 +463,7 @@ def create_plots_analysis_report( report_type: str, plots_group_id: Optional[str] = None, *, - metadata: Optional[dict] = None + metadata: Optional[dict] = None, ) -> str: """ Creates a report @@ -450,17 +482,19 @@ def create_plots_analysis_report( str: the id of the new report """ if plots_group_id is not None: - warnings.warn("Passing plots_group_id is not needed anymore, remove it", DeprecationWarning) + warnings.warn( + "Passing plots_group_id is not needed anymore, remove it", + DeprecationWarning, + ) upload_id = self._upload_plot_ids(plot_ids) data = { "name": report_name, "upload_id": upload_id, "report_type": report_type, - "metadata": metadata if metadata is not None else {} + "metadata": metadata if metadata is not None else {}, } resp = self.sess.post( - self._full_url(f"plots_analyses/{plots_analysis_id}/reports/"), - json=data + self._full_url(f"plots_analyses/{plots_analysis_id}/reports/"), json=data ) _check_resp_is_ok(resp, "Failure starting analysis precheck") op_result = self._wait_until_operation_completes(resp.json()) @@ -484,7 +518,9 @@ def get_plots_group(self, plots_group_id: str) -> dict: _check_resp_is_ok(resp, "Failed to get plots group") return resp.json() - def get_plots_analysis(self, plots_analysis_id: str, plots_group_id: Optional[str] = None) -> Dict[str, Any]: + def get_plots_analysis( + self, plots_analysis_id: str, plots_group_id: Optional[str] = None + ) -> Dict[str, Any]: """ Get plots analysis information @@ -498,13 +534,21 @@ def get_plots_analysis(self, plots_analysis_id: str, plots_group_id: Optional[st dict: see https://app.picterra.ch/public/apidocs/plots_analysis/v1/#tag/analysis/operation/getAnalysis """ if plots_group_id is not None: - warnings.warn("Passing plots_group_id is not needed anymore, remove it", DeprecationWarning) + warnings.warn( + "Passing plots_group_id is not needed anymore, remove it", + DeprecationWarning, + ) resp = self.sess.get(self._full_url(f"plots_analyses/{plots_analysis_id}/")) _check_resp_is_ok(resp, "Failed to get plots analysis") return resp.json() - def get_plots_analysis_report(self, plots_analysis_report_id: str, plots_group_id: Optional[str] = None, plots_analysis_id: Optional[str] = None) -> Dict[str, Any]: + def get_plots_analysis_report( + self, + plots_analysis_report_id: str, + plots_group_id: Optional[str] = None, + plots_analysis_id: Optional[str] = None, + ) -> Dict[str, Any]: """ Get plots analysis report information @@ -522,8 +566,13 @@ def get_plots_analysis_report(self, plots_analysis_report_id: str, plots_group_i dict: see https://app.picterra.ch/public/apidocs/plots_analysis/v1/#tag/reports/operation/getReportForAnalysis """ if plots_group_id is not None or plots_analysis_id is not None: - warnings.warn("Passing plots_group_id/plots_analysis_id is not needed anymore, remove it", DeprecationWarning) + warnings.warn( + "Passing plots_group_id/plots_analysis_id is not needed anymore, remove it", + DeprecationWarning, + ) - resp = self.sess.get(self._full_url(f"plots_analysis_reports/{plots_analysis_report_id}/")) + resp = self.sess.get( + self._full_url(f"plots_analysis_reports/{plots_analysis_report_id}/") + ) _check_resp_is_ok(resp, "Failed to get plots analysis report") return resp.json()