Skip to content

Commit

Permalink
Merge 96e5c79 into 9efab2a
Browse files Browse the repository at this point in the history
  • Loading branch information
dqops committed Nov 23, 2023
2 parents 9efab2a + 96e5c79 commit acbeea8
Show file tree
Hide file tree
Showing 153 changed files with 8,778 additions and 2,679 deletions.
2 changes: 1 addition & 1 deletion .run/dqo run.run.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="dqo run" type="JarApplication">
<option name="JAR_PATH" value="$PROJECT_DIR$/dqops/target/dqo-dqops-0.4.0.jar" />
<option name="JAR_PATH" value="$PROJECT_DIR$/dqops/target/dqo-dqops-0.4.1.jar" />
<option name="VM_PARAMETERS" value="-XX:MaxRAMPercentage=60.0 --add-opens java.base/java.nio=ALL-UNNAMED" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="ALTERNATIVE_JRE_PATH" value="temurin-17" />
Expand Down
34 changes: 7 additions & 27 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,7 @@
# 0.4.0
* Data quality dashboards reorganized, layout unified
* Python client and REST API documentation
* Additional Airflow operators
* Small bug fixes in the UI
* DQOps concepts documentation

# 0.3.0
* Improvements for running DQOps as a cloud hosted SaaS platform
* Reorganized data quality dashboard tree to match the structure of checks
* UI screens for managing the configuration: users, default schedules, notifications and more
* Airflow operator for running checks

# 0.2.1
* Table comparison support
* Anomaly detection checks renamed
* Bug fixes

# 0.2.0
* Reorganization of all data quality checks
* User interface created
* Documentation created for all checks
* Extensive REST API for all operations
* Additional connectors for PostgreSQL, MySQL, SQL Server, Oracle

# 0.1.0
* Initial command-line version (preview release)
# 0.4.1
* More Airflow operators - waiting for a data quality job to finish
* Table quality status summary screen
* Renames of some dashboards
* Updates to dashboards - unification of filters
* Current table status detects statuses at column and check levels, also returning the current and highest historical quality status
* Fixes in the UI - filtering detailed check results
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ RUN mkdir $DQO_USER_HOME
RUN touch $DQO_USER_HOME/.DQO_USER_HOME_NOT_MOUNTED
COPY --from=dqo-home /dqo/home home

COPY distribution/dqo_docker_entrypoint.sh /dqo/home/
RUN chmod +x /dqo/home/dqo_docker_entrypoint.sh
COPY distribution/dqo_docker_entrypoint.sh /dqo/home/bin/
RUN chmod +x /dqo/home/bin/dqo_docker_entrypoint.sh

# copy spring dependencies
ARG DEPENDENCY=/workspace/app/dqops/target/dependency
COPY --from=dqo-libs /workspace/app/lib/target/output/dqo-lib/jars /dqo/app/lib
COPY --from=dqo-libs ${DEPENDENCY}/BOOT-INF/lib /dqo/app/lib
COPY --from=dqo-libs ${DEPENDENCY}/META-INF /dqo/app/META-INF
COPY --from=dqo-libs ${DEPENDENCY}/BOOT-INF/classes /dqo/app
ENTRYPOINT ["/dqo/home/dqo_docker_entrypoint.sh"]
ENTRYPOINT ["/dqo/home/bin/dqo_docker_entrypoint.sh"]
6 changes: 3 additions & 3 deletions Dockerfile-fast
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ RUN mkdir $DQO_USER_HOME
RUN touch $DQO_USER_HOME/.DQO_USER_HOME_NOT_MOUNTED
COPY --from=dqo-home /dqo/home home

COPY distribution/dqo_docker_entrypoint.sh /dqo/home/
RUN chmod +x /dqo/home/dqo_docker_entrypoint.sh
COPY distribution/dqo_docker_entrypoint.sh /dqo/home/bin/
RUN chmod +x /dqo/home/bin/dqo_docker_entrypoint.sh

# copy spring dependencies
ARG DEPENDENCY=/workspace/app/home
COPY --from=dqo-libs ${DEPENDENCY}/jars /dqo/app/lib
COPY --from=dqo-libs ${DEPENDENCY}/expanded/BOOT-INF/lib /dqo/app/lib
COPY --from=dqo-libs ${DEPENDENCY}/expanded/META-INF /dqo/app/META-INF
COPY --from=dqo-libs ${DEPENDENCY}/expanded/BOOT-INF/classes /dqo/app
ENTRYPOINT ["/dqo/home/dqo_docker_entrypoint.sh"]
ENTRYPOINT ["/dqo/home/bin/dqo_docker_entrypoint.sh"]
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.4.0
0.4.1
2 changes: 1 addition & 1 deletion distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<groupId>com.dqops</groupId>
<artifactId>dqo-distribution</artifactId>
<version>0.4.0</version> <!-- DQOps Version, do not touch (changed automatically) -->
<version>0.4.1</version> <!-- DQOps Version, do not touch (changed automatically) -->
<name>dqo-distribution</name>
<description>DQOps Data Quality Operations Center final assembly</description>
<packaging>pom</packaging>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from airflow.exceptions import AirflowException
from airflow.models.taskinstance import TaskInstance


