Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions f/common_logic/db_connection.py

This file was deleted.

52 changes: 52 additions & 0 deletions f/common_logic/db_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging

from psycopg2 import Error, connect, sql

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


postgresql = dict


def conninfo(db: postgresql):
"""Convert a `postgresql` Windmill Resources to psycopg-style connection string"""
# password is optional
password_part = f" password={db['password']}" if "password" in db else ""
conn = "dbname={dbname} user={user} host={host} port={port}".format(**db)
return conn + password_part


def fetch_data_from_postgres(db_connection_string: str, table_name: str):
"""
Fetches all data from a specified PostgreSQL table.

Parameters
----------
db_connection_string (str): The connection string for the PostgreSQL database.
table_name (str): The name of the table to fetch data from.

Returns
-------
tuple: A tuple containing a list of column names and a list of rows fetched from the table.
"""

try:
conn = connect(db_connection_string)
cursor = conn.cursor()
cursor.execute(
sql.SQL("SELECT * FROM {table_name}").format(
table_name=sql.Identifier(table_name)
)
)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
except Error as e:
logger.error(f"Error fetching data from {table_name}: {e}")
raise
finally:
cursor.close()
conn.close()

logger.info(f"Data fetched from {table_name}")
return columns, rows
11 changes: 11 additions & 0 deletions f/common_logic/db_operations.script.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
summary: 'Shared logic for database connections and operations'
description: ''
lock: '!inline f/common_logic/db_operations.script.lock'
has_preprocessor: false
kind: script
no_main_func: true
schema:
$schema: 'https://json-schema.org/draft/2020-12/schema'
type: object
properties: {}
required: []
55 changes: 55 additions & 0 deletions f/common_logic/save_disk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import csv
import json
import logging
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def get_safe_file_path(storage_path: str, db_table_name: str, file_type: str):
"""
Construct a safe file path for storing data, ensuring it remains within the specified storage directory;
otherwise, raises a ValueError.
"""
storage_path = Path(storage_path).resolve()
file_path = (storage_path / f"{db_table_name}.{file_type}").resolve()

if not file_path.is_relative_to(storage_path):
raise ValueError("Invalid path: possible path traversal detected.")

return file_path


def save_export_file(
data, db_table_name: str, storage_path: str, file_type: str = "json"
):
"""
Saves the provided data to a file in the specified format and storage path.

Parameters
----------
data : list or dict
The data to be saved. For CSV, should be a list of rows including a header.
db_table_name : str
The name of the database table, used to name the output file.
storage_path : str
The directory path where the file will be saved.
file_type : str
The format to save the file as: "json", "geojson", or "csv".
"""
storage_path = Path(storage_path)
storage_path.mkdir(parents=True, exist_ok=True)
file_path = get_safe_file_path(storage_path, db_table_name, file_type)

if file_type in {"geojson", "json"}:
with file_path.open("w") as f:
json.dump(data, f)
elif file_type == "csv":
with file_path.open("w", newline="") as f:
writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC)
writer.writerows(data)
else:
raise ValueError(f"Unsupported file type: {file_type}")

logger.info(f"{file_type.upper()} file saved to {file_path}")
1 change: 1 addition & 0 deletions f/common_logic/save_disk.script.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# py311
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
summary: 'Shared logic for database connection'
summary: 'Shared logic for saving files to disk'
description: ''
lock: '!inline f/common_logic/db_connection.script.lock'
lock: '!inline f/common_logic/save_disk.script.lock'
has_preprocessor: false
kind: script
no_main_func: true
Expand Down
2 changes: 1 addition & 1 deletion f/connectors/alerts/alerts_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from PIL import Image
from psycopg2 import sql

from f.common_logic.db_connection import conninfo, postgresql
from f.common_logic.db_operations import conninfo, postgresql

# type names that refer to Windmill Resources
gcp_service_account = dict
Expand Down
2 changes: 1 addition & 1 deletion f/connectors/arcgis/arcgis_feature_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import requests

from f.common_logic.db_connection import postgresql
from f.common_logic.db_operations import postgresql
from f.connectors.geojson.geojson_to_postgres import main as save_geojson_to_postgres

# type names that refer to Windmill Resources
Expand Down
2 changes: 1 addition & 1 deletion f/connectors/comapeo/comapeo_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import psycopg2
import requests

from f.common_logic.db_connection import conninfo, postgresql
from f.common_logic.db_operations import conninfo, postgresql


class comapeo_server(TypedDict):
Expand Down
2 changes: 1 addition & 1 deletion f/connectors/comapeo/comapeo_observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import requests
from psycopg2 import errors, sql

from f.common_logic.db_connection import conninfo, postgresql
from f.common_logic.db_operations import conninfo, postgresql


class comapeo_server(TypedDict):
Expand Down
2 changes: 1 addition & 1 deletion f/connectors/geojson/geojson_to_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from psycopg2 import connect, errors, sql

from f.common_logic.db_connection import conninfo, postgresql
from f.common_logic.db_operations import conninfo, postgresql

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion f/connectors/kobotoolbox/kobotoolbox_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import requests
from psycopg2 import errors, sql

from f.common_logic.db_connection import conninfo, postgresql
from f.common_logic.db_operations import conninfo, postgresql
from f.common_logic.db_transformations import sanitize

# type names that refer to Windmill Resources
Expand Down
2 changes: 1 addition & 1 deletion f/connectors/locusmap/locusmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from lxml import etree
from psycopg2 import connect, errors, sql

from f.common_logic.db_connection import conninfo, postgresql
from f.common_logic.db_operations import conninfo, postgresql

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion f/connectors/odk/odk_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from psycopg2 import errors, sql
from pyodk.client import Client

