diff --git a/datascience/src/pipeline/flows/email_actions_to_units.py b/datascience/src/pipeline/flows/email_actions_to_units.py index 68487740b..ad83e61be 100644 --- a/datascience/src/pipeline/flows/email_actions_to_units.py +++ b/datascience/src/pipeline/flows/email_actions_to_units.py @@ -87,11 +87,6 @@ def get_control_unit_ids(env_action: pd.DataFrame) -> List[int]: @task(checkpoint=False) def extract_control_units(control_unit_ids: List[str]) -> pd.DataFrame: - if not control_unit_ids: - return pd.DataFrame( - columns=["control_unit_id", "control_unit_name", "email_addresses"] - ) - return extract( "monitorenv_remote", "monitorenv/control_units.sql", @@ -330,6 +325,11 @@ def load_emails_sent_to_control_units( ) +@task(checkpoint=False) +def get_is_control_unit_ids_empty(control_unit_ids: List[int]) -> bool: + return len(control_unit_ids) == 0 + + with Flow("Email actions to units", executor=LocalDaskExecutor()) as flow: flow_not_running = check_flow_not_running() with case(flow_not_running, True): @@ -348,29 +348,36 @@ def load_emails_sent_to_control_units( ) env_actions = extract_env_actions(period=period) control_unit_ids = get_control_unit_ids(env_actions) - control_units = extract_control_units(control_unit_ids) - control_unit_actions = to_control_unit_actions( - env_actions, period, control_units + is_control_unit_ids_empty = get_is_control_unit_ids_empty( + control_unit_ids ) + with case(is_control_unit_ids_empty, False): + control_units = extract_control_units(control_unit_ids) + + control_unit_actions = to_control_unit_actions( + env_actions, period, control_units + ) - html = render.map(control_unit_actions, template=unmapped(template)) + html = render.map( + control_unit_actions, template=unmapped(template) + ) - message = create_email.map( - html=html, - actions=control_unit_actions, - test_mode=unmapped(test_mode), - ) - message = filter_results(message) + message = create_email.map( + html=html, + actions=control_unit_actions, + test_mode=unmapped(test_mode), + ) + message = filter_results(message) - sent_messages = send_env_actions_email.map( - message, - control_unit_actions, - is_integration=unmapped(is_integration), - ) + sent_messages = send_env_actions_email.map( + message, + control_unit_actions, + is_integration=unmapped(is_integration), + ) - sent_messages = flatten(sent_messages) - sent_messages = control_unit_actions_list_to_df(sent_messages) - load_emails_sent_to_control_units(sent_messages) + sent_messages = flatten(sent_messages) + sent_messages = control_unit_actions_list_to_df(sent_messages) + load_emails_sent_to_control_units(sent_messages) flow.file_name = Path(__file__).name diff --git a/datascience/tests/conftest.py b/datascience/tests/conftest.py index ad8564c8f..478acfb84 100644 --- a/datascience/tests/conftest.py +++ b/datascience/tests/conftest.py @@ -142,7 +142,7 @@ def start_remote_database_container( ) timeout = 30 - stop_time = 10 + stop_time = 5 elapsed_time = 0 healthcheck_exit_code = None diff --git a/datascience/tests/test_pipeline/test_flows/test_email_actions_to_units.py b/datascience/tests/test_pipeline/test_flows/test_email_actions_to_units.py index 48652af60..08d646b42 100644 --- a/datascience/tests/test_pipeline/test_flows/test_email_actions_to_units.py +++ b/datascience/tests/test_pipeline/test_flows/test_email_actions_to_units.py @@ -415,19 +415,6 @@ def test_extract_control_units( pd.testing.assert_frame_equal(units, expected_control_units) -def test_extract_control_units_without_control_unit_ids(reset_test_data): - control_units_as_data_frame = extract_control_units.run( - control_unit_ids=[] - ) - - pd.testing.assert_frame_equal( - control_units_as_data_frame, - pd.DataFrame( - columns=["control_unit_id", "control_unit_name", "email_addresses"] - ), - ) - - def test_to_control_unit_actions(expected_env_actions, expected_control_units): period = Period(