class DqopsDataQualityIssueDetectedException(AirflowException):
"""
Exception used in airflow to mark status of task execution as Failed.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from airflow.exceptions import AirflowException
from airflow.models.taskinstance import TaskInstance


class DqopsJobFailedException(AirflowException):
"""
Exception used in airflow to mark status of task execution as Failed.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, List, Union

from airflow.models.baseoperator import BaseOperator
from httpx import ReadTimeout
Expand All @@ -26,6 +26,8 @@
from dqops.client import Client
from dqops.client.api.jobs.run_checks import sync_detailed
from dqops.client.models.check_search_filters import CheckSearchFilters
from dqops.client.models.check_target import CheckTarget
from dqops.client.models.check_time_scale import CheckTimeScale
from dqops.client.models.check_type import CheckType
from dqops.client.models.dqo_job_status import DqoJobStatus
from dqops.client.models.rule_severity_level import RuleSeverityLevel
Expand All @@ -46,9 +48,21 @@ def __init__(
base_url: str = "http://localhost:8888/",
job_business_key: Union[Unset, None, str] = UNSET,
api_key: str = UNSET,
connection: str = UNSET,
full_table_name: str = UNSET,
check_type: Optional[CheckType] = UNSET,
connection: Union[Unset, str] = UNSET,
full_table_name: Union[Unset, str] = UNSET,
enabled: Union[Unset, bool] = UNSET,
tags: Union[Unset, List[str]] = UNSET,
labels: Union[Unset, List[str]] = UNSET,
column: Union[Unset, str] = UNSET,
column_data_type: Union[Unset, str] = UNSET,
column_nullable: Union[Unset, bool] = UNSET,
check_target: Union[Unset, CheckTarget] = UNSET,
check_type: Union[Unset, CheckType] = UNSET,
time_scale: Union[Unset, CheckTimeScale] = UNSET,
check_category: Union[Unset, str] = UNSET,
table_comparison_name: Union[Unset, str] = UNSET,
check_name: Union[Unset, str] = UNSET,
sensor_name: Union[Unset, str] = UNSET,
wait_timeout: int = UNSET,
fail_on_timeout: bool = True,
fail_at_severity: RuleSeverityLevel = RuleSeverityLevel.FATAL,
Expand All @@ -64,11 +78,55 @@ def __init__(
api_key : str [optional, default=UNSET]
Api key to DQOps application. Not set as default.
connection : str [optional, default=UNSET]
The connection name to the data source in DQOps. When not set, all connection names will be used.
The connection name to the data source in DQOps. When not set, all connection names will be used. Supports search patterns in the format:
'source\*', '\*_prod', 'prefix\*suffix'.
full_table_name : str [optional, default=UNSET]
The name of the table with the schema. When not set, checks from all tables will be run within the specified connection.
The name of the table with the schema. The schema and table name. It is provided as *<schema_name>.<table_name>*,
for example *public.fact_sales*.
The schema and table name accept patterns both in the schema name and table name parts.
Sample patterns are: 'schema_name.tab_prefix_\*', 'schema_name.*', '*.*', 'schema_name.\*_customer', 'schema_name.tab_\*_suffix'.
enabled : Union[Unset, bool]
A boolean flag to target enabled tables, columns or checks. When the value of this
field is not set, the default value of this field is *true*, targeting only tables, columns and checks that are
not implicitly disabled.
tags : Union[Unset, List[str]]
An array of tags assigned to the table. All tags must be present on a table to
match. The tags can use patterns: 'prefix\*', '\*suffix', 'prefix\*suffix'. The tags are assigned to the table
on the data grouping screen when any of the data grouping hierarchy level is assigned a static value, which is a
tag.
labels : Union[Unset, List[str]]
An array of labels assigned to the table. All labels must be present on a
table to match. The labels can use patterns: 'prefix\*', '\*suffix', 'prefix\*suffix'. The labels are assigned
on the labels screen and stored in the *labels* node in the *.dqotable.yaml* file.
column : Union[Unset, str]
The column name. This field accepts search patterns in the format: 'fk_\*', '\*_id',
'prefix\*suffix'.
column_data_type : Union[Unset, str]
The column data type that was imported from the data source and is stored
in the [columns -> column_name -> type_snapshot ->
column_type](../../reference/yaml/TableYaml/#columntypesnapshotspec) field in the *.dqotable.yaml* file.
column_nullable : Union[Unset, bool]
Optional filter to find only nullable (when the value is *true*) or not
nullable (when the value is *false*) columns, based on the value of the [columns -> column_name -> type_snapshot
-> nullable](../../reference/yaml/TableYaml/#columntypesnapshotspec) field in the *.dqotable.yaml* file.
check_target : Union[Unset, CheckTarget]
check_type : CheckType [optional, default=UNSET]
Specifies type of checks to be executed. When not set, all types of checks will be executed. <br/> The enum is stored in _dqops.client.models.check_type_ module.
time_scale : Union[Unset, CheckTimeScale]
check_category : Union[Unset, str]
The target check category, for example: *nulls*, *volume*, *anomaly*.
table_comparison_name : Union[Unset, str]
The name of a configured table comparison. When the table comparison
is provided, DQOps will only perform table comparison checks that compare data between tables.
check_name : Union[Unset, str]
The target check name to run only this named check. Uses the short check name
which is the name of the deepest folder in the *checks* folder. This field supports search patterns such as:
'profiling_\*', '\*_count', 'profiling_\*_percent'.
sensor_name : Union[Unset, str]
The target sensor name to run only data quality checks that are using this
sensor. Uses the full sensor name which is the full folder path within the *sensors* folder. This field supports
search patterns such as: 'table/volume/row_\*', '\*_count', 'table/volume/prefix_\*_suffix'.
wait_timeout : int [optional, default=UNSET]
Time in seconds for execution that client will wait. It prevents from hanging the task for an action that is never completed. If not set, the timeout is read form the client defaults, which value is 120 seconds.
fail_on_timeout : bool [optional, default=True]
Expand All @@ -81,18 +139,44 @@ def __init__(
self.base_url: str = extract_base_url(base_url)
self.job_business_key: Union[Unset, None, str] = job_business_key
self.api_key: str = api_key
self.connection: str = connection
self.full_table_name: str = full_table_name
self.check_type: Optional[CheckType] = check_type

self.connection: Union[Unset, str] = connection
self.full_table_name: Union[Unset, str] = full_table_name
self.enabled: Union[Unset, bool] = enabled
self.tags: Union[Unset, List[str]] = tags
self.labels: Union[Unset, List[str]] = labels
self.column: Union[Unset, str] = column
self.column_data_type: Union[Unset, str] = column_data_type
self.column_nullable: Union[Unset, bool] = column_nullable
self.check_target: Union[Unset, CheckTarget] = check_target
self.check_type: Union[Unset, CheckType] = check_type
self.time_scale: Union[Unset, CheckTimeScale] = time_scale
self.check_category: Union[Unset, str] = check_category
self.table_comparison_name: Union[Unset, str] = table_comparison_name
self.check_name: Union[Unset, str] = check_name
self.sensor_name: Union[Unset, str] = sensor_name

self.wait_timeout: int = wait_timeout
self.fail_on_timeout: bool = fail_on_timeout
self.fail_at_severity: RuleSeverityLevel = fail_at_severity

def execute(self, context):
filters: CheckSearchFilters = CheckSearchFilters(
connection=self.connection,
check_type=self.check_type,
full_table_name=self.full_table_name,
enabled=self.enabled,
tags=self.tags,
labels=self.labels,
column=self.column,
column_data_type=self.column_data_type,
column_nullable=self.column_nullable,
check_target=self.check_target,
check_type=self.check_type,
time_scale=self.time_scale,
check_category=self.check_category,
table_comparison_name=self.table_comparison_name,
check_name=self.check_name,
sensor_name=self.sensor_name,
)
params: RunChecksParameters = RunChecksParameters(check_search_filters=filters)

Expand Down Expand Up @@ -134,6 +218,8 @@ def execute(self, context):
>= get_severity_value_from_rule_severity(self.fail_at_severity)
and job_result.status != DqoJobStatus.CANCELLED
):
raise DqopsDataQualityIssueDetectedException(context["ti"], job_result.to_dict())
raise DqopsDataQualityIssueDetectedException(
context["ti"], job_result.to_dict()
)

return job_result.to_dict()
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
)
from dqops.airflow.common.tools.client_creator import create_client
from dqops.airflow.common.tools.rule_severity_level_utility import (
get_severity_value_from_check_result,
get_severity_value_from_rule_severity,
)
from dqops.airflow.common.tools.server_response_verifier import (
Expand Down Expand Up @@ -153,9 +152,11 @@ def execute(self, context):
)
logging.info(table_dq_status.to_dict())

if get_severity_value_from_check_result(
table_dq_status.highest_severity_level
if get_severity_value_from_rule_severity(
table_dq_status.current_severity
) >= get_severity_value_from_rule_severity(self.fail_at_severity):
raise DqopsDataQualityIssueDetectedException(context["ti"], table_dq_status.to_dict())
raise DqopsDataQualityIssueDetectedException(
context["ti"], table_dq_status.to_dict()
)

return table_dq_status.to_dict()

0 comments on commit acbeea8

Please sign in to comment.