From 75ccbe2f337124cdb72f54c1db376f522efce004 Mon Sep 17 00:00:00 2001 From: jonavellecuerdo Date: Fri, 1 Sep 2023 11:27:17 -0400 Subject: [PATCH 1/5] IN-905 Rename db.py to database.py --- carbon/app.py | 2 +- carbon/cli.py | 2 +- carbon/{db.py => database.py} | 0 tests/conftest.py | 2 +- tests/test_cli.py | 2 +- tests/test_db.py | 2 +- 6 files changed, 5 insertions(+), 5 deletions(-) rename carbon/{db.py => database.py} (100%) diff --git a/carbon/app.py b/carbon/app.py index d356526..6bfe766 100644 --- a/carbon/app.py +++ b/carbon/app.py @@ -14,7 +14,7 @@ from lxml import etree as ET # nosec from sqlalchemy import func, select -from carbon.db import aa_articles, dlcs, engine, orcids, persons +from carbon.database import aa_articles, dlcs, engine, orcids, persons if TYPE_CHECKING: from collections.abc import Callable, Generator diff --git a/carbon/cli.py b/carbon/cli.py index b08bee7..8e1e755 100644 --- a/carbon/cli.py +++ b/carbon/cli.py @@ -5,7 +5,7 @@ from carbon.app import FTPFeeder, sns_log from carbon.config import configure_logger, configure_sentry, load_config_values -from carbon.db import engine +from carbon.database import engine logger = logging.getLogger(__name__) diff --git a/carbon/db.py b/carbon/database.py similarity index 100% rename from carbon/db.py rename to carbon/database.py diff --git a/tests/conftest.py b/tests/conftest.py index c0c374b..3dfac11 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,7 +14,7 @@ from pyftpdlib.handlers import TLS_FTPHandler from pyftpdlib.servers import FTPServer -from carbon.db import aa_articles, dlcs, engine, metadata, orcids, persons +from carbon.database import aa_articles, dlcs, engine, metadata, orcids, persons @pytest.fixture(scope="session", autouse=True) diff --git a/tests/test_cli.py b/tests/test_cli.py index 960dd7c..5dac7b8 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -7,7 +7,7 @@ from lxml import etree as ET from carbon.cli import main -from carbon.db import engine +from carbon.database import engine @pytest.fixture diff --git a/tests/test_db.py b/tests/test_db.py index afc3e1a..297a06f 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,6 +1,6 @@ import pytest -from carbon.db import DatabaseEngine +from carbon.database import DatabaseEngine def test_nonconfigured_engine_raises_attributeerror(): From 8e1b767409e21a30f8bc004cf77c2b66c5629eff Mon Sep 17 00:00:00 2001 From: jonavellecuerdo Date: Fri, 1 Sep 2023 11:55:42 -0400 Subject: [PATCH 2/5] IN-905 Refine code naming conventions and in-code documentation Why these changes are being introduced: * Improve code coherency and repo structure How this addresses that need: * Rename FTPFeed class to DatabaseToFtpPipe * Rename FTPReader to FtpFileWriter * Rename PipeWriter.pipe to PipeWriter.connect * Reorganize app module so classes appear after methods * Add docstrings to cli and app modules Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/IN-905 --- carbon/app.py | 338 ++++++++++++++++++++++------------------------ carbon/cli.py | 40 +++--- carbon/helpers.py | 47 +++++++ tests/test_app.py | 14 +- 4 files changed, 240 insertions(+), 199 deletions(-) create mode 100644 carbon/helpers.py diff --git a/carbon/app.py b/carbon/app.py index 6bfe766..c079afa 100644 --- a/carbon/app.py +++ b/carbon/app.py @@ -5,12 +5,11 @@ import re import threading from contextlib import closing, contextmanager -from datetime import UTC, datetime +from datetime import datetime from ftplib import FTP, FTP_TLS # nosec from functools import partial from typing import IO, TYPE_CHECKING, Any -import boto3 from lxml import etree as ET # nosec from sqlalchemy import func, select @@ -234,134 +233,6 @@ def add_child( return child -class Writer: - """A Symplectic Elements feed writer. - - Use this class to generate and output an HR or AA feed for Symplectic - Elements. - """ - - def __init__(self, out: IO): - self.out = out - - def write(self, feed_type: str) -> None: - """Write the specified feed type to the configured output.""" - if feed_type == "people": - with person_feed(self.out) as f: - for person in people(): - f(person) - elif feed_type == "articles": - with article_feed(self.out) as f: - for article in articles(): - f(article) - - -class PipeWriter(Writer): - """A read/write :class:`carbon.app.Writer`. - - This class is intended to provide a buffered read/write connecter. The - :meth:`~carbon.app.PipeWriter.pipe` method should be called before - writing to configure the reader end. For example:: - - PipeWriter(fp_out).pipe(reader).write('people') - - See :class:`carbon.app.FTPReader` for an example reader. - """ - - def write(self, feed_type: str) -> None: - """Concurrently read/write from the configured inputs and outputs. - - This method will block until both the reader and writer are finished. - """ - pipe = threading.Thread(target=self._reader) - pipe.start() - super().write(feed_type) - self.out.close() - pipe.join() - - def pipe(self, reader: FTPReader) -> PipeWriter: - """Connect the read end of the pipe. - - This should be called before :meth:`~carbon.app.PipeWriter.write`. - """ - self._reader = reader - return self - - -class CarbonCopyFTPS(FTP_TLS): - """FTP_TLS subclass with support for SSL session reuse. - - The stdlib version of FTP_TLS creates a new SSL session for data - transfer commands. This results in a cryptic OpenSSL error message - when a server requires SSL session reuse. The ntransfercmd here takes - advantage of the new session parameter to wrap_socket that was added - in 3.6. - - Additionally, in the stdlib, storbinary destroys the SSL session after - transfering the file. Since the session has been shared with the - command connection, OpenSSL will once again generate a cryptic error - message for subsequent commands. The modified storbinary method here - removes the unwrap call. Calling quit on the ftp connection should - still cleanly shutdown the connection. - """ - - def ntransfercmd(self, cmd: str, rest: str | int | None = None) -> tuple[socket, int]: - conn, size = FTP.ntransfercmd(self, cmd, rest) - if self._prot_p: # type: ignore[attr-defined] - conn = self.context.wrap_socket( - conn, server_hostname=self.host, session=self.sock.session # type: ignore[union-attr] # noqa: E501 - ) - return conn, size - - def storbinary( - self, - cmd: str, - fp: IO, # type: ignore[override] - blocksize: int = 8192, - callback: Callable | None = None, - rest: str | None = None, # type: ignore[override] - ) -> str: - self.voidcmd("TYPE I") - with self.transfercmd(cmd, rest) as conn: - while 1: - buf = fp.read(blocksize) - if not buf: - break - conn.sendall(buf) - if callback: - callback(buf) - return self.voidresp() - - -class FTPReader: - def __init__( - self, - fp: IO, - user: str, - passwd: str, - path: str, - host: str = "localhost", - port: int = 21, - ctx: SSLContext | None = None, - ): - self.fp = fp - self.user = user - self.passwd = passwd - self.path = path - self.host = host - self.port = port - self.ctx = ctx - - def __call__(self) -> None: - """Transfer a file using FTP over TLS.""" - ftps = CarbonCopyFTPS(context=self.ctx, timeout=30) - ftps.connect(self.host, self.port) - ftps.login(self.user, self.passwd) - ftps.prot_p() - ftps.storbinary("STOR " + self.path, self.fp) - ftps.quit() - - @contextmanager def person_feed(out: IO) -> Generator: """Generate XML feed of people. @@ -455,7 +326,168 @@ def _add_person(xf: IO, person: dict[str, Any]) -> None: xf.write(record) -class FTPFeeder: +class CarbonCopyFTPS(FTP_TLS): + """FTP_TLS subclass with support for SSL session reuse. + + The stdlib version of FTP_TLS creates a new SSL session for data + transfer commands. This results in a cryptic OpenSSL error message + when a server requires SSL session reuse. The ntransfercmd here takes + advantage of the new session parameter to wrap_socket that was added + in 3.6. + + Additionally, in the stdlib, storbinary destroys the SSL session after + transfering the file. Since the session has been shared with the + command connection, OpenSSL will once again generate a cryptic error + message for subsequent commands. The modified storbinary method here + removes the unwrap call. Calling quit on the ftp connection should + still cleanly shutdown the connection. + """ + + def ntransfercmd(self, cmd: str, rest: str | int | None = None) -> tuple[socket, int]: + conn, size = FTP.ntransfercmd(self, cmd, rest) + if self._prot_p: # type: ignore[attr-defined] + conn = self.context.wrap_socket( + conn, server_hostname=self.host, session=self.sock.session # type: ignore[union-attr] # noqa: E501 + ) + return conn, size + + def storbinary( + self, + cmd: str, + fp: IO, # type: ignore[override] + blocksize: int = 8192, + callback: Callable | None = None, + rest: str | None = None, # type: ignore[override] + ) -> str: + self.voidcmd("TYPE I") + with self.transfercmd(cmd, rest) as conn: + while 1: + buf = fp.read(blocksize) + if not buf: + break + conn.sendall(buf) + if callback: + callback(buf) + return self.voidresp() + + +class Writer: + """A Symplectic Elements feed writer. + + Use this class to generate and output an HR or AA feed for Symplectic + Elements. + """ + + def __init__(self, out: IO): + self.out = out + + def write(self, feed_type: str) -> None: + """Write the specified feed type to the configured output.""" + if feed_type == "people": + with person_feed(self.out) as f: + for person in people(): + f(person) + elif feed_type == "articles": + with article_feed(self.out) as f: + for article in articles(): + f(article) + + +class PipeWriter(Writer): + """A read/write :class:`carbon.app.Writer`. + + This class is intended to provide a buffered read/write connecter. The + :meth:`~carbon.app.PipeWriter.pipe` method should be called before + writing to configure the reader end. For example:: + + PipeWriter(fp_out).pipe(reader).write('people') + + See :class:`carbon.app.FTPReader` for an example reader. + """ + + def write(self, feed_type: str) -> None: + """Concurrently read/write from the configured inputs and outputs. + + This method will block until both the reader and writer are finished. + """ + pipe = threading.Thread(target=self._reader) + pipe.start() + super().write(feed_type) + self.out.close() + pipe.join() + + def connect(self, reader: FtpFileWriter) -> PipeWriter: + """Connect the read end of the pipe. + + This should be called before :meth:`~carbon.app.PipeWriter.write`. + """ + self._reader = reader + return self + + +class FtpFileWriter: + """A file writer for the Symplectic Elements FTP server. + + The FtpFileWriter will read data from a provided feed and write the contents + from the feed to a file on the Symplectic Elements FTP server. + + Attributes: + content_feed: A file-like object (stream) that contains the records + from the Data Warehouse. + user: The username for accessing the Symplectic FTP server. + password: The password for accessing the Symplectic FTP server. + path: The full file path to the XML file (including the file name) that is + uploaded to the Symplectic FTP server. + host: The hostname of the Symplectic FTP server. + port: The port of the Symplectic FTP server. + """ + + def __init__( + self, + content_feed: IO, + user: str, + password: str, + path: str, + host: str = "localhost", + port: int = 21, + ctx: SSLContext | None = None, + ): + self.content_feed = content_feed + self.user = user + self.password = password + self.path = path + self.host = host + self.port = port + self.ctx = ctx + + def __call__(self) -> None: + """Transfer a file using FTP over TLS.""" + ftps = CarbonCopyFTPS(context=self.ctx, timeout=30) + ftps.connect(host=self.host, port=self.port) + ftps.login(user=self.user, passwd=self.password) + ftps.prot_p() + ftps.storbinary(cmd=f"STOR {self.path}", fp=self.content_feed) + ftps.quit() + + +class DatabaseToFtpPipe: + """A pipe feeding data from the Data Warehouse to the Symplectic Elements FTP server. + + The feed consists of a pipe that connects 'read' and 'write' file-like objects + (streams) that allows for one-way passing of information to each other. The flow of + data is as follows: + + 1. The records from the Data Warehouse are transformed into normalized + XML strings and are concurrently written to the 'write' file stream + one record at a time. + + 2. The connected 'read' file stream concurrently transfers data from the + 'write' file stream into an XML file on the Elements FTP server. + + Attributes: + config: A dictionary of required environment variables for running the feed. + """ + def __init__( self, event: dict[str, str], @@ -470,7 +502,7 @@ def run(self) -> None: r, w = os.pipe() feed_type = self.event["feed_type"] with open(r, "rb") as fp_r, open(w, "wb") as fp_w: - ftp_rdr = FTPReader( + ftp_file_writer = FtpFileWriter( fp_r, self.config["SYMPLECTIC_FTP_USER"], self.config["SYMPLECTIC_FTP_PASS"], @@ -479,7 +511,7 @@ def run(self) -> None: int(self.config["SYMPLECTIC_FTP_PORT"]), self.ssl_ctx, ) - PipeWriter(out=fp_w).pipe(ftp_rdr).write(feed_type) + PipeWriter(out=fp_w).connect(reader=ftp_file_writer).write(feed_type) def run_connection_test(self) -> None: """Test connection to the Symplectic Elements FTP server. @@ -506,43 +538,3 @@ def run_connection_test(self) -> None: else: logger.info("Successfully connected to the Symplectic Elements FTP server") ftps.quit() - - -def sns_log( - config_values: dict[str, Any], status: str, error: Exception | None = None -) -> None: - sns_client = boto3.client("sns") - sns_id = config_values.get("SNS_TOPIC") - stage = config_values.get("SYMPLECTIC_FTP_PATH", "").lstrip("/").split("/")[0] - feed = config_values.get("FEED_TYPE", "") - - if status == "start": - sns_client.publish( - TopicArn=sns_id, - Subject="Carbon run", - Message=( - f"[{datetime.now(tz=UTC).isoformat()}] Starting carbon run for the " - f"{feed} feed in the {stage} environment." - ), - ) - elif status == "success": - sns_client.publish( - TopicArn=sns_id, - Subject="Carbon run", - Message=( - f"[{datetime.now(tz=UTC).isoformat()}] Finished carbon run for the " - f"{feed} feed in the {stage} environment." - ), - ) - logger.info("Carbon run has successfully completed.") - elif status == "fail": - sns_client.publish( - TopicArn=sns_id, - Subject="Carbon run", - Message=( - f"[{datetime.now(tz=UTC).isoformat()}] The following problem was " - f"encountered during the carbon run for the {feed} feed " - f"in the {stage} environment: {error}." - ), - ) - logger.info("Carbon run has failed.") diff --git a/carbon/cli.py b/carbon/cli.py index 8e1e755..a673cae 100644 --- a/carbon/cli.py +++ b/carbon/cli.py @@ -3,9 +3,10 @@ import click -from carbon.app import FTPFeeder, sns_log +from carbon.app import DatabaseToFtpPipe from carbon.config import configure_logger, configure_sentry, load_config_values from carbon.database import engine +from carbon.helpers import sns_log logger = logging.getLogger(__name__) @@ -14,24 +15,19 @@ @click.version_option() @click.option("--run_connection_tests", is_flag=True) def main(*, run_connection_tests: bool) -> None: - """Generate feeds for Symplectic Elements. - - Specify which FEED_TYPE should be generated. This should be either - 'people' or 'articles'. - - The data is pulled from a database identified by --db, which should - be a valid SQLAlchemy database connection string. This can also be - omitted and pulled from an environment variable named CARBON_DB. For - oracle use: - - oracle://:@:1521/ - - By default, the feed will be printed to stdout. If -o/--out is used the - output will be written to the specified file instead. - - Alternatively, the --ftp switch can be used to send the output to an FTP - server. The server should support FTP over TLS. Only one of -o/--out or - --ftp should be used. + """Generate a data feed that uploads XML files to the Symplectic Elements FTP server. + + The feed uses a SQLAlchemy engine to connect to the Data Warehouse. A query is + submitted to the Data Warehouse to retrieve either 'people' or 'articles' records + depending on the 'FEED_TYPE' environment variable. Several transforms are applied + to normalize the records before it is converted to an XML-formatted string. + The feed builds a pipe that will concurrently read data from the Data Warehouse + and write the normalized, XML-formatted string to an XML file on the Elements + FTP server. For security purposes, the server should support FTP over TLS. + + [wip] By default, the feed will write to an XML file on the Elements FTP server. + If the -o/--out argument is used, the output will be written to the specified + file instead. This latter option is recommended for testing purposes. """ config_values = load_config_values() # [TEMP]: The connection string must use 'oracle+oracledb' to differentiate @@ -52,13 +48,13 @@ def main(*, run_connection_tests: bool) -> None: engine.run_connection_test() # test connection to the Symplectic Elements FTP server - ftp_feed = FTPFeeder({"feed_type": config_values["FEED_TYPE"]}, config_values) - ftp_feed.run_connection_test() + pipe = DatabaseToFtpPipe({"feed_type": config_values["FEED_TYPE"]}, config_values) + pipe.run_connection_test() if not run_connection_tests: sns_log(config_values=config_values, status="start") try: - ftp_feed.run() + pipe.run() except Exception as error: # noqa: BLE001 sns_log(config_values=config_values, status="fail", error=error) else: diff --git a/carbon/helpers.py b/carbon/helpers.py new file mode 100644 index 0000000..78841ce --- /dev/null +++ b/carbon/helpers.py @@ -0,0 +1,47 @@ +import logging +from datetime import UTC, datetime +from typing import Any + +import boto3 + +logger = logging.getLogger(__name__) + + +def sns_log( + config_values: dict[str, Any], status: str, error: Exception | None = None +) -> None: + sns_client = boto3.client("sns") + sns_id = config_values.get("SNS_TOPIC") + stage = config_values.get("SYMPLECTIC_FTP_PATH", "").lstrip("/").split("/")[0] + feed = config_values.get("FEED_TYPE", "") + + if status == "start": + sns_client.publish( + TopicArn=sns_id, + Subject="Carbon run", + Message=( + f"[{datetime.now(tz=UTC).isoformat()}] Starting carbon run for the " + f"{feed} feed in the {stage} environment." + ), + ) + elif status == "success": + sns_client.publish( + TopicArn=sns_id, + Subject="Carbon run", + Message=( + f"[{datetime.now(tz=UTC).isoformat()}] Finished carbon run for the " + f"{feed} feed in the {stage} environment." + ), + ) + logger.info("Carbon run has successfully completed.") + elif status == "fail": + sns_client.publish( + TopicArn=sns_id, + Subject="Carbon run", + Message=( + f"[{datetime.now(tz=UTC).isoformat()}] The following problem was " + f"encountered during the carbon run for the {feed} feed " + f"in the {stage} environment: {error}." + ), + ) + logger.info("Carbon run has failed.") diff --git a/tests/test_app.py b/tests/test_app.py index 62859a7..8a31908 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -8,7 +8,7 @@ from carbon.app import ( NSMAP, - FTPReader, + FtpFileWriter, PipeWriter, Writer, add_child, @@ -19,9 +19,9 @@ ns, people, person_feed, - sns_log, ) from carbon.config import load_config_values +from carbon.helpers import sns_log pytestmark = pytest.mark.usefixtures("_load_data") @@ -100,7 +100,7 @@ def test_pipewriter_writes_person_feed(reader): with open(r, "rb") as fr, open(w, "wb") as fw: wtr = PipeWriter(fw) rdr = reader(fr) - wtr.pipe(rdr).write("people") + wtr.connect(rdr).write("people") xml = ET.XML(rdr.data) xp = xml.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", @@ -112,7 +112,13 @@ def test_pipewriter_writes_person_feed(reader): def test_ftpreader_sends_file(ftp_server_wrapper): s, d = ftp_server_wrapper b = BytesIO(b"Storin' some bits in the FTPz") - ftp = FTPReader(b, "user", "pass", "/warez", port=s[1]) + ftp = FtpFileWriter( + content_feed=b, + user="user", + password="pass", # noqa: S106 + path="/warez", + port=s[1], + ) ftp() with open(os.path.join(d, "warez")) as fp: assert fp.read() == "Storin' some bits in the FTPz" From 4eaa76437803549b7dbe258f3f3ef19f1cc7d0c9 Mon Sep 17 00:00:00 2001 From: jonavellecuerdo Date: Fri, 1 Sep 2023 14:00:45 -0400 Subject: [PATCH 3/5] IN-905 Deprecate unused context arguments to simplify use --- carbon/app.py | 30 ++++++++++++------------------ carbon/cli.py | 2 +- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/carbon/app.py b/carbon/app.py index c079afa..d65802b 100644 --- a/carbon/app.py +++ b/carbon/app.py @@ -18,7 +18,6 @@ if TYPE_CHECKING: from collections.abc import Callable, Generator from socket import socket - from ssl import SSLContext logger = logging.getLogger(__name__) @@ -450,7 +449,6 @@ def __init__( path: str, host: str = "localhost", port: int = 21, - ctx: SSLContext | None = None, ): self.content_feed = content_feed self.user = user @@ -458,11 +456,10 @@ def __init__( self.path = path self.host = host self.port = port - self.ctx = ctx def __call__(self) -> None: """Transfer a file using FTP over TLS.""" - ftps = CarbonCopyFTPS(context=self.ctx, timeout=30) + ftps = CarbonCopyFTPS(timeout=30) ftps.connect(host=self.host, port=self.port) ftps.login(user=self.user, passwd=self.password) ftps.prot_p() @@ -490,28 +487,25 @@ class DatabaseToFtpPipe: def __init__( self, - event: dict[str, str], config: dict, - ssl_ctx: SSLContext | None = None, ): - self.event = event self.config = config - self.ssl_ctx = ssl_ctx def run(self) -> None: r, w = os.pipe() - feed_type = self.event["feed_type"] + with open(r, "rb") as fp_r, open(w, "wb") as fp_w: ftp_file_writer = FtpFileWriter( - fp_r, - self.config["SYMPLECTIC_FTP_USER"], - self.config["SYMPLECTIC_FTP_PASS"], - self.config["SYMPLECTIC_FTP_PATH"], - self.config["SYMPLECTIC_FTP_HOST"], - int(self.config["SYMPLECTIC_FTP_PORT"]), - self.ssl_ctx, + content_feed=fp_r, + user=self.config["SYMPLECTIC_FTP_USER"], + password=self.config["SYMPLECTIC_FTP_PASS"], + path=self.config["SYMPLECTIC_FTP_PATH"], + host=self.config["SYMPLECTIC_FTP_HOST"], + port=int(self.config["SYMPLECTIC_FTP_PORT"]), + ) + PipeWriter(out=fp_w).connect(reader=ftp_file_writer).write( + feed_type=self.config["FEED_TYPE"] ) - PipeWriter(out=fp_w).connect(reader=ftp_file_writer).write(feed_type) def run_connection_test(self) -> None: """Test connection to the Symplectic Elements FTP server. @@ -521,7 +515,7 @@ def run_connection_test(self) -> None: """ logger.info("Testing connection to the Symplectic Elements FTP server") try: - ftps = CarbonCopyFTPS(context=self.ssl_ctx, timeout=30) + ftps = CarbonCopyFTPS(timeout=30) ftps.connect( self.config["SYMPLECTIC_FTP_HOST"], int(self.config["SYMPLECTIC_FTP_PORT"]), diff --git a/carbon/cli.py b/carbon/cli.py index a673cae..828d67d 100644 --- a/carbon/cli.py +++ b/carbon/cli.py @@ -48,7 +48,7 @@ def main(*, run_connection_tests: bool) -> None: engine.run_connection_test() # test connection to the Symplectic Elements FTP server - pipe = DatabaseToFtpPipe({"feed_type": config_values["FEED_TYPE"]}, config_values) + pipe = DatabaseToFtpPipe(config=config_values) pipe.run_connection_test() if not run_connection_tests: From a5228be54fb13359e71f58b042a6f8aa979e4291 Mon Sep 17 00:00:00 2001 From: jonavellecuerdo Date: Fri, 1 Sep 2023 16:49:59 -0400 Subject: [PATCH 4/5] IN-905 Miscellaneous updates * Add and update docstrings * Simplify PipeWriter class * Additional updates to variable and class names * Move transform functions to helpers.py --- carbon/app.py | 404 +++++++++++++++++++++++---------------------- carbon/cli.py | 4 +- carbon/database.py | 6 - carbon/helpers.py | 43 +++++ tests/conftest.py | 243 +++++++++++++-------------- tests/test_app.py | 147 +++++++++-------- tests/test_cli.py | 8 +- 7 files changed, 453 insertions(+), 402 deletions(-) diff --git a/carbon/app.py b/carbon/app.py index d65802b..437637c 100644 --- a/carbon/app.py +++ b/carbon/app.py @@ -2,7 +2,6 @@ import logging import os -import re import threading from contextlib import closing, contextmanager from datetime import datetime @@ -14,6 +13,7 @@ from sqlalchemy import func, select from carbon.database import aa_articles, dlcs, engine, orcids, persons +from carbon.helpers import group_name, hire_date_string, initials if TYPE_CHECKING: from collections.abc import Callable, Generator @@ -21,6 +21,7 @@ logger = logging.getLogger(__name__) +# variables used in query for retrieving 'people' records AREAS = ( "ARCHITECTURE & PLANNING AREA", "ENGINEERING AREA", @@ -96,172 +97,18 @@ "PART-TIME FLEXIBLE/LL", ) -ENV_VARS = ( - "FTP_USER", - "FTP_PASS", - "FTP_PATH", - "FTP_HOST", - "FTP_PORT", - "CARBON_DB", -) - - -def people() -> Generator[dict[str, Any], Any, None]: - """A person generator. - - Returns an iterator of person dictionaries. - """ - sql = ( - select( - persons.c.MIT_ID, - persons.c.KRB_NAME_UPPERCASE, - persons.c.FIRST_NAME, - persons.c.MIDDLE_NAME, - persons.c.LAST_NAME, - persons.c.EMAIL_ADDRESS, - persons.c.DATE_TO_FACULTY, - persons.c.ORIGINAL_HIRE_DATE, - dlcs.c.DLC_NAME, - persons.c.PERSONNEL_SUBAREA_CODE, - persons.c.APPOINTMENT_END_DATE, - orcids.c.ORCID, - dlcs.c.ORG_HIER_SCHOOL_AREA_NAME, - dlcs.c.HR_ORG_LEVEL5_NAME, - ) - .select_from(persons) - .outerjoin(orcids) - .join(dlcs) - .where(persons.c.EMAIL_ADDRESS.is_not(None)) - .where(persons.c.LAST_NAME.is_not(None)) - .where(persons.c.KRB_NAME_UPPERCASE.is_not(None)) - .where(persons.c.KRB_NAME_UPPERCASE != "UNKNOWN") - .where(persons.c.MIT_ID.is_not(None)) - .where(persons.c.ORIGINAL_HIRE_DATE.is_not(None)) - .where( - persons.c.APPOINTMENT_END_DATE # noqa: SIM300 - >= datetime(2009, 1, 1) # noqa: DTZ001 - ) - .where(func.upper(dlcs.c.ORG_HIER_SCHOOL_AREA_NAME).in_(AREAS)) - .where(persons.c.PERSONNEL_SUBAREA_CODE.in_(PS_CODES)) - .where(func.upper(persons.c.JOB_TITLE).in_(TITLES)) - ) - with closing(engine().connect()) as connection: - result = connection.execute(sql) - for row in result: - yield dict(zip(result.keys(), row, strict=True)) - - -def articles() -> Generator[dict[str, Any], Any, None]: - """An article generator. - - Returns an iterator over the AA_ARTICLE table. - """ - sql = ( - select(aa_articles) - .where(aa_articles.c.ARTICLE_ID.is_not(None)) - .where(aa_articles.c.ARTICLE_TITLE.is_not(None)) - .where(aa_articles.c.DOI.is_not(None)) - .where(aa_articles.c.MIT_ID.is_not(None)) - ) - with closing(engine().connect()) as connection: - result = connection.execute(sql) - for row in result: - yield dict(zip(result.keys(), row, strict=True)) - - -def initials(*args: str) -> str: - """Turn `*args` into a space-separated string of initials. - - Each argument is processed through :func:`~initialize_part` and - the resulting list is joined with a space. - """ - return " ".join([initialize_part(n) for n in args if n]) - - -def initialize_part(name: str) -> str: - """Turn a name part into uppercased initials. - - This function will do its best to parse the argument into one or - more initials. The first step is to remove any character that is - not alphanumeric, whitespace or a hyphen. The remaining string - is split on word boundaries, retaining both the words and the - boundaries. The first character of each list member is then - joined together, uppercased and returned. - - Some examples:: - - assert initialize_part('Foo Bar') == 'F B' - assert initialize_part('F. Bar-Baz') == 'F B-B' - assert initialize_part('Foo-bar') == 'F-B' - assert initialize_part(u'влад') == u'В' - - """ # noqa: RUF002 - name = re.sub(r"[^\w\s-]", "", name, flags=re.UNICODE) - return "".join([x[:1] for x in re.split(r"(\W+)", name, flags=re.UNICODE)]).upper() - - -def group_name(dlc: str, sub_area: str) -> str: - qualifier = "Faculty" if sub_area in ("CFAT", "CFAN") else "Non-faculty" - return f"{dlc} {qualifier}" - - -def hire_date_string(original_start_date: datetime, date_to_faculty: datetime) -> str: - if date_to_faculty: - return date_to_faculty.strftime("%Y-%m-%d") - return original_start_date.strftime("%Y-%m-%d") - - -def _ns(namespace: str, element: str) -> ET.QName: - return ET.QName(namespace, element) - - -SYMPLECTIC_NS = "http://www.symplectic.co.uk/hrimporter" -NSMAP = {None: SYMPLECTIC_NS} -ns = partial(_ns, SYMPLECTIC_NS) - - -def add_child( - parent: ET._Element, # noqa: SLF001 - element: str, - text: str | None = None, - **kwargs: str, -) -> ET._Element: # noqa: SLF001 - """Add a subelement with text.""" - child = ET.SubElement(parent, element, attrib=kwargs) - child.text = text - return child - - -@contextmanager -def person_feed(out: IO) -> Generator: - """Generate XML feed of people. - This is a streaming XML generator for people. Output will be - written to the provided output destination which can be a file - or file-like object. The context manager returns a function - which can be called repeatedly to add a person to the feed:: +def _add_article(xml_file: IO, article: dict[str, Any]) -> None: + """Create an XML element representing an article and write it to an output file. - with person_feed(sys.stdout) as f: - f({"MIT_ID": "1234", ...}) - f({"MIT_ID": "5678", ...}) + The function will create a single 'ARTICLE' element that contains subelements + representing fields in a record from the 'AA_ARTICLE table'. The 'ARTICLE' + element is written to an XML file. + Args: + xml_file (IO): A file-like object (stream) that writes to an XML file. + article (dict[str, Any]): A record from the AA_ARTICLE table. """ - with ET.xmlfile(out, encoding="UTF-8") as xf: - xf.write_declaration() - with xf.element(ns("records"), nsmap=NSMAP): - yield partial(_add_person, xf) - - -@contextmanager -def article_feed(out: IO) -> Generator: - """Generate XML feed of articles.""" - with ET.xmlfile(out, encoding="UTF-8") as xf: - xf.write_declaration() - with xf.element("ARTICLES"): - yield partial(_add_article, xf) - - -def _add_article(xf: IO, article: dict[str, Any]) -> None: record = ET.Element("ARTICLE") add_child(record, "AA_MATCH_SCORE", str(article["AA_MATCH_SCORE"])) add_child(record, "ARTICLE_ID", article["ARTICLE_ID"]) @@ -279,10 +126,23 @@ def _add_article(xf: IO, article: dict[str, Any]) -> None: add_child(record, "JOURNAL_NAME", article["JOURNAL_NAME"]) add_child(record, "MIT_ID", article["MIT_ID"]) add_child(record, "PUBLISHER", article["PUBLISHER"]) - xf.write(record) + xml_file.write(record) -def _add_person(xf: IO, person: dict[str, Any]) -> None: +def _add_person(xml_file: IO, person: dict[str, Any]) -> None: + """Create an XML element representing a person and write it to an output file. + + The function will create a single 'record' element that contains subelements + representing fields from the 'HR_PERSON_EMPLOYEE_LIMITED', 'HR_ORG_UNIT', + and 'ORCID_TO_MITID' tables. The 'record' element is written to + to an XML file. + + Args: + xml_file (IO): A file-like object (stream) that writes to an XML file. + person (dict[str, Any]): A record from from a data model that includes fields + from the 'HR_PERSON_EMPLOYEE_LIMITED', 'HR_ORG_UNIT', and 'ORCID_TO_MITID' + tables. + """ record = ET.Element("record") add_child(record, "field", person["MIT_ID"], name="[Proprietary_ID]") add_child(record, "field", person["KRB_NAME_UPPERCASE"], name="[Username]") @@ -322,10 +182,149 @@ def _add_person(xf: IO, person: dict[str, Any]) -> None: add_child(record, "field", person["ORG_HIER_SCHOOL_AREA_NAME"], name="[Generic03]") add_child(record, "field", person["DLC_NAME"], name="[Generic04]") add_child(record, "field", person.get("HR_ORG_LEVEL5_NAME"), name="[Generic05]") - xf.write(record) + xml_file.write(record) -class CarbonCopyFTPS(FTP_TLS): +def add_child( + parent: ET._Element, # noqa: SLF001 + element: str, + text: str | None = None, + **kwargs: str, +) -> ET._Element: # noqa: SLF001 + """Add a subelement to an existing element. + + Args: + parent (ET._Element): The root element. + element: The tag name assigned to the subelement. + text (str | None, optional): _description_. Defaults to None. + **kwargs (str): See lxml.etree.SubElement for more details. + + Returns: + ET._Element: The subelement. + """ + child = ET.SubElement(parent, element, attrib=kwargs) + child.text = text + return child + + +@contextmanager +def article_feed(output_file: IO) -> Generator: + """Generate XML feed of articles. + + Args: + output_file (IO): A file-like object (stream) into which normalized XML strings + strings are written. + + Yields: + Generator: A function that adds an 'ARTICLE' element to an 'ARTICLES' + root element. + """ + with ET.xmlfile(output_file, encoding="UTF-8") as xf: + xf.write_declaration() + with xf.element("ARTICLES"): + yield partial(_add_article, xf) + + +def articles() -> Generator[dict[str, Any], Any, None]: + """Create a generator of article records. + + Yields: + Generator[dict[str, Any], Any, None]: Results matching the query submitted to + the Data Warehouse for retrieving 'articles' records. + """ + sql = ( + select(aa_articles) + .where(aa_articles.c.ARTICLE_ID.is_not(None)) + .where(aa_articles.c.ARTICLE_TITLE.is_not(None)) + .where(aa_articles.c.DOI.is_not(None)) + .where(aa_articles.c.MIT_ID.is_not(None)) + ) + with closing(engine().connect()) as connection: + result = connection.execute(sql) + for row in result: + yield dict(zip(result.keys(), row, strict=True)) + + +@contextmanager +def person_feed(output_file: IO) -> Generator: + """Generate XML feed of people. + + This is a streaming XML generator for people. Output will be + written to the provided output destination which can be a file + or file-like object. The context manager returns a function + which can be called repeatedly to add a person to the feed:: + + with person_feed(sys.stdout) as f: + f({"MIT_ID": "1234", ...}) + f({"MIT_ID": "5678", ...}) + + Args: + output_file (IO): A file-like object (stream) into which normalized XML + strings are written. + + Yields: + Generator: A function that adds a 'record' element to a 'records' root element. + . + """ + # namespace assigned to the xmlns attribute of the root 'records' element + symplectic_elements_namespace = "http://www.symplectic.co.uk/hrimporter" + qualified_tag_name = ET.QName(symplectic_elements_namespace, tag="records") + nsmap = {None: symplectic_elements_namespace} + + with ET.xmlfile(output_file, encoding="UTF-8") as xml_file: + xml_file.write_declaration() + with xml_file.element(tag=qualified_tag_name, nsmap=nsmap): + yield partial(_add_person, xml_file) + + +def people() -> Generator[dict[str, Any], Any, None]: + """Create a generator of 'people' records. + + Yields: + Generator[dict[str, Any], Any, None]: Results matching the query submitted to + the Data Warehouse for retrieving 'people' records. + """ + sql = ( + select( + persons.c.MIT_ID, + persons.c.KRB_NAME_UPPERCASE, + persons.c.FIRST_NAME, + persons.c.MIDDLE_NAME, + persons.c.LAST_NAME, + persons.c.EMAIL_ADDRESS, + persons.c.DATE_TO_FACULTY, + persons.c.ORIGINAL_HIRE_DATE, + dlcs.c.DLC_NAME, + persons.c.PERSONNEL_SUBAREA_CODE, + persons.c.APPOINTMENT_END_DATE, + orcids.c.ORCID, + dlcs.c.ORG_HIER_SCHOOL_AREA_NAME, + dlcs.c.HR_ORG_LEVEL5_NAME, + ) + .select_from(persons) + .outerjoin(orcids) + .join(dlcs) + .where(persons.c.EMAIL_ADDRESS.is_not(None)) + .where(persons.c.LAST_NAME.is_not(None)) + .where(persons.c.KRB_NAME_UPPERCASE.is_not(None)) + .where(persons.c.KRB_NAME_UPPERCASE != "UNKNOWN") + .where(persons.c.MIT_ID.is_not(None)) + .where(persons.c.ORIGINAL_HIRE_DATE.is_not(None)) + .where( + persons.c.APPOINTMENT_END_DATE # noqa: SIM300 + >= datetime(2009, 1, 1) # noqa: DTZ001 + ) + .where(func.upper(dlcs.c.ORG_HIER_SCHOOL_AREA_NAME).in_(AREAS)) + .where(persons.c.PERSONNEL_SUBAREA_CODE.in_(PS_CODES)) + .where(func.upper(persons.c.JOB_TITLE).in_(TITLES)) + ) + with closing(engine().connect()) as connection: + result = connection.execute(sql) + for row in result: + yield dict(zip(result.keys(), row, strict=True)) + + +class CarbonFtpsTls(FTP_TLS): """FTP_TLS subclass with support for SSL session reuse. The stdlib version of FTP_TLS creates a new SSL session for data @@ -340,6 +339,9 @@ class CarbonCopyFTPS(FTP_TLS): message for subsequent commands. The modified storbinary method here removes the unwrap call. Calling quit on the ftp connection should still cleanly shutdown the connection. + + Attributes: + See ftplib.FTP_TLS for more details. """ def ntransfercmd(self, cmd: str, rest: str | int | None = None) -> tuple[socket, int]: @@ -371,58 +373,58 @@ def storbinary( class Writer: - """A Symplectic Elements feed writer. + """A writer that outputs normalized XML strings to a specified file. + + Use this class to generate and output an HR or AA feed. - Use this class to generate and output an HR or AA feed for Symplectic - Elements. + Attributes: + output_file: A file-like object (stream) into which normalized XML + strings are written. """ - def __init__(self, out: IO): - self.out = out + def __init__(self, output_file: IO): + self.output_file = output_file def write(self, feed_type: str) -> None: """Write the specified feed type to the configured output.""" if feed_type == "people": - with person_feed(self.out) as f: + with person_feed(self.output_file) as f: for person in people(): f(person) elif feed_type == "articles": - with article_feed(self.out) as f: + with article_feed(self.output_file) as f: for article in articles(): f(article) class PipeWriter(Writer): - """A read/write :class:`carbon.app.Writer`. - - This class is intended to provide a buffered read/write connecter. The - :meth:`~carbon.app.PipeWriter.pipe` method should be called before - writing to configure the reader end. For example:: + """A read/write carbon.app.Writer for the Symplectic Elements FTP server. - PipeWriter(fp_out).pipe(reader).write('people') + This class is intended to provide a buffered read/write connecter. - See :class:`carbon.app.FTPReader` for an example reader. + Attributes: + output_file: A file-like object (stream) into which normalized XML + strings are written. + ftp_output_file: A file-like object (stream) that reads data from + PipeWriter().output_file and writes its contents to an XML file + on the Symplectic Elements FTP server. """ + def __init__(self, input_file: IO, ftp_output_file: Callable): + super().__init__(input_file) + self.ftp_output_file = ftp_output_file + def write(self, feed_type: str) -> None: """Concurrently read/write from the configured inputs and outputs. This method will block until both the reader and writer are finished. """ - pipe = threading.Thread(target=self._reader) + pipe = threading.Thread(target=self.ftp_output_file) pipe.start() super().write(feed_type) - self.out.close() + self.output_file.close() pipe.join() - def connect(self, reader: FtpFileWriter) -> PipeWriter: - """Connect the read end of the pipe. - - This should be called before :meth:`~carbon.app.PipeWriter.write`. - """ - self._reader = reader - return self - class FtpFileWriter: """A file writer for the Symplectic Elements FTP server. @@ -459,7 +461,7 @@ def __init__( def __call__(self) -> None: """Transfer a file using FTP over TLS.""" - ftps = CarbonCopyFTPS(timeout=30) + ftps = CarbonFtpsTls(timeout=30) ftps.connect(host=self.host, port=self.port) ftps.login(user=self.user, passwd=self.password) ftps.prot_p() @@ -492,18 +494,20 @@ def __init__( self.config = config def run(self) -> None: - r, w = os.pipe() + read_file, write_file = os.pipe() - with open(r, "rb") as fp_r, open(w, "wb") as fp_w: - ftp_file_writer = FtpFileWriter( - content_feed=fp_r, + with open(read_file, "rb") as buffered_reader, open( + write_file, "wb" + ) as buffered_writer: + ftp_file = FtpFileWriter( + content_feed=buffered_reader, user=self.config["SYMPLECTIC_FTP_USER"], password=self.config["SYMPLECTIC_FTP_PASS"], path=self.config["SYMPLECTIC_FTP_PATH"], host=self.config["SYMPLECTIC_FTP_HOST"], port=int(self.config["SYMPLECTIC_FTP_PORT"]), ) - PipeWriter(out=fp_w).connect(reader=ftp_file_writer).write( + PipeWriter(input_file=buffered_writer, ftp_output_file=ftp_file).write( feed_type=self.config["FEED_TYPE"] ) @@ -515,10 +519,10 @@ def run_connection_test(self) -> None: """ logger.info("Testing connection to the Symplectic Elements FTP server") try: - ftps = CarbonCopyFTPS(timeout=30) + ftps = CarbonFtpsTls(timeout=30) ftps.connect( - self.config["SYMPLECTIC_FTP_HOST"], - int(self.config["SYMPLECTIC_FTP_PORT"]), + host=self.config["SYMPLECTIC_FTP_HOST"], + port=int(self.config["SYMPLECTIC_FTP_PORT"]), ) ftps.login( user=self.config["SYMPLECTIC_FTP_USER"], diff --git a/carbon/cli.py b/carbon/cli.py index 828d67d..2e6bbe0 100644 --- a/carbon/cli.py +++ b/carbon/cli.py @@ -20,9 +20,9 @@ def main(*, run_connection_tests: bool) -> None: The feed uses a SQLAlchemy engine to connect to the Data Warehouse. A query is submitted to the Data Warehouse to retrieve either 'people' or 'articles' records depending on the 'FEED_TYPE' environment variable. Several transforms are applied - to normalize the records before it is converted to an XML-formatted string. + to normalize the records before it is converted to an XML string. The feed builds a pipe that will concurrently read data from the Data Warehouse - and write the normalized, XML-formatted string to an XML file on the Elements + and write the normalized XML string to an XML file on the Elements FTP server. For security purposes, the server should support FTP over TLS. [wip] By default, the feed will write to an XML file on the Elements FTP server. diff --git a/carbon/database.py b/carbon/database.py index 7702268..ce9b5b7 100644 --- a/carbon/database.py +++ b/carbon/database.py @@ -1,5 +1,4 @@ import logging -import os from typing import Any from sqlalchemy import ( @@ -18,11 +17,8 @@ logger = logging.getLogger(__name__) -os.environ["NLS_LANG"] = "AMERICAN_AMERICA.UTF8" - metadata = MetaData() - persons = Table( "HR_PERSON_EMPLOYEE_LIMITED", metadata, @@ -53,7 +49,6 @@ Column("HR_ORG_LEVEL5_NAME", String), ) - orcids = Table( "ORCID_TO_MITID", metadata, @@ -61,7 +56,6 @@ Column("ORCID", String), ) - aa_articles = Table( "AA_ARTICLE", metadata, diff --git a/carbon/helpers.py b/carbon/helpers.py index 78841ce..8359ff8 100644 --- a/carbon/helpers.py +++ b/carbon/helpers.py @@ -1,4 +1,5 @@ import logging +import re from datetime import UTC, datetime from typing import Any @@ -7,6 +8,48 @@ logger = logging.getLogger(__name__) +def group_name(dlc: str, sub_area: str) -> str: + qualifier = "Faculty" if sub_area in ("CFAT", "CFAN") else "Non-faculty" + return f"{dlc} {qualifier}" + + +def hire_date_string(original_start_date: datetime, date_to_faculty: datetime) -> str: + if date_to_faculty: + return date_to_faculty.strftime("%Y-%m-%d") + return original_start_date.strftime("%Y-%m-%d") + + +def initialize_part(name: str) -> str: + """Turn a name part into uppercased initials. + + This function will do its best to parse the argument into one or + more initials. The first step is to remove any character that is + not alphanumeric, whitespace or a hyphen. The remaining string + is split on word boundaries, retaining both the words and the + boundaries. The first character of each list member is then + joined together, uppercased and returned. + + Some examples:: + + assert initialize_part('Foo Bar') == 'F B' + assert initialize_part('F. Bar-Baz') == 'F B-B' + assert initialize_part('Foo-bar') == 'F-B' + assert initialize_part(u'влад') == u'В' + + """ # noqa: RUF002 + name = re.sub(r"[^\w\s-]", "", name, flags=re.UNICODE) + return "".join([x[:1] for x in re.split(r"(\W+)", name, flags=re.UNICODE)]).upper() + + +def initials(*args: str) -> str: + """Turn `*args` into a space-separated string of initials. + + Each argument is processed through :func:`~initialize_part` and + the resulting list is joined with a space. + """ + return " ".join([initialize_part(n) for n in args if n]) + + def sns_log( config_values: dict[str, Any], status: str, error: Exception | None = None ) -> None: diff --git a/tests/conftest.py b/tests/conftest.py index 3dfac11..9f897b3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,7 +8,6 @@ import pytest import yaml from botocore.stub import ANY, Stubber -from lxml.builder import E as B from lxml.builder import ElementMaker from pyftpdlib.authorizers import DummyAuthorizer from pyftpdlib.handlers import TLS_FTPHandler @@ -41,22 +40,140 @@ def _test_env(ftp_server): ) +@pytest.fixture +def _load_data(people_records, articles_records): + with closing(engine().connect()) as connection: + connection.execute(persons.delete()) + connection.execute(orcids.delete()) + connection.execute(dlcs.delete()) + connection.execute(aa_articles.delete()) + for record in people_records: + connection.execute(persons.insert(), record["person"]) + connection.execute(orcids.insert(), record["orcid"]) + connection.execute(dlcs.insert(), record["dlc"]) + connection.execute(aa_articles.insert(), articles_records) + connection.commit() + yield + with closing(engine().connect()) as connection: + connection.execute(persons.delete()) + connection.execute(orcids.delete()) + connection.execute(dlcs.delete()) + connection.execute(aa_articles.delete()) + + +@pytest.fixture +def articles_element(): + element_maker = ElementMaker() + return element_maker.ARTICLES( + element_maker.ARTICLE( + element_maker.AA_MATCH_SCORE("0.9"), + element_maker.ARTICLE_ID("1234567"), + element_maker.ARTICLE_TITLE( + "Interaction between hatsopoulos microfluids and " + "the Yawning Abyss of Chaos ☈." + ), + element_maker.ARTICLE_YEAR("1999"), + element_maker.AUTHORS("McRandallson, Randall M.|Lord, Dark|☭"), + element_maker.DOI("10.0000/1234LETTERS56"), + element_maker.ISSN_ELECTRONIC("0987654"), + element_maker.ISSN_PRINT("01234567"), + element_maker.IS_CONFERENCE_PROCEEDING("0"), + element_maker.JOURNAL_FIRST_PAGE("666"), + element_maker.JOURNAL_LAST_PAGE("666"), + element_maker.JOURNAL_ISSUE("10"), + element_maker.JOURNAL_VOLUME("1"), + element_maker.JOURNAL_NAME("Bunnies"), + element_maker.MIT_ID("123456789"), + element_maker.PUBLISHER("MIT Press"), + ) + ) + + @pytest.fixture(scope="session") -def records(): +def articles_records(): current_dir = os.path.dirname(os.path.realpath(__file__)) - data = os.path.join(current_dir, "fixtures/data.yml") + data = os.path.join(current_dir, "fixtures/articles.yml") with open(data) as fp: return list(yaml.safe_load_all(fp)) +@pytest.fixture +def people_element_maker(): + return ElementMaker( + namespace="http://www.symplectic.co.uk/hrimporter", + nsmap={None: "http://www.symplectic.co.uk/hrimporter"}, + ) + + +@pytest.fixture +def people_element(people_element_maker): + people_elements = [ + people_element_maker.record( + people_element_maker.field("123456", {"name": "[Proprietary_ID]"}), + people_element_maker.field("FOOBAR", {"name": "[Username]"}), + people_element_maker.field("F B", {"name": "[Initials]"}), + people_element_maker.field("Gaz", {"name": "[LastName]"}), + people_element_maker.field("Foobar", {"name": "[FirstName]"}), + people_element_maker.field("foobar@example.com", {"name": "[Email]"}), + people_element_maker.field("MIT", {"name": "[AuthenticatingAuthority]"}), + people_element_maker.field("1", {"name": "[IsAcademic]"}), + people_element_maker.field("1", {"name": "[IsCurrent]"}), + people_element_maker.field("1", {"name": "[LoginAllowed]"}), + people_element_maker.field( + "Chemistry Faculty", {"name": "[PrimaryGroupDescriptor]"} + ), + people_element_maker.field("2001-01-01", {"name": "[ArriveDate]"}), + people_element_maker.field("2010-01-01", {"name": "[LeaveDate]"}), + people_element_maker.field("http://example.com/1", {"name": "[Generic01]"}), + people_element_maker.field("CFAT", {"name": "[Generic02]"}), + people_element_maker.field("SCIENCE AREA", {"name": "[Generic03]"}), + people_element_maker.field("Chemistry", {"name": "[Generic04]"}), + people_element_maker.field(name="[Generic05]"), + ), + people_element_maker.record( + people_element_maker.field("098754", name="[Proprietary_ID]"), + people_element_maker.field("THOR", name="[Username]"), + people_element_maker.field("Þ H", name="[Initials]"), + people_element_maker.field("Hammerson", name="[LastName]"), + people_element_maker.field("Þorgerðr", name="[FirstName]"), + people_element_maker.field("thor@example.com", name="[Email]"), + people_element_maker.field("MIT", {"name": "[AuthenticatingAuthority]"}), + people_element_maker.field("1", {"name": "[IsAcademic]"}), + people_element_maker.field("1", {"name": "[IsCurrent]"}), + people_element_maker.field("1", {"name": "[LoginAllowed]"}), + people_element_maker.field( + "Nuclear Science Non-faculty", + {"name": "[PrimaryGroupDescriptor]"}, + ), + people_element_maker.field("2015-01-01", {"name": "[ArriveDate]"}), + people_element_maker.field("2999-12-31", {"name": "[LeaveDate]"}), + people_element_maker.field("http://example.com/2", {"name": "[Generic01]"}), + people_element_maker.field("COAC", {"name": "[Generic02]"}), + people_element_maker.field("ENGINEERING AREA", {"name": "[Generic03]"}), + people_element_maker.field("Nuclear Science", {"name": "[Generic04]"}), + people_element_maker.field( + "Nuclear Science and Engineering", {"name": "[Generic05]"} + ), + ), + ] + + return people_element_maker.records(*people_elements) + + @pytest.fixture(scope="session") -def aa_data(): +def people_records(): current_dir = os.path.dirname(os.path.realpath(__file__)) - data = os.path.join(current_dir, "fixtures/articles.yml") + data = os.path.join(current_dir, "fixtures/data.yml") with open(data) as fp: return list(yaml.safe_load_all(fp)) +@pytest.fixture +def feed_type(request, monkeypatch): + monkeypatch.setenv("FEED_TYPE", request.param) + return request.param + + @pytest.fixture(scope="session") def ftp_server(): """Starts an FTPS server with an empty temp dir. @@ -94,116 +211,6 @@ def ftp_server_wrapper(ftp_server): return ftp_server -@pytest.fixture -def _load_data(records, aa_data): - with closing(engine().connect()) as connection: - connection.execute(persons.delete()) - connection.execute(orcids.delete()) - connection.execute(dlcs.delete()) - connection.execute(aa_articles.delete()) - for r in records: - connection.execute(persons.insert(), r["person"]) - connection.execute(orcids.insert(), r["orcid"]) - connection.execute(dlcs.insert(), r["dlc"]) - connection.execute(aa_articles.insert(), aa_data) - connection.commit() - yield - with closing(engine().connect()) as connection: - connection.execute(persons.delete()) - connection.execute(orcids.delete()) - connection.execute(dlcs.delete()) - connection.execute(aa_articles.delete()) - - -@pytest.fixture -def xml_records(e): - return [ - e.record( - e.field("123456", {"name": "[Proprietary_ID]"}), - e.field("FOOBAR", {"name": "[Username]"}), - e.field("F B", {"name": "[Initials]"}), - e.field("Gaz", {"name": "[LastName]"}), - e.field("Foobar", {"name": "[FirstName]"}), - e.field("foobar@example.com", {"name": "[Email]"}), - e.field("MIT", {"name": "[AuthenticatingAuthority]"}), - e.field("1", {"name": "[IsAcademic]"}), - e.field("1", {"name": "[IsCurrent]"}), - e.field("1", {"name": "[LoginAllowed]"}), - e.field("Chemistry Faculty", {"name": "[PrimaryGroupDescriptor]"}), - e.field("2001-01-01", {"name": "[ArriveDate]"}), - e.field("2010-01-01", {"name": "[LeaveDate]"}), - e.field("http://example.com/1", {"name": "[Generic01]"}), - e.field("CFAT", {"name": "[Generic02]"}), - e.field("SCIENCE AREA", {"name": "[Generic03]"}), - e.field("Chemistry", {"name": "[Generic04]"}), - e.field(name="[Generic05]"), - ), - e.record( - e.field("098754", name="[Proprietary_ID]"), - e.field("THOR", name="[Username]"), - e.field("Þ H", name="[Initials]"), - e.field("Hammerson", name="[LastName]"), - e.field("Þorgerðr", name="[FirstName]"), - e.field("thor@example.com", name="[Email]"), - e.field("MIT", {"name": "[AuthenticatingAuthority]"}), - e.field("1", {"name": "[IsAcademic]"}), - e.field("1", {"name": "[IsCurrent]"}), - e.field("1", {"name": "[LoginAllowed]"}), - e.field( - "Nuclear Science Non-faculty", - {"name": "[PrimaryGroupDescriptor]"}, - ), - e.field("2015-01-01", {"name": "[ArriveDate]"}), - e.field("2999-12-31", {"name": "[LeaveDate]"}), - e.field("http://example.com/2", {"name": "[Generic01]"}), - e.field("COAC", {"name": "[Generic02]"}), - e.field("ENGINEERING AREA", {"name": "[Generic03]"}), - e.field("Nuclear Science", {"name": "[Generic04]"}), - e.field("Nuclear Science and Engineering", {"name": "[Generic05]"}), - ), - ] - - -@pytest.fixture -def people_data(e, xml_records): - return e.records(*xml_records) - - -@pytest.fixture -def e(): - return ElementMaker( - namespace="http://www.symplectic.co.uk/hrimporter", - nsmap={None: "http://www.symplectic.co.uk/hrimporter"}, - ) - - -@pytest.fixture -def articles_data(): - return B.ARTICLES( - B.ARTICLE( - B.AA_MATCH_SCORE("0.9"), - B.ARTICLE_ID("1234567"), - B.ARTICLE_TITLE( - "Interaction between hatsopoulos microfluids and " - "the Yawning Abyss of Chaos ☈." - ), - B.ARTICLE_YEAR("1999"), - B.AUTHORS("McRandallson, Randall M.|Lord, Dark|☭"), - B.DOI("10.0000/1234LETTERS56"), - B.ISSN_ELECTRONIC("0987654"), - B.ISSN_PRINT("01234567"), - B.IS_CONFERENCE_PROCEEDING("0"), - B.JOURNAL_FIRST_PAGE("666"), - B.JOURNAL_LAST_PAGE("666"), - B.JOURNAL_ISSUE("10"), - B.JOURNAL_VOLUME("1"), - B.JOURNAL_NAME("Bunnies"), - B.MIT_ID("123456789"), - B.PUBLISHER("MIT Press"), - ) - ) - - @pytest.fixture def reader(): class Reader: @@ -221,12 +228,6 @@ def __call__(self): return Reader -@pytest.fixture -def feed_type(request, monkeypatch): - monkeypatch.setenv("FEED_TYPE", request.param) - return request.param - - @pytest.fixture def symplectic_ftp_path(request, monkeypatch): monkeypatch.setenv("SYMPLECTIC_FTP_PATH", request.param) diff --git a/tests/test_app.py b/tests/test_app.py index 8a31908..52fcec5 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -7,7 +7,6 @@ from lxml import etree as ET from carbon.app import ( - NSMAP, FtpFileWriter, PipeWriter, Writer, @@ -16,7 +15,6 @@ articles, group_name, initials, - ns, people, person_feed, ) @@ -24,43 +22,54 @@ from carbon.helpers import sns_log pytestmark = pytest.mark.usefixtures("_load_data") +symplectic_elements_namespace = "http://www.symplectic.co.uk/hrimporter" +qualified_tag_name = ET.QName(symplectic_elements_namespace, tag="records") +nsmap = {None: symplectic_elements_namespace} def test_people_generates_people(): - peeps = list(people()) - person = peeps[0] + people_records = list(people()) + person = people_records[0] assert person["KRB_NAME_UPPERCASE"] == "FOOBAR" - person = peeps[1] + person = people_records[1] assert person["KRB_NAME_UPPERCASE"] == "THOR" def test_people_adds_orcids(): - peeps = list(people()) - assert peeps[0]["ORCID"] == "http://example.com/1" + people_records = list(people()) + assert people_records[0]["ORCID"] == "http://example.com/1" -def test_people_excludes_no_emails(): - peeps = list(people()) - no_email = [x for x in peeps if x["EMAIL_ADDRESS"] is not None] - assert len(no_email) == len(peeps) +def test_people_excludes_records_without_email(): + people_records = list(people()) + people_without_emails = [ + person for person in people_records if person["EMAIL_ADDRESS"] is None + ] + assert len(people_without_emails) == 0 -def test_people_excludes_no_lastname(): - peeps = list(people()) - no_email = [x for x in peeps if x["LAST_NAME"] is not None] - assert len(no_email) == len(peeps) +def test_people_excludes_records_without_last_name(): + people_records = list(people()) + people_without_last_names = [ + person for person in people_records if person["LAST_NAME"] is None + ] + assert len(people_without_last_names) == 0 -def test_people_excludes_no_kerberos(): - peeps = list(people()) - no_email = [x for x in peeps if x["KRB_NAME_UPPERCASE"] is not None] - assert len(no_email) == len(peeps) +def test_people_excludes_records_without_kerberos(): + people_records = list(people()) + people_without_kerberos = [ + person for person in people_records if person["KRB_NAME_UPPERCASE"] is None + ] + assert len(people_without_kerberos) == 0 -def test_people_excludes_no_mitid(): - peeps = list(people()) - no_email = [x for x in peeps if x["MIT_ID"] is not None] - assert len(no_email) == len(peeps) +def test_people_excludes_records_without_mit_id(): + people_records = list(people()) + people_without_mit_id = [ + person for person in people_records if person["MIT_ID"] is None + ] + assert len(people_without_mit_id) == 0 def test_initials_returns_first_and_middle(): @@ -76,9 +85,11 @@ def test_initials_returns_first_and_middle(): assert initials("F. M.", "Laxdæla") == "F M L" -def test_add_child_adds_child_element(e): - xml = e.records(e.record("foobar", {"baz": "bazbar"})) - element = ET.Element(ns("records"), nsmap=NSMAP) +def test_add_child_adds_child_element(people_element_maker): + xml = people_element_maker.records( + people_element_maker.record("foobar", {"baz": "bazbar"}) + ) + element = ET.Element(_tag=qualified_tag_name, nsmap=nsmap) add_child(element, "record", "foobar", baz="bazbar") assert ET.tostring(element) == ET.tostring(xml) @@ -96,12 +107,14 @@ def test_writer_writes_person_feed(): def test_pipewriter_writes_person_feed(reader): - r, w = os.pipe() - with open(r, "rb") as fr, open(w, "wb") as fw: - wtr = PipeWriter(fw) - rdr = reader(fr) - wtr.connect(rdr).write("people") - xml = ET.XML(rdr.data) + read_file, write_file = os.pipe() + with open(read_file, "rb") as buffered_reader, open( + write_file, "wb" + ) as buffered_writer: + file = reader(buffered_reader) + writer = PipeWriter(input_file=buffered_writer, ftp_output_file=file) + writer.write("people") + xml = ET.XML(file.data) xp = xml.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", namespaces={"s": "http://www.symplectic.co.uk/hrimporter"}, @@ -109,19 +122,19 @@ def test_pipewriter_writes_person_feed(reader): assert xp[1].text == "Þorgerðr" -def test_ftpreader_sends_file(ftp_server_wrapper): +def test_ftp_file_writer_sends_file(ftp_server_wrapper): s, d = ftp_server_wrapper - b = BytesIO(b"Storin' some bits in the FTPz") + b = BytesIO(b"File uploaded to FTP server.") ftp = FtpFileWriter( content_feed=b, user="user", password="pass", # noqa: S106 - path="/warez", + path="/DEV", port=s[1], ) ftp() - with open(os.path.join(d, "warez")) as fp: - assert fp.read() == "Storin' some bits in the FTPz" + with open(os.path.join(d, "DEV")) as fp: + assert fp.read() == "File uploaded to FTP server." def test_person_feed_uses_namespace(): @@ -132,36 +145,32 @@ def test_person_feed_uses_namespace(): assert root.tag == "{http://www.symplectic.co.uk/hrimporter}records" -def test_person_feed_adds_person(records, xml_records, e): - b = BytesIO() - xml = e.records(xml_records[0]) - r = records[0]["person"].copy() - r.update(records[0]["orcid"]) - r.update(records[0]["dlc"]) - with person_feed(b) as f: - f(r) - xml = ET.XML(b.getvalue()) - xp = xml.xpath( +def test_person_feed_adds_person(people_records): + output_file = BytesIO() + record = people_records[0]["person"].copy() + record |= people_records[0]["orcid"] | people_records[0]["dlc"] + with person_feed(output_file) as f: + f(record) + person_element = ET.XML(output_file.getvalue()) + person_first_name_xpath = person_element.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", namespaces={"s": "http://www.symplectic.co.uk/hrimporter"}, ) - assert xp[0].text == "Foobar" + assert person_first_name_xpath[0].text == "Foobar" -def test_person_feed_uses_utf8_encoding(records, xml_records, e): - b = BytesIO() - xml = e.records(xml_records[1]) - r = records[1]["person"].copy() - r.update(records[1]["orcid"]) - r.update(records[1]["dlc"]) - with person_feed(b) as f: - f(r) - xml = ET.XML(b.getvalue()) - xp = xml.xpath( +def test_person_feed_uses_utf8_encoding(people_records): + output_file = BytesIO() + record = people_records[1]["person"].copy() + record |= people_records[1]["orcid"] | people_records[1]["dlc"] + with person_feed(output_file) as f: + f(record) + person_element = ET.XML(output_file.getvalue()) + person_first_name_xpath = person_element.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", namespaces={"s": "http://www.symplectic.co.uk/hrimporter"}, ) - assert xp[0].text == "Þorgerðr" + assert person_first_name_xpath[0].text == "Þorgerðr" def test_group_name_adds_faculty(): @@ -174,22 +183,22 @@ def test_group_name_adds_non_faculty(): def test_articles_generates_articles(): - arts = list(articles()) - assert "Yawning Abyss of Chaos" in arts[0]["ARTICLE_TITLE"] + articles_records = list(articles()) + assert "Yawning Abyss of Chaos" in articles_records[0]["ARTICLE_TITLE"] -def test_article_feed_adds_article(aa_data, articles_data): - b = BytesIO() - with article_feed(b) as f: - f(aa_data[0]) - assert b.getvalue() == ET.tostring( - articles_data, encoding="UTF-8", xml_declaration=True +def test_article_feed_adds_article(articles_records, articles_element): + output_file = BytesIO() + with article_feed(output_file) as f: + f(articles_records[0]) + assert output_file.getvalue() == ET.tostring( + articles_element, encoding="UTF-8", xml_declaration=True ) def test_articles_skips_articles_without_required_fields(): - arts = list(articles()) - assert len(arts) == 1 + articles_records = list(articles()) + assert len(articles_records) == 1 @freeze_time("2023-08-18") diff --git a/tests/test_cli.py b/tests/test_cli.py index 5dac7b8..acf6992 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -24,7 +24,7 @@ def test_people_returns_people( feed_type, symplectic_ftp_path, runner, - people_data, + people_element, ftp_server, stubbed_sns_client, ): @@ -40,7 +40,7 @@ def test_people_returns_people( people_element, encoding="UTF-8", xml_declaration=True ) assert people_xml_string == ET.tostring( - people_data, encoding="UTF-8", xml_declaration=True + people_element, encoding="UTF-8", xml_declaration=True ) @@ -53,7 +53,7 @@ def test_articles_returns_articles( feed_type, symplectic_ftp_path, runner, - articles_data, + articles_element, ftp_server, stubbed_sns_client, ): @@ -69,7 +69,7 @@ def test_articles_returns_articles( articles_element, encoding="UTF-8", xml_declaration=True ) assert articles_xml_string == ET.tostring( - articles_data, encoding="UTF-8", xml_declaration=True + articles_element, encoding="UTF-8", xml_declaration=True ) From 5c63810239d28e2b880cb756f91584bc49a40bbf Mon Sep 17 00:00:00 2001 From: jonavellecuerdo Date: Wed, 6 Sep 2023 10:31:54 -0400 Subject: [PATCH 5/5] Address comments --- carbon/app.py | 37 +++++++++-------- carbon/helpers.py | 102 +++++++++++++++++++++++++++++++++++----------- tests/conftest.py | 42 +++++++++---------- tests/test_app.py | 74 ++++++++++++++++----------------- tests/test_cli.py | 6 +-- 5 files changed, 161 insertions(+), 100 deletions(-) diff --git a/carbon/app.py b/carbon/app.py index 437637c..aa7a3c7 100644 --- a/carbon/app.py +++ b/carbon/app.py @@ -13,7 +13,7 @@ from sqlalchemy import func, select from carbon.database import aa_articles, dlcs, engine, orcids, persons -from carbon.helpers import group_name, hire_date_string, initials +from carbon.helpers import get_group_name, get_hire_date_string, get_initials if TYPE_CHECKING: from collections.abc import Callable, Generator @@ -149,7 +149,7 @@ def _add_person(xml_file: IO, person: dict[str, Any]) -> None: add_child( record, "field", - initials(person["FIRST_NAME"], person["MIDDLE_NAME"]), + get_initials(person["FIRST_NAME"], person["MIDDLE_NAME"]), name="[Initials]", ) add_child(record, "field", person["LAST_NAME"], name="[LastName]") @@ -162,13 +162,13 @@ def _add_person(xml_file: IO, person: dict[str, Any]) -> None: add_child( record, "field", - group_name(person["DLC_NAME"], person["PERSONNEL_SUBAREA_CODE"]), + get_group_name(person["DLC_NAME"], person["PERSONNEL_SUBAREA_CODE"]), name="[PrimaryGroupDescriptor]", ) add_child( record, "field", - hire_date_string(person["ORIGINAL_HIRE_DATE"], person["DATE_TO_FACULTY"]), + get_hire_date_string(person["ORIGINAL_HIRE_DATE"], person["DATE_TO_FACULTY"]), name="[ArriveDate]", ) add_child( @@ -187,23 +187,25 @@ def _add_person(xml_file: IO, person: dict[str, Any]) -> None: def add_child( parent: ET._Element, # noqa: SLF001 - element: str, - text: str | None = None, + element_name: str, + element_text: str | None = None, **kwargs: str, ) -> ET._Element: # noqa: SLF001 """Add a subelement to an existing element. Args: parent (ET._Element): The root element. - element: The tag name assigned to the subelement. - text (str | None, optional): _description_. Defaults to None. - **kwargs (str): See lxml.etree.SubElement for more details. + element_name: The name of the subelement. + element_text (str | None, optional): The value stored in the subelement. + Defaults to None. + **kwargs (str): Keyword arguments representing attributes for the subelement. + The 'name' argument is set for 'people' elements. Returns: ET._Element: The subelement. """ - child = ET.SubElement(parent, element, attrib=kwargs) - child.text = text + child = ET.SubElement(parent, element_name, attrib=kwargs) + child.text = element_text return child @@ -219,10 +221,10 @@ def article_feed(output_file: IO) -> Generator: Generator: A function that adds an 'ARTICLE' element to an 'ARTICLES' root element. """ - with ET.xmlfile(output_file, encoding="UTF-8") as xf: - xf.write_declaration() - with xf.element("ARTICLES"): - yield partial(_add_article, xf) + with ET.xmlfile(output_file, encoding="UTF-8") as xml_file: + xml_file.write_declaration() + with xml_file.element("ARTICLES"): + yield partial(_add_article, xml_file) def articles() -> Generator[dict[str, Any], Any, None]: @@ -345,6 +347,7 @@ class CarbonFtpsTls(FTP_TLS): """ def ntransfercmd(self, cmd: str, rest: str | int | None = None) -> tuple[socket, int]: + """Initiate a transfer over the data connection.""" conn, size = FTP.ntransfercmd(self, cmd, rest) if self._prot_p: # type: ignore[attr-defined] conn = self.context.wrap_socket( @@ -360,6 +363,7 @@ def storbinary( callback: Callable | None = None, rest: str | None = None, # type: ignore[override] ) -> str: + """Store a file in binary mode.""" self.voidcmd("TYPE I") with self.transfercmd(cmd, rest) as conn: while 1: @@ -375,7 +379,8 @@ def storbinary( class Writer: """A writer that outputs normalized XML strings to a specified file. - Use this class to generate and output an HR or AA feed. + Use this class to generate either a 'people' or 'articles' feed that is written + to a specified output file. Attributes: output_file: A file-like object (stream) into which normalized XML diff --git a/carbon/helpers.py b/carbon/helpers.py index 8359ff8..c686b37 100644 --- a/carbon/helpers.py +++ b/carbon/helpers.py @@ -8,19 +8,8 @@ logger = logging.getLogger(__name__) -def group_name(dlc: str, sub_area: str) -> str: - qualifier = "Faculty" if sub_area in ("CFAT", "CFAN") else "Non-faculty" - return f"{dlc} {qualifier}" - - -def hire_date_string(original_start_date: datetime, date_to_faculty: datetime) -> str: - if date_to_faculty: - return date_to_faculty.strftime("%Y-%m-%d") - return original_start_date.strftime("%Y-%m-%d") - - -def initialize_part(name: str) -> str: - """Turn a name part into uppercased initials. +def _convert_to_initials(name_component: str) -> str: + """Turn a name component into uppercased initials. This function will do its best to parse the argument into one or more initials. The first step is to remove any character that is @@ -31,28 +20,95 @@ def initialize_part(name: str) -> str: Some examples:: - assert initialize_part('Foo Bar') == 'F B' - assert initialize_part('F. Bar-Baz') == 'F B-B' - assert initialize_part('Foo-bar') == 'F-B' - assert initialize_part(u'влад') == u'В' + assert _convert_to_initials('Foo Bar') == 'F B' + assert _convert_to_initials('F. Bar-Baz') == 'F B-B' + assert _convert_to_initials('Foo-bar') == 'F-B' + assert _convert_to_initials(u'влад') == u'В' """ # noqa: RUF002 - name = re.sub(r"[^\w\s-]", "", name, flags=re.UNICODE) - return "".join([x[:1] for x in re.split(r"(\W+)", name, flags=re.UNICODE)]).upper() + name_component = re.sub(r"[^\w\s-]", "", name_component, flags=re.UNICODE) + return "".join( + [x[:1] for x in re.split(r"(\W+)", name_component, flags=re.UNICODE)] + ).upper() + + +def get_group_name(dlc: str, sub_area: str) -> str: + """Create a primary group name for a 'people' record. + Args: + dlc (str): The value for the 'DLC_NAME' field from a 'people' record. + sub_area (str): The value for the 'PERSONNEL_SUB_AREA_CODE' field from a + 'people' record. -def initials(*args: str) -> str: - """Turn `*args` into a space-separated string of initials. + Returns: + str: A group name for a 'people' record, consisting of the DLC name and a flag + indicating 'Faculty' or 'Non-faculty'. + """ + qualifier = "Faculty" if sub_area in ("CFAT", "CFAN") else "Non-faculty" + return f"{dlc} {qualifier}" - Each argument is processed through :func:`~initialize_part` and + +def get_hire_date_string(original_start_date: datetime, date_to_faculty: datetime) -> str: + """Create a string indicating the hire date for a 'people' record. + + If the record has a value for the 'DATE_TO_FACULTY' field, this value is used; + if not, the value for the 'ORIGINAL_HIRE_DATE' field is used. Dates are formatted + as: YYYY-MM-DD (i.e., 2023-01-01). + + Args: + original_start_date (datetime): The value for the 'ORIGINAL_HIRE_DATE' field + from a 'people' record. + date_to_faculty (datetime): The value for the 'DATE_TO_FACULTY field from a + 'people' record. + + Returns: + str: The hire date formatted as a string. + """ + if date_to_faculty: + return date_to_faculty.strftime("%Y-%m-%d") + return original_start_date.strftime("%Y-%m-%d") + + +def get_initials(*args: str) -> str: + """Convert a tuple of name components into a space-separated string of initials. + + Each name component is processed through helpers.get_initials() and the resulting list is joined with a space. + + Returns: + str: A string containing the initials of the provided name components. """ - return " ".join([initialize_part(n) for n in args if n]) + return " ".join( + [ + _convert_to_initials(name_component) + for name_component in args + if name_component + ] + ) def sns_log( config_values: dict[str, Any], status: str, error: Exception | None = None ) -> None: + """Send a message to an Amazon SNS topic about the status of the Carbon run. + + When Carbon is run in the 'stage' environment, subscribers to the 'carbon-ecs-stage' + topic receive an email with the published message. For a given run, two messages are + published: + + 1. When status = 'start', a message indicating the Carbon run has started. + 2. When status = 'start'/'fail', a message indicating if the Carbon run has + successfully completed or encountered an error. + + Args: + config_values (dict[str, Any]): A dictionary of required environment variables + for running the feed. + status (str): The status of the Carbon run that is used to determine the message + published by SNS. The following values are accepted: 'start', 'success', + and 'fail'. + error (Exception | None, optional): The exception thrown for a failed Carbon run. + Defaults to None. + """ sns_client = boto3.client("sns") sns_id = config_values.get("SNS_TOPIC") stage = config_values.get("SYMPLECTIC_FTP_PATH", "").lstrip("/").split("/")[0] diff --git a/tests/conftest.py b/tests/conftest.py index 9f897b3..409f8b9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,7 +24,7 @@ def _app_init(): @pytest.fixture(autouse=True) def _test_env(ftp_server): - ftp_socket, ftp_directory = ftp_server + ftp_socket, _ = ftp_server os.environ["FEED_TYPE"] = "test_feed_type" os.environ["LOG_LEVEL"] = "INFO" os.environ["SENTRY_DSN"] = "None" @@ -93,8 +93,8 @@ def articles_element(): def articles_records(): current_dir = os.path.dirname(os.path.realpath(__file__)) data = os.path.join(current_dir, "fixtures/articles.yml") - with open(data) as fp: - return list(yaml.safe_load_all(fp)) + with open(data) as file: + return list(yaml.safe_load_all(file)) @pytest.fixture @@ -164,8 +164,8 @@ def people_element(people_element_maker): def people_records(): current_dir = os.path.dirname(os.path.realpath(__file__)) data = os.path.join(current_dir, "fixtures/data.yml") - with open(data) as fp: - return list(yaml.safe_load_all(fp)) + with open(data) as file: + return list(yaml.safe_load_all(file)) @pytest.fixture @@ -184,43 +184,43 @@ def ftp_server(): Use the ``ftp_server`` wrapper fixture instead as it will clean the directory before each test. """ - s = socket.socket() - s.bind(("", 0)) + ftp_socket = socket.socket() + ftp_socket.bind(("", 0)) fixtures = os.path.join(os.path.dirname(os.path.realpath(__file__)), "fixtures") - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory() as ftp_directory: auth = DummyAuthorizer() - auth.add_user("user", "pass", d, perm="elradfmwMT") + auth.add_user("user", "pass", ftp_directory, perm="elradfmwMT") handler = TLS_FTPHandler handler.certfile = os.path.join(fixtures, "server.crt") handler.keyfile = os.path.join(fixtures, "server.key") handler.authorizer = auth - server = FTPServer(s, handler) - t = threading.Thread(target=server.serve_forever, daemon=1) - t.start() - yield s.getsockname(), d + server = FTPServer(ftp_socket, handler) + thread = threading.Thread(target=server.serve_forever, daemon=1) + thread.start() + yield ftp_socket.getsockname(), ftp_directory @pytest.fixture def ftp_server_wrapper(ftp_server): """Wrapper around ``_ftp_server`` to clean directory before each test.""" - d = ftp_server[1] - for f in os.listdir(d): - fpath = os.path.join(d, f) - if os.path.isfile(fpath): - os.unlink(fpath) + ftp_directory = ftp_server[1] + for file in os.listdir(ftp_directory): + file_path = os.path.join(ftp_directory, file) + if os.path.isfile(file_path): + os.unlink(file_path) return ftp_server @pytest.fixture def reader(): class Reader: - def __init__(self, fp): - self.fp = fp + def __init__(self, file): + self.file = file self.data = b"" def __call__(self): while 1: - data = self.fp.read(1024) + data = self.file.read(1024) if not data: break self.data += data diff --git a/tests/test_app.py b/tests/test_app.py index 52fcec5..37090a5 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -13,8 +13,8 @@ add_child, article_feed, articles, - group_name, - initials, + get_group_name, + get_initials, people, person_feed, ) @@ -73,16 +73,16 @@ def test_people_excludes_records_without_mit_id(): def test_initials_returns_first_and_middle(): - assert initials("Foo", "Bar") == "F B" - assert initials("Foo") == "F" - assert initials("F", "B") == "F B" - assert initials("Foo-bar", "Gaz") == "F-B G" - assert initials("Foo Bar-baz", "G") == "F B-B G" - assert initials("Foo", "") == "F" - assert initials("Foo", None) == "F" - assert initials("Gull-Þóris") == "G-Þ" - assert initials("владимир", "ильич", "ленин") == "В И Л" # noqa: RUF001 - assert initials("F. M.", "Laxdæla") == "F M L" + assert get_initials("Foo", "Bar") == "F B" + assert get_initials("Foo") == "F" + assert get_initials("F", "B") == "F B" + assert get_initials("Foo-bar", "Gaz") == "F-B G" + assert get_initials("Foo Bar-baz", "G") == "F B-B G" + assert get_initials("Foo", "") == "F" + assert get_initials("Foo", None) == "F" + assert get_initials("Gull-Þóris") == "G-Þ" + assert get_initials("владимир", "ильич", "ленин") == "В И Л" # noqa: RUF001 + assert get_initials("F. M.", "Laxdæla") == "F M L" def test_add_child_adds_child_element(people_element_maker): @@ -95,10 +95,10 @@ def test_add_child_adds_child_element(people_element_maker): def test_writer_writes_person_feed(): - b = BytesIO() - w = Writer(b) - w.write("people") - xml = ET.XML(b.getvalue()) + output_file = BytesIO() + writer = Writer(output_file) + writer.write("people") + xml = ET.XML(output_file.getvalue()) xp = xml.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", namespaces={"s": "http://www.symplectic.co.uk/hrimporter"}, @@ -114,34 +114,34 @@ def test_pipewriter_writes_person_feed(reader): file = reader(buffered_reader) writer = PipeWriter(input_file=buffered_writer, ftp_output_file=file) writer.write("people") - xml = ET.XML(file.data) - xp = xml.xpath( + people_element = ET.XML(file.data) + people_first_names_xpath = people_element.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", namespaces={"s": "http://www.symplectic.co.uk/hrimporter"}, ) - assert xp[1].text == "Þorgerðr" + assert people_first_names_xpath[1].text == "Þorgerðr" def test_ftp_file_writer_sends_file(ftp_server_wrapper): - s, d = ftp_server_wrapper - b = BytesIO(b"File uploaded to FTP server.") + ftp_socket, ftp_directory = ftp_server_wrapper + feed = BytesIO(b"File uploaded to FTP server.") ftp = FtpFileWriter( - content_feed=b, + content_feed=feed, user="user", password="pass", # noqa: S106 path="/DEV", - port=s[1], + port=ftp_socket[1], ) ftp() - with open(os.path.join(d, "DEV")) as fp: - assert fp.read() == "File uploaded to FTP server." + with open(os.path.join(ftp_directory, "DEV")) as file: + assert file.read() == "File uploaded to FTP server." def test_person_feed_uses_namespace(): - b = BytesIO() - with person_feed(b): + output_file = BytesIO() + with person_feed(output_file): pass - root = ET.XML(b.getvalue()) + root = ET.XML(output_file.getvalue()) assert root.tag == "{http://www.symplectic.co.uk/hrimporter}records" @@ -149,8 +149,8 @@ def test_person_feed_adds_person(people_records): output_file = BytesIO() record = people_records[0]["person"].copy() record |= people_records[0]["orcid"] | people_records[0]["dlc"] - with person_feed(output_file) as f: - f(record) + with person_feed(output_file) as write_to_file: + write_to_file(record) person_element = ET.XML(output_file.getvalue()) person_first_name_xpath = person_element.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", @@ -163,8 +163,8 @@ def test_person_feed_uses_utf8_encoding(people_records): output_file = BytesIO() record = people_records[1]["person"].copy() record |= people_records[1]["orcid"] | people_records[1]["dlc"] - with person_feed(output_file) as f: - f(record) + with person_feed(output_file) as write_to_file: + write_to_file(record) person_element = ET.XML(output_file.getvalue()) person_first_name_xpath = person_element.xpath( "/s:records/s:record/s:field[@name='[FirstName]']", @@ -174,12 +174,12 @@ def test_person_feed_uses_utf8_encoding(people_records): def test_group_name_adds_faculty(): - assert group_name("FOOBAR", "CFAT") == "FOOBAR Faculty" - assert group_name("FOOBAR", "CFAN") == "FOOBAR Faculty" + assert get_group_name("FOOBAR", "CFAT") == "FOOBAR Faculty" + assert get_group_name("FOOBAR", "CFAN") == "FOOBAR Faculty" def test_group_name_adds_non_faculty(): - assert group_name("FOOBAR", "COAC") == "FOOBAR Non-faculty" + assert get_group_name("FOOBAR", "COAC") == "FOOBAR Non-faculty" def test_articles_generates_articles(): @@ -189,8 +189,8 @@ def test_articles_generates_articles(): def test_article_feed_adds_article(articles_records, articles_element): output_file = BytesIO() - with article_feed(output_file) as f: - f(articles_records[0]) + with article_feed(output_file) as write_to_file: + write_to_file(articles_records[0]) assert output_file.getvalue() == ET.tostring( articles_element, encoding="UTF-8", xml_declaration=True ) diff --git a/tests/test_cli.py b/tests/test_cli.py index acf6992..15ef748 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -28,7 +28,7 @@ def test_people_returns_people( ftp_server, stubbed_sns_client, ): - ftp_socket, ftp_directory = ftp_server + _, ftp_directory = ftp_server with patch("boto3.client") as mocked_boto_client: mocked_boto_client.return_value = stubbed_sns_client @@ -57,7 +57,7 @@ def test_articles_returns_articles( ftp_server, stubbed_sns_client, ): - ftp_socket, ftp_directory = ftp_server + _, ftp_directory = ftp_server with patch("boto3.client") as mocked_boto_client: mocked_boto_client.return_value = stubbed_sns_client @@ -114,7 +114,7 @@ def test_cli_connection_tests_success(caplog, runner): def test_cli_connection_tests_fail(caplog, ftp_server, monkeypatch, runner): - ftp_socket, ftp_directory = ftp_server + ftp_socket, _ = ftp_server # override engine from pytest fixture # configure with connection string that will error out with engine.connect()