Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
615 changes: 305 additions & 310 deletions carbon/app.py

Large diffs are not rendered by default.

42 changes: 19 additions & 23 deletions carbon/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

import click

from carbon.app import FTPFeeder, sns_log
from carbon.app import DatabaseToFtpPipe
from carbon.config import configure_logger, configure_sentry, load_config_values
from carbon.db import engine
from carbon.database import engine
from carbon.helpers import sns_log

logger = logging.getLogger(__name__)

Expand All @@ -14,24 +15,19 @@
@click.version_option()
@click.option("--run_connection_tests", is_flag=True)
def main(*, run_connection_tests: bool) -> None:
"""Generate feeds for Symplectic Elements.

Specify which FEED_TYPE should be generated. This should be either
'people' or 'articles'.

The data is pulled from a database identified by --db, which should
be a valid SQLAlchemy database connection string. This can also be
omitted and pulled from an environment variable named CARBON_DB. For
oracle use:

oracle://<username>:<password>@<server>:1521/<sid>

By default, the feed will be printed to stdout. If -o/--out is used the
output will be written to the specified file instead.

Alternatively, the --ftp switch can be used to send the output to an FTP
server. The server should support FTP over TLS. Only one of -o/--out or
--ftp should be used.
"""Generate a data feed that uploads XML files to the Symplectic Elements FTP server.

The feed uses a SQLAlchemy engine to connect to the Data Warehouse. A query is
submitted to the Data Warehouse to retrieve either 'people' or 'articles' records
depending on the 'FEED_TYPE' environment variable. Several transforms are applied
to normalize the records before it is converted to an XML string.
The feed builds a pipe that will concurrently read data from the Data Warehouse
and write the normalized XML string to an XML file on the Elements
FTP server. For security purposes, the server should support FTP over TLS.

[wip] By default, the feed will write to an XML file on the Elements FTP server.
If the -o/--out argument is used, the output will be written to the specified
file instead. This latter option is recommended for testing purposes.
"""
config_values = load_config_values()
# [TEMP]: The connection string must use 'oracle+oracledb' to differentiate
Expand All @@ -52,13 +48,13 @@ def main(*, run_connection_tests: bool) -> None:
engine.run_connection_test()

# test connection to the Symplectic Elements FTP server
ftp_feed = FTPFeeder({"feed_type": config_values["FEED_TYPE"]}, config_values)
ftp_feed.run_connection_test()
pipe = DatabaseToFtpPipe(config=config_values)
pipe.run_connection_test()

if not run_connection_tests:
sns_log(config_values=config_values, status="start")
try:
ftp_feed.run()
pipe.run()
except Exception as error: # noqa: BLE001
sns_log(config_values=config_values, status="fail", error=error)
else:
Expand Down
6 changes: 0 additions & 6 deletions carbon/db.py → carbon/database.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import os
from typing import Any

from sqlalchemy import (
Expand All @@ -18,11 +17,8 @@

logger = logging.getLogger(__name__)

os.environ["NLS_LANG"] = "AMERICAN_AMERICA.UTF8"

metadata = MetaData()


persons = Table(
"HR_PERSON_EMPLOYEE_LIMITED",
metadata,
Expand Down Expand Up @@ -53,15 +49,13 @@
Column("HR_ORG_LEVEL5_NAME", String),
)


orcids = Table(
"ORCID_TO_MITID",
metadata,
Column("MIT_ID", String, ForeignKey("HR_PERSON_EMPLOYEE_LIMITED.MIT_ID")),
Column("ORCID", String),
)


aa_articles = Table(
"AA_ARTICLE",
metadata,
Expand Down
146 changes: 146 additions & 0 deletions carbon/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import logging
import re
from datetime import UTC, datetime
from typing import Any

import boto3

logger = logging.getLogger(__name__)


def _convert_to_initials(name_component: str) -> str:
"""Turn a name component into uppercased initials.

This function will do its best to parse the argument into one or
more initials. The first step is to remove any character that is
not alphanumeric, whitespace or a hyphen. The remaining string
is split on word boundaries, retaining both the words and the
boundaries. The first character of each list member is then
joined together, uppercased and returned.

Some examples::

