Skip to content

Commit

Permalink
Merge pull request #205 from MITLibraries/GDT-226-alma-marc-fetch-and…
Browse files Browse the repository at this point in the history
…-filter

Fetch and filter records for Alma MARC harvest
  • Loading branch information
ghukill authored Mar 13, 2024
2 parents 0168a10 + 411a2e4 commit b0da3bb
Show file tree
Hide file tree
Showing 19 changed files with 4,733 additions and 384 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pygit2 = "*"
shapely = "*"
requests = "*"
pyaml = "*"
marcalyx = "*"

[dev-packages]
black = "*"
Expand Down
846 changes: 465 additions & 381 deletions Pipfile.lock

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions docs/alma_harvests.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Alma GIS Harvests

The following are sequence diagrams related to harvesting of GIS data from Alma MARC records.

## Records added or modified in Alma invoking TIMDEX
```mermaid
sequenceDiagram
participant gis_team as GIS Team
participant alma_staff as Alma Staff
participant alma as Alma
participant s3_sftp as S3 SFTP
participant webhook as Alma Webhook Lambda
participant timdex_sf as TIMDEX StepFunction
gis_team->>alma_staff: Create or help<br>identify GIS records
alma_staff->>alma: Create or modify records
alma->>s3_sftp: Daily and Full Exports<br>write records
alma->>webhook: Send TIMDEX<br>Export Confirmation
webhook->>timdex_sf: Invoke for TIMDEX<br>"alma" source
note right of timdex_sf: Pre-existing "alma" invocation
webhook->>timdex_sf: Invoke for TIMDEX<br>"gisalma" source
note right of timdex_sf: New "gisalma" invocation
```

## GeoHarvester fetching and filtering Alma MARC records for `gisalma` source

```mermaid
sequenceDiagram
participant webhook as Alma Webhook Lambda
participant s3alma as TIMDEX s3 "alma" folder
participant gh as GeoHarvester
participant s3gisalma as TIMDEX s3 "gisalma" folder
participant geopipeline as Geo Records Pipeline
webhook->>gh: Invoke "alma" harvest via CLI
gh->>s3alma: Identify records for scanning
s3alma-->>gh: Iterator of records
loop For each MARC record
alt Matches geospatial filtering
gh->>gh: Transform to MITAardvark
gh->>s3gisalma: Write
else Does not match geospatial filtering
note right of gh: Skip
end
end
gh->>geopipeline: Follows same pipeline as<br>"gismit" and "gisogm" from here
```
214 changes: 214 additions & 0 deletions harvester/harvest/alma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
"""harvester.harvest.alma"""

import glob
import logging
import re
from collections.abc import Iterator
from typing import Literal, cast

import smart_open # type: ignore[import-untyped]
from attrs import define, field
from dateutil.parser import parse as date_parser
from lxml import etree
from marcalyx import Record as MARCRecord # type: ignore[import-untyped]

from harvester.aws.s3 import S3Client
from harvester.config import Config
from harvester.harvest import Harvester
from harvester.records import MARC, Record
from harvester.utils import convert_to_utc

logger = logging.getLogger(__name__)

CONFIG = Config()

# map GeoHarvest harvest type to record stage in filepath
HARVEST_TYPE_MAP = {"full": "full", "incremental": "daily"}

# regex to extract YYYY-MM-DD from filepath
FILEPATH_DATE_REGEX = re.compile(r".+?alma-(\d{4}-\d{2}-\d{2})-.*")


@define
class MITAlmaHarvester(Harvester):
"""Harvester of MIT Alma MARC Records."""

input_files: str = field(default=None)

def full_harvest_get_source_records(self) -> Iterator[Record]:
"""Identify files for harvest by parsing MARC records from FULL Alma exports.
While both full and incremental harvests share the same code path for fetching and
filtering records, the base Harvester class requires this explicit method to be
defined.
"""
CONFIG.check_required_env_vars()
yield from self._get_source_records()

