diff --git a/carbon/app.py b/carbon/app.py index d356526..aa7a3c7 100644 --- a/carbon/app.py +++ b/carbon/app.py @@ -2,27 +2,26 @@ import logging import os -import re import threading from contextlib import closing, contextmanager -from datetime import UTC, datetime +from datetime import datetime from ftplib import FTP, FTP_TLS # nosec from functools import partial from typing import IO, TYPE_CHECKING, Any -import boto3 from lxml import etree as ET # nosec from sqlalchemy import func, select -from carbon.db import aa_articles, dlcs, engine, orcids, persons +from carbon.database import aa_articles, dlcs, engine, orcids, persons +from carbon.helpers import get_group_name, get_hire_date_string, get_initials if TYPE_CHECKING: from collections.abc import Callable, Generator from socket import socket - from ssl import SSLContext logger = logging.getLogger(__name__) +# variables used in query for retrieving 'people' records AREAS = ( "ARCHITECTURE & PLANNING AREA", "ENGINEERING AREA", @@ -98,20 +97,194 @@ "PART-TIME FLEXIBLE/LL", ) -ENV_VARS = ( - "FTP_USER", - "FTP_PASS", - "FTP_PATH", - "FTP_HOST", - "FTP_PORT", - "CARBON_DB", -) + +def _add_article(xml_file: IO, article: dict[str, Any]) -> None: + """Create an XML element representing an article and write it to an output file. + + The function will create a single 'ARTICLE' element that contains subelements + representing fields in a record from the 'AA_ARTICLE table'. The 'ARTICLE' + element is written to an XML file. + + Args: + xml_file (IO): A file-like object (stream) that writes to an XML file. + article (dict[str, Any]): A record from the AA_ARTICLE table. + """ + record = ET.Element("ARTICLE") + add_child(record, "AA_MATCH_SCORE", str(article["AA_MATCH_SCORE"])) + add_child(record, "ARTICLE_ID", article["ARTICLE_ID"]) + add_child(record, "ARTICLE_TITLE", article["ARTICLE_TITLE"]) + add_child(record, "ARTICLE_YEAR", article["ARTICLE_YEAR"]) + add_child(record, "AUTHORS", article["AUTHORS"]) + add_child(record, "DOI", article["DOI"]) + add_child(record, "ISSN_ELECTRONIC", article["ISSN_ELECTRONIC"]) + add_child(record, "ISSN_PRINT", article["ISSN_PRINT"]) + add_child(record, "IS_CONFERENCE_PROCEEDING", article["IS_CONFERENCE_PROCEEDING"]) + add_child(record, "JOURNAL_FIRST_PAGE", article["JOURNAL_FIRST_PAGE"]) + add_child(record, "JOURNAL_LAST_PAGE", article["JOURNAL_LAST_PAGE"]) + add_child(record, "JOURNAL_ISSUE", article["JOURNAL_ISSUE"]) + add_child(record, "JOURNAL_VOLUME", article["JOURNAL_VOLUME"]) + add_child(record, "JOURNAL_NAME", article["JOURNAL_NAME"]) + add_child(record, "MIT_ID", article["MIT_ID"]) + add_child(record, "PUBLISHER", article["PUBLISHER"]) + xml_file.write(record) + + +def _add_person(xml_file: IO, person: dict[str, Any]) -> None: + """Create an XML element representing a person and write it to an output file. + + The function will create a single 'record' element that contains subelements + representing fields from the 'HR_PERSON_EMPLOYEE_LIMITED', 'HR_ORG_UNIT', + and 'ORCID_TO_MITID' tables. The 'record' element is written to + to an XML file. + + Args: + xml_file (IO): A file-like object (stream) that writes to an XML file. + person (dict[str, Any]): A record from from a data model that includes fields + from the 'HR_PERSON_EMPLOYEE_LIMITED', 'HR_ORG_UNIT', and 'ORCID_TO_MITID' + tables. + """ + record = ET.Element("record") + add_child(record, "field", person["MIT_ID"], name="[Proprietary_ID]") + add_child(record, "field", person["KRB_NAME_UPPERCASE"], name="[Username]") + add_child( + record, + "field", + get_initials(person["FIRST_NAME"], person["MIDDLE_NAME"]), + name="[Initials]", + ) + add_child(record, "field", person["LAST_NAME"], name="[LastName]") + add_child(record, "field", person["FIRST_NAME"], name="[FirstName]") + add_child(record, "field", person["EMAIL_ADDRESS"], name="[Email]") + add_child(record, "field", "MIT", name="[AuthenticatingAuthority]") + add_child(record, "field", "1", name="[IsAcademic]") + add_child(record, "field", "1", name="[IsCurrent]") + add_child(record, "field", "1", name="[LoginAllowed]") + add_child( + record, + "field", + get_group_name(person["DLC_NAME"], person["PERSONNEL_SUBAREA_CODE"]), + name="[PrimaryGroupDescriptor]", + ) + add_child( + record, + "field", + get_hire_date_string(person["ORIGINAL_HIRE_DATE"], person["DATE_TO_FACULTY"]), + name="[ArriveDate]", + ) + add_child( + record, + "field", + person["APPOINTMENT_END_DATE"].strftime("%Y-%m-%d"), + name="[LeaveDate]", + ) + add_child(record, "field", person["ORCID"], name="[Generic01]") + add_child(record, "field", person["PERSONNEL_SUBAREA_CODE"], name="[Generic02]") + add_child(record, "field", person["ORG_HIER_SCHOOL_AREA_NAME"], name="[Generic03]") + add_child(record, "field", person["DLC_NAME"], name="[Generic04]") + add_child(record, "field", person.get("HR_ORG_LEVEL5_NAME"), name="[Generic05]") + xml_file.write(record) + + +def add_child( + parent: ET._Element, # noqa: SLF001 + element_name: str, + element_text: str | None = None, + **kwargs: str, +) -> ET._Element: # noqa: SLF001 + """Add a subelement to an existing element. + + Args: + parent (ET._Element): The root element. + element_name: The name of the subelement. + element_text (str | None, optional): The value stored in the subelement. + Defaults to None. + **kwargs (str): Keyword arguments representing attributes for the subelement. + The 'name' argument is set for 'people' elements. + + Returns: + ET._Element: The subelement. + """ + child = ET.SubElement(parent, element_name, attrib=kwargs) + child.text = element_text + return child + + +@contextmanager +def article_feed(output_file: IO) -> Generator: + """Generate XML feed of articles. + + Args: + output_file (IO): A file-like object (stream) into which normalized XML strings + strings are written. + + Yields: + Generator: A function that adds an 'ARTICLE' element to an 'ARTICLES' + root element. + """ + with ET.xmlfile(output_file, encoding="UTF-8") as xml_file: + xml_file.write_declaration() + with xml_file.element("ARTICLES"): + yield partial(_add_article, xml_file) + + +def articles() -> Generator[dict[str, Any], Any, None]: + """Create a generator of article records. + + Yields: + Generator[dict[str, Any], Any, None]: Results matching the query submitted to + the Data Warehouse for retrieving 'articles' records. + """ + sql = ( + select(aa_articles) + .where(aa_articles.c.ARTICLE_ID.is_not(None)) + .where(aa_articles.c.ARTICLE_TITLE.is_not(None)) + .where(aa_articles.c.DOI.is_not(None)) + .where(aa_articles.c.MIT_ID.is_not(None)) + ) + with closing(engine().connect()) as connection: + result = connection.execute(sql) + for row in result: + yield dict(zip(result.keys(), row, strict=True)) + + +@contextmanager +def person_feed(output_file: IO) -> Generator: + """Generate XML feed of people. + + This is a streaming XML generator for people. Output will be + written to the provided output destination which can be a file + or file-like object. The context manager returns a function + which can be called repeatedly to add a person to the feed:: + + with person_feed(sys.stdout) as f: + f({"MIT_ID": "1234", ...}) + f({"MIT_ID": "5678", ...}) + + Args: + output_file (IO): A file-like object (stream) into which normalized XML + strings are written. + + Yields: + Generator: A function that adds a 'record' element to a 'records' root element. + . + """ + # namespace assigned to the xmlns attribute of the root 'records' element + symplectic_elements_namespace = "http://www.symplectic.co.uk/hrimporter" + qualified_tag_name = ET.QName(symplectic_elements_namespace, tag="records") + nsmap = {None: symplectic_elements_namespace} + + with ET.xmlfile(output_file, encoding="UTF-8") as xml_file: + xml_file.write_declaration() + with xml_file.element(tag=qualified_tag_name, nsmap=nsmap): + yield partial(_add_person, xml_file) def people() -> Generator[dict[str, Any], Any, None]: - """A person generator. + """Create a generator of 'people' records. - Returns an iterator of person dictionaries. + Yields: + Generator[dict[str, Any], Any, None]: Results matching the query submitted to + the Data Warehouse for retrieving 'people' records. """ sql = ( select( @@ -153,142 +326,7 @@ def people() -> Generator[dict[str, Any], Any, None]: yield dict(zip(result.keys(), row, strict=True)) -def articles() -> Generator[dict[str, Any], Any, None]: - """An article generator. - - Returns an iterator over the AA_ARTICLE table. - """ - sql = ( - select(aa_articles) - .where(aa_articles.c.ARTICLE_ID.is_not(None)) - .where(aa_articles.c.ARTICLE_TITLE.is_not(None)) - .where(aa_articles.c.DOI.is_not(None)) - .where(aa_articles.c.MIT_ID.is_not(None)) - ) - with closing(engine().connect()) as connection: - result = connection.execute(sql) - for row in result: - yield dict(zip(result.keys(), row, strict=True)) - - -def initials(*args: str) -> str: - """Turn `*args` into a space-separated string of initials. - - Each argument is processed through :func:`~initialize_part` and - the resulting list is joined with a space. - """ - return " ".join([initialize_part(n) for n in args if n]) - - -def initialize_part(name: str) -> str: - """Turn a name part into uppercased initials. - - This function will do its best to parse the argument into one or - more initials. The first step is to remove any character that is - not alphanumeric, whitespace or a hyphen. The remaining string - is split on word boundaries, retaining both the words and the - boundaries. The first character of each list member is then - joined together, uppercased and returned. - - Some examples:: - - assert initialize_part('Foo Bar') == 'F B' - assert initialize_part('F. Bar-Baz') == 'F B-B' - assert initialize_part('Foo-bar') == 'F-B' - assert initialize_part(u'влад') == u'В' - - """ # noqa: RUF002 - name = re.sub(r"[^\w\s-]", "", name, flags=re.UNICODE) - return "".join([x[:1] for x in re.split(r"(\W+)", name, flags=re.UNICODE)]).upper() - - -def group_name(dlc: str, sub_area: str) -> str: - qualifier = "Faculty" if sub_area in ("CFAT", "CFAN") else "Non-faculty" - return f"{dlc} {qualifier}" - - -def hire_date_string(original_start_date: datetime, date_to_faculty: datetime) -> str: - if date_to_faculty: - return date_to_faculty.strftime("%Y-%m-%d") - return original_start_date.strftime("%Y-%m-%d") - - -def _ns(namespace: str, element: str) -> ET.QName: - return ET.QName(namespace, element) - - -SYMPLECTIC_NS = "http://www.symplectic.co.uk/hrimporter" -NSMAP = {None: SYMPLECTIC_NS} -ns = partial(_ns, SYMPLECTIC_NS) - - -def add_child( - parent: ET._Element, # noqa: SLF001 - element: str, - text: str | None = None, - **kwargs: str, -) -> ET._Element: # noqa: SLF001 - """Add a subelement with text.""" - child = ET.SubElement(parent, element, attrib=kwargs) - child.text = text - return child - - -class Writer: - """A Symplectic Elements feed writer. - - Use this class to generate and output an HR or AA feed for Symplectic - Elements. - """ - - def __init__(self, out: IO): - self.out = out - - def write(self, feed_type: str) -> None: - """Write the specified feed type to the configured output.""" - if feed_type == "people": - with person_feed(self.out) as f: - for person in people(): - f(person) - elif feed_type == "articles": - with article_feed(self.out) as f: - for article in articles(): - f(article) - - -class PipeWriter(Writer): - """A read/write :class:`carbon.app.Writer`. - - This class is intended to provide a buffered read/write connecter. The - :meth:`~carbon.app.PipeWriter.pipe` method should be called before - writing to configure the reader end. For example:: - - PipeWriter(fp_out).pipe(reader).write('people') - - See :class:`carbon.app.FTPReader` for an example reader. - """ - - def write(self, feed_type: str) -> None: - """Concurrently read/write from the configured inputs and outputs. - - This method will block until both the reader and writer are finished. - """ - pipe = threading.Thread(target=self._reader) - pipe.start() - super().write(feed_type) - self.out.close() - pipe.join() - - def pipe(self, reader: FTPReader) -> PipeWriter: - """Connect the read end of the pipe. - - This should be called before :meth:`~carbon.app.PipeWriter.write`. - """ - self._reader = reader - return self - - -class CarbonCopyFTPS(FTP_TLS): +class CarbonFtpsTls(FTP_TLS): """FTP_TLS subclass with support for SSL session reuse. The stdlib version of FTP_TLS creates a new SSL session for data @@ -303,9 +341,13 @@ class CarbonCopyFTPS(FTP_TLS): message for subsequent commands. The modified storbinary method here removes the unwrap call. Calling quit on the ftp connection should still cleanly shutdown the connection. + + Attributes: + See ftplib.FTP_TLS for more details. """ def ntransfercmd(self, cmd: str, rest: str | int | None = None) -> tuple[socket, int]: + """Initiate a transfer over the data connection.""" conn, size = FTP.ntransfercmd(self, cmd, rest) if self._prot_p: # type: ignore[attr-defined] conn = self.context.wrap_socket( @@ -321,6 +363,7 @@ def storbinary( callback: Callable | None = None, rest: str | None = None, # type: ignore[override] ) -> str: + """Store a file in binary mode.""" self.voidcmd("TYPE I") with self.transfercmd(cmd, rest) as conn: while 1: @@ -333,153 +376,145 @@ def storbinary( return self.voidresp() -class FTPReader: +class Writer: + """A writer that outputs normalized XML strings to a specified file. + + Use this class to generate either a 'people' or 'articles' feed that is written + to a specified output file. + + Attributes: + output_file: A file-like object (stream) into which normalized XML + strings are written. + """ + + def __init__(self, output_file: IO): + self.output_file = output_file + + def write(self, feed_type: str) -> None: + """Write the specified feed type to the configured output.""" + if feed_type == "people": + with person_feed(self.output_file) as f: + for person in people(): + f(person) + elif feed_type == "articles": + with article_feed(self.output_file) as f: + for article in articles(): + f(article) + + +class PipeWriter(Writer): + """A read/write carbon.app.Writer for the Symplectic Elements FTP server. + + This class is intended to provide a buffered read/write connecter. + + Attributes: + output_file: A file-like object (stream) into which normalized XML + strings are written. + ftp_output_file: A file-like object (stream) that reads data from + PipeWriter().output_file and writes its contents to an XML file + on the Symplectic Elements FTP server. + """ + + def __init__(self, input_file: IO, ftp_output_file: Callable): + super().__init__(input_file) + self.ftp_output_file = ftp_output_file + + def write(self, feed_type: str) -> None: + """Concurrently read/write from the configured inputs and outputs. + + This method will block until both the reader and writer are finished. + """ + pipe = threading.Thread(target=self.ftp_output_file) + pipe.start() + super().write(feed_type) + self.output_file.close() + pipe.join() + + +class FtpFileWriter: + """A file writer for the Symplectic Elements FTP server. + + The FtpFileWriter will read data from a provided feed and write the contents + from the feed to a file on the Symplectic Elements FTP server. + + Attributes: + content_feed: A file-like object (stream) that contains the records + from the Data Warehouse. + user: The username for accessing the Symplectic FTP server. + password: The password for accessing the Symplectic FTP server. + path: The full file path to the XML file (including the file name) that is + uploaded to the Symplectic FTP server. + host: The hostname of the Symplectic FTP server. + port: The port of the Symplectic FTP server. + """ + def __init__( self, - fp: IO, + content_feed: IO, user: str, - passwd: str, + password: str, path: str, host: str = "localhost", port: int = 21, - ctx: SSLContext | None = None, ): - self.fp = fp + self.content_feed = content_feed self.user = user - self.passwd = passwd + self.password = password self.path = path self.host = host self.port = port - self.ctx = ctx def __call__(self) -> None: """Transfer a file using FTP over TLS.""" - ftps = CarbonCopyFTPS(context=self.ctx, timeout=30) - ftps.connect(self.host, self.port) - ftps.login(self.user, self.passwd) + ftps = CarbonFtpsTls(timeout=30) + ftps.connect(host=self.host, port=self.port) + ftps.login(user=self.user, passwd=self.password) ftps.prot_p() - ftps.storbinary("STOR " + self.path, self.fp) + ftps.storbinary(cmd=f"STOR {self.path}", fp=self.content_feed) ftps.quit() -@contextmanager -def person_feed(out: IO) -> Generator: - """Generate XML feed of people. - - This is a streaming XML generator for people. Output will be - written to the provided output destination which can be a file - or file-like object. The context manager returns a function - which can be called repeatedly to add a person to the feed:: - - with person_feed(sys.stdout) as f: - f({"MIT_ID": "1234", ...}) - f({"MIT_ID": "5678", ...}) - - """ - with ET.xmlfile(out, encoding="UTF-8") as xf: - xf.write_declaration() - with xf.element(ns("records"), nsmap=NSMAP): - yield partial(_add_person, xf) +class DatabaseToFtpPipe: + """A pipe feeding data from the Data Warehouse to the Symplectic Elements FTP server. + The feed consists of a pipe that connects 'read' and 'write' file-like objects + (streams) that allows for one-way passing of information to each other. The flow of + data is as follows: -@contextmanager -def article_feed(out: IO) -> Generator: - """Generate XML feed of articles.""" - with ET.xmlfile(out, encoding="UTF-8") as xf: - xf.write_declaration() - with xf.element("ARTICLES"): - yield partial(_add_article, xf) - - -def _add_article(xf: IO, article: dict[str, Any]) -> None: - record = ET.Element("ARTICLE") - add_child(record, "AA_MATCH_SCORE", str(article["AA_MATCH_SCORE"])) - add_child(record, "ARTICLE_ID", article["ARTICLE_ID"]) - add_child(record, "ARTICLE_TITLE", article["ARTICLE_TITLE"]) - add_child(record, "ARTICLE_YEAR", article["ARTICLE_YEAR"]) - add_child(record, "AUTHORS", article["AUTHORS"]) - add_child(record, "DOI", article["DOI"]) - add_child(record, "ISSN_ELECTRONIC", article["ISSN_ELECTRONIC"]) - add_child(record, "ISSN_PRINT", article["ISSN_PRINT"]) - add_child(record, "IS_CONFERENCE_PROCEEDING", article["IS_CONFERENCE_PROCEEDING"]) - add_child(record, "JOURNAL_FIRST_PAGE", article["JOURNAL_FIRST_PAGE"]) - add_child(record, "JOURNAL_LAST_PAGE", article["JOURNAL_LAST_PAGE"]) - add_child(record, "JOURNAL_ISSUE", article["JOURNAL_ISSUE"]) - add_child(record, "JOURNAL_VOLUME", article["JOURNAL_VOLUME"]) - add_child(record, "JOURNAL_NAME", article["JOURNAL_NAME"]) - add_child(record, "MIT_ID", article["MIT_ID"]) - add_child(record, "PUBLISHER", article["PUBLISHER"]) - xf.write(record) + 1. The records from the Data Warehouse are transformed into normalized + XML strings and are concurrently written to the 'write' file stream + one record at a time. + 2. The connected 'read' file stream concurrently transfers data from the + 'write' file stream into an XML file on the Elements FTP server. -def _add_person(xf: IO, person: dict[str, Any]) -> None: - record = ET.Element("record") - add_child(record, "field", person["MIT_ID"], name="[Proprietary_ID]") - add_child(record, "field", person["KRB_NAME_UPPERCASE"], name="[Username]") - add_child( - record, - "field", - initials(person["FIRST_NAME"], person["MIDDLE_NAME"]), - name="[Initials]", - ) - add_child(record, "field", person["LAST_NAME"], name="[LastName]") - add_child(record, "field", person["FIRST_NAME"], name="[FirstName]") - add_child(record, "field", person["EMAIL_ADDRESS"], name="[Email]") - add_child(record, "field", "MIT", name="[AuthenticatingAuthority]") - add_child(record, "field", "1", name="[IsAcademic]") - add_child(record, "field", "1", name="[IsCurrent]") - add_child(record, "field", "1", name="[LoginAllowed]") - add_child( - record, - "field", - group_name(person["DLC_NAME"], person["PERSONNEL_SUBAREA_CODE"]), - name="[PrimaryGroupDescriptor]", - ) - add_child( - record, - "field", - hire_date_string(person["ORIGINAL_HIRE_DATE"], person["DATE_TO_FACULTY"]), - name="[ArriveDate]", - ) - add_child( - record, - "field", - person["APPOINTMENT_END_DATE"].strftime("%Y-%m-%d"), - name="[LeaveDate]", - ) - add_child(record, "field", person["ORCID"], name="[Generic01]") - add_child(record, "field", person["PERSONNEL_SUBAREA_CODE"], name="[Generic02]") - add_child(record, "field", person["ORG_HIER_SCHOOL_AREA_NAME"], name="[Generic03]") - add_child(record, "field", person["DLC_NAME"], name="[Generic04]") - add_child(record, "field", person.get("HR_ORG_LEVEL5_NAME"), name="[Generic05]") - xf.write(record) - + Attributes: + config: A dictionary of required environment variables for running the feed. + """ -class FTPFeeder: def __init__( self, - event: dict[str, str], config: dict, - ssl_ctx: SSLContext | None = None, ): - self.event = event self.config = config - self.ssl_ctx = ssl_ctx def run(self) -> None: - r, w = os.pipe() - feed_type = self.event["feed_type"] - with open(r, "rb") as fp_r, open(w, "wb") as fp_w: - ftp_rdr = FTPReader( - fp_r, - self.config["SYMPLECTIC_FTP_USER"], - self.config["SYMPLECTIC_FTP_PASS"], - self.config["SYMPLECTIC_FTP_PATH"], - self.config["SYMPLECTIC_FTP_HOST"], - int(self.config["SYMPLECTIC_FTP_PORT"]), - self.ssl_ctx, + read_file, write_file = os.pipe() + + with open(read_file, "rb") as buffered_reader, open( + write_file, "wb" + ) as buffered_writer: + ftp_file = FtpFileWriter( + content_feed=buffered_reader, + user=self.config["SYMPLECTIC_FTP_USER"], + password=self.config["SYMPLECTIC_FTP_PASS"], + path=self.config["SYMPLECTIC_FTP_PATH"], + host=self.config["SYMPLECTIC_FTP_HOST"], + port=int(self.config["SYMPLECTIC_FTP_PORT"]), + ) + PipeWriter(input_file=buffered_writer, ftp_output_file=ftp_file).write( + feed_type=self.config["FEED_TYPE"] ) - PipeWriter(out=fp_w).pipe(ftp_rdr).write(feed_type) def run_connection_test(self) -> None: """Test connection to the Symplectic Elements FTP server. @@ -489,10 +524,10 @@ def run_connection_test(self) -> None: """ logger.info("Testing connection to the Symplectic Elements FTP server") try: - ftps = CarbonCopyFTPS(context=self.ssl_ctx, timeout=30) + ftps = CarbonFtpsTls(timeout=30) ftps.connect( - self.config["SYMPLECTIC_FTP_HOST"], - int(self.config["SYMPLECTIC_FTP_PORT"]), + host=self.config["SYMPLECTIC_FTP_HOST"], + port=int(self.config["SYMPLECTIC_FTP_PORT"]), ) ftps.login( user=self.config["SYMPLECTIC_FTP_USER"], @@ -506,43 +541,3 @@ def run_connection_test(self) -> None: else: logger.info("Successfully connected to the Symplectic Elements FTP server") ftps.quit() - - -def sns_log( - config_values: dict[str, Any], status: str, error: Exception | None = None -) -> None: - sns_client = boto3.client("sns") - sns_id = config_values.get("SNS_TOPIC") - stage = config_values.get("SYMPLECTIC_FTP_PATH", "").lstrip("/").split("/")[0] - feed = config_values.get("FEED_TYPE", "") - - if status == "start": - sns_client.publish( - TopicArn=sns_id, - Subject="Carbon run", - Message=( - f"[{datetime.now(tz=UTC).isoformat()}] Starting carbon run for the " - f"{feed} feed in the {stage} environment." - ), - ) - elif status == "success": - sns_client.publish( - TopicArn=sns_id, - Subject="Carbon run", - Message=( - f"[{datetime.now(tz=UTC).isoformat()}] Finished carbon run for the " - f"{feed} feed in the {stage} environment." - ), - ) - logger.info("Carbon run has successfully completed.") - elif status == "fail": - sns_client.publish( - TopicArn=sns_id, - Subject="Carbon run", - Message=( - f"[{datetime.now(tz=UTC).isoformat()}] The following problem was " - f"encountered during the carbon run for the {feed} feed " - f"in the {stage} environment: {error}." - ), - ) - logger.info("Carbon run has failed.") diff --git a/carbon/cli.py b/carbon/cli.py index b08bee7..2e6bbe0 100644 --- a/carbon/cli.py +++ b/carbon/cli.py @@ -3,9 +3,10 @@ import click -from carbon.app import FTPFeeder, sns_log +from carbon.app import DatabaseToFtpPipe from carbon.config import configure_logger, configure_sentry, load_config_values -from carbon.db import engine +from carbon.database import engine +from carbon.helpers import sns_log logger = logging.getLogger(__name__) @@ -14,24 +15,19 @@ @click.version_option() @click.option("--run_connection_tests", is_flag=True) def main(*, run_connection_tests: bool) -> None: - """Generate feeds for Symplectic Elements. - - Specify which FEED_TYPE should be generated. This should be either - 'people' or 'articles'. - - The data is pulled from a database identified by --db, which should - be a valid SQLAlchemy database connection string. This can also be - omitted and pulled from an environment variable named CARBON_DB. For - oracle use: - - oracle://:@:1521/ - - By default, the feed will be printed to stdout. If -o/--out is used the - output will be written to the specified file instead. - - Alternatively, the --ftp switch can be used to send the output to an FTP - server. The server should support FTP over TLS. Only one of -o/--out or - --ftp should be used. + """Generate a data feed that uploads XML files to the Symplectic Elements FTP server. + + The feed uses a SQLAlchemy engine to connect to the Data Warehouse. A query is + submitted to the Data Warehouse to retrieve either 'people' or 'articles' records + depending on the 'FEED_TYPE' environment variable. Several transforms are applied + to normalize the records before it is converted to an XML string. + The feed builds a pipe that will concurrently read data from the Data Warehouse + and write the normalized XML string to an XML file on the Elements + FTP server. For security purposes, the server should support FTP over TLS. + + [wip] By default, the feed will write to an XML file on the Elements FTP server. + If the -o/--out argument is used, the output will be written to the specified + file instead. This latter option is recommended for testing purposes. """ config_values = load_config_values() # [TEMP]: The connection string must use 'oracle+oracledb' to differentiate @@ -52,13 +48,13 @@ def main(*, run_connection_tests: bool) -> None: engine.run_connection_test() # test connection to the Symplectic Elements FTP server - ftp_feed = FTPFeeder({"feed_type": config_values["FEED_TYPE"]}, config_values) - ftp_feed.run_connection_test() + pipe = DatabaseToFtpPipe(config=config_values) + pipe.run_connection_test() if not run_connection_tests: sns_log(config_values=config_values, status="start") try: - ftp_feed.run() + pipe.run() except Exception as error: # noqa: BLE001 sns_log(config_values=config_values, status="fail", error=error) else: diff --git a/carbon/db.py b/carbon/database.py similarity index 98% rename from carbon/db.py rename to carbon/database.py index 7702268..ce9b5b7 100644 --- a/carbon/db.py +++ b/carbon/database.py @@ -1,5 +1,4 @@ import logging -import os from typing import Any from sqlalchemy import ( @@ -18,11 +17,8 @@ logger = logging.getLogger(__name__) -os.environ["NLS_LANG"] = "AMERICAN_AMERICA.UTF8" - metadata = MetaData() - persons = Table( "HR_PERSON_EMPLOYEE_LIMITED", metadata, @@ -53,7 +49,6 @@ Column("HR_ORG_LEVEL5_NAME", String), ) - orcids = Table( "ORCID_TO_MITID", metadata, @@ -61,7 +56,6 @@ Column("ORCID", String), ) - aa_articles = Table( "AA_ARTICLE", metadata, diff --git a/carbon/helpers.py b/carbon/helpers.py new file mode 100644 index 0000000..c686b37 --- /dev/null +++ b/carbon/helpers.py @@ -0,0 +1,146 @@ +import logging +import re +from datetime import UTC, datetime +from typing import Any + +import boto3 + +logger = logging.getLogger(__name__) + + +def _convert_to_initials(name_component: str) -> str: + """Turn a name component into uppercased initials. + + This function will do its best to parse the argument into one or + more initials. The first step is to remove any character that is + not alphanumeric, whitespace or a hyphen. The remaining string + is split on word boundaries, retaining both the words and the + boundaries. The first character of each list member is then + joined together, uppercased and returned. + + Some examples:: + + assert _convert_to_initials('Foo Bar') == 'F B' + assert _convert_to_initials('F. Bar-Baz') == 'F B-B' + assert _convert_to_initials('Foo-bar') == 'F-B' + assert _convert_to_initials(u'влад') == u'В' + + """ # noqa: RUF002 + name_component = re.sub(r"[^\w\s-]", "", name_component, flags=re.UNICODE) + return "".join( + [x[:1] for x in re.split(r"(\W+)", name_component, flags=re.UNICODE)] + ).upper() + + +def get_group_name(dlc: str, sub_area: str) -> str: + """Create a primary group name for a 'people' record. + + Args: + dlc (str): The value for the 'DLC_NAME' field from a 'people' record. + sub_area (str): The value for the 'PERSONNEL_SUB_AREA_CODE' field from a + 'people' record. + + Returns: + str: A group name for a 'people' record, consisting of the DLC name and a flag + indicating 'Faculty' or 'Non-faculty'. + """ + qualifier = "Faculty" if sub_area in ("CFAT", "CFAN") else "Non-faculty" + return f"{dlc} {qualifier}" + + +def get_hire_date_string(original_start_date: datetime, date_to_faculty: datetime) -> str: + """Create a string indicating the hire date for a 'people' record. + + If the record has a value for the 'DATE_TO_FACULTY' field, this value is used; + if not, the value for the 'ORIGINAL_HIRE_DATE' field is used. Dates are formatted + as: YYYY-MM-DD (i.e., 2023-01-01). + + Args: + original_start_date (datetime): The value for the 'ORIGINAL_HIRE_DATE' field + from a 'people' record. + date_to_faculty (datetime): The value for the 'DATE_TO_FACULTY field from a + 'people' record. + + Returns: + str: The hire date formatted as a string. + """ + if date_to_faculty: + return date_to_faculty.strftime("%Y-%m-%d") + return original_start_date.strftime("%Y-%m-%d") + + +def get_initials(*args: str) -> str: + """Convert a tuple of name components into a space-separated string of initials. + + Each name component is processed through helpers.get_initials() and + the resulting list is joined with a space. + + Returns: + str: A string containing the initials of the provided name components. + """ + return " ".join( + [ + _convert_to_initials(name_component) + for name_component in args + if name_component + ] + ) + + +def sns_log( + config_values: dict[str, Any], status: str, error: Exception | None = None +) -> None: + """Send a message to an Amazon SNS topic about the status of the Carbon run. + + When Carbon is run in the 'stage' environment, subscribers to the 'carbon-ecs-stage' + topic receive an email with the published message. For a given run, two messages are + published: + + 1. When status = 'start', a message indicating the Carbon run has started. + 2. When status = 'start'/'fail', a message indicating if the Carbon run has + successfully completed or encountered an error. + + Args: + config_values (dict[str, Any]): A dictionary of required environment variables + for running the feed. + status (str): The status of the Carbon run that is used to determine the message + published by SNS. The following values are accepted: 'start', 'success', + and 'fail'. + error (Exception | None, optional): The exception thrown for a failed Carbon run. + Defaults to None. + """ + sns_client = boto3.client("sns") + sns_id = config_values.get("SNS_TOPIC") + stage = config_values.get("SYMPLECTIC_FTP_PATH", "").lstrip("/").split("/")[0] + feed = config_values.get("FEED_TYPE", "") + + if status == "start": + sns_client.publish( + TopicArn=sns_id, + Subject="Carbon run", + Message=( + f"[{datetime.now(tz=UTC).isoformat()}] Starting carbon run for the " + f"{feed} feed in the {stage} environment." + ), + ) + elif status == "success": + sns_client.publish( + TopicArn=sns_id, + Subject="Carbon run", + Message=( + f"[{datetime.now(tz=UTC).isoformat()}] Finished carbon run for the " + f"{feed} feed in the {stage} environment." + ), + ) + logger.info("Carbon run has successfully completed.") + elif status == "fail": + sns_client.publish( + TopicArn=sns_id, + Subject="Carbon run", + Message=( + f"[{datetime.now(tz=UTC).isoformat()}] The following problem was " + f"encountered during the carbon run for the {feed} feed " + f"in the {stage} environment: {error}." + ), + ) + logger.info("Carbon run has failed.") diff --git a/tests/conftest.py b/tests/conftest.py index c0c374b..409f8b9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,13 +8,12 @@ import pytest import yaml from botocore.stub import ANY, Stubber -from lxml.builder import E as B from lxml.builder import ElementMaker from pyftpdlib.authorizers import DummyAuthorizer from pyftpdlib.handlers import TLS_FTPHandler from pyftpdlib.servers import FTPServer -from carbon.db import aa_articles, dlcs, engine, metadata, orcids, persons +from carbon.database import aa_articles, dlcs, engine, metadata, orcids, persons @pytest.fixture(scope="session", autouse=True) @@ -25,7 +24,7 @@ def _app_init(): @pytest.fixture(autouse=True) def _test_env(ftp_server): - ftp_socket, ftp_directory = ftp_server + ftp_socket, _ = ftp_server os.environ["FEED_TYPE"] = "test_feed_type" os.environ["LOG_LEVEL"] = "INFO" os.environ["SENTRY_DSN"] = "None" @@ -41,71 +40,18 @@ def _test_env(ftp_server): ) -@pytest.fixture(scope="session") -def records(): - current_dir = os.path.dirname(os.path.realpath(__file__)) - data = os.path.join(current_dir, "fixtures/data.yml") - with open(data) as fp: - return list(yaml.safe_load_all(fp)) - - -@pytest.fixture(scope="session") -def aa_data(): - current_dir = os.path.dirname(os.path.realpath(__file__)) - data = os.path.join(current_dir, "fixtures/articles.yml") - with open(data) as fp: - return list(yaml.safe_load_all(fp)) - - -@pytest.fixture(scope="session") -def ftp_server(): - """Starts an FTPS server with an empty temp dir. - - This fixture returns a tuple with the socketname and the path to the - serving directory. The socketname is a tuple with host and port. - - Use the ``ftp_server`` wrapper fixture instead as it will clean the - directory before each test. - """ - s = socket.socket() - s.bind(("", 0)) - fixtures = os.path.join(os.path.dirname(os.path.realpath(__file__)), "fixtures") - with tempfile.TemporaryDirectory() as d: - auth = DummyAuthorizer() - auth.add_user("user", "pass", d, perm="elradfmwMT") - handler = TLS_FTPHandler - handler.certfile = os.path.join(fixtures, "server.crt") - handler.keyfile = os.path.join(fixtures, "server.key") - handler.authorizer = auth - server = FTPServer(s, handler) - t = threading.Thread(target=server.serve_forever, daemon=1) - t.start() - yield s.getsockname(), d - - @pytest.fixture -def ftp_server_wrapper(ftp_server): - """Wrapper around ``_ftp_server`` to clean directory before each test.""" - d = ftp_server[1] - for f in os.listdir(d): - fpath = os.path.join(d, f) - if os.path.isfile(fpath): - os.unlink(fpath) - return ftp_server - - -@pytest.fixture -def _load_data(records, aa_data): +def _load_data(people_records, articles_records): with closing(engine().connect()) as connection: connection.execute(persons.delete()) connection.execute(orcids.delete()) connection.execute(dlcs.delete()) connection.execute(aa_articles.delete()) - for r in records: - connection.execute(persons.insert(), r["person"]) - connection.execute(orcids.insert(), r["orcid"]) - connection.execute(dlcs.insert(), r["dlc"]) - connection.execute(aa_articles.insert(), aa_data) + for record in people_records: + connection.execute(persons.insert(), record["person"]) + connection.execute(orcids.insert(), record["orcid"]) + connection.execute(dlcs.insert(), record["dlc"]) + connection.execute(aa_articles.insert(), articles_records) connection.commit() yield with closing(engine().connect()) as connection: @@ -116,61 +62,43 @@ def _load_data(records, aa_data): @pytest.fixture -def xml_records(e): - return [ - e.record( - e.field("123456", {"name": "[Proprietary_ID]"}), - e.field("FOOBAR", {"name": "[Username]"}), - e.field("F B", {"name": "[Initials]"}), - e.field("Gaz", {"name": "[LastName]"}), - e.field("Foobar", {"name": "[FirstName]"}), - e.field("foobar@example.com", {"name": "[Email]"}), - e.field("MIT", {"name": "[AuthenticatingAuthority]"}), - e.field("1", {"name": "[IsAcademic]"}), - e.field("1", {"name": "[IsCurrent]"}), - e.field("1", {"name": "[LoginAllowed]"}), - e.field("Chemistry Faculty", {"name": "[PrimaryGroupDescriptor]"}), - e.field("2001-01-01", {"name": "[ArriveDate]"}), - e.field("2010-01-01", {"name": "[LeaveDate]"}), - e.field("http://example.com/1", {"name": "[Generic01]"}), - e.field("CFAT", {"name": "[Generic02]"}), - e.field("SCIENCE AREA", {"name": "[Generic03]"}), - e.field("Chemistry", {"name": "[Generic04]"}), - e.field(name="[Generic05]"), - ), - e.record( - e.field("098754", name="[Proprietary_ID]"), - e.field("THOR", name="[Username]"), - e.field("Þ H", name="[Initials]"), - e.field("Hammerson", name="[LastName]"), - e.field("Þorgerðr", name="[FirstName]"), - e.field("thor@example.com", name="[Email]"), - e.field("MIT", {"name": "[AuthenticatingAuthority]"}), - e.field("1", {"name": "[IsAcademic]"}), - e.field("1", {"name": "[IsCurrent]"}), - e.field("1", {"name": "[LoginAllowed]"}), - e.field( - "Nuclear Science Non-faculty", - {"name": "[PrimaryGroupDescriptor]"}, +def articles_element(): + element_maker = ElementMaker() + return element_maker.ARTICLES( + element_maker.ARTICLE( + element_maker.AA_MATCH_SCORE("0.9"), + element_maker.ARTICLE_ID("1234567"), + element_maker.ARTICLE_TITLE( + "Interaction between hatsopoulos microfluids and " + "the Yawning Abyss of Chaos ☈." ), - e.field("2015-01-01", {"name": "[ArriveDate]"}), - e.field("2999-12-31", {"name": "[LeaveDate]"}), - e.field("http://example.com/2", {"name": "[Generic01]"}), - e.field("COAC", {"name": "[Generic02]"}), - e.field("ENGINEERING AREA", {"name": "[Generic03]"}), - e.field("Nuclear Science", {"name": "[Generic04]"}), - e.field("Nuclear Science and Engineering", {"name": "[Generic05]"}), - ), - ] + element_maker.ARTICLE_YEAR("1999"), + element_maker.AUTHORS("McRandallson, Randall M.|Lord, Dark|☭"), + element_maker.DOI("10.0000/1234LETTERS56"), + element_maker.ISSN_ELECTRONIC("0987654"), + element_maker.ISSN_PRINT("01234567"), + element_maker.IS_CONFERENCE_PROCEEDING("0"), + element_maker.JOURNAL_FIRST_PAGE("666"), + element_maker.JOURNAL_LAST_PAGE("666"), + element_maker.JOURNAL_ISSUE("10"), + element_maker.JOURNAL_VOLUME("1"), + element_maker.JOURNAL_NAME("Bunnies"), + element_maker.MIT_ID("123456789"), + element_maker.PUBLISHER("MIT Press"), + ) + ) -@pytest.fixture -def people_data(e, xml_records): - return e.records(*xml_records) +@pytest.fixture(scope="session") +def articles_records(): + current_dir = os.path.dirname(os.path.realpath(__file__)) + data = os.path.join(current_dir, "fixtures/articles.yml") + with open(data) as file: + return list(yaml.safe_load_all(file)) @pytest.fixture -def e(): +def people_element_maker(): return ElementMaker( namespace="http://www.symplectic.co.uk/hrimporter", nsmap={None: "http://www.symplectic.co.uk/hrimporter"}, @@ -178,42 +106,121 @@ def e(): @pytest.fixture -def articles_data(): - return B.ARTICLES( - B.ARTICLE( - B.AA_MATCH_SCORE("0.9"), - B.ARTICLE_ID("1234567"), - B.ARTICLE_TITLE( - "Interaction between hatsopoulos microfluids and " - "the Yawning Abyss of Chaos ☈." +def people_element(people_element_maker): + people_elements = [ + people_element_maker.record( + people_element_maker.field("123456", {"name": "[Proprietary_ID]"}), + people_element_maker.field("FOOBAR", {"name": "[Username]"}), + people_element_maker.field("F B", {"name": "[Initials]"}), + people_element_maker.field("Gaz", {"name": "[LastName]"}), + people_element_maker.field("Foobar", {"name": "[FirstName]"}), + people_element_maker.field("foobar@example.com", {"name": "[Email]"}), + people_element_maker.field("MIT", {"name": "[AuthenticatingAuthority]"}), + people_element_maker.field("1", {"name": "[IsAcademic]"}), + people_element_maker.field("1", {"name": "[IsCurrent]"}), + people_element_maker.field("1", {"name": "[LoginAllowed]"}), + people_element_maker.field( + "Chemistry Faculty", {"name": "[PrimaryGroupDescriptor]"} ), - B.ARTICLE_YEAR("1999"), - B.AUTHORS("McRandallson, Randall M.|Lord, Dark|☭"), - B.DOI("10.0000/1234LETTERS56"), - B.ISSN_ELECTRONIC("0987654"), - B.ISSN_PRINT("01234567"), - B.IS_CONFERENCE_PROCEEDING("0"), - B.JOURNAL_FIRST_PAGE("666"), - B.JOURNAL_LAST_PAGE("666"), - B.JOURNAL_ISSUE("10"), - B.JOURNAL_VOLUME("1"), - B.JOURNAL_NAME("Bunnies"), - B.MIT_ID("123456789"), - B.PUBLISHER("MIT Press"), - ) - ) + people_element_maker.field("2001-01-01", {"name": "[ArriveDate]"}), + people_element_maker.field("2010-01-01", {"name": "[LeaveDate]"}), + people_element_maker.field("http://example.com/1", {"name": "[Generic01]"}), + people_element_maker.field("CFAT", {"name": "[Generic02]"}), + people_element_maker.field("SCIENCE AREA", {"name": "[Generic03]"}), + people_element_maker.field("Chemistry", {"name": "[Generic04]"}), + people_element_maker.field(name="[Generic05]"), + ), + people_element_maker.record( + people_element_maker.field("098754", name="[Proprietary_ID]"), + people_element_maker.field("THOR", name="[Username]"), + people_element_maker.field("Þ H", name="[Initials]"), + people_element_maker.field("Hammerson", name="[LastName]"), + people_element_maker.field("Þorgerðr", name="[FirstName]"), + people_element_maker.field("thor@example.com", name="[Email]"), + people_element_maker.field("MIT", {"name": "[AuthenticatingAuthority]"}), + people_element_maker.field("1", {"name": "[IsAcademic]"}), + people_element_maker.field("1", {"name": "[IsCurrent]"}), + people_element_maker.field("1", {"name": "[LoginAllowed]"}), + people_element_maker.field( + "Nuclear Science Non-faculty", + {"name": "[PrimaryGroupDescriptor]"}, + ), + people_element_maker.field("2015-01-01", {"name": "[ArriveDate]"}), + people_element_maker.field("2999-12-31", {"name": "[LeaveDate]"}), + people_element_maker.field("http://example.com/2", {"name": "[Generic01]"}), + people_element_maker.field("COAC", {"name": "[Generic02]"}), + people_element_maker.field("ENGINEERING AREA", {"name": "[Generic03]"}), + people_element_maker.field("Nuclear Science", {"name": "[Generic04]"}), + people_element_maker.field( + "Nuclear Science and Engineering", {"name": "[Generic05]"} + ), + ), + ] + + return people_element_maker.records(*people_elements) + + +@pytest.fixture(scope="session") +def people_records(): + current_dir = os.path.dirname(os.path.realpath(__file__)) + data = os.path.join(current_dir, "fixtures/data.yml") + with open(data) as file: + return list(yaml.safe_load_all(file)) + + +@pytest.fixture +def feed_type(request, monkeypatch): + monkeypatch.setenv("FEED_TYPE", request.param) + return request.param + + +@pytest.fixture(scope="session") +def ftp_server(): + """Starts an FTPS server with an empty temp dir. + + This fixture returns a tuple with the socketname and the path to the + serving directory. The socketname is a tuple with host and port. + + Use the ``ftp_server`` wrapper fixture instead as it will clean the + directory before each test. + """ + ftp_socket = socket.socket() + ftp_socket.bind(("", 0)) + fixtures = os.path.join(os.path.dirname(os.path.realpath(__file__)), "fixtures") + with tempfile.TemporaryDirectory() as ftp_directory: + auth = DummyAuthorizer() + auth.add_user("user", "pass", ftp_directory, perm="elradfmwMT") + handler = TLS_FTPHandler + handler.certfile = os.path.join(fixtures, "server.crt") + handler.keyfile = os.path.join(fixtures, "server.key") + handler.authorizer = auth + server = FTPServer(ftp_socket, handler) + thread = threading.Thread(target=server.serve_forever, daemon=1) + thread.start() + yield ftp_socket.getsockname(), ftp_directory + + +@pytest.fixture +def ftp_server_wrapper(ftp_server): + """Wrapper around ``_ftp_server`` to clean directory before each test.""" + ftp_directory = ftp_server[1] + for file in os.listdir(ftp_directory): + file_path = os.path.join(ftp_directory, file) + if os.path.isfile(file_path): + os.unlink(file_path) + return ftp_server @pytest.fixture def reader(): class Reader: - def __init__(self, fp): - self.fp = fp + def __init__(self, file): + self.file = file self.data = b"" def __call__(self): while 1: - data = self.fp.read(1024) + data = self.file.read(1024) if not data: break self.data += data @@ -221,12 +228,6 @@ def __call__(self): return Reader -@pytest.fixture -def feed_type(request, monkeypatch): - monkeypatch.setenv("FEED_TYPE", request.param) - return request.param - - @pytest.fixture def symplectic_ftp_path(request, monkeypatch): monkeypatch.setenv("SYMPLECTIC_FTP_PATH", request.param) diff --git a/tests/test_app.py b/tests/test_app.py index 62859a7..37090a5 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -7,87 +7,98 @@ from lxml import etree as ET from carbon.app import ( - NSMAP, - FTPReader, + FtpFileWriter, PipeWriter, Writer, add_child, article_feed, articles, - group_name, - initials, - ns, + get_group_name, + get_initials, people, person_feed, - sns_log, ) from carbon.config import load_config_values +from carbon.helpers import sns_log pytestmark = pytest.mark.usefixtures("_load_data") +symplectic_elements_namespace = "http://www.symplectic.co.uk/hrimporter" +qualified_tag_name = ET.QName(symplectic_elements_namespace, tag="records") +nsmap = {None: symplectic_elements_namespace} def test_people_generates_people(): - peeps = list(people()) - person = peeps[0] + people_records = list(people()) + person = people_records[0] assert person["KRB_NAME_UPPERCASE"] == "FOOBAR" - person = peeps[1] + person = people_records[1] assert person["KRB_NAME_UPPERCASE"] == "THOR" def test_people_adds_orcids(): - peeps = list(people()) - assert peeps[0]["ORCID"] == "http://example.com/1" + people_records = list(people()) + assert people_records[0]["ORCID"] == "http://example.com/1" -def test_people_excludes_no_emails(): - peeps = list(people()) - no_email = [x for x in peeps if x["EMAIL_ADDRESS"] is not None] - assert len(no_email) == len(peeps) +def test_people_excludes_records_without_email(): + people_records = list(people()) + people_without_emails = [ + person for person in people_records if person["EMAIL_ADDRESS"] is None + ] + assert len(people_without_emails) == 0 -def test_people_excludes_no_lastname(): - peeps = list(people()) - no_email = [x for x in peeps if x["LAST_NAME"] is not None] - assert len(no_email) == len(peeps) +def test_people_excludes_records_without_last_name(): + people_records = list(people()) + people_without_last_names = [ + person for person in people_records if person["LAST_NAME"] is None + ] + assert len(people_without_last_names) == 0 -def test_people_excludes_no_kerberos(): - peeps = list(people()) - no_email = [x for x in peeps if x["KRB_NAME_UPPERCASE"] is not None] - assert len(no_email) == len(peeps) +def test_people_excludes_records_without_kerberos(): + people_records = list(people()) + people_without_kerberos = [ + person for person in people_records if person["KRB_NAME_UPPERCASE"] is None + ] + assert len(people_without_kerberos) == 0 -def test_people_excludes_no_mitid(): - peeps = list(people()) - no_email = [x for x in peeps if x["MIT_ID"] is not None] - assert len(no_email) == len(peeps) +def test_people_excludes_records_without_mit_id(): + people_records = list(people()) + people_without_mit_id = [ + person for person in people_records if person["MIT_ID"] is None + ] + assert len(people_without_mit_id) == 0 def test_initials_returns_first_and_middle(): - assert initials("Foo", "Bar") == "F B" - assert initials("Foo") == "F" - assert initials("F", "B") == "F B" - assert initials("Foo-bar", "Gaz") == "F-B G" - assert initials("Foo Bar-baz", "G") == "F B-B G" - assert initials("Foo", "") == "F" - assert initials("Foo", None) == "F" - assert initials("Gull-Þóris") == "G-Þ" - assert initials("владимир", "ильич", "ленин") == "В И Л" # noqa: RUF001 - assert initials("F. M.", "Laxdæla") == "F M L" - - -def test_add_child_adds_child_element(e): - xml = e.records(e.record("foobar", {"baz": "bazbar"})) - element = ET.Element(ns("records"), nsmap=NSMAP) + assert get_initials("Foo", "Bar") == "F B" + assert get_initials("Foo") == "F" + assert get_initials("F", "B") == "F B" + assert get_initials("Foo-bar", "Gaz") == "F-B G" + assert get_initials("Foo Bar-baz", "G") == "F B-B G" + assert get_initials("Foo", "") == "F" + assert get_initials("Foo", None) == "F" + assert get_initials("Gull-Þóris") == "G-Þ" + assert get_initials("владимир", "ильич", "ленин") == "В И Л" # noqa: RUF001 + assert get_initials("F. M.", "Laxdæla") == "F M L" + + +def test_add_child_adds_child_element(people_element_maker): + xml = people_element_maker.records( + people_element_maker.record("foobar", {"baz": "bazbar"}) + ) + element = ET.Element(_tag=qualified_tag_name, nsmap=nsmap) add_child(element, "record", "foobar", baz="bazbar") assert ET.tostring(element) == ET.tostring(xml) def test_writer_writes_person_feed(): - b = BytesIO() - w = Writer(b) - w.write("people") - xml = ET.XML(b.getvalue()) + output_file = BytesIO() + writer = Writer(output_file) + writer.write("people") + xml = ET.XML(output_file.getvalue()) xp = xml.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", namespaces={"s": "http://www.symplectic.co.uk/hrimporter"}, @@ -96,94 +107,98 @@ def test_writer_writes_person_feed(): def test_pipewriter_writes_person_feed(reader): - r, w = os.pipe() - with open(r, "rb") as fr, open(w, "wb") as fw: - wtr = PipeWriter(fw) - rdr = reader(fr) - wtr.pipe(rdr).write("people") - xml = ET.XML(rdr.data) - xp = xml.xpath( + read_file, write_file = os.pipe() + with open(read_file, "rb") as buffered_reader, open( + write_file, "wb" + ) as buffered_writer: + file = reader(buffered_reader) + writer = PipeWriter(input_file=buffered_writer, ftp_output_file=file) + writer.write("people") + people_element = ET.XML(file.data) + people_first_names_xpath = people_element.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", namespaces={"s": "http://www.symplectic.co.uk/hrimporter"}, ) - assert xp[1].text == "Þorgerðr" - - -def test_ftpreader_sends_file(ftp_server_wrapper): - s, d = ftp_server_wrapper - b = BytesIO(b"Storin' some bits in the FTPz") - ftp = FTPReader(b, "user", "pass", "/warez", port=s[1]) + assert people_first_names_xpath[1].text == "Þorgerðr" + + +def test_ftp_file_writer_sends_file(ftp_server_wrapper): + ftp_socket, ftp_directory = ftp_server_wrapper + feed = BytesIO(b"File uploaded to FTP server.") + ftp = FtpFileWriter( + content_feed=feed, + user="user", + password="pass", # noqa: S106 + path="/DEV", + port=ftp_socket[1], + ) ftp() - with open(os.path.join(d, "warez")) as fp: - assert fp.read() == "Storin' some bits in the FTPz" + with open(os.path.join(ftp_directory, "DEV")) as file: + assert file.read() == "File uploaded to FTP server." def test_person_feed_uses_namespace(): - b = BytesIO() - with person_feed(b): + output_file = BytesIO() + with person_feed(output_file): pass - root = ET.XML(b.getvalue()) + root = ET.XML(output_file.getvalue()) assert root.tag == "{http://www.symplectic.co.uk/hrimporter}records" -def test_person_feed_adds_person(records, xml_records, e): - b = BytesIO() - xml = e.records(xml_records[0]) - r = records[0]["person"].copy() - r.update(records[0]["orcid"]) - r.update(records[0]["dlc"]) - with person_feed(b) as f: - f(r) - xml = ET.XML(b.getvalue()) - xp = xml.xpath( +def test_person_feed_adds_person(people_records): + output_file = BytesIO() + record = people_records[0]["person"].copy() + record |= people_records[0]["orcid"] | people_records[0]["dlc"] + with person_feed(output_file) as write_to_file: + write_to_file(record) + person_element = ET.XML(output_file.getvalue()) + person_first_name_xpath = person_element.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", namespaces={"s": "http://www.symplectic.co.uk/hrimporter"}, ) - assert xp[0].text == "Foobar" - - -def test_person_feed_uses_utf8_encoding(records, xml_records, e): - b = BytesIO() - xml = e.records(xml_records[1]) - r = records[1]["person"].copy() - r.update(records[1]["orcid"]) - r.update(records[1]["dlc"]) - with person_feed(b) as f: - f(r) - xml = ET.XML(b.getvalue()) - xp = xml.xpath( + assert person_first_name_xpath[0].text == "Foobar" + + +def test_person_feed_uses_utf8_encoding(people_records): + output_file = BytesIO() + record = people_records[1]["person"].copy() + record |= people_records[1]["orcid"] | people_records[1]["dlc"] + with person_feed(output_file) as write_to_file: + write_to_file(record) + person_element = ET.XML(output_file.getvalue()) + person_first_name_xpath = person_element.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", namespaces={"s": "http://www.symplectic.co.uk/hrimporter"}, ) - assert xp[0].text == "Þorgerðr" + assert person_first_name_xpath[0].text == "Þorgerðr" def test_group_name_adds_faculty(): - assert group_name("FOOBAR", "CFAT") == "FOOBAR Faculty" - assert group_name("FOOBAR", "CFAN") == "FOOBAR Faculty" + assert get_group_name("FOOBAR", "CFAT") == "FOOBAR Faculty" + assert get_group_name("FOOBAR", "CFAN") == "FOOBAR Faculty" def test_group_name_adds_non_faculty(): - assert group_name("FOOBAR", "COAC") == "FOOBAR Non-faculty" + assert get_group_name("FOOBAR", "COAC") == "FOOBAR Non-faculty" def test_articles_generates_articles(): - arts = list(articles()) - assert "Yawning Abyss of Chaos" in arts[0]["ARTICLE_TITLE"] + articles_records = list(articles()) + assert "Yawning Abyss of Chaos" in articles_records[0]["ARTICLE_TITLE"] -def test_article_feed_adds_article(aa_data, articles_data): - b = BytesIO() - with article_feed(b) as f: - f(aa_data[0]) - assert b.getvalue() == ET.tostring( - articles_data, encoding="UTF-8", xml_declaration=True +def test_article_feed_adds_article(articles_records, articles_element): + output_file = BytesIO() + with article_feed(output_file) as write_to_file: + write_to_file(articles_records[0]) + assert output_file.getvalue() == ET.tostring( + articles_element, encoding="UTF-8", xml_declaration=True ) def test_articles_skips_articles_without_required_fields(): - arts = list(articles()) - assert len(arts) == 1 + articles_records = list(articles()) + assert len(articles_records) == 1 @freeze_time("2023-08-18") diff --git a/tests/test_cli.py b/tests/test_cli.py index 960dd7c..15ef748 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -7,7 +7,7 @@ from lxml import etree as ET from carbon.cli import main -from carbon.db import engine +from carbon.database import engine @pytest.fixture @@ -24,11 +24,11 @@ def test_people_returns_people( feed_type, symplectic_ftp_path, runner, - people_data, + people_element, ftp_server, stubbed_sns_client, ): - ftp_socket, ftp_directory = ftp_server + _, ftp_directory = ftp_server with patch("boto3.client") as mocked_boto_client: mocked_boto_client.return_value = stubbed_sns_client @@ -40,7 +40,7 @@ def test_people_returns_people( people_element, encoding="UTF-8", xml_declaration=True ) assert people_xml_string == ET.tostring( - people_data, encoding="UTF-8", xml_declaration=True + people_element, encoding="UTF-8", xml_declaration=True ) @@ -53,11 +53,11 @@ def test_articles_returns_articles( feed_type, symplectic_ftp_path, runner, - articles_data, + articles_element, ftp_server, stubbed_sns_client, ): - ftp_socket, ftp_directory = ftp_server + _, ftp_directory = ftp_server with patch("boto3.client") as mocked_boto_client: mocked_boto_client.return_value = stubbed_sns_client @@ -69,7 +69,7 @@ def test_articles_returns_articles( articles_element, encoding="UTF-8", xml_declaration=True ) assert articles_xml_string == ET.tostring( - articles_data, encoding="UTF-8", xml_declaration=True + articles_element, encoding="UTF-8", xml_declaration=True ) @@ -114,7 +114,7 @@ def test_cli_connection_tests_success(caplog, runner): def test_cli_connection_tests_fail(caplog, ftp_server, monkeypatch, runner): - ftp_socket, ftp_directory = ftp_server + ftp_socket, _ = ftp_server # override engine from pytest fixture # configure with connection string that will error out with engine.connect() diff --git a/tests/test_db.py b/tests/test_db.py index afc3e1a..297a06f 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,6 +1,6 @@ import pytest -from carbon.db import DatabaseEngine +from carbon.database import DatabaseEngine def test_nonconfigured_engine_raises_attributeerror():