diff --git a/f/common_logic/db_connection.py b/f/common_logic/db_connection.py deleted file mode 100644 index 991d821..0000000 --- a/f/common_logic/db_connection.py +++ /dev/null @@ -1,9 +0,0 @@ -postgresql = dict - - -def conninfo(db: postgresql): - """Convert a `postgresql` Windmill Resources to psycopg-style connection string""" - # password is optional - password_part = f" password={db['password']}" if "password" in db else "" - conn = "dbname={dbname} user={user} host={host} port={port}".format(**db) - return conn + password_part diff --git a/f/common_logic/db_operations.py b/f/common_logic/db_operations.py new file mode 100644 index 0000000..833c2a0 --- /dev/null +++ b/f/common_logic/db_operations.py @@ -0,0 +1,52 @@ +import logging + +from psycopg2 import Error, connect, sql + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +postgresql = dict + + +def conninfo(db: postgresql): + """Convert a `postgresql` Windmill Resources to psycopg-style connection string""" + # password is optional + password_part = f" password={db['password']}" if "password" in db else "" + conn = "dbname={dbname} user={user} host={host} port={port}".format(**db) + return conn + password_part + + +def fetch_data_from_postgres(db_connection_string: str, table_name: str): + """ + Fetches all data from a specified PostgreSQL table. + + Parameters + ---------- + db_connection_string (str): The connection string for the PostgreSQL database. + table_name (str): The name of the table to fetch data from. + + Returns + ------- + tuple: A tuple containing a list of column names and a list of rows fetched from the table. + """ + + try: + conn = connect(db_connection_string) + cursor = conn.cursor() + cursor.execute( + sql.SQL("SELECT * FROM {table_name}").format( + table_name=sql.Identifier(table_name) + ) + ) + columns = [desc[0] for desc in cursor.description] + rows = cursor.fetchall() + except Error as e: + logger.error(f"Error fetching data from {table_name}: {e}") + raise + finally: + cursor.close() + conn.close() + + logger.info(f"Data fetched from {table_name}") + return columns, rows diff --git a/f/common_logic/db_connection.script.lock b/f/common_logic/db_operations.script.lock similarity index 100% rename from f/common_logic/db_connection.script.lock rename to f/common_logic/db_operations.script.lock diff --git a/f/common_logic/db_operations.script.yaml b/f/common_logic/db_operations.script.yaml new file mode 100644 index 0000000..b8d117f --- /dev/null +++ b/f/common_logic/db_operations.script.yaml @@ -0,0 +1,11 @@ +summary: 'Shared logic for database connections and operations' +description: '' +lock: '!inline f/common_logic/db_operations.script.lock' +has_preprocessor: false +kind: script +no_main_func: true +schema: + $schema: 'https://json-schema.org/draft/2020-12/schema' + type: object + properties: {} + required: [] diff --git a/f/common_logic/save_disk.py b/f/common_logic/save_disk.py new file mode 100644 index 0000000..855ace3 --- /dev/null +++ b/f/common_logic/save_disk.py @@ -0,0 +1,55 @@ +import csv +import json +import logging +from pathlib import Path + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def get_safe_file_path(storage_path: str, db_table_name: str, file_type: str): + """ + Construct a safe file path for storing data, ensuring it remains within the specified storage directory; + otherwise, raises a ValueError. + """ + storage_path = Path(storage_path).resolve() + file_path = (storage_path / f"{db_table_name}.{file_type}").resolve() + + if not file_path.is_relative_to(storage_path): + raise ValueError("Invalid path: possible path traversal detected.") + + return file_path + + +def save_export_file( + data, db_table_name: str, storage_path: str, file_type: str = "json" +): + """ + Saves the provided data to a file in the specified format and storage path. + + Parameters + ---------- + data : list or dict + The data to be saved. For CSV, should be a list of rows including a header. + db_table_name : str + The name of the database table, used to name the output file. + storage_path : str + The directory path where the file will be saved. + file_type : str + The format to save the file as: "json", "geojson", or "csv". + """ + storage_path = Path(storage_path) + storage_path.mkdir(parents=True, exist_ok=True) + file_path = get_safe_file_path(storage_path, db_table_name, file_type) + + if file_type in {"geojson", "json"}: + with file_path.open("w") as f: + json.dump(data, f) + elif file_type == "csv": + with file_path.open("w", newline="") as f: + writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC) + writer.writerows(data) + else: + raise ValueError(f"Unsupported file type: {file_type}") + + logger.info(f"{file_type.upper()} file saved to {file_path}") diff --git a/f/common_logic/save_disk.script.lock b/f/common_logic/save_disk.script.lock new file mode 100644 index 0000000..b53e2c4 --- /dev/null +++ b/f/common_logic/save_disk.script.lock @@ -0,0 +1 @@ +# py311 diff --git a/f/common_logic/db_connection.script.yaml b/f/common_logic/save_disk.script.yaml similarity index 63% rename from f/common_logic/db_connection.script.yaml rename to f/common_logic/save_disk.script.yaml index bef890d..97af2f1 100644 --- a/f/common_logic/db_connection.script.yaml +++ b/f/common_logic/save_disk.script.yaml @@ -1,6 +1,6 @@ -summary: 'Shared logic for database connection' +summary: 'Shared logic for saving files to disk' description: '' -lock: '!inline f/common_logic/db_connection.script.lock' +lock: '!inline f/common_logic/save_disk.script.lock' has_preprocessor: false kind: script no_main_func: true diff --git a/f/connectors/alerts/alerts_gcs.py b/f/connectors/alerts/alerts_gcs.py index d8b0b87..deeb4f1 100644 --- a/f/connectors/alerts/alerts_gcs.py +++ b/f/connectors/alerts/alerts_gcs.py @@ -22,7 +22,7 @@ from PIL import Image from psycopg2 import sql -from f.common_logic.db_connection import conninfo, postgresql +from f.common_logic.db_operations import conninfo, postgresql # type names that refer to Windmill Resources gcp_service_account = dict diff --git a/f/connectors/arcgis/arcgis_feature_layer.py b/f/connectors/arcgis/arcgis_feature_layer.py index d389e8b..2d38539 100644 --- a/f/connectors/arcgis/arcgis_feature_layer.py +++ b/f/connectors/arcgis/arcgis_feature_layer.py @@ -9,7 +9,7 @@ import requests -from f.common_logic.db_connection import postgresql +from f.common_logic.db_operations import postgresql from f.connectors.geojson.geojson_to_postgres import main as save_geojson_to_postgres # type names that refer to Windmill Resources diff --git a/f/connectors/comapeo/comapeo_alerts.py b/f/connectors/comapeo/comapeo_alerts.py index 17ccd7f..e15519f 100644 --- a/f/connectors/comapeo/comapeo_alerts.py +++ b/f/connectors/comapeo/comapeo_alerts.py @@ -10,7 +10,7 @@ import psycopg2 import requests -from f.common_logic.db_connection import conninfo, postgresql +from f.common_logic.db_operations import conninfo, postgresql class comapeo_server(TypedDict): diff --git a/f/connectors/comapeo/comapeo_observations.py b/f/connectors/comapeo/comapeo_observations.py index e2a2bac..c64f0c3 100644 --- a/f/connectors/comapeo/comapeo_observations.py +++ b/f/connectors/comapeo/comapeo_observations.py @@ -13,7 +13,7 @@ import requests from psycopg2 import errors, sql -from f.common_logic.db_connection import conninfo, postgresql +from f.common_logic.db_operations import conninfo, postgresql class comapeo_server(TypedDict): diff --git a/f/connectors/geojson/geojson_to_postgres.py b/f/connectors/geojson/geojson_to_postgres.py index 10a546f..3a00575 100644 --- a/f/connectors/geojson/geojson_to_postgres.py +++ b/f/connectors/geojson/geojson_to_postgres.py @@ -7,7 +7,7 @@ from psycopg2 import connect, errors, sql -from f.common_logic.db_connection import conninfo, postgresql +from f.common_logic.db_operations import conninfo, postgresql logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) diff --git a/f/connectors/kobotoolbox/kobotoolbox_responses.py b/f/connectors/kobotoolbox/kobotoolbox_responses.py index 1949c7c..8da40b9 100644 --- a/f/connectors/kobotoolbox/kobotoolbox_responses.py +++ b/f/connectors/kobotoolbox/kobotoolbox_responses.py @@ -11,7 +11,7 @@ import requests from psycopg2 import errors, sql -from f.common_logic.db_connection import conninfo, postgresql +from f.common_logic.db_operations import conninfo, postgresql from f.common_logic.db_transformations import sanitize # type names that refer to Windmill Resources diff --git a/f/connectors/locusmap/locusmap.py b/f/connectors/locusmap/locusmap.py index 491db90..9f9af42 100644 --- a/f/connectors/locusmap/locusmap.py +++ b/f/connectors/locusmap/locusmap.py @@ -12,7 +12,7 @@ from lxml import etree from psycopg2 import connect, errors, sql -from f.common_logic.db_connection import conninfo, postgresql +from f.common_logic.db_operations import conninfo, postgresql logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) diff --git a/f/connectors/odk/odk_responses.py b/f/connectors/odk/odk_responses.py index 5b0fd14..af39141 100644 --- a/f/connectors/odk/odk_responses.py +++ b/f/connectors/odk/odk_responses.py @@ -13,7 +13,7 @@ from psycopg2 import errors, sql from pyodk.client import Client -from f.common_logic.db_connection import conninfo, postgresql +from f.common_logic.db_operations import conninfo, postgresql from f.common_logic.db_transformations import sanitize # type names that refer to Windmill Resources diff --git a/f/export/postgres_to_file/postgres_to_csv.py b/f/export/postgres_to_file/postgres_to_csv.py new file mode 100644 index 0000000..6e10fda --- /dev/null +++ b/f/export/postgres_to_file/postgres_to_csv.py @@ -0,0 +1,20 @@ +import logging + +from f.common_logic.db_operations import conninfo, fetch_data_from_postgres, postgresql +from f.common_logic.save_disk import save_export_file + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def main( + db: postgresql, + db_table_name: str, + storage_path: str = "/persistent-storage/datalake/export", +): + columns, rows = fetch_data_from_postgres(conninfo(db), db_table_name) + + # Convert rows to lists to ensure compatibility with CSV writer, which requires iterable rows + data = [columns, *map(list, rows)] + + save_export_file(data, db_table_name, storage_path, file_type="csv") diff --git a/f/export/postgres_to_geojson/postgres_to_geojson.script.lock b/f/export/postgres_to_file/postgres_to_csv.script.lock similarity index 100% rename from f/export/postgres_to_geojson/postgres_to_geojson.script.lock rename to f/export/postgres_to_file/postgres_to_csv.script.lock diff --git a/f/export/postgres_to_file/postgres_to_csv.script.yaml b/f/export/postgres_to_file/postgres_to_csv.script.yaml new file mode 100644 index 0000000..0d95165 --- /dev/null +++ b/f/export/postgres_to_file/postgres_to_csv.script.yaml @@ -0,0 +1,36 @@ +summary: 'Export: Postgres to CSV' +description: >- + This script connects to a PostgreSQL database, retrieves all entries from a + specified table, and converts the data into a CSV file. +lock: '!inline f/export/postgres_to_file/postgres_to_csv.script.lock' +concurrency_time_window_s: 0 +kind: script +schema: + $schema: 'https://json-schema.org/draft/2020-12/schema' + type: object + order: + - db + - db_table_name + - storage_path + properties: + db: + type: object + description: A database connection for storing tabular data. + default: null + format: resource-postgresql + db_table_name: + type: string + description: The name of the database table to export to CSV. + default: null + originalType: string + storage_path: + type: string + description: >- + The path to the directory where the CSV file will be stored. The + file will be named after the database table name. + default: /persistent-storage/datalake/exports + originalType: string + required: + - db + - db_table_name + - storage_path diff --git a/f/export/postgres_to_file/postgres_to_geojson.py b/f/export/postgres_to_file/postgres_to_geojson.py new file mode 100644 index 0000000..395b8a6 --- /dev/null +++ b/f/export/postgres_to_file/postgres_to_geojson.py @@ -0,0 +1,76 @@ +import json +import logging + +from f.common_logic.db_operations import conninfo, fetch_data_from_postgres, postgresql +from f.common_logic.save_disk import save_export_file + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def main( + db: postgresql, + db_table_name: str, + storage_path: str = "/persistent-storage/datalake/export", +): + data = fetch_data_from_postgres(conninfo(db), db_table_name) + + feature_collection = format_data_as_geojson(data) + + save_export_file( + feature_collection, db_table_name, storage_path, file_type="geojson" + ) + + +def format_data_as_geojson(data): + """ + Converts data from a PostgreSQL table into a GeoJSON FeatureCollection. + + Parameters + ---------- + data (tuple): A tuple containing columns and rows fetched from the database. + + Returns + ------- + dict: A GeoJSON FeatureCollection with features extracted from the data. + """ + + columns, rows = data + features = [] + for row in rows: + properties = {} + geometry = {} + feature_id = None + + # The expected schema here is that geometry columns are prefixed with "g__" + # If an "_id" column is present, it is used as the feature id + # All other columns are treated as properties + for col, value in zip(columns, row): + if col == "_id": + feature_id = value + elif col == "g__coordinates": + if value: + geometry["coordinates"] = json.loads(value) + else: + geometry["coordinates"] = None + elif col == "g__type": + geometry["type"] = value + else: + properties[col] = value + + feature = { + "type": "Feature", + "id": feature_id, + "properties": properties, + "geometry": geometry, + } + features.append(feature) + + feature_collection = { + "type": "FeatureCollection", + "features": features, + } + + logger.info(f"GeoJSON created with {len(features)} features") + + return feature_collection diff --git a/f/export/postgres_to_file/postgres_to_geojson.script.lock b/f/export/postgres_to_file/postgres_to_geojson.script.lock new file mode 100644 index 0000000..64833c1 --- /dev/null +++ b/f/export/postgres_to_file/postgres_to_geojson.script.lock @@ -0,0 +1 @@ +psycopg2-binary==2.9.10 \ No newline at end of file diff --git a/f/export/postgres_to_geojson/postgres_to_geojson.script.yaml b/f/export/postgres_to_file/postgres_to_geojson.script.yaml similarity index 93% rename from f/export/postgres_to_geojson/postgres_to_geojson.script.yaml rename to f/export/postgres_to_file/postgres_to_geojson.script.yaml index cb10754..3c9d8f3 100644 --- a/f/export/postgres_to_geojson/postgres_to_geojson.script.yaml +++ b/f/export/postgres_to_file/postgres_to_geojson.script.yaml @@ -3,7 +3,7 @@ description: >- This script connects to a PostgreSQL database, retrieves all entries from a specified table, and converts the data into GeoJSON format. The script assumes that the geometry fields are properly formatted for GeoJSON. -lock: '!inline f/export/postgres_to_geojson/postgres_to_geojson.script.lock' +lock: '!inline f/export/postgres_to_file/postgres_to_geojson.script.lock' concurrency_time_window_s: 0 kind: script schema: diff --git a/f/export/postgres_to_geojson/tests/conftest.py b/f/export/postgres_to_file/tests/conftest.py similarity index 100% rename from f/export/postgres_to_geojson/tests/conftest.py rename to f/export/postgres_to_file/tests/conftest.py diff --git a/f/export/postgres_to_file/tests/postgres_to_csv_test.py b/f/export/postgres_to_file/tests/postgres_to_csv_test.py new file mode 100644 index 0000000..24f9edf --- /dev/null +++ b/f/export/postgres_to_file/tests/postgres_to_csv_test.py @@ -0,0 +1,21 @@ +from f.export.postgres_to_file.postgres_to_csv import main + + +def test_script_e2e(pg_database, database_mock_data, tmp_path): + asset_storage = tmp_path / "datalake/export" + + main( + pg_database, + "comapeo_data", + asset_storage, + ) + + with open(asset_storage / "comapeo_data.csv") as f: + data = f.read() + expected_data = """\ +"_id","lat","deleted","created_at","updated_at","project_name","type","g__type","g__coordinates","attachments","notes","project_id","animal_type","lon" +"doc_id_1","-33.8688","False","2024-10-14T20:18:14.206Z","2024-10-14T20:18:14.206Z","Forest Expedition","water","Point","[151.2093, -33.8688]","capybara.jpg","Rapid","forest_expedition","","151.2093" +"doc_id_2","48.8566","False","2024-10-15T21:19:15.207Z","2024-10-15T21:19:15.207Z","River Mapping","animal","Point","[2.3522, 48.8566]","capybara.jpg","Capybara","river_mapping","capybara","2.3522" +"doc_id_3","35.6895","False","2024-10-16T22:20:16.208Z","2024-10-16T22:20:16.208Z","Historical Site","location","Point","[139.6917, 35.6895]","","Former village site","historical","","139.6917" +""" + assert data == expected_data diff --git a/f/export/postgres_to_geojson/tests/postgres_to_geojson_test.py b/f/export/postgres_to_file/tests/postgres_to_geojson_test.py similarity index 92% rename from f/export/postgres_to_geojson/tests/postgres_to_geojson_test.py rename to f/export/postgres_to_file/tests/postgres_to_geojson_test.py index 45d739a..c7042fb 100644 --- a/f/export/postgres_to_geojson/tests/postgres_to_geojson_test.py +++ b/f/export/postgres_to_file/tests/postgres_to_geojson_test.py @@ -1,6 +1,6 @@ import json -from f.export.postgres_to_geojson.postgres_to_geojson import main +from f.export.postgres_to_file.postgres_to_geojson import main def test_script_e2e(pg_database, database_mock_data, tmp_path): diff --git a/f/export/postgres_to_geojson/tests/requirements-test.txt b/f/export/postgres_to_file/tests/requirements-test.txt similarity index 100% rename from f/export/postgres_to_geojson/tests/requirements-test.txt rename to f/export/postgres_to_file/tests/requirements-test.txt diff --git a/f/export/postgres_to_geojson/postgres_to_geojson.py b/f/export/postgres_to_geojson/postgres_to_geojson.py deleted file mode 100644 index 763bb37..0000000 --- a/f/export/postgres_to_geojson/postgres_to_geojson.py +++ /dev/null @@ -1,133 +0,0 @@ -# requirements: -# psycopg2-binary - -import json -import logging -from pathlib import Path - -from psycopg2 import Error, connect, sql - -from f.common_logic.db_connection import conninfo, postgresql - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -def main( - db: postgresql, - db_table_name: str, - storage_path: str = "/persistent-storage/datalake/export", -): - data = fetch_data_from_postgres(conninfo(db), db_table_name) - - feature_collection = format_data_as_geojson(data) - - save_file(feature_collection, db_table_name, storage_path) - - -def fetch_data_from_postgres(db_connection_string, table_name: str): - """ - Fetches all data from a specified PostgreSQL table. - - Parameters - ---------- - db_connection_string (str): The connection string for the PostgreSQL database. - table_name (str): The name of the table to fetch data from. - - Returns - ------- - tuple: A tuple containing a list of column names and a list of rows fetched from the table. - """ - - try: - conn = connect(db_connection_string) - cursor = conn.cursor() - cursor.execute( - sql.SQL("SELECT * FROM {table_name}").format( - table_name=sql.Identifier(table_name) - ) - ) - columns = [desc[0] for desc in cursor.description] - rows = cursor.fetchall() - except Error as e: - logger.error(f"Error fetching data from {table_name}: {e}") - raise - finally: - cursor.close() - conn.close() - - logger.info(f"Data fetched from {table_name}") - return columns, rows - - -def format_data_as_geojson(data): - """ - Converts data from a PostgreSQL table into a GeoJSON FeatureCollection. - - Parameters - ---------- - data (tuple): A tuple containing columns and rows fetched from the database. - - Returns - ------- - dict: A GeoJSON FeatureCollection with features extracted from the data. - """ - - columns, rows = data - features = [] - for row in rows: - properties = {} - geometry = {} - feature_id = None - - # The expected schema here is that geometry columns are prefixed with "g__" - # If an "_id" column is present, it is used as the feature id - # All other columns are treated as properties - for col, value in zip(columns, row): - if col == "_id": - feature_id = value - elif col == "g__coordinates": - if value: - geometry["coordinates"] = json.loads(value) - else: - geometry["coordinates"] = None - elif col == "g__type": - geometry["type"] = value - else: - properties[col] = value - - feature = { - "type": "Feature", - "id": feature_id, - "properties": properties, - "geometry": geometry, - } - features.append(feature) - - feature_collection = { - "type": "FeatureCollection", - "features": features, - } - - logger.info(f"GeoJSON created with {len(features)} features") - - return feature_collection - - -def save_file(data, db_table_name: str, storage_path: str): - """ - Saves the provided data as a GeoJSON file in the specified storage path. - - Parameters - ---------- - data (dict): The data to be saved, formatted as a GeoJSON FeatureCollection. - db_table_name (str): The name of the database table, used to name the output file. - storage_path (str): The directory path where the GeoJSON file will be saved. - """ - - storage_path = Path(storage_path) - storage_path.mkdir(parents=True, exist_ok=True) - geojson_path = storage_path / f"{db_table_name}.geojson" - with geojson_path.open("w") as f: - json.dump(data, f) - logger.info(f"GeoJSON file saved to {geojson_path}") diff --git a/tox.ini b/tox.ini index 8fb985e..2611e0d 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] skipsdist = True -env_list = alerts, arcgis, comapeo, geojson, kobotoolbox_responses, locusmap, odk_responses, postgres_to_geojson +env_list = alerts, arcgis, comapeo, geojson, kobotoolbox_responses, locusmap, odk_responses, postgres_to_file [testenv] setenv = @@ -67,9 +67,11 @@ commands = # Warnings disabled because of a pyODK deprecation issue superfluous to the tests pytest --disable-warnings {posargs} f/connectors/odk -[testenv:postgres_to_geojson] +[testenv:postgres_to_file] deps = - -r{toxinidir}/f/export/postgres_to_geojson/postgres_to_geojson.script.lock - -r{toxinidir}/f/export/postgres_to_geojson/tests/requirements-test.txt + -r{toxinidir}/f/export/postgres_to_file/postgres_to_geojson.script.lock + -r{toxinidir}/f/export/postgres_to_file/postgres_to_csv.script.lock + -r{toxinidir}/f/export/postgres_to_file/tests/requirements-test.txt commands = - pytest {posargs} f/export/postgres_to_geojson \ No newline at end of file + pytest {posargs} f/export/postgres_to_file +