Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
FROM python:3.12-slim as build
FROM python:3.12-slim AS build

# Set path for Oracle client libraries
ENV LD_LIBRARY_PATH /opt/lib/
ENV LD_LIBRARY_PATH=/opt/lib/

RUN apt-get update && apt-get upgrade -y

# Install

# Install Oracle 19.x client libraries
RUN apt-get install -y unzip libaio1
ENV INSTANTCLIENT_FILENAME instantclient-basiclite-linux.x64-19.23.0.0.0dbru.zip
RUN apt-get install -y unzip libaio1t64
RUN ln -s /usr/lib/x86_64-linux-gnu/libaio.so.1t64 /usr/lib/libaio.so.1
ENV INSTANTCLIENT_FILENAME=instantclient-basiclite-linux.x64-19.23.0.0.0dbru.zip
COPY vendor/$INSTANTCLIENT_FILENAME /
RUN unzip -j $INSTANTCLIENT_FILENAME -d /opt/lib/

Expand Down
1,394 changes: 712 additions & 682 deletions Pipfile.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ DATA_WAREHOUSE_CONNECTION_STRING=# Data Warehouse SQLAlchemy connection string,
DYLD_LIBRARY_PATH=/usr/local/lib:$DYLD_LIBRARY_PATH # used when developing on arm64 architecture + Rosetta2 environment
TARGETS_DIRECTORY=# Location to store Task Targets, overriding application default of "output"
LUIGI_NUM_WORKERS=# Number of processes for luigi to run tasks in parallel. If not set, defaults to 1 in application.
SKIP_TASK_INTEGRITY_CHECKS=# If set to a truth value like "1", "true", "yes", or "on", any integrity tests defined for a task will be skipped
```

## CLI Commands
Expand Down
3 changes: 3 additions & 0 deletions hrqb/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ def run_integrity_checks( # type: ignore[no-untyped-def]
**kwargs, # noqa: ANN003
) -> None:
"""Run all registered integrity check methods."""
if Config().skip_task_integrity_checks:
return

for check_name in getattr(self, "_integrity_checks", []):
if check_func := getattr(self, check_name, None):
try:
Expand Down
8 changes: 8 additions & 0 deletions hrqb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Config:
"DYLD_LIBRARY_PATH",
"TARGETS_DIRECTORY",
"LUIGI_NUM_WORKERS",
"SKIP_TASK_INTEGRITY_CHECKS",
)

def check_required_env_vars(self) -> None:
Expand All @@ -41,6 +42,13 @@ def targets_directory(self) -> str:
directory = self.TARGETS_DIRECTORY or "output"
return directory.removesuffix("/")

@property
def skip_task_integrity_checks(self) -> bool:
value = os.getenv("SKIP_TASK_INTEGRITY_CHECKS")
if value is None:
return False
return value.strip().lower() in {"1", "true", "t", "yes", "y"}

Comment on lines +45 to +51

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smart option for testing!


def configure_logger(logger: logging.Logger, *, verbose: bool) -> str:
# configure app logger
Expand Down
39 changes: 38 additions & 1 deletion hrqb/tasks/employee_appointments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@
import luigi # type: ignore[import-untyped]
import pandas as pd

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask, SQLQueryExtractTask
from hrqb.base.task import (
HRQBTask,
PandasPickleTask,
QuickbaseUpsertTask,
SQLQueryExtractTask,
)
from hrqb.exceptions import IntegrityCheckError
from hrqb.utils import md5_hash_from_values, normalize_dataframe_dates
from hrqb.utils.quickbase import QBClient


class ExtractDWEmployeeAppointments(SQLQueryExtractTask):
Expand All @@ -21,6 +28,7 @@ class TransformEmployeeAppointments(PandasPickleTask):
"""Transform Data Warehouse data for Employee Appointments QB table."""

stage = luigi.Parameter("Transform")
table_name = "Employee Appointments"

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [ExtractDWEmployeeAppointments(pipeline=self.pipeline)]
Expand Down Expand Up @@ -80,6 +88,35 @@ def generate_merge_key(
]
)

@HRQBTask.integrity_check
def qb_row_count_less_than_or_equal_transformed_row_count(
self, output_df: pd.DataFrame
) -> None:
"""Ensure Quickbase row count is less than or equal to transformed records.

Each run of this task retrieves ALL data from the data warehouse. If Quickbase
has more rows then the data warehouse transformed data, this suggests a problem.

Args:
- output_df: the dataframe prepared by self.get_dataframe()
"""
qbclient = QBClient()
qb_table_df = qbclient.get_table_as_df(
qbclient.get_table_id(self.table_name),
fields=["Record ID#"],
)

qb_count = len(qb_table_df)
transformed_count = len(output_df)

if qb_count > transformed_count:
message = (
f"For table '{self.table_name}', the Quickbase row count of {qb_count} "
f"exceeds this run's transformed row count of {transformed_count}. "
"This should not happen."
)
raise IntegrityCheckError(message)


class LoadEmployeeAppointments(QuickbaseUpsertTask):

Expand Down
46 changes: 43 additions & 3 deletions hrqb/tasks/employee_salary_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
QuickbaseUpsertTask,
SQLQueryExtractTask,
)
from hrqb.exceptions import IntegrityCheckError
from hrqb.tasks.employee_appointments import TransformEmployeeAppointments
from hrqb.utils import md5_hash_from_values, normalize_dataframe_dates
from hrqb.utils.quickbase import QBClient

PERCENT_DECIMAL_ACCURACY = 5

Expand All @@ -27,8 +29,13 @@ def sql_file(self) -> str:


class TransformEmployeeSalaryHistory(PandasPickleTask):
table_name = luigi.Parameter("Employee Salary History")
stage = luigi.Parameter("Transform")

@property
def merge_field(self) -> str | None:
return "Key"

def requires(self) -> list[luigi.Task]: # pragma: nocover
from hrqb.tasks.shared import ExtractQBEmployeeAppointments

Expand Down Expand Up @@ -75,11 +82,9 @@ def get_dataframe(self) -> pd.DataFrame:
row.mit_id,
row.position_id,
str(row.appointment_begin_date),
str(row.appointment_end_date),
str(row.hr_personnel_action),
str(row.hr_action_reason),
str(row.start_date),
str(row.end_date),
]
),
axis=1,
Expand Down Expand Up @@ -205,7 +210,42 @@ def all_rows_have_employee_appointments(self, output_df: pd.DataFrame) -> None:
f"{missing_appointment_count} rows are missing an Employee "
f"Appointment for task '{self.name}'"
)
raise ValueError(message)
raise IntegrityCheckError(message)

@HRQBTask.integrity_check
def merge_field_values_are_unique(self, output_df: pd.DataFrame) -> None:
if not output_df[self.merge_field].is_unique:
message = f"Values for merge field {self.merge_field} are not unique."
raise IntegrityCheckError(message)

@HRQBTask.integrity_check
def qb_row_count_less_than_or_equal_transformed_row_count(
self, output_df: pd.DataFrame
) -> None:
"""Ensure Quickbase row count is less than or equal to transformed records.

Each run of this task retrieves ALL data from the data warehouse. If Quickbase
has more rows then the data warehouse transformed data, this suggests a problem.

Args:
- output_df: the dataframe prepared by self.get_dataframe()
"""
qbclient = QBClient()
qb_table_df = qbclient.get_table_as_df(
qbclient.get_table_id(self.table_name),
fields=["Record ID#"],
)

qb_count = len(qb_table_df)
transformed_count = len(output_df)

if qb_count > transformed_count:
message = (
f"For table '{self.table_name}', the Quickbase row count of {qb_count} "
f"exceeds this run's transformed row count of {transformed_count}. "
"This should not happen."
)
raise IntegrityCheckError(message)


class LoadEmployeeSalaryHistory(QuickbaseUpsertTask):
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ ignore = [
"D103",
"D104",
"D415",
"D417",
"PLC0415",
"PLR0912",
"PLR0913",
"PLR0915",
"S301",
"S321",
"TD002",
"TD003",
]

# allow autofix behavior for specified rules
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def _test_env(request, monkeypatch, targets_directory, data_warehouse_connection
"DATA_WAREHOUSE_CONNECTION_STRING",
data_warehouse_connection_string,
)
monkeypatch.setenv("SKIP_TASK_INTEGRITY_CHECKS", "1")


@pytest.fixture
Expand Down
2 changes: 0 additions & 2 deletions tests/tasks/test_employee_salary_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ def test_task_transform_employee_salary_history_key_expected_from_input_data(
emp_salary_row["MIT ID"],
qb_emp_appt_row["Position ID"],
qb_emp_appt_row["Begin Date"],
qb_emp_appt_row["End Date"],
emp_salary_row["Related Salary Change Type"],
emp_salary_row["Salary Change Reason"],
emp_salary_row["Start Date"],
emp_salary_row["End Date"],
]
)

Expand Down
10 changes: 8 additions & 2 deletions tests/test_base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,13 +346,15 @@ def test_base_task_integrity_check_decorator_adds_check_names_to_registered_chec


def test_base_task_run_integrity_checks_invokes_present_and_skips_absent_checks(
monkeypatch,
pandas_task_with_integrity_checks,
):
"""
This test simulates another task registering an integrity check that this task does
not have. The task's defined integrity checks are invoked, while this other one is
quietly skipped.
"""
monkeypatch.delenv("SKIP_TASK_INTEGRITY_CHECKS", raising=False)
pandas_task_with_integrity_checks._integrity_checks.add("i_do_not_have")

check_one = mock.MagicMock()
Expand All @@ -368,8 +370,9 @@ def test_base_task_run_integrity_checks_invokes_present_and_skips_absent_checks(


def test_quickbase_upsert_task_integrity_checks_get_upsert_results(
upsert_task_with_integrity_checks, mocked_qb_api_upsert
monkeypatch, upsert_task_with_integrity_checks, mocked_qb_api_upsert
):
monkeypatch.delenv("SKIP_TASK_INTEGRITY_CHECKS", raising=False)
check_one = mock.MagicMock()
check_two = mock.MagicMock()
upsert_task_with_integrity_checks.expecting_three_processed_records = check_one
Expand All @@ -383,8 +386,9 @@ def test_quickbase_upsert_task_integrity_checks_get_upsert_results(


def test_failed_integrity_checks_raise_custom_exception(
pandas_task_with_integrity_checks, upsert_task_with_integrity_checks
monkeypatch, pandas_task_with_integrity_checks, upsert_task_with_integrity_checks
):
monkeypatch.delenv("SKIP_TASK_INTEGRITY_CHECKS", raising=False)
# pandas task
with pytest.raises(
IntegrityCheckError, match="Expecting a 'letter' column in dataframe"
Expand All @@ -405,8 +409,10 @@ def test_task_without_integrity_checks_run_without_error(


def test_upsert_task_duplicate_merge_field_values_raises_error(
monkeypatch,
upsert_task_with_duplicate_merge_field_values,
):
monkeypatch.delenv("SKIP_TASK_INTEGRITY_CHECKS", raising=False)
with pytest.raises(
ValueError,
match="Merge field 'Key' found to have duplicate values for task "
Expand Down