diff --git a/README.md b/README.md index 8b8c072..1a89d49 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,14 @@ secrets managements, etc). Some of the tools available in the Guardian Connector Scripts Hub are: -* Connector scripts to ingest data from data collection tools such as KoboToolbox, ODK, CoMapeo, and Locus Map, +* Connector scripts to ingest data from data collection tools such as KoboToolbox, ODK, CoMapeo, ArcGIS, and Locus Map, and store this data (tabular and media attachments) in a data lake. * A flow to download and store GeoJSON and GeoTIFF change detection alerts, post these to a CoMapeo Archive Server API, and send a message to WhatsApp recipients via Twilio. * Scripts to export data from a database into a specific format (e.g., GeoJSON). ![Available scripts, flows, and apps in gc-scripts-hub](gc-scripts-hub.jpg) -_A Windmill Workspace populated with the tools in this repository._ +_A Windmill Workspace populated with some of the tools in this repository._ ## Deploying the code to a Windmill workspace diff --git a/c_arcgis_account.resource-type.json b/c_arcgis_account.resource-type.json new file mode 100644 index 0000000..0e3892e --- /dev/null +++ b/c_arcgis_account.resource-type.json @@ -0,0 +1,25 @@ +{ + "type": "object", + "order": [ + "username", + "password" + ], + "$schema": "https://json-schema.org/draft/2020-12/schema", + "required": [ + "server_url" + ], + "properties": { + "username": { + "type": "string", + "default": "", + "nullable": false, + "description": "The username of your ArcGIS account" + }, + "password": { + "type": "string", + "default": "", + "nullable": false, + "description": "The password of your ArcGIS account" + } + } +} \ No newline at end of file diff --git a/f/connectors/arcgis/README.md b/f/connectors/arcgis/README.md new file mode 100644 index 0000000..1d7db66 --- /dev/null +++ b/f/connectors/arcgis/README.md @@ -0,0 +1,13 @@ +# `arcgis_feature_layer`: Download Feature Layer from ArcGIS REST API + +This script fetches the contents of an ArcGIS feature layer and stores it in a PostgreSQL database. Additionally, it downloads any attachments (e.g. from Survey123) and saves them to a specified directory. + +Usage of this script requires you to have an ArcGIS account, in order to generate a token. + +The feature layer URL can be found on the item details page of your layer on ArcGIS Online: + +![Screenshot of a feature layer item page](arcgis.jpg) + +This script uses the [ArcGIS REST API Query Feature Service / Layer](https://developers.arcgis.com/rest/services-reference/enterprise/query-feature-service-layer/) endpoint. + +Note: we have opted not to use the [ArcGIS API for Python](https://developers.arcgis.com/python/latest/) library because it requires installing `libkrb5-dev` as a system-level dependency. Workers in Windmill can [preinstall binaries](https://www.windmill.dev/docs/advanced/preinstall_binaries), but it requires modifying the Windmill `docker-compose.yml`, which is too heavy-handed an approach for this simple fetch script. \ No newline at end of file diff --git a/f/connectors/arcgis/arcgis.jpg b/f/connectors/arcgis/arcgis.jpg new file mode 100644 index 0000000..830e44e Binary files /dev/null and b/f/connectors/arcgis/arcgis.jpg differ diff --git a/f/connectors/arcgis/arcgis_feature_layer.py b/f/connectors/arcgis/arcgis_feature_layer.py new file mode 100644 index 0000000..d389e8b --- /dev/null +++ b/f/connectors/arcgis/arcgis_feature_layer.py @@ -0,0 +1,274 @@ +# requirements: +# psycopg2-binary +# requests~=2.32 + +import json +import logging +import os +from pathlib import Path + +import requests + +from f.common_logic.db_connection import postgresql +from f.connectors.geojson.geojson_to_postgres import main as save_geojson_to_postgres + +# type names that refer to Windmill Resources +c_arcgis_account = dict + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def main( + arcgis_account: c_arcgis_account, + feature_layer_url: str, + db: postgresql, + db_table_name: str, + attachment_root: str = "/persistent-storage/datalake", +): + storage_path = Path(attachment_root) / db_table_name + + arcgis_token = get_arcgis_token(arcgis_account) + + features = get_features_from_arcgis(feature_layer_url, arcgis_token) + + features_with_attachments = download_feature_attachments( + features, feature_layer_url, arcgis_token, storage_path + ) + + features_with_global_ids = set_global_id(features_with_attachments) + + save_geojson_file_to_disk(features_with_global_ids, storage_path) + + # At this point, the ArcGIS data is GeoJSON-compliant, and we don't need anything + # from the REST API anymore. The data can therefore be handled further using the + # existing GeoJSON connector. + save_geojson_to_postgres( + db, + db_table_name, + str(storage_path / "data.geojson"), + storage_path, + False, # to not delete the GeoJSON file after its contents are written to the database. + # Users might like to have access to the GeoJSON file directly, in addition to the data + # in the database. + ) + + +def get_arcgis_token(arcgis_account: c_arcgis_account): + """ + Generate an ArcGIS token using the provided account credentials. + + Parameters + ---------- + arcgis_account : dict + A dictionary containing the ArcGIS account credentials with keys "username" and "password". + + Returns + ------- + str + The generated ArcGIS token. + """ + arcgis_username = arcgis_account["username"] + arcgis_password = arcgis_account["password"] + + # According to the ArcGIS REST API documentation, you can set `client to `requestip` + # to generate a token based on the IP address of the request. However, this does not + # seem to work well, neither in local development nor in production. Therefore, we use + # `referer` as the client type, and use the base URL of the Windmill app as the referer. + # https://developers.arcgis.com/rest/services-reference/enterprise/generate-token/ + token_response = requests.post( + "https://www.arcgis.com/sharing/rest/generateToken", + data={ + "username": arcgis_username, + "password": arcgis_password, + "client": "referer", + "referer": os.environ.get("WM_BASE_URL"), + "f": "json", + }, + ) + + arcgis_token = token_response.json().get("token") + + return arcgis_token + + +def get_features_from_arcgis(feature_layer_url: str, arcgis_token: str): + """ + Fetch features from an ArcGIS feature layer using the provided token. + + Parameters + ---------- + feature_layer_url : str + The URL of the ArcGIS feature layer. + arcgis_token : str + The ArcGIS token for authentication. + + Returns + ------- + list + A list of features retrieved from the ArcGIS feature layer. + """ + response = requests.get( + f"{feature_layer_url}/0/query", + params={ + "where": "1=1", # get all features + "outFields": "*", # get all fields + "returnGeometry": "true", + "f": "geojson", + "token": arcgis_token, + }, + ) + + if ( + response.status_code != 200 or "error" in response.json() + ): # ArcGIS sometimes returns 200 with an error message e.g. if a token is invalid + try: + error_message = ( + response.json().get("error", {}).get("message", "Unknown error") + ) + except (KeyError, ValueError): + error_message = "Unknown error" + raise ValueError(f"Error fetching features: {error_message}") + + features = response.json().get("features", []) + + logger.info(f"{len(features)} features fetched from the ArcGIS feature layer") + return features + + +def download_feature_attachments( + features: list, feature_layer_url: str, arcgis_token: str, storage_path: str +): + """ + Download attachments for each feature and save them to the specified directory. + + Parameters + ---------- + features : list + A list of features for which attachments need to be downloaded. + feature_layer_url : str + The URL of the ArcGIS feature layer. + arcgis_token : str + The ArcGIS token for authentication. + storage_path : str + The directory where attachments should be saved. + + Returns + ------- + list + The list of features with updated properties including attachment information. + """ + total_downloaded_attachments = 0 + skipped_attachments = 0 + + for feature in features: + object_id = feature["properties"]["objectid"] + + attachments_response = requests.get( + f"{feature_layer_url}/0/{object_id}/attachments", + params={"f": "json", "token": arcgis_token}, + ) + + attachments_response.raise_for_status() + + attachments = attachments_response.json().get("attachmentInfos", []) + + if not attachments: + logger.info(f"No attachments found for object_id {object_id}") + continue + + for attachment in attachments: + attachment_id = attachment["id"] + attachment_name = attachment["name"] + attachment_content_type = attachment["contentType"] + attachment_keywords = attachment["keywords"] + + feature["properties"][f"{attachment_keywords}_filename"] = attachment_name + feature["properties"][f"{attachment_keywords}_content_type"] = ( + attachment_content_type + ) + + attachment_path = Path(storage_path) / "attachments" / attachment_name + + if attachment_path.exists(): + logger.debug( + f"File already exists, skipping download: {attachment_path}" + ) + skipped_attachments += 1 + continue + + attachment_response = requests.get( + f"{feature_layer_url}/0/{object_id}/attachments/{attachment_id}", + params={"f": "json", "token": arcgis_token}, + ) + + attachment_response.raise_for_status() + + attachment_data = attachment_response.content + + attachment_path.parent.mkdir(parents=True, exist_ok=True) + + with open(attachment_path, "wb") as f: + f.write(attachment_data) + + logger.info( + f"Downloaded attachment {attachment_name} (content type: {attachment_content_type})" + ) + + total_downloaded_attachments += 1 + + logger.info(f"Total downloaded attachments: {total_downloaded_attachments}") + logger.info(f"Total skipped attachments: {skipped_attachments}") + return features + + +def set_global_id(features: list): + """ + Set the feature ID of each feature to its global ID (which is a uuid). + ArcGIS uses global IDs to uniquely identify features, but the + feature ID is set to the object ID by default (which is an integer + incremented by 1 for each feature). UUIDs are more reliable for + uniquely identifying features, and using them instead is consistent + with how we store other data in the data warehouse. + https://support.esri.com/en-us/gis-dictionary/globalid + + Parameters + ---------- + features : list + A list of features to update. + + Returns + ------- + list + The list of features with updated feature IDs. + """ + for feature in features: + feature["id"] = feature["properties"]["globalid"] + + return features + + +def save_geojson_file_to_disk( + features: list, + storage_path: str, +): + """ + Save the GeoJSON file to disk. + + Parameters + ---------- + features : list + A list of features to save. + storage_path : str + The directory where the GeoJSON file should be saved. + """ + geojson = {"type": "FeatureCollection", "features": features} + + geojson_filename = Path(storage_path) / "data.geojson" + + geojson_filename.parent.mkdir(parents=True, exist_ok=True) + + with open(geojson_filename, "w") as f: + json.dump(geojson, f) + + logger.info(f"GeoJSON file saved to: {geojson_filename}") diff --git a/f/connectors/arcgis/arcgis_feature_layer.script.lock b/f/connectors/arcgis/arcgis_feature_layer.script.lock new file mode 100644 index 0000000..711c818 --- /dev/null +++ b/f/connectors/arcgis/arcgis_feature_layer.script.lock @@ -0,0 +1,6 @@ +certifi==2024.12.14 +charset-normalizer==3.4.1 +idna==3.10 +requests==2.32.3 +urllib3==2.3.0 +psycopg2-binary==2.9.10 \ No newline at end of file diff --git a/f/connectors/arcgis/arcgis_feature_layer.script.yaml b/f/connectors/arcgis/arcgis_feature_layer.script.yaml new file mode 100644 index 0000000..2da78f5 --- /dev/null +++ b/f/connectors/arcgis/arcgis_feature_layer.script.yaml @@ -0,0 +1,50 @@ +summary: 'ArcGIS: Download Feature Layer' +description: This script fetches the contents of an ArcGIS feature layer and stores it in a PostgreSQL database. +lock: '!inline f/connectors/arcgis/arcgis_feature_layer.script.lock' +concurrency_time_window_s: 0 +kind: script +schema: + $schema: 'https://json-schema.org/draft/2020-12/schema' + type: object + order: + - arcgis_account + - feature_layer_url + - db + - db_table_name + - attachment_root + properties: + arcgis_account: + type: object + description: The name of the ArcGIS account to use for fetching the feature layer. + default: null + format: resource-c_arcgis_account + originalType: string + attachment_root: + type: string + description: >- + A path where ArcGIS attachments (e.g., from Survey123) will be stored. Attachment + files like photo and audio will be stored in the following directory schema: + `{attachment_root}/{db_table_name}/attachments/...` + default: /persistent-storage/datalake + originalType: string + db: + type: object + description: A database connection for storing tabular data. + default: null + format: resource-postgresql + db_table_name: + type: string + description: The name of the database table where the data will be stored. + default: null + originalType: string + pattern: '^.{1,54}$' + feature_layer_url: + type: string + description: The URL of the ArcGIS feature layer to fetch. + default: null + originalType: string + required: + - arcgis_account + - feature_layer_url + - db + - db_table_name \ No newline at end of file diff --git a/f/connectors/arcgis/tests/arcgis_feature_layer_test.py b/f/connectors/arcgis/tests/arcgis_feature_layer_test.py new file mode 100644 index 0000000..24ae965 --- /dev/null +++ b/f/connectors/arcgis/tests/arcgis_feature_layer_test.py @@ -0,0 +1,45 @@ +import psycopg2 + +from f.connectors.arcgis.arcgis_feature_layer import ( + main, +) + + +def test_script_e2e(arcgis_server, pg_database, tmp_path): + asset_storage = tmp_path / "datalake" + + main( + arcgis_server.account, + arcgis_server.feature_layer_url, + pg_database, + "my_arcgis_data", + asset_storage, + ) + + # Attachments are saved to disk + assert ( + asset_storage / "my_arcgis_data" / "attachments" / "springfield_photo.png" + ).exists() + + with psycopg2.connect(**pg_database) as conn: + # Survey responses from arcgis_feature_layer are written to a SQL Table in expected format + with conn.cursor() as cursor: + cursor.execute("SELECT COUNT(*) FROM my_arcgis_data") + assert cursor.fetchone()[0] == 1 + + cursor.execute("SELECT * FROM my_arcgis_data LIMIT 0") + columns = [desc[0] for desc in cursor.description] + + assert "what_is_your_name" in columns + + assert "what_is_the_date_and_time" in columns + + assert "add_a_photo_filename" in columns + + assert "add_an_audio_content_type" in columns + + cursor.execute("SELECT g__type FROM my_arcgis_data LIMIT 1") + assert cursor.fetchone()[0] == "Point" + + cursor.execute("SELECT g__coordinates FROM my_arcgis_data LIMIT 1") + assert cursor.fetchone()[0] == "[-73.965355, 40.782865]" diff --git a/f/connectors/arcgis/tests/assets/server_responses.py b/f/connectors/arcgis/tests/assets/server_responses.py new file mode 100644 index 0000000..8a5e3d7 --- /dev/null +++ b/f/connectors/arcgis/tests/assets/server_responses.py @@ -0,0 +1,62 @@ +def arcgis_token(): + return { + "token": "token_value", + "expires": 1741109789251, + "ssl": True, + } + + +def arcgis_features(): + return { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": 1, + "geometry": { + "type": "Point", + "coordinates": [-73.965355, 40.782865], + }, + "properties": { + "objectid": 1, + "globalid": "12345678-1234-5678-1234-567812345678", + "CreationDate": 1741017108116, + "Creator": "arcgis_account", + "EditDate": 1741017108116, + "Editor": "arcgis_account", + "what_is_your_name": "Community mapper", + "what_is_your_community": "Springfield", + "what_is_your_community_other": None, + "what_is_the_date_and_time": 1741017060000, + "did_you_like_this_survey": 7, + }, + } + ], + } + + +def arcgis_attachments(): + return { + "attachmentInfos": [ + { + "id": 1, + "globalId": "ab12cd34-56ef-78gh-90ij-klmn12345678", + "parentGlobalId": "12345678-1234-5678-1234-567812345678", + "name": "springfield_photo.png", + "contentType": "image/png", + "size": 3632, + "keywords": "add_a_photo", + "exifInfo": None, + }, + { + "id": 2, + "globalId": "mnop5678-qrst-uvwx-yzab-cdef98765432", + "parentGlobalId": "12345678-1234-5678-1234-567812345678", + "name": "springfield_audio.mp4", + "contentType": "audio/webm;codecs=opus", + "size": 920, + "keywords": "add_an_audio", + "exifInfo": None, + }, + ] + } diff --git a/f/connectors/arcgis/tests/assets/springfield_audio.mp4 b/f/connectors/arcgis/tests/assets/springfield_audio.mp4 new file mode 100644 index 0000000..4fed7fa Binary files /dev/null and b/f/connectors/arcgis/tests/assets/springfield_audio.mp4 differ diff --git a/f/connectors/arcgis/tests/assets/springfield_photo.png b/f/connectors/arcgis/tests/assets/springfield_photo.png new file mode 100644 index 0000000..7216c48 Binary files /dev/null and b/f/connectors/arcgis/tests/assets/springfield_photo.png differ diff --git a/f/connectors/arcgis/tests/conftest.py b/f/connectors/arcgis/tests/conftest.py new file mode 100644 index 0000000..ec475c7 --- /dev/null +++ b/f/connectors/arcgis/tests/conftest.py @@ -0,0 +1,77 @@ +import re +from dataclasses import dataclass + +import pytest +import responses +import testing.postgresql + +from f.connectors.arcgis.tests.assets import server_responses + + +@pytest.fixture +def mocked_responses(): + with responses.RequestsMock() as rsps: + yield rsps + + +@pytest.fixture +def arcgis_server(mocked_responses): + """A mock ArcGIS Server that you can use to provide feature layer data""" + + @dataclass + class ArcGISServer: + account: dict + feature_layer_url: str + + username = "my_username" + password = "my_password" + feature_layer_url = "https://services.arcgis.com/abc123/arcgis/rest/services/MyFeatureLayer/FeatureServer" + + mocked_responses.post( + "https://www.arcgis.com/sharing/rest/generateToken", + json=server_responses.arcgis_token(), + status=200, + ) + mocked_responses.get( + f"{feature_layer_url}/0/query", + json=server_responses.arcgis_features(), + status=200, + ) + mocked_responses.get( + f"{feature_layer_url}/0/1/attachments", + json=server_responses.arcgis_attachments(), + status=200, + ) + + mocked_responses.get( + re.compile(rf"{feature_layer_url}/0/1/attachments/1"), + body=open( + "f/connectors/arcgis/tests/assets/springfield_photo.png", "rb" + ).read(), + content_type="image/png", + ) + + mocked_responses.get( + re.compile(rf"{feature_layer_url}/0/1/attachments/2"), + body=open( + "f/connectors/arcgis/tests/assets/springfield_audio.mp4", "rb" + ).read(), + content_type="video/mp4", + ) + + account = dict(username=username, password=password) + + return ArcGISServer( + account, + feature_layer_url, + ) + + +@pytest.fixture +def pg_database(): + """A dsn that may be used to connect to a live (local for test) postgresql server""" + db = testing.postgresql.Postgresql(port=7654) + dsn = db.dsn() + dsn["dbname"] = dsn.pop("database") + yield dsn + db.stop diff --git a/f/connectors/arcgis/tests/requirements-test.txt b/f/connectors/arcgis/tests/requirements-test.txt new file mode 100644 index 0000000..78c8bf3 --- /dev/null +++ b/f/connectors/arcgis/tests/requirements-test.txt @@ -0,0 +1,3 @@ +pytest +responses +testing.postgresql \ No newline at end of file diff --git a/f/connectors/geojson/README.md b/f/connectors/geojson/README.md new file mode 100644 index 0000000..a224bc0 --- /dev/null +++ b/f/connectors/geojson/README.md @@ -0,0 +1,20 @@ +# `geojson_to_postgres`: Import a GeoJSON file into a PostgreSQL table + +This script reads a GeoJSON file and inserts its contents into a PostgreSQL table, flattening all data into TEXT columns. + +### Behavior + +* Each feature's `geometry` object is decomposed into separate TEXT columns, prefixed with `g__`. + + Example: `geometry.type` → `g__type`, `geometry.coordinates` → `g__coordinates` + +* Each properties field is inserted as-is into a column matching the property name. + + Example: `properties.category` → `category` + +* The feature's top-level `id` (if present) is used as the primary key `_id`. + +### Notes +* The data is inserted as flat text fields — no geometry types or JSONB columns are used. +* PostGIS is _not_ used at this stage. This approach may change based on requirements downstream. +* Optionally, the input file is deleted after import. \ No newline at end of file diff --git a/f/connectors/geojson/geojson_to_postgres.py b/f/connectors/geojson/geojson_to_postgres.py new file mode 100644 index 0000000..10a546f --- /dev/null +++ b/f/connectors/geojson/geojson_to_postgres.py @@ -0,0 +1,288 @@ +# requirements: +# psycopg2-binary + +import json +import logging +from pathlib import Path + +from psycopg2 import connect, errors, sql + +from f.common_logic.db_connection import conninfo, postgresql + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def main( + db: postgresql, + db_table_name: str, + geojson_path: str, + attachment_root: str = "/persistent-storage/datalake/", + delete_geojson_file: bool = False, +): + geojson_path = Path(attachment_root) / Path(geojson_path) + transformed_geojson_data = transform_geojson_data(geojson_path) + + db_writer = GeoJSONDbWriter(conninfo(db), db_table_name) + db_writer.handle_output(transformed_geojson_data) + + if delete_geojson_file: + delete_geojson_file(geojson_path) + + +def transform_geojson_data(geojson_path): + """ + Transforms GeoJSON data from a file into a list of dictionaries suitable for database insertion. + + Args: + geojson_path (str or Path): The file path to the GeoJSON file. + + Returns: + list: A list of dictionaries where each dictionary represents a GeoJSON feature with keys: + '_id' for the feature's unique identifier, + 'g__type' for the geometry type, + 'g__coordinates' for the geometry coordinates, + and any additional properties from the feature. + """ + with open(geojson_path, "r") as f: + geojson_data = json.load(f) + + transformed_geojson_data = [] + for feature in geojson_data["features"]: + transformed_feature = { + "_id": feature[ + "id" + ], # Assuming that the GeoJSON feature has unique "id" field that can be used as the primary key + "g__type": feature["geometry"]["type"], + "g__coordinates": feature["geometry"]["coordinates"], + **feature.get("properties", {}), + } + transformed_geojson_data.append(transformed_feature) + return transformed_geojson_data + + +class GeoJSONDbWriter: + """ + Converts GeoJSON spatial data to structured SQL tables. + """ + + def __init__(self, db_connection_string, table_name): + """ + Initializes the GeoJSONIOManager with the provided connection string and form response table to be used. + """ + self.db_connection_string = db_connection_string + self.table_name = table_name + + def _get_conn(self): + """ + Establishes a connection to the PostgreSQL database using the class's configured connection string. + """ + return connect(dsn=self.db_connection_string) + + def _inspect_schema(self, table_name): + """ + Fetches the column names of the given table. + """ + conn = self._get_conn() + cursor = conn.cursor() + cursor.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = %s", + (table_name,), + ) + columns = [row[0] for row in cursor.fetchall()] + cursor.close() + conn.close() + return columns + + def _create_missing_fields(self, table_name, missing_columns): + """ + Generates and executes SQL statements to add missing fields to the table. + """ + table_name = sql.Identifier(table_name) + try: + with self._get_conn() as conn, conn.cursor() as cursor: + query = sql.SQL( + "CREATE TABLE IF NOT EXISTS {table_name} (_id TEXT PRIMARY KEY);" + ).format(table_name=table_name) + cursor.execute(query) + + for sanitized_column in missing_columns: + if sanitized_column == "_id": + continue + try: + query = sql.SQL( + "ALTER TABLE {table_name} ADD COLUMN {colname} TEXT;" + ).format( + table_name=table_name, + colname=sql.Identifier(sanitized_column), + ) + cursor.execute(query) + except errors.DuplicateColumn: + logger.debug( + f"Skipping insert due to DuplicateColumn, this form column has been accounted for already in the past: {sanitized_column}" + ) + continue + except Exception as e: + logger.error( + f"An error occurred while creating missing column: {sanitized_column} for {table_name}: {e}" + ) + raise + finally: + conn.close() + + @staticmethod + def _safe_insert(cursor, table_name, columns, values): + """ + Executes a safe INSERT operation into a PostgreSQL table, ensuring data integrity and preventing SQL injection. + This method also handles conflicts by updating existing records if necessary. + + The function first checks if a row with the same primary key (_id) already exists in the table. If it does, + and the existing row's data matches the new values, the operation is skipped. Otherwise, it performs an + INSERT operation. If a conflict on the primary key occurs, it updates the existing row with the new values. + + Parameters + ---------- + cursor : psycopg2 cursor + The database cursor used to execute SQL queries. + table_name : str + The name of the table where data will be inserted. + columns : list of str + The list of column names corresponding to the values being inserted. + values : list + The list of values to be inserted into the table, aligned with the columns. + + Returns + ------- + tuple + A tuple containing two integers: the count of rows inserted and the count of rows updated. + """ + inserted_count = 0 + updated_count = 0 + + # Check if there is an existing row that is different from the new values + # We are doing this in order to keep track of which rows are actually updated + # (Otherwise all existing rows would be added to updated_count) + id_index = columns.index("_id") + values[id_index] = str(values[id_index]) + select_query = sql.SQL("SELECT {fields} FROM {table} WHERE _id = %s").format( + fields=sql.SQL(", ").join(map(sql.Identifier, columns)), + table=sql.Identifier(table_name), + ) + cursor.execute(select_query, (values[columns.index("_id")],)) + existing_row = cursor.fetchone() + + if existing_row and list(existing_row) == values: + # No changes, skip the update + return inserted_count, updated_count + + query = sql.SQL( + "INSERT INTO {table} ({fields}) VALUES ({placeholders}) " + "ON CONFLICT (_id) DO UPDATE SET {updates} " + # The RETURNING clause is used to determine if the row was inserted or updated. + # xmax is a system column in PostgreSQL that stores the transaction ID of the deleting transaction. + # If xmax is 0, it means the row was newly inserted and not updated. + "RETURNING (xmax = 0) AS inserted" + ).format( + table=sql.Identifier(table_name), + fields=sql.SQL(", ").join(map(sql.Identifier, columns)), + placeholders=sql.SQL(", ").join(sql.Placeholder() for _ in values), + updates=sql.SQL(", ").join( + sql.Composed( + [sql.Identifier(col), sql.SQL(" = EXCLUDED."), sql.Identifier(col)] + ) + for col in columns + if col != "_id" + ), + ) + + cursor.execute(query, values) + result = cursor.fetchone() + if result and result[0]: + inserted_count += 1 + else: + updated_count += 1 + + return inserted_count, updated_count + + def handle_output(self, outputs): + """ + Inserts GeojSON spatial data into the specified PostgreSQL database table. + It checks the database schema and adds any missing fields, then constructs + and executes SQL insert queries to store the data. After processing all data, + it commits the transaction and closes the database connection. + """ + table_name = self.table_name + + conn = self._get_conn() + cursor = conn.cursor() + + existing_fields = self._inspect_schema(table_name) + rows = [] + for entry in outputs: + sanitized_entry = {k: v for k, v in entry.items()} + rows.append(sanitized_entry) + + missing_field_keys = set() + for row in rows: + missing_field_keys.update(set(row.keys()).difference(existing_fields)) + + if missing_field_keys: + self._create_missing_fields(table_name, missing_field_keys) + + logger.info(f"Attempting to write {len(rows)} submissions to the DB.") + + inserted_count = 0 + updated_count = 0 + + for row in rows: + try: + cols, vals = zip(*row.items()) + + # Serialize lists, dict values to JSON text + vals = list(vals) + for i in range(len(vals)): + value = vals[i] + if isinstance(value, list) or isinstance(value, dict): + vals[i] = json.dumps(value) + + result_inserted_count, result_updated_count = self._safe_insert( + cursor, table_name, cols, vals + ) + inserted_count += result_inserted_count + updated_count += result_updated_count + except Exception as e: + logger.error(f"Error inserting data: {e}, {type(e).__name__}") + conn.rollback() + + try: + conn.commit() + except Exception as e: + logger.error(f"Error committing transaction: {e}") + conn.rollback() + + logger.info(f"Total rows inserted: {inserted_count}") + logger.info(f"Total rows updated: {updated_count}") + + cursor.close() + conn.close() + + +def delete_geojson_file( + geojson_path: str, +): + """ + Deletes the GeoJSON file after processing. + + Parameters + ---------- + geojson_path : str + The path to the GeoJSON file to delete. + """ + try: + geojson_path.unlink() + logger.info(f"Deleted GeoJSON file: {geojson_path}") + except FileNotFoundError: + logger.warning(f"GeoJSON file not found: {geojson_path}") + except Exception as e: + logger.error(f"Error deleting GeoJSON file: {e}") + raise diff --git a/f/connectors/geojson/geojson_to_postgres.script.lock b/f/connectors/geojson/geojson_to_postgres.script.lock new file mode 100644 index 0000000..64833c1 --- /dev/null +++ b/f/connectors/geojson/geojson_to_postgres.script.lock @@ -0,0 +1 @@ +psycopg2-binary==2.9.10 \ No newline at end of file diff --git a/f/connectors/geojson/geojson_to_postgres.script.yaml b/f/connectors/geojson/geojson_to_postgres.script.yaml new file mode 100644 index 0000000..0ae567f --- /dev/null +++ b/f/connectors/geojson/geojson_to_postgres.script.yaml @@ -0,0 +1,45 @@ +summary: 'GeoJSON: Upload to Postgres' +description: This script uploads GeoJSON data to a Postgres database. +lock: '!inline f/connectors/geojson/geojson_to_postgres.script.lock' +concurrency_time_window_s: 0 +kind: script +schema: + $schema: 'https://json-schema.org/draft/2020-12/schema' + type: object + order: + - geojson_path + - db + - db_table_name + - delete_geojson_file + - attachment_root + properties: + attachment_root: + type: string + description: >- + A path where to find the GeoJSON file. + default: /persistent-storage/datalake + originalType: string + db: + type: object + description: A database connection for storing tabular data. + default: null + format: resource-postgresql + db_table_name: + type: string + description: The name of the database table where the data will be stored. + default: null + originalType: string + pattern: '^.{1,54}$' + delete_geojson_file: + type: boolean + description: Whether to delete the GeoJSON file file after processing. + default: false + geojson_path: + type: string + description: The path to the GeoJSON file to upload, including the filename. + default: null + originalType: string + required: + - geojson_path + - db + - db_table_name \ No newline at end of file diff --git a/f/connectors/geojson/tests/assets/data.geojson b/f/connectors/geojson/tests/assets/data.geojson new file mode 100644 index 0000000..7d95bdc --- /dev/null +++ b/f/connectors/geojson/tests/assets/data.geojson @@ -0,0 +1,59 @@ +{ + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": "1", + "geometry": { + "type": "Point", + "coordinates": [-105.01621, 39.57422] + }, + "properties": { + "name": "Pine Tree", + "height": 30, + "age": 50, + "species": "Pinus ponderosa" + } + }, + { + "type": "Feature", + "id": "2", + "geometry": { + "type": "LineString", + "coordinates": [ + [-105.01621, 39.57422], + [-105.01621, 39.57423], + [-105.01622, 39.57424] + ] + }, + "properties": { + "name": "River Stream", + "length": 2.5, + "flow_rate": "moderate", + "water_type": "freshwater" + } + }, + { + "type": "Feature", + "id": "3", + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [-105.01621, 39.57422], + [-105.01621, 39.57423], + [-105.01622, 39.57423], + [-105.01622, 39.57422], + [-105.01621, 39.57422] + ] + ] + }, + "properties": { + "name": "Meadow", + "area": 1.2, + "flora": ["wildflowers", "grasses"], + "fauna": ["deer", "rabbits"] + } + } + ] +} diff --git a/f/connectors/geojson/tests/conftest.py b/f/connectors/geojson/tests/conftest.py new file mode 100644 index 0000000..26b3b2e --- /dev/null +++ b/f/connectors/geojson/tests/conftest.py @@ -0,0 +1,12 @@ +import pytest +import testing.postgresql + + +@pytest.fixture +def pg_database(): + """A dsn that may be used to connect to a live (local for test) postgresql server""" + db = testing.postgresql.Postgresql(port=7654) + dsn = db.dsn() + dsn["dbname"] = dsn.pop("database") + yield dsn + db.stop() diff --git a/f/connectors/geojson/tests/geojson_to_postgres_test.py b/f/connectors/geojson/tests/geojson_to_postgres_test.py new file mode 100644 index 0000000..7e094de --- /dev/null +++ b/f/connectors/geojson/tests/geojson_to_postgres_test.py @@ -0,0 +1,53 @@ +import psycopg2 + +from f.connectors.geojson.geojson_to_postgres import main + +geojson_fixture_path = "f/connectors/geojson/tests/assets/" + + +def test_script_e2e(pg_database): + main(pg_database, "my_geojson_data", "data.geojson", geojson_fixture_path, False) + + with psycopg2.connect(**pg_database) as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT COUNT(*) FROM my_geojson_data") + assert cursor.fetchone()[0] == 3 + + cursor.execute( + "SELECT g__type, g__coordinates, name, height, age, species FROM my_geojson_data WHERE _id = '1'" + ) + point_data = cursor.fetchone() + assert point_data == ( + "Point", + "[-105.01621, 39.57422]", + "Pine Tree", + "30", + "50", + "Pinus ponderosa", + ) + + cursor.execute( + "SELECT g__type, g__coordinates, name, length, flow_rate, water_type FROM my_geojson_data WHERE _id = '2'" + ) + line_data = cursor.fetchone() + assert line_data == ( + "LineString", + "[[-105.01621, 39.57422], [-105.01621, 39.57423], [-105.01622, 39.57424]]", + "River Stream", + "2.5", + "moderate", + "freshwater", + ) + + cursor.execute( + "SELECT g__type, g__coordinates, name, area, flora, fauna FROM my_geojson_data WHERE _id = '3'" + ) + polygon_data = cursor.fetchone() + assert polygon_data == ( + "Polygon", + "[[[-105.01621, 39.57422], [-105.01621, 39.57423], [-105.01622, 39.57423], [-105.01622, 39.57422], [-105.01621, 39.57422]]]", + "Meadow", + "1.2", + '["wildflowers", "grasses"]', + '["deer", "rabbits"]', + ) diff --git a/f/connectors/geojson/tests/requirements-test.txt b/f/connectors/geojson/tests/requirements-test.txt new file mode 100644 index 0000000..4a80eaa --- /dev/null +++ b/f/connectors/geojson/tests/requirements-test.txt @@ -0,0 +1,2 @@ +pytest +testing.postgresql \ No newline at end of file diff --git a/f/export/folder.meta.yaml b/f/export/folder.meta.yaml index a8209a9..c3792b3 100644 --- a/f/export/folder.meta.yaml +++ b/f/export/folder.meta.yaml @@ -1 +1,2 @@ +summary: '' display_name: export \ No newline at end of file diff --git a/tox.ini b/tox.ini index 72406a8..8fb985e 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] skipsdist = True -env_list = alerts, comapeo, kobotoolbox_responses, locusmap, odk_responses, postgres_to_geojson +env_list = alerts, arcgis, comapeo, geojson, kobotoolbox_responses, locusmap, odk_responses, postgres_to_geojson [testenv] setenv = @@ -23,6 +23,13 @@ environment = expose = TOX_DOCKER_GCS_PORT=10010/tcp +[testenv:arcgis] +deps = + -r{toxinidir}/f/connectors/arcgis/arcgis_feature_layer.script.lock + -r{toxinidir}/f/connectors/arcgis/tests/requirements-test.txt +commands = + pytest {posargs} f/connectors/arcgis + [testenv:comapeo] deps = -r{toxinidir}/f/connectors/comapeo/comapeo_observations.script.lock @@ -31,6 +38,13 @@ deps = commands = pytest {posargs} f/connectors/comapeo +[testenv:geojson] +deps = + -r{toxinidir}/f/connectors/geojson/geojson_to_postgres.script.lock + -r{toxinidir}/f/connectors/geojson/tests/requirements-test.txt +commands = + pytest {posargs} f/connectors/geojson + [testenv:kobotoolbox_responses] deps = -r{toxinidir}/f/connectors/kobotoolbox/kobotoolbox_responses.script.lock