def incremental_harvest_get_source_records(self) -> Iterator[Record]:
"""Identify files for harvest by parsing MARC records from DAILY Alma exports.
While both full and incremental harvests share the same code path for fetching and
filtering records, the base Harvester class requires this explicit method to be
defined.
"""
CONFIG.check_required_env_vars()
yield from self._get_source_records()

def _get_source_records(self) -> Iterator[Record]:
"""Shared method to get MARC records for full and incremental harvests."""
all_marc_records = self.parse_marc_records_from_files()
for marc_record in self.filter_geospatial_marc_records(all_marc_records):
identifier, source_record = self.create_source_record_from_marc_record(
marc_record
)
yield Record(
identifier=identifier,
source_record=source_record,
)

def parse_marc_records_from_files(self) -> Iterator[MARCRecord]:
"""Yield parsed MARCRecords from list of filepaths."""
for filepath in self._list_xml_files():
with smart_open.open(filepath, "rb") as f:
context = etree.iterparse(f, events=("end",), tag="record")
for _event, element in context:
yield MARCRecord(element)
element.clear()
while element.getprevious() is not None:
del element.getparent()[0]

def _list_xml_files(self) -> Iterator[str]:
"""Provide list of MARC record set XML files filtered by harvest type and date.
Retrieve list of XML files from S3 or local filesystem, then yield filepaths that
match harvest options.
"""
if self.input_files.startswith("s3://"):
filepaths = self._list_s3_xml_files()
else:
filepaths = self._list_local_xml_files() # pragma: nocover

for filepath in filepaths:
if not self._filter_filepath_by_harvest_type(filepath):
continue
if not self._filter_filepath_by_dates(filepath):
continue
yield filepath

def _filter_filepath_by_harvest_type(self, filepath: str) -> bool:
"""Bool if harvest type aligns with record stage in filepath.
Example filepath: alma-2024-03-01-daily-extracted-records-to-index_19.xml
- run_type=daily
"""
return HARVEST_TYPE_MAP[self.harvest_type] in filepath

def _filter_filepath_by_dates(self, filepath: str) -> bool:
"""Bool if from and/or until dates align with filepath dates.
Example filepath: alma-2024-03-01-daily-extracted-records-to-index_19.xml
- run_date=2024-03-01
"""
match = FILEPATH_DATE_REGEX.match(filepath)
if not match: # pragma: nocover
message = f"Could not parse date from filepath: {filepath}"
logger.warning(message)
return False

filepath_date = convert_to_utc(date_parser(match.group(1)))
if self.from_datetime_object and filepath_date < self.from_datetime_object:
return False
if self.until_datetime_object and filepath_date >= self.until_datetime_object:
return False

return True

def _list_s3_xml_files(self) -> list[str]:
"""Return a list of S3 URIs for extracted Alma XML files.
Example self.input_files = "s3://timdex-extract-dev-222053980223/alma/"
"""
bucket, prefix = self.input_files.replace("s3://", "").split("/", 1)
s3_objects = S3Client.list_objects_uri_and_date(bucket, prefix)
return [
s3_object[0]
for s3_object in s3_objects
if s3_object[0].lower().endswith(".xml")
]

def _list_local_xml_files(self) -> list[str]:
"""Return a list of local filepaths of extracted Alma XML files."""
return glob.glob(f"{self.input_files.removesuffix('/')}/*.xml")

def filter_geospatial_marc_records(
self, marc_records: Iterator[MARCRecord]
) -> Iterator[MARCRecord]:
"""Yield geospatial MARC records by filtering on defined criteria."""
for i, record in enumerate(marc_records):
if i % 10_000 == 0 and i > 0: # pragma: nocover
message = f"{i} MARC records scanned for geospatial filtering"
logger.info(message)

# skip if leader doesn't have a/c/n/p
if record.leader[5] not in ("a", "c", "d", "n", "p"):
continue