assert _convert_to_initials('Foo Bar') == 'F B'
assert _convert_to_initials('F. Bar-Baz') == 'F B-B'
assert _convert_to_initials('Foo-bar') == 'F-B'
assert _convert_to_initials(u'влад') == u'В'

""" # noqa: RUF002
name_component = re.sub(r"[^\w\s-]", "", name_component, flags=re.UNICODE)
return "".join(
[x[:1] for x in re.split(r"(\W+)", name_component, flags=re.UNICODE)]
).upper()


def get_group_name(dlc: str, sub_area: str) -> str:
"""Create a primary group name for a 'people' record.

Args:
dlc (str): The value for the 'DLC_NAME' field from a 'people' record.
sub_area (str): The value for the 'PERSONNEL_SUB_AREA_CODE' field from a
'people' record.

Returns:
str: A group name for a 'people' record, consisting of the DLC name and a flag
indicating 'Faculty' or 'Non-faculty'.
"""
qualifier = "Faculty" if sub_area in ("CFAT", "CFAN") else "Non-faculty"
return f"{dlc} {qualifier}"


def get_hire_date_string(original_start_date: datetime, date_to_faculty: datetime) -> str:
"""Create a string indicating the hire date for a 'people' record.

If the record has a value for the 'DATE_TO_FACULTY' field, this value is used;
if not, the value for the 'ORIGINAL_HIRE_DATE' field is used. Dates are formatted
as: YYYY-MM-DD (i.e., 2023-01-01).

Args:
original_start_date (datetime): The value for the 'ORIGINAL_HIRE_DATE' field
from a 'people' record.
date_to_faculty (datetime): The value for the 'DATE_TO_FACULTY field from a
'people' record.

Returns:
str: The hire date formatted as a string.
"""
if date_to_faculty:
return date_to_faculty.strftime("%Y-%m-%d")
return original_start_date.strftime("%Y-%m-%d")


def get_initials(*args: str) -> str:
"""Convert a tuple of name components into a space-separated string of initials.

Each name component is processed through helpers.get_initials() and
the resulting list is joined with a space.

Returns:
str: A string containing the initials of the provided name components.
"""
return " ".join(
[
_convert_to_initials(name_component)
for name_component in args
if name_component
]
)


def sns_log(
config_values: dict[str, Any], status: str, error: Exception | None = None
) -> None:
"""Send a message to an Amazon SNS topic about the status of the Carbon run.

When Carbon is run in the 'stage' environment, subscribers to the 'carbon-ecs-stage'
topic receive an email with the published message. For a given run, two messages are
published:

1. When status = 'start', a message indicating the Carbon run has started.
2. When status = 'start'/'fail', a message indicating if the Carbon run has
successfully completed or encountered an error.

Args:
config_values (dict[str, Any]): A dictionary of required environment variables
for running the feed.
status (str): The status of the Carbon run that is used to determine the message
published by SNS. The following values are accepted: 'start', 'success',
and 'fail'.
error (Exception | None, optional): The exception thrown for a failed Carbon run.
Defaults to None.
"""
sns_client = boto3.client("sns")
sns_id = config_values.get("SNS_TOPIC")
stage = config_values.get("SYMPLECTIC_FTP_PATH", "").lstrip("/").split("/")[0]
feed = config_values.get("FEED_TYPE", "")

if status == "start":
sns_client.publish(
TopicArn=sns_id,
Subject="Carbon run",
Message=(
f"[{datetime.now(tz=UTC).isoformat()}] Starting carbon run for the "
f"{feed} feed in the {stage} environment."
),
)
elif status == "success":
sns_client.publish(
TopicArn=sns_id,
Subject="Carbon run",
Message=(
f"[{datetime.now(tz=UTC).isoformat()}] Finished carbon run for the "
f"{feed} feed in the {stage} environment."
),
)
logger.info("Carbon run has successfully completed.")
elif status == "fail":
sns_client.publish(
TopicArn=sns_id,
Subject="Carbon run",
Message=(
f"[{datetime.now(tz=UTC).isoformat()}] The following problem was "
f"encountered during the carbon run for the {feed} feed "
f"in the {stage} environment: {error}."
),
)
logger.info("Carbon run has failed.")
Loading