diff --git a/harvester/harvest/alma.py b/harvester/harvest/alma.py index 1be87c6..0ec233f 100644 --- a/harvester/harvest/alma.py +++ b/harvester/harvest/alma.py @@ -15,16 +15,19 @@ from harvester.aws.s3 import S3Client from harvester.config import Config from harvester.harvest import Harvester -from harvester.records import ( - MARC, - Record, -) +from harvester.records import MARC, Record from harvester.utils import convert_to_utc logger = logging.getLogger(__name__) CONFIG = Config() +# map GeoHarvest harvest type to record stage in filepath +HARVEST_TYPE_MAP = {"full": "full", "incremental": "daily"} + +# regex to extract YYYY-MM-DD from filepath +FILEPATH_DATE_REGEX = re.compile(r".+?alma-(\d{4}-\d{2}-\d{2})-.*") + @define class MITAlmaHarvester(Harvester): @@ -33,12 +36,22 @@ class MITAlmaHarvester(Harvester): input_files: str = field(default=None) def full_harvest_get_source_records(self) -> Iterator[Record]: - """Identify files for harvest by parsing MARC records from FULL Alma exports.""" + """Identify files for harvest by parsing MARC records from FULL Alma exports. + + While both full and incremental harvests share the same code path for fetching and + filtering records, the base Harvester class requires this explicit method to be + defined. + """ CONFIG.check_required_env_vars() yield from self._get_source_records() def incremental_harvest_get_source_records(self) -> Iterator[Record]: - """Identify files for harvest by parsing MARC records from DAILY Alma exports.""" + """Identify files for harvest by parsing MARC records from DAILY Alma exports. + + While both full and incremental harvests share the same code path for fetching and + filtering records, the base Harvester class requires this explicit method to be + defined. + """ CONFIG.check_required_env_vars() yield from self._get_source_records() @@ -54,42 +67,62 @@ def _get_source_records(self) -> Iterator[Record]: source_record=source_record, ) + def parse_marc_records_from_files(self) -> Iterator[MARCRecord]: + """Yield parsed MARCRecords from list of filepaths.""" + for filepath in self._list_xml_files(): + with smart_open.open(filepath, "rb") as f: + context = etree.iterparse(f, events=("end",), tag="record") + for _event, element in context: + yield MARCRecord(element) + element.clear() + while element.getprevious() is not None: + del element.getparent()[0] + def _list_xml_files(self) -> Iterator[str]: """Provide list of MARC record set XML files filtered by harvest type and date. - Example filepath: alma-2024-03-01-daily-extracted-records-to-index_19.xml - - run_type=extracted - - run_date=2024-03-01 + Retrieve list of XML files from S3 or local filesystem, then yield filepaths that + match harvest options. """ if self.input_files.startswith("s3://"): filepaths = self._list_s3_xml_files() else: filepaths = self._list_local_xml_files() # pragma: nocover - harvest_type_map = {"full": "full", "incremental": "daily"} - date_regex = re.compile(r".+?alma-(\d{4}-\d{2}-\d{2})-.*") - for filepath in filepaths: - - # skip for run type - if harvest_type_map[self.harvest_type] not in filepath: + if not self._filter_filepath_by_harvest_type(filepath): continue - - # parse date from filepath - match = date_regex.match(filepath) - if not match: # pragma: nocover - message = f"Could not parse date from filepath: {filepath}" - logger.warning(message) + if not self._filter_filepath_by_dates(filepath): continue - filepath_date = convert_to_utc(date_parser(match.group(1))) + yield filepath - # skip if date out of bounds for harvester from/until dates - if self.from_datetime_object and filepath_date < self.from_datetime_object: - continue - if self.until_datetime_object and filepath_date >= self.until_datetime_object: - continue + def _filter_filepath_by_harvest_type(self, filepath: str) -> bool: + """Bool if harvest type aligns with record stage in filepath. - yield filepath + Example filepath: alma-2024-03-01-daily-extracted-records-to-index_19.xml + - run_type=daily + """ + return HARVEST_TYPE_MAP[self.harvest_type] in filepath + + def _filter_filepath_by_dates(self, filepath: str) -> bool: + """Bool if from and/or until dates align with filepath dates. + + Example filepath: alma-2024-03-01-daily-extracted-records-to-index_19.xml + - run_date=2024-03-01 + """ + match = FILEPATH_DATE_REGEX.match(filepath) + if not match: # pragma: nocover + message = f"Could not parse date from filepath: {filepath}" + logger.warning(message) + return False + + filepath_date = convert_to_utc(date_parser(match.group(1))) + if self.from_datetime_object and filepath_date < self.from_datetime_object: + return False + if self.until_datetime_object and filepath_date >= self.until_datetime_object: + return False + + return True def _list_s3_xml_files(self) -> list[str]: """Return a list of S3 URIs for extracted Alma XML files. @@ -108,17 +141,6 @@ def _list_local_xml_files(self) -> list[str]: """Return a list of local filepaths of extracted Alma XML files.""" return glob.glob(f"{self.input_files.removesuffix('/')}/*.xml") - def parse_marc_records_from_files(self) -> Iterator[MARCRecord]: - """Yield parsed MARCRecords from list of filepaths.""" - for filepath in self._list_xml_files(): - with smart_open.open(filepath, "rb") as f: - context = etree.iterparse(f, events=("end",), tag="record") - for _event, elem in context: - yield MARCRecord(elem) - elem.clear() - while elem.getprevious() is not None: - del elem.getparent()[0] - def filter_geospatial_marc_records( self, marc_records: Iterator[MARCRecord] ) -> Iterator[MARCRecord]: @@ -129,7 +151,7 @@ def filter_geospatial_marc_records( logger.info(message) # skip if leader doesn't have a/c/n/p - if record.leader[5] not in ("a", "c", "n", "p"): + if record.leader[5] not in ("a", "c", "d", "n", "p"): continue # skip if Genre/Form 655 does not contain "Maps." @@ -177,9 +199,9 @@ def create_source_record_from_marc_record( { "a": "created", "c": "created", + "d": "deleted", "n": "created", "p": "created", - "d": "deleted", }[marc_record.leader[5]], )