diff --git a/pyproject.toml b/pyproject.toml index 8528e09c..f540f05d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ build-backend = "setuptools.build_meta" [project] name = "dataops-testgen" -version = "3.1.0" +version = "3.1.2" description = "DataKitchen's Data Quality DataOps TestGen" authors = [ { "name" = "DataKitchen, Inc.", "email" = "info@datakitchen.io" }, diff --git a/testgen/__main__.py b/testgen/__main__.py index fd193792..2fbbe845 100644 --- a/testgen/__main__.py +++ b/testgen/__main__.py @@ -43,6 +43,7 @@ logs, version_service, ) +from testgen.ui.queries import profiling_run_queries, test_run_queries from testgen.utils import plugins LOG = logging.getLogger("testgen") @@ -606,6 +607,11 @@ def run(debug: bool): use_ssl = os.path.isfile(settings.SSL_CERT_FILE) and os.path.isfile(settings.SSL_KEY_FILE) patch_streamlit.patch(force=True) + try: + profiling_run_queries.cancel_all_running() + test_run_queries.cancel_all_running() + except Exception: + LOG.warning("Failed to cancel 'Running' profiling/test runs") try: app_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ui/app.py") diff --git a/testgen/commands/run_profiling_bridge.py b/testgen/commands/run_profiling_bridge.py index 1cdf2161..dadf58d0 100644 --- a/testgen/commands/run_profiling_bridge.py +++ b/testgen/commands/run_profiling_bridge.py @@ -17,8 +17,6 @@ RunActionQueryList, RunThreadedRetrievalQueryList, WriteListToDB, - date_service, - read_template_sql_file, ) from testgen.common.database.database_service import empty_cache @@ -508,14 +506,3 @@ def run_profiling_queries(strTableGroupsID, spinner=None): str_error_status = "successfully." message += str_error_status return message - - -def update_profile_run_status(profile_run_id, status): - sql_template = read_template_sql_file("project_profile_run_record_update_status.sql", sub_directory="profiling") - - sql_template = sql_template.replace("{STATUS}", status) - sql_template = sql_template.replace("{NOW}", date_service.get_now_as_string()) - sql_template = sql_template.replace("{EXCEPTION_MESSAGE}", "") - sql_template = sql_template.replace("{PROFILE_RUN_ID}", profile_run_id) - - RunActionQueryList("DKTG", [sql_template]) diff --git a/testgen/template/profiling/project_profile_run_record_update_status.sql b/testgen/template/profiling/project_profile_run_record_update_status.sql deleted file mode 100644 index 8751c032..00000000 --- a/testgen/template/profiling/project_profile_run_record_update_status.sql +++ /dev/null @@ -1,5 +0,0 @@ -UPDATE profiling_runs -SET status = '{STATUS}', - profiling_endtime = '{NOW}', - log_message = '{EXCEPTION_MESSAGE}' -where id = '{PROFILE_RUN_ID}' :: UUID; diff --git a/testgen/template/score_cards/get_score_card_issues_by_column.sql b/testgen/template/score_cards/get_score_card_issues_by_column.sql index 4ae24956..2844f4be 100644 --- a/testgen/template/score_cards/get_score_card_issues_by_column.sql +++ b/testgen/template/score_cards/get_score_card_issues_by_column.sql @@ -17,7 +17,7 @@ anomalies AS ( EXTRACT( EPOCH FROM runs.profiling_starttime - ) AS time, + ) * 1000 AS time, '' AS name, runs.id::text AS run_id, 'hygiene' AS issue_type @@ -50,7 +50,7 @@ tests AS ( EXTRACT( EPOCH FROM test_time - ) AS time, + ) * 1000 AS time, test_suites.test_suite AS name, test_results.test_run_id::text AS run_id, 'test' AS issue_type @@ -80,4 +80,4 @@ ORDER BY WHEN 'Possible' THEN 4 WHEN 'Warning' THEN 5 ELSE 6 - END \ No newline at end of file + END diff --git a/testgen/template/score_cards/get_score_card_issues_by_dimension.sql b/testgen/template/score_cards/get_score_card_issues_by_dimension.sql index 3d42e958..b27243e4 100644 --- a/testgen/template/score_cards/get_score_card_issues_by_dimension.sql +++ b/testgen/template/score_cards/get_score_card_issues_by_dimension.sql @@ -17,7 +17,7 @@ anomalies AS ( EXTRACT( EPOCH FROM runs.profiling_starttime - ) AS time, + ) * 1000 AS time, '' AS name, runs.id::text AS run_id, 'hygiene' AS issue_type @@ -51,7 +51,7 @@ tests AS ( EXTRACT( EPOCH FROM test_time - ) AS time, + ) * 1000 AS time, test_suites.test_suite AS name, test_results.test_run_id::text AS run_id, 'test' AS issue_type @@ -82,4 +82,4 @@ ORDER BY WHEN 'Possible' THEN 4 WHEN 'Warning' THEN 5 ELSE 6 - END \ No newline at end of file + END diff --git a/testgen/ui/queries/profiling_run_queries.py b/testgen/ui/queries/profiling_run_queries.py new file mode 100644 index 00000000..a2bfa805 --- /dev/null +++ b/testgen/ui/queries/profiling_run_queries.py @@ -0,0 +1,27 @@ +import streamlit as st + +import testgen.ui.services.database_service as db +from testgen.common import date_service + + +def update_status(profile_run_id: str, status: str) -> None: + schema: str = st.session_state["dbschema"] + now = date_service.get_now_as_string() + + sql = f""" + UPDATE {schema}.profiling_runs + SET status = '{status}', + profiling_endtime = '{now}' + WHERE id = '{profile_run_id}'::UUID; + """ + db.execute_sql(sql) + st.cache_data.clear() + + +def cancel_all_running() -> None: + schema: str = db.get_schema() + db.execute_sql(f""" + UPDATE {schema}.profiling_runs + SET status = 'Cancelled' + WHERE status = 'Running'; + """) diff --git a/testgen/ui/queries/test_run_queries.py b/testgen/ui/queries/test_run_queries.py index 34692de2..a1484116 100644 --- a/testgen/ui/queries/test_run_queries.py +++ b/testgen/ui/queries/test_run_queries.py @@ -4,10 +4,11 @@ import testgen.ui.services.database_service as db -def cascade_delete(schema: str, test_suite_ids: list[str]) -> None: +def cascade_delete(test_suite_ids: list[str]) -> None: if not test_suite_ids: raise ValueError("No Test Suite is specified.") + schema: str = st.session_state["dbschema"] ids_str = ", ".join([f"'{item}'" for item in test_suite_ids]) sql = f""" DELETE @@ -23,10 +24,11 @@ def cascade_delete(schema: str, test_suite_ids: list[str]) -> None: st.cache_data.clear() -def update_status(schema: str, test_run_id: str, status: str) -> None: +def update_status(test_run_id: str, status: str) -> None: if not all([test_run_id, status]): raise ValueError("Missing query parameters.") + schema: str = st.session_state["dbschema"] now = date_service.get_now_as_string() sql = f""" @@ -37,3 +39,12 @@ def update_status(schema: str, test_run_id: str, status: str) -> None: """ db.execute_sql(sql) st.cache_data.clear() + + +def cancel_all_running() -> None: + schema: str = db.get_schema() + db.execute_sql(f""" + UPDATE {schema}.test_runs + SET status = 'Cancelled' + WHERE status = 'Running'; + """) diff --git a/testgen/ui/services/test_definition_service.py b/testgen/ui/services/test_definition_service.py index 136151d6..c57f7789 100644 --- a/testgen/ui/services/test_definition_service.py +++ b/testgen/ui/services/test_definition_service.py @@ -4,7 +4,7 @@ import testgen.ui.services.connection_service as connection_service import testgen.ui.services.database_service as database_service import testgen.ui.services.table_group_service as table_group_service -import testgen.ui.services.test_run_service as test_run_service +from testgen.ui.queries import test_run_queries def update_attribute(test_definition_ids, attribute, value): @@ -54,7 +54,7 @@ def delete(test_definition_ids, dry_run=False): def cascade_delete(test_suite_ids: list[str]): schema = st.session_state["dbschema"] - test_run_service.cascade_delete(test_suite_ids) + test_run_queries.cascade_delete(test_suite_ids) test_definition_queries.cascade_delete(schema, test_suite_ids) @@ -138,7 +138,7 @@ def validate_test(test_definition): ) -def move(test_definitions, target_table_group, target_test_suite): +def move(test_definitions, target_table_group, target_test_suite): schema = st.session_state["dbschema"] test_definition_queries.move(schema, test_definitions, target_table_group, target_test_suite) @@ -152,4 +152,3 @@ def copy(test_definitions, target_table_group, target_test_suite): def get_test_definitions_collision(test_definitions, target_table_group, target_test_suite): schema = st.session_state["dbschema"] return test_definition_queries.get_test_definitions_collision(schema, test_definitions, target_table_group, target_test_suite) - diff --git a/testgen/ui/services/test_run_service.py b/testgen/ui/services/test_run_service.py deleted file mode 100644 index ccbbccd1..00000000 --- a/testgen/ui/services/test_run_service.py +++ /dev/null @@ -1,13 +0,0 @@ -import streamlit as st - -import testgen.ui.queries.test_run_queries as test_run_queries - - -def cascade_delete(test_suite_ids): - schema = st.session_state["dbschema"] - test_run_queries.cascade_delete(schema, test_suite_ids) - - -def update_status(test_run_id, status): - schema = st.session_state["dbschema"] - test_run_queries.update_status(schema, test_run_id, status) diff --git a/testgen/ui/views/data_catalog.py b/testgen/ui/views/data_catalog.py index 184aa1a2..d7d18433 100644 --- a/testgen/ui/views/data_catalog.py +++ b/testgen/ui/views/data_catalog.py @@ -217,7 +217,7 @@ def get_latest_test_issues(table_group_id: str, table_name: str, column_name: st result_message, test_suite, test_results.test_run_id::VARCHAR(50), - EXTRACT(EPOCH FROM test_starttime) AS test_run_date + EXTRACT(EPOCH FROM test_starttime) * 1000 AS test_run_date FROM {schema}.test_suites LEFT JOIN {schema}.test_runs ON ( test_suites.last_complete_test_run_id = test_runs.id diff --git a/testgen/ui/views/profiling_runs.py b/testgen/ui/views/profiling_runs.py index db3cf9a1..b2a0a87a 100644 --- a/testgen/ui/views/profiling_runs.py +++ b/testgen/ui/views/profiling_runs.py @@ -8,12 +8,11 @@ import testgen.ui.services.database_service as db import testgen.ui.services.form_service as fm import testgen.ui.services.query_service as dq -from testgen.commands.run_profiling_bridge import update_profile_run_status from testgen.ui.components import widgets as testgen from testgen.ui.components.widgets import testgen_component from testgen.ui.navigation.menu import MenuItem from testgen.ui.navigation.page import Page -from testgen.ui.queries import project_queries +from testgen.ui.queries import profiling_run_queries, project_queries from testgen.ui.services import authentication_service from testgen.ui.session import session from testgen.ui.views.dialogs.run_profiling_dialog import run_profiling_dialog @@ -123,7 +122,7 @@ def render_empty_state(project_code: str) -> bool: def on_cancel_run(profiling_run: pd.Series) -> None: process_status, process_message = process_service.kill_profile_run(to_int(profiling_run["process_id"])) if process_status: - update_profile_run_status(profiling_run["profiling_run_id"], "Cancelled") + profiling_run_queries.update_status(profiling_run["profiling_run_id"], "Cancelled") fm.reset_post_updates(str_message=f":{'green' if process_status else 'red'}[{process_message}]", as_toast=True) @@ -178,11 +177,7 @@ def get_db_profiling_runs(project_code: str, table_group_id: str | None = None) SELECT v_profiling_runs.profiling_run_id::VARCHAR, v_profiling_runs.start_time, v_profiling_runs.table_groups_name, - CASE - WHEN v_profiling_runs.status = 'Running' - AND v_profiling_runs.start_time < CURRENT_DATE - 1 THEN 'Error' - ELSE v_profiling_runs.status - END as status, + v_profiling_runs.status, v_profiling_runs.process_id, v_profiling_runs.duration, v_profiling_runs.log_message, diff --git a/testgen/ui/views/test_runs.py b/testgen/ui/views/test_runs.py index 61e78793..26a8ce21 100644 --- a/testgen/ui/views/test_runs.py +++ b/testgen/ui/views/test_runs.py @@ -8,12 +8,11 @@ import testgen.ui.services.database_service as db import testgen.ui.services.form_service as fm import testgen.ui.services.query_service as dq -import testgen.ui.services.test_run_service as test_run_service from testgen.ui.components import widgets as testgen from testgen.ui.components.widgets import testgen_component from testgen.ui.navigation.menu import MenuItem from testgen.ui.navigation.page import Page -from testgen.ui.queries import project_queries +from testgen.ui.queries import project_queries, test_run_queries from testgen.ui.services import authentication_service from testgen.ui.session import session from testgen.ui.views.dialogs.run_tests_dialog import run_tests_dialog @@ -141,7 +140,7 @@ def render_empty_state(project_code: str) -> bool: def on_cancel_run(test_run: pd.Series) -> None: process_status, process_message = process_service.kill_test_run(to_int(test_run["process_id"])) if process_status: - test_run_service.update_status(test_run["test_run_id"], "Cancelled") + test_run_queries.update_status(test_run["test_run_id"], "Cancelled") fm.reset_post_updates(str_message=f":{'green' if process_status else 'red'}[{process_message}]", as_toast=True)