Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions hrqb/utils/luigi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging

import luigi # type: ignore[import-untyped]
import sentry_sdk
from luigi.execution_summary import LuigiRunResult # type: ignore[import-untyped]

from hrqb.base.task import HRQBPipelineTask
Expand All @@ -27,8 +28,23 @@ def run_pipeline(pipeline_task: HRQBPipelineTask) -> LuigiRunResult:
if not isinstance(pipeline_task, HRQBPipelineTask):
message = f"{pipeline_task.name} is not a HRQBPipelineTask type task"
raise TypeError(message)

results = run_task(pipeline_task)

if upsert_results := pipeline_task.aggregate_upsert_results():
message = f"Upsert results: {json.dumps(upsert_results)}"
logger.info(message)

if upsert_results.get("qb_upsert_errors"):
tasks_with_errors = [
task_name
for task_name, task_results in upsert_results.get("tasks", {}).items()
if task_results["errors"] is not None
]
sentry_sdk.capture_message(
f"Quickbase upsert error(s) detected for tasks: {tasks_with_errors}. "
"Please see logs for information on specific errors encountered.",
level="warning",
)

return results
31 changes: 31 additions & 0 deletions tests/test_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from unittest import mock

import luigi
import pytest
import sentry_sdk

from hrqb.utils.luigi import run_pipeline
from tests.fixtures.full_annotated_pipeline import (
Expand Down Expand Up @@ -76,3 +79,31 @@ def test_run_pipeline_with_non_hrqbpipelinetask_type_raise_error(
match="ExtractAnimalNames is not a HRQBPipelineTask type task",
):
run_pipeline(task_extract_animal_names)


def test_run_pipeline_no_sentry_message_with_no_upsert_errors(mocker):
spy_capture_message = mocker.spy(sentry_sdk, "capture_message")
run_pipeline(AlphaNumeric())
spy_capture_message.assert_not_called()


def test_run_pipeline_send_sentry_message_on_upsert_error(mocker):
spy_capture_message = mocker.spy(sentry_sdk, "capture_message")
with mock.patch.object(
AlphaNumeric, "aggregate_upsert_results"
) as mocked_agg_results:
mocked_agg_results.return_value = {
"tasks": {
"FinickyTask": {"errors": ["Error1", "Error2"]},
"SolidTask": {"errors": None},
},
"qb_upsert_errors": True,
}
run_pipeline(AlphaNumeric())

spy_capture_message.assert_called()
assert (
spy_capture_message.call_args[0][0]
== "Quickbase upsert error(s) detected for tasks: ['FinickyTask']. Please see "
"logs for information on specific errors encountered."
)