Skip to content

Commit

Permalink
Revisions from PR 205
Browse files Browse the repository at this point in the history
  • Loading branch information
ghukill committed Mar 13, 2024
1 parent d2fa2ff commit 411a2e4
Showing 1 changed file with 63 additions and 41 deletions.
104 changes: 63 additions & 41 deletions harvester/harvest/alma.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
from harvester.aws.s3 import S3Client
from harvester.config import Config
from harvester.harvest import Harvester
from harvester.records import (
MARC,
Record,
)
from harvester.records import MARC, Record
from harvester.utils import convert_to_utc

logger = logging.getLogger(__name__)

CONFIG = Config()

# map GeoHarvest harvest type to record stage in filepath
HARVEST_TYPE_MAP = {"full": "full", "incremental": "daily"}

# regex to extract YYYY-MM-DD from filepath
FILEPATH_DATE_REGEX = re.compile(r".+?alma-(\d{4}-\d{2}-\d{2})-.*")


@define
class MITAlmaHarvester(Harvester):
Expand All @@ -33,12 +36,22 @@ class MITAlmaHarvester(Harvester):
input_files: str = field(default=None)

def full_harvest_get_source_records(self) -> Iterator[Record]:
"""Identify files for harvest by parsing MARC records from FULL Alma exports."""
"""Identify files for harvest by parsing MARC records from FULL Alma exports.
While both full and incremental harvests share the same code path for fetching and
filtering records, the base Harvester class requires this explicit method to be
defined.
"""
CONFIG.check_required_env_vars()
yield from self._get_source_records()

def incremental_harvest_get_source_records(self) -> Iterator[Record]:
"""Identify files for harvest by parsing MARC records from DAILY Alma exports."""
"""Identify files for harvest by parsing MARC records from DAILY Alma exports.
While both full and incremental harvests share the same code path for fetching and
filtering records, the base Harvester class requires this explicit method to be
defined.
"""
CONFIG.check_required_env_vars()
yield from self._get_source_records()

Expand All @@ -54,42 +67,62 @@ def _get_source_records(self) -> Iterator[Record]:
source_record=source_record,
)

def parse_marc_records_from_files(self) -> Iterator[MARCRecord]:
"""Yield parsed MARCRecords from list of filepaths."""
for filepath in self._list_xml_files():
with smart_open.open(filepath, "rb") as f:
context = etree.iterparse(f, events=("end",), tag="record")
for _event, element in context:
yield MARCRecord(element)
element.clear()
while element.getprevious() is not None:
del element.getparent()[0]

def _list_xml_files(self) -> Iterator[str]:
"""Provide list of MARC record set XML files filtered by harvest type and date.
Example filepath: alma-2024-03-01-daily-extracted-records-to-index_19.xml
- run_type=extracted
- run_date=2024-03-01
Retrieve list of XML files from S3 or local filesystem, then yield filepaths that
match harvest options.
"""
if self.input_files.startswith("s3://"):
filepaths = self._list_s3_xml_files()
else:
filepaths = self._list_local_xml_files() # pragma: nocover

harvest_type_map = {"full": "full", "incremental": "daily"}
date_regex = re.compile(r".+?alma-(\d{4}-\d{2}-\d{2})-.*")

for filepath in filepaths:

# skip for run type
if harvest_type_map[self.harvest_type] not in filepath:
if not self._filter_filepath_by_harvest_type(filepath):
continue

# parse date from filepath
match = date_regex.match(filepath)
if not match: # pragma: nocover
message = f"Could not parse date from filepath: {filepath}"
logger.warning(message)
if not self._filter_filepath_by_dates(filepath):
continue
filepath_date = convert_to_utc(date_parser(match.group(1)))
yield filepath

# skip if date out of bounds for harvester from/until dates
if self.from_datetime_object and filepath_date < self.from_datetime_object:
continue
if self.until_datetime_object and filepath_date >= self.until_datetime_object:
continue
def _filter_filepath_by_harvest_type(self, filepath: str) -> bool:
"""Bool if harvest type aligns with record stage in filepath.
yield filepath
Example filepath: alma-2024-03-01-daily-extracted-records-to-index_19.xml
- run_type=daily
"""
return HARVEST_TYPE_MAP[self.harvest_type] in filepath

def _filter_filepath_by_dates(self, filepath: str) -> bool:
"""Bool if from and/or until dates align with filepath dates.
Example filepath: alma-2024-03-01-daily-extracted-records-to-index_19.xml
- run_date=2024-03-01
"""
match = FILEPATH_DATE_REGEX.match(filepath)
if not match: # pragma: nocover
message = f"Could not parse date from filepath: {filepath}"
logger.warning(message)
return False

filepath_date = convert_to_utc(date_parser(match.group(1)))
if self.from_datetime_object and filepath_date < self.from_datetime_object:
return False
if self.until_datetime_object and filepath_date >= self.until_datetime_object:
return False

return True

def _list_s3_xml_files(self) -> list[str]:
"""Return a list of S3 URIs for extracted Alma XML files.
Expand All @@ -108,17 +141,6 @@ def _list_local_xml_files(self) -> list[str]:
"""Return a list of local filepaths of extracted Alma XML files."""
return glob.glob(f"{self.input_files.removesuffix('/')}/*.xml")

def parse_marc_records_from_files(self) -> Iterator[MARCRecord]:
"""Yield parsed MARCRecords from list of filepaths."""
for filepath in self._list_xml_files():
with smart_open.open(filepath, "rb") as f:
context = etree.iterparse(f, events=("end",), tag="record")
for _event, elem in context:
yield MARCRecord(elem)
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]

def filter_geospatial_marc_records(
self, marc_records: Iterator[MARCRecord]
) -> Iterator[MARCRecord]:
Expand All @@ -129,7 +151,7 @@ def filter_geospatial_marc_records(
logger.info(message)

# skip if leader doesn't have a/c/n/p
if record.leader[5] not in ("a", "c", "n", "p"):
if record.leader[5] not in ("a", "c", "d", "n", "p"):
continue

# skip if Genre/Form 655 does not contain "Maps."
Expand Down Expand Up @@ -177,9 +199,9 @@ def create_source_record_from_marc_record(
{
"a": "created",
"c": "created",
"d": "deleted",
"n": "created",
"p": "created",
"d": "deleted",
}[marc_record.leader[5]],
)

Expand Down

0 comments on commit 411a2e4

Please sign in to comment.