from f.common_logic.db_connection import conninfo, postgresql
from f.common_logic.db_operations import conninfo, postgresql
from f.common_logic.db_transformations import sanitize

# type names that refer to Windmill Resources
Expand Down
20 changes: 20 additions & 0 deletions f/export/postgres_to_file/postgres_to_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import logging

from f.common_logic.db_operations import conninfo, fetch_data_from_postgres, postgresql
from f.common_logic.save_disk import save_export_file

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def main(
db: postgresql,
db_table_name: str,
storage_path: str = "/persistent-storage/datalake/export",
):
columns, rows = fetch_data_from_postgres(conninfo(db), db_table_name)

# Convert rows to lists to ensure compatibility with CSV writer, which requires iterable rows
data = [columns, *map(list, rows)]

save_export_file(data, db_table_name, storage_path, file_type="csv")
36 changes: 36 additions & 0 deletions f/export/postgres_to_file/postgres_to_csv.script.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
summary: 'Export: Postgres to CSV'
description: >-
This script connects to a PostgreSQL database, retrieves all entries from a
specified table, and converts the data into a CSV file.
lock: '!inline f/export/postgres_to_file/postgres_to_csv.script.lock'
concurrency_time_window_s: 0
kind: script
schema:
$schema: 'https://json-schema.org/draft/2020-12/schema'
type: object
order:
- db
- db_table_name
- storage_path
properties:
db:
type: object
description: A database connection for storing tabular data.
default: null
format: resource-postgresql
db_table_name:
type: string
description: The name of the database table to export to CSV.
default: null
originalType: string
storage_path:
type: string
description: >-
The path to the directory where the CSV file will be stored. The
file will be named after the database table name.
default: /persistent-storage/datalake/exports
originalType: string
required:
- db
- db_table_name
- storage_path
76 changes: 76 additions & 0 deletions f/export/postgres_to_file/postgres_to_geojson.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import json
import logging

from f.common_logic.db_operations import conninfo, fetch_data_from_postgres, postgresql
from f.common_logic.save_disk import save_export_file

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def main(
db: postgresql,
db_table_name: str,
storage_path: str = "/persistent-storage/datalake/export",
):
data = fetch_data_from_postgres(conninfo(db), db_table_name)

feature_collection = format_data_as_geojson(data)

save_export_file(
feature_collection, db_table_name, storage_path, file_type="geojson"
)


def format_data_as_geojson(data):
"""
Converts data from a PostgreSQL table into a GeoJSON FeatureCollection.

Parameters
----------
data (tuple): A tuple containing columns and rows fetched from the database.

Returns
-------
dict: A GeoJSON FeatureCollection with features extracted from the data.
"""

columns, rows = data
features = []
for row in rows:
properties = {}
geometry = {}
feature_id = None

# The expected schema here is that geometry columns are prefixed with "g__"
# If an "_id" column is present, it is used as the feature id
# All other columns are treated as properties
for col, value in zip(columns, row):
if col == "_id":
feature_id = value
elif col == "g__coordinates":
if value:
geometry["coordinates"] = json.loads(value)
else:
geometry["coordinates"] = None
elif col == "g__type":
geometry["type"] = value
else:
properties[col] = value

feature = {
"type": "Feature",
"id": feature_id,
"properties": properties,
"geometry": geometry,
}
features.append(feature)

feature_collection = {
"type": "FeatureCollection",
"features": features,
}

logger.info(f"GeoJSON created with {len(features)} features")

return feature_collection
1 change: 1 addition & 0 deletions f/export/postgres_to_file/postgres_to_geojson.script.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
psycopg2-binary==2.9.10
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ description: >-
This script connects to a PostgreSQL database, retrieves all entries from a
specified table, and converts the data into GeoJSON format. The script assumes
that the geometry fields are properly formatted for GeoJSON.
lock: '!inline f/export/postgres_to_geojson/postgres_to_geojson.script.lock'
lock: '!inline f/export/postgres_to_file/postgres_to_geojson.script.lock'
concurrency_time_window_s: 0
kind: script
schema:
Expand Down
21 changes: 21 additions & 0 deletions f/export/postgres_to_file/tests/postgres_to_csv_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from f.export.postgres_to_file.postgres_to_csv import main


def test_script_e2e(pg_database, database_mock_data, tmp_path):
asset_storage = tmp_path / "datalake/export"

main(
pg_database,
"comapeo_data",
asset_storage,
)

with open(asset_storage / "comapeo_data.csv") as f:
data = f.read()
expected_data = """\
"_id","lat","deleted","created_at","updated_at","project_name","type","g__type","g__coordinates","attachments","notes","project_id","animal_type","lon"
"doc_id_1","-33.8688","False","2024-10-14T20:18:14.206Z","2024-10-14T20:18:14.206Z","Forest Expedition","water","Point","[151.2093, -33.8688]","capybara.jpg","Rapid","forest_expedition","","151.2093"
"doc_id_2","48.8566","False","2024-10-15T21:19:15.207Z","2024-10-15T21:19:15.207Z","River Mapping","animal","Point","[2.3522, 48.8566]","capybara.jpg","Capybara","river_mapping","capybara","2.3522"
"doc_id_3","35.6895","False","2024-10-16T22:20:16.208Z","2024-10-16T22:20:16.208Z","Historical Site","location","Point","[139.6917, 35.6895]","","Former village site","historical","","139.6917"
"""
assert data == expected_data
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json

from f.export.postgres_to_geojson.postgres_to_geojson import main
from f.export.postgres_to_file.postgres_to_geojson import main


def test_script_e2e(pg_database, database_mock_data, tmp_path):
Expand Down
Loading