# skip if Genre/Form 655 does not contain "Maps."
if not any(
"Maps." in subfield.value
for form in record["655"]
for subfield in form.subfield("a")
):
continue

# skip if call number prefix not in list
if not any(
subfield.value in ("MAP", "CDROM", "DVDROM")
for locations in record["949"]
for subfield in locations.subfield("k")
):
continue

# skip if shelving location not in list
if not any(
subfield.value in ("MAPRM", "GIS")
for locations in record["985"]
for subfield in locations.subfield("aa")
):
continue

yield record

def create_source_record_from_marc_record(
self, marc_record: MARCRecord
) -> tuple[str, MARC]:
"""Create MARC SourceRecord from parsed MARC record."""
# derive identifier from ControlField 001
try:
identifier = next(
item for item in marc_record.controlFields() if item.tag == "001"
).value
except IndexError as exc: # pragma: nocover
message = "Could not extract identifier from ControlField 001"
raise ValueError(message) from exc

# derive event from Leader 5th character
event: Literal["created", "deleted"] = cast(
Literal["created", "deleted"],
{
"a": "created",
"c": "created",
"d": "deleted",
"n": "created",
"p": "created",
}[marc_record.leader[5]],
)

return identifier, MARC(
origin="alma",
identifier=identifier,
data=etree.tostring(marc_record.node),
marc=marc_record,
event=event,
)
1 change: 1 addition & 0 deletions harvester/records/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
from harvester.records.iso19139 import ISO19139
from harvester.records.gbl1 import GBL1
from harvester.records.aardvark import Aardvark
from harvester.records.marc import MARC
89 changes: 89 additions & 0 deletions harvester/records/marc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""harvester.harvest.records.marc"""

# ruff: noqa: N802

from typing import Literal

from attrs import define, field

from harvester.records.record import XMLSourceRecord


@define
class MARC(XMLSourceRecord):
"""MIT MARC metadata format SourceRecord class."""

metadata_format: Literal["marc"] = field(default="marc")

##########################
# Required Field Methods
##########################

def _dct_accessRights_s(self) -> str:
raise NotImplementedError

def _dct_title_s(self) -> str | None:
raise NotImplementedError

def _gbl_resourceClass_sm(self) -> list[str]:
raise NotImplementedError

def _dcat_bbox(self) -> str | None:
raise NotImplementedError

def _locn_geometry(self) -> str | None:
raise NotImplementedError

##########################
# Optional Field Methods
##########################

def _dct_description_sm(self) -> list[str]:
raise NotImplementedError

def _dcat_keyword_sm(self) -> list[str]:
"""New field in Aardvark: no mapping from GBL1 to dcat_keyword_sm."""
raise NotImplementedError

def _dct_alternative_sm(self) -> list[str]:
"""New field in Aardvark: no mapping from GBL1 to dct_alternative_sm."""
raise NotImplementedError

def _dct_creator_sm(self) -> list[str] | None:
raise NotImplementedError

def _dct_format_s(self) -> str | None:
raise NotImplementedError

def _dct_issued_s(self) -> str | None:
raise NotImplementedError

def _dct_identifier_sm(self) -> list[str]:
raise NotImplementedError

def _dct_language_sm(self) -> list[str]:
raise NotImplementedError

def _dct_publisher_sm(self) -> list[str]:
raise NotImplementedError

def _dct_rights_sm(self) -> list[str]:
raise NotImplementedError

def _dct_spatial_sm(self) -> list[str] | None:
raise NotImplementedError

def _dct_subject_sm(self) -> list[str] | None:
raise NotImplementedError

def _dct_temporal_sm(self) -> list[str] | None:
raise NotImplementedError

def _gbl_dateRange_drsim(self) -> list[str]:
raise NotImplementedError

def _gbl_resourceType_sm(self) -> list[str]:
raise NotImplementedError

def _gbl_indexYear_im(self) -> list[int]:
raise NotImplementedError
Loading

0 comments on commit b0da3bb

Please sign in to comment.