Skip to content

Commit

Permalink
Merge pull request #154 from MITLibraries/GDT-109-no-cdn-write-ogm
Browse files Browse the repository at this point in the history
OGM harvests do not write metadata to S3 Public CDN
  • Loading branch information
ghukill committed Feb 15, 2024
2 parents 2d2c49c + 2da32d5 commit ff2592d
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 229 deletions.
61 changes: 31 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Geo-Harvester is a python CLI application for harvesting, normalizing, and writing GIS and geospatial metadata, with a focus on providing this metadata for TIMDEX.

At a high level, this is accomplished by:
1. fetching metadata records generated by MIT (S3) or from OpenGeoMetadata (OGM) repositories (GitHub)
1. fetching metadata records generated by MIT (S3) or from OpenGeoMetadata (OGM) repositories (Github)
2. normalize this metadata to the [Aardvark metadata format](https://opengeometadata.org/ogm-aardvark/)
3. based on normalized metadata, sending EventBridge events for other applications to potentially move and copy data

Expand All @@ -21,12 +21,12 @@ At a high level, this is accomplished by:
```shell
SENTRY_DSN=### If set to a valid Sentry DSN, enables Sentry exception monitoring. This is not needed for local development.
WORKSPACE=### Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform.
S3_RESTRICTED_CDN_ROOT=### S3 bucket + prefix for CDN restricted, e.g. 's3://<bucket>/path/to/restricted'
S3_PUBLIC_CDN_ROOT=### S3 bucket + prefix for CDN public, e.g. 's3://<bucket>/path/to/public'
```

### Optional
```shell
S3_RESTRICTED_CDN_ROOT=### S3 bucket + prefix for CDN restricted, e.g. 's3://<bucket>/path/to/restricted'
S3_PUBLIC_CDN_ROOT=### S3 bucket + prefix for CDN public, e.g. 's3://<bucket>/path/to/public'
GEOHARVESTER_SQS_TOPIC_NAME=### default value for CLI argument --sqs-topic-name
OGM_CONFIG_FILEPATH=### optional location for OGM configuration YAML
OGM_CLONE_ROOT_URL=### optional base URL or filepath for where to clone OGM repositories from
Expand Down Expand Up @@ -86,19 +86,6 @@ Options:
date; format YYYY-MM-DD.
-u, --until-date TEXT filter for files modified before this date;
format YYYY-MM-DD.
-osd, --output-source-directory TEXT
Directory to write source metadata for EACH
harvested record file with naming convention
'<identifier>.<format>.source.xml|json'.
Defaults to env var S3_PUBLIC_CDN_ROOT if
not set.
-ond, --output-normalized-directory TEXT
Directory to write normalized MITAardvark
metadata for EACH harvested record file with
naming convention
'<identifier>.aardvark.normalized.json'.
Defaults to env var S3_PUBLIC_CDN_ROOT if
not set.
-o, --output-file TEXT Filepath to write single, combined JSONLines
file of normalized MITAardvark metadata for
ALL harvested records. This is the expected
Expand All @@ -118,18 +105,32 @@ Usage: -c harvest mit [OPTIONS]
Harvest and normalize MIT geospatial metadata records.
Options:
-i, --input-files TEXT Directory location of source record zip files
(may be local or s3). Defaults to env var
S3_RESTRICTED_CDN_ROOT if not set. [required]
-s, --sqs-topic-name TEXT SQS topic name with messages capturing zip file
modifications. Defaults to env var
GEOHARVESTER_SQS_TOPIC_NAME if not set.
[required]
--preserve-sqs-messages If set, SQS messages will remain in the queue
after incremental harvest.
--skip-eventbridge-events If set, will skip sending EventBridge events to
manage files in CDN.
-h, --help Show this message and exit.
-i, --input-files TEXT Directory location of source record zip
files (may be local or s3). Defaults to env
var S3_RESTRICTED_CDN_ROOT if not set.
[required]
-osd, --output-source-directory TEXT
Directory to write source metadata for EACH
harvested record file with naming convention
'<identifier>.<format>.source.xml|json'.
Defaults to env var S3_PUBLIC_CDN_ROOT if
not set.
-ond, --output-normalized-directory TEXT
Directory to write normalized MITAardvark
metadata for EACH harvested record file with
naming convention
'<identifier>.aardvark.normalized.json'.
Defaults to env var S3_PUBLIC_CDN_ROOT if
not set.
-s, --sqs-topic-name TEXT SQS topic name with messages capturing zip
file modifications. Defaults to env var
GEOHARVESTER_SQS_TOPIC_NAME if not set.
[required]
--preserve-sqs-messages If set, SQS messages will remain in the
queue after incremental harvest.
--skip-eventbridge-events If set, will skip sending EventBridge events
to manage files in CDN.
-h, --help Show this message and exit.
```

### `harvester harvest ogm`
Expand All @@ -140,9 +141,9 @@ Usage: -c harvest ogm [OPTIONS]
Harvest and normalize OpenGeoMetadata (OGM) geospatial metadata records.
Options:
--include-repositories TEXT If set, limit to only this comma seperated
--include-repositories TEXT If set, limit to only these comma seperated
list of repositories for harvest.
--exclude-repositories TEXT If set, exclude this comma seperated list of
--exclude-repositories TEXT If set, exclude these comma seperated list of
repositories from harvest.
-h, --help Show this message and exit.
```
5 changes: 1 addition & 4 deletions docs/ogm_harvests.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ sequenceDiagram
participant ogm_config_yaml as OGM Config YAML
participant geo_harv as GeoHarvester
participant ogm_repo as OGM Institution Repository
participant s3_cdn_pub as S3:CDN:Public
participant s3_timdex as S3:TIMDEX
geo_harv->>ogm_config_yaml: Load OGM harvest config YAML
Expand All @@ -22,8 +21,7 @@ sequenceDiagram
geo_harv->>ogm_repo: Clone repo
ogm_repo-->>geo_harv: Repo files
geo_harv->>geo_harv: Filter to list of files based on <br> supported metadata formats from config
geo_harv->>geo_harv: Normalize source metadata to Aardvark
geo_harv->>s3_cdn_pub: Write source and MIT aardvark metadata
geo_harv->>geo_harv: Normalize source metadata to Aardvark
geo_harv->>s3_timdex: Write MIT aardvark
end
```
Expand All @@ -38,7 +36,6 @@ sequenceDiagram
participant ogm_config_yaml as OGM Config YAML
participant geo_harv as GeoHarvester
participant ogm_repo as OGM Institution Repository
participant s3_cdn_pub as S3: CDN:Public/OGM
participant s3_timdex as S3: TIMDEX
geo_harv->>ogm_config_yaml: Load OGM harvest config YAML
Expand Down
52 changes: 24 additions & 28 deletions harvester/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,6 @@ def ping(ctx: click.Context) -> None:
type=str,
help="filter for files modified before this date; format YYYY-MM-DD.",
)
@click.option(
"-osd",
"--output-source-directory",
required=False,
envvar="S3_PUBLIC_CDN_ROOT",
type=str,
help="Directory to write source metadata for EACH harvested record file with naming "
"convention '<identifier>.<format>.source.xml|json'. Defaults to env var "
"S3_PUBLIC_CDN_ROOT if not set.",
)
@click.option(
"-ond",
"--output-normalized-directory",
required=False,
envvar="S3_PUBLIC_CDN_ROOT",
type=str,
help="Directory to write normalized MITAardvark metadata for EACH harvested record "
"file with naming convention '<identifier>.aardvark.normalized.json'. Defaults "
"to env var S3_PUBLIC_CDN_ROOT if not set.",
)
@click.option(
"-o",
"--output-file",
Expand All @@ -112,16 +92,12 @@ def harvest(
harvest_type: str,
from_date: str,
until_date: str,
output_source_directory: str,
output_normalized_directory: str,
output_file: str,
) -> None:
"""Harvest command with sub-commands for different sources."""
ctx.obj["HARVEST_TYPE"] = harvest_type
ctx.obj["FROM_DATE"] = from_date
ctx.obj["UNTIL_DATE"] = until_date
ctx.obj["OUTPUT_SOURCE_DIRECTORY"] = output_source_directory
ctx.obj["OUTPUT_NORMALIZED_DIRECTORY"] = output_normalized_directory
ctx.obj["OUTPUT_FILE"] = output_file


Expand All @@ -139,6 +115,26 @@ def harvest(
help="Directory location of source record zip files (may be local or s3). Defaults to"
" env var S3_RESTRICTED_CDN_ROOT if not set.",
)
@click.option(
"-osd",
"--output-source-directory",
required=False,
envvar="S3_PUBLIC_CDN_ROOT",
type=str,
help="Directory to write source metadata for EACH harvested record file with naming "
"convention '<identifier>.<format>.source.xml|json'. Defaults to env var "
"S3_PUBLIC_CDN_ROOT if not set.",
)
@click.option(
"-ond",
"--output-normalized-directory",
required=False,
envvar="S3_PUBLIC_CDN_ROOT",
type=str,
help="Directory to write normalized MITAardvark metadata for EACH harvested record "
"file with naming convention '<identifier>.aardvark.normalized.json'. Defaults "
"to env var S3_PUBLIC_CDN_ROOT if not set.",
)
@click.option(
"-s",
"--sqs-topic-name",
Expand All @@ -164,6 +160,8 @@ def harvest(
def harvest_mit(
ctx: click.Context,
input_files: str,
output_source_directory: str,
output_normalized_directory: str,
sqs_topic_name: str,
preserve_sqs_messages: bool,
skip_eventbridge_events: bool,
Expand All @@ -177,8 +175,8 @@ def harvest_mit(
sqs_topic_name=sqs_topic_name,
preserve_sqs_messages=preserve_sqs_messages,
skip_eventbridge_events=skip_eventbridge_events,
output_source_directory=ctx.obj["OUTPUT_SOURCE_DIRECTORY"],
output_normalized_directory=ctx.obj["OUTPUT_NORMALIZED_DIRECTORY"],
output_source_directory=output_source_directory,
output_normalized_directory=output_normalized_directory,
output_file=ctx.obj["OUTPUT_FILE"],
)
results = harvester.harvest()
Expand Down Expand Up @@ -223,8 +221,6 @@ def harvest_ogm(
from_date=ctx.obj["FROM_DATE"],
include_repositories=include_list,
exclude_repositories=exclude_list,
output_source_directory=ctx.obj["OUTPUT_SOURCE_DIRECTORY"],
output_normalized_directory=ctx.obj["OUTPUT_NORMALIZED_DIRECTORY"],
output_file=ctx.obj["OUTPUT_FILE"],
)

Expand Down
4 changes: 2 additions & 2 deletions harvester/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ class Config:
REQUIRED_ENV_VARS = (
"WORKSPACE",
"SENTRY_DSN",
"S3_RESTRICTED_CDN_ROOT",
"S3_PUBLIC_CDN_ROOT",
)
OPTIONAL_ENV_VARS = (
"S3_RESTRICTED_CDN_ROOT",
"S3_PUBLIC_CDN_ROOT",
"GEOHARVESTER_SQS_TOPIC_NAME",
"OGM_CONFIG_FILEPATH",
"OGM_CLONE_ROOT_URL",
Expand Down
56 changes: 0 additions & 56 deletions harvester/harvest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def harvest(self) -> dict:
"""
records = self.filter_failed_records(self.get_source_records())
records = self.filter_failed_records(self.normalize_source_records(records))
records = self.filter_failed_records(self.write_source_and_normalized(records))
records = self.filter_failed_records(self.write_combined_normalized(records))
records = self.filter_failed_records(self.harvester_specific_steps(records))

Expand Down Expand Up @@ -114,61 +113,6 @@ def normalize_source_records(self, records: Iterator[Record]) -> Iterator[Record
record.exception = exc
yield record

def write_source_and_normalized(self, records: Iterator[Record]) -> Iterator[Record]:
"""Write source and normalized metadata as standalone files.
This step is driven by presence of one or both CLI options:
"--output-source-directory": write source records
"--output-normalized-directory": write normalized records
Source and normalized metadata files are most commonly written to the public CDN
bucket to facilitate download.
"""
for record in records:
# write source
if self.output_source_directory:
message = f"Record {record.identifier}: writing source metadata"
logger.debug(message)
try:
self._write_source_metadata(record)
except Exception as exc: # noqa: BLE001
record.exception_stage = "write_metadata.source"
record.exception = exc
yield record
continue # pragma: nocover

# write normalized
if self.output_normalized_directory:
message = f"Record {record.identifier}: writing normalized metadata"
logger.debug(message)
try:
self._write_normalized_metadata(record)
except Exception as exc: # noqa: BLE001
record.exception_stage = "write_metadata.normalized"
record.exception = exc
yield record
continue # pragma: nocover

yield record

def _write_source_metadata(self, record: Record) -> None:
"""Write source metadata file."""
source_metadata_filepath = (
f"{self.output_source_directory.rstrip('/')}/"
f"{record.source_record.source_metadata_filename.lstrip('/')}"
)
with smart_open.open(source_metadata_filepath, "wb") as source_file:
source_file.write(record.source_record.data)

def _write_normalized_metadata(self, record: Record) -> None:
"""Write normalized metadata file."""
normalized_metadata_filepath = (
f"{self.output_normalized_directory.rstrip('/')}/"
f"{record.source_record.normalized_metadata_filename.lstrip('/')}"
)
with smart_open.open(normalized_metadata_filepath, "w") as normalized_file:
normalized_file.write(record.normalized_record.to_json(pretty=False))

def write_combined_normalized(self, records: Iterator[Record]) -> Iterator[Record]:
"""Write single, combined JSONLines file of all normalized MITAardvark.
Expand Down
57 changes: 57 additions & 0 deletions harvester/harvest/mit.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,71 @@ def harvester_specific_steps(self, records: Iterator[Record]) -> Iterator[Record
"""Harvest steps specific to MITHarvester
Additional steps included:
- write source and normalized metadata records to CDN bucket
- sending EventBridge events
- managing SQS messages after processing
"""
records = self.filter_failed_records(self.write_source_and_normalized(records))
records = self.filter_failed_records(self.send_eventbridge_event(records))
if self.harvest_type == "incremental":
records = self.filter_failed_records(self.delete_sqs_messages(records))
yield from records

def write_source_and_normalized(self, records: Iterator[Record]) -> Iterator[Record]:
"""Write source and normalized metadata as standalone files.
This step is driven by presence of one or both CLI options:
"--output-source-directory": write source records
"--output-normalized-directory": write normalized records
Source and normalized metadata files are most commonly written to the public CDN
bucket to facilitate download.
"""
for record in records:
# write source
if self.output_source_directory:
message = f"Record {record.identifier}: writing source metadata"
logger.debug(message)
try:
self._write_source_metadata(record)
except Exception as exc: # noqa: BLE001
record.exception_stage = "write_metadata.source"
record.exception = exc
yield record
continue # pragma: nocover

# write normalized
if self.output_normalized_directory:
message = f"Record {record.identifier}: writing normalized metadata"
logger.debug(message)
try:
self._write_normalized_metadata(record)
except Exception as exc: # noqa: BLE001
record.exception_stage = "write_metadata.normalized"
record.exception = exc
yield record
continue # pragma: nocover

yield record

def _write_source_metadata(self, record: Record) -> None:
"""Write source metadata file."""
source_metadata_filepath = (
f"{self.output_source_directory.rstrip('/')}/"
f"{record.source_record.source_metadata_filename.lstrip('/')}"
)
with smart_open.open(source_metadata_filepath, "wb") as source_file:
source_file.write(record.source_record.data)

def _write_normalized_metadata(self, record: Record) -> None:
"""Write normalized metadata file."""
normalized_metadata_filepath = (
f"{self.output_normalized_directory.rstrip('/')}/"
f"{record.source_record.normalized_metadata_filename.lstrip('/')}"
)
with smart_open.open(normalized_metadata_filepath, "w") as normalized_file:
normalized_file.write(record.normalized_record.to_json(pretty=False))

def send_eventbridge_event(self, records: Iterator[Record]) -> Iterator[Record]:
"""Method to send EventBridge event indicating access restrictions.
Expand Down
Loading

0 comments on commit ff2592d

Please sign in to comment.