Skip to content

Commit

Permalink
Merge 35e5d25 into 5cec92b
Browse files Browse the repository at this point in the history
  • Loading branch information
dqops committed Nov 9, 2023
2 parents 5cec92b + 35e5d25 commit 48adcd3
Show file tree
Hide file tree
Showing 1,047 changed files with 481,669 additions and 284,888 deletions.
2 changes: 1 addition & 1 deletion .run/dqo run.run.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="dqo run" type="JarApplication">
<option name="JAR_PATH" value="$PROJECT_DIR$/dqops/target/dqo-dqops-0.3.0.jar" />
<option name="JAR_PATH" value="$PROJECT_DIR$/dqops/target/dqo-dqops-0.4.0.jar" />
<option name="VM_PARAMETERS" value="-XX:MaxRAMPercentage=60.0 --add-opens java.base/java.nio=ALL-UNNAMED" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="ALTERNATIVE_JRE_PATH" value="temurin-17" />
Expand Down
26 changes: 25 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
# 0.4.0
* Data quality dashboards reorganized, layout unified
* Python client and REST API documentation
* Additional Airflow operators
* Small bug fixes in the UI
* DQOps concepts documentation

# 0.3.0
* Improvements for running DQOps as a cloud hosted SaaS platform
* Reorganized data quality dashboard tree to match the structure of checks
* UI screens for managing the configuration: users, default schedules, notifications and more
* Airflow operator for running checks

# 0.2.1
* Table comparison support
* Anomaly detection checks renamed
* Bug fixes
* Bug fixes

# 0.2.0
* Reorganization of all data quality checks
* User interface created
* Documentation created for all checks
* Extensive REST API for all operations
* Additional connectors for PostgreSQL, MySQL, SQL Server, Oracle

# 0.1.0
* Initial command-line version (preview release)
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ To use DQOps you need:

- Python version 3.8 or greater (for details see [Python's documentation](https://www.python.org/doc/) and [download sites](https://www.python.org/downloads/)).
- Ability to install Python packages with pip.
- Installed JDK software (version 17) and set the JAVA_HOME environment variable.
- If you want to compile DQOps locally, also Java JDK (version 17 or higher), and a configured JAVA_HOME environment variable.


DQOps is available on [PyPi repository](https://pypi.org/project/dqops/).
Expand Down Expand Up @@ -63,7 +63,12 @@ DQOps is available on [PyPi repository](https://pypi.org/project/dqops/).

## Documentation

For full documentation with guides and use cases, visit https://dqops.com/docs
For full documentation with guides and use cases, visit https://dqops.com/docs/

The [getting started](https://dqops.com/docs/getting-started/) guide shows how to start using DQOps.

Also, read the [DQOps concept](https://dqops.com/docs/dqo-concepts/) guide to know how DQOps operates,
and how to configure data quality checks.

## Contact and issues

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.0
0.4.0
2 changes: 1 addition & 1 deletion distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<groupId>com.dqops</groupId>
<artifactId>dqo-distribution</artifactId>
<version>0.3.0</version> <!-- DQOps Version, do not touch (changed automatically) -->
<version>0.4.0</version> <!-- DQOps Version, do not touch (changed automatically) -->
<name>dqo-distribution</name>
<description>DQOps Data Quality Operations Center final assembly</description>
<packaging>pom</packaging>
Expand Down
5 changes: 4 additions & 1 deletion distribution/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ To use DQOps you need:

- Python version 3.8 or greater (for details see [Python's documentation](https://www.python.org/doc/) and [download sites](https://www.python.org/downloads/)).
- Ability to install Python packages with pip.
- Installed JDK software (version 17) and set the JAVA_HOME environment variable.


DQOps is available on [PyPi repository](https://pypi.org/project/dqops/).
Expand Down Expand Up @@ -65,6 +64,10 @@ DQOps is available on [PyPi repository](https://pypi.org/project/dqops/).

For full documentation with guides and use cases, visit https://dqops.com/docs/

The [getting started](https://dqops.com/docs/getting-started/) guide shows how to start using DQOps.

Also, read the [DQOps concept](https://dqops.com/docs/dqo-concepts/) guide to know how DQOps operates,
and how to configure data quality checks.

## DQOps client
The package contains also a remote DQO client that can connect to a DQOps instance and perform all operations supported by the user interface.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import json
import logging
from typing import Any, Dict, List, Union

from airflow.models.baseoperator import BaseOperator
from httpx import ReadTimeout

from dqops.airflow.common.exceptions.dqops_job_failed_exception import (
DqopsJobFailedException,
)
from dqops.airflow.common.tools.client_creator import create_client
from dqops.airflow.common.tools.server_response_verifier import (
verify_server_response_correctness,
)
from dqops.airflow.common.tools.timeout.dqo_timeout import handle_dqo_timeout
from dqops.airflow.common.tools.timeout.python_client_timeout import (
handle_python_timeout,
)
from dqops.airflow.common.tools.url_resolver import extract_base_url
from dqops.client import Client
from dqops.client.api.jobs.collect_statistics_on_table import sync_detailed
from dqops.client.models.collect_statistics_queue_job_result import (
CollectStatisticsQueueJobResult,
)
from dqops.client.models.dqo_job_status import DqoJobStatus
from dqops.client.models.statistics_collector_search_filters import (
StatisticsCollectorSearchFilters,
)
from dqops.client.models.statistics_collector_target import StatisticsCollectorTarget
from dqops.client.types import UNSET, Response, Unset


class DqopsCollectStatisticsOperator(BaseOperator):
"""
Airflow collect statistics operator for receiving DQOps table status.
"""

def __init__(
self,
*,
connection: (Union[Unset, str]) = UNSET,
full_table_name: Union[Unset, str] = UNSET,
enabled: Union[Unset, bool] = UNSET,
labels: Union[Unset, List[str]] = UNSET,
column_names: Union[Unset, List[str]] = UNSET,
sensor_name: Union[Unset, str] = UNSET,
target: Union[Unset, StatisticsCollectorTarget] = UNSET,
base_url: str = "http://localhost:8888/",
job_business_key: Union[Unset, None, str] = UNSET,
wait_timeout: Union[Unset, None, int] = UNSET,
fail_on_timeout: bool = True,
**kwargs
) -> Union[Dict[str, Any], None]:
"""
All parameters are optional. When not set, all statistics will be collected
Parameters
----------
connection : Union[Unset, str]
The connection name to the data source in DQOps.
full_table_name : Union[Unset, str]
The schema name with the table name.
enabled : Union[Unset, bool]
If set to true only enabled connections and tables are filtered. Otherwise only disabled connection or table are used.
labels: Union[Unset, List[str]] = UNSET
The label names of those edited by user on connections, tables and columns edited in DQOps platform.
column_names : Union[Unset, List[str]] = UNSET
The names of columns.
sensor_name : Union[Unset, str] = UNSET
The name of the sensor
target : Union[Unset, StatisticsCollectorTarget] = UNSET
The name of the target which value is column or table.
base_url : str [optional, default="http://localhost:8888/"]
The base url to DQOps application.
job_business_key : Union[Unset, None, str] = UNSET
Job business key that is a user assigned unique job id, used to check the job status by looking up the job by a user assigned identifier, instead of the DQOps assigned job identifier.
wait_timeout : int
Time in seconds for execution that client will wait. It prevents from hanging the task for an action that is never completed. If not set, the timeout is read form the client defaults, which value is 120 seconds.
fail_on_timeout : bool [optional, default=True]
Timeout is leading the task status to Failed by default. It can be omitted marking the task as Success by setting the flag to True.
"""

super().__init__(**kwargs)
self.connection: Union[Unset, str] = connection
self.full_table_name: Union[Unset, str] = full_table_name
self.enabled: Union[Unset, bool] = enabled
self.labels: Union[Unset, List[str]] = labels
self.column_names: Union[Unset, List[str]] = column_names
self.sensor_name: Union[Unset, str] = sensor_name
self.target: Union[Unset, StatisticsCollectorTarget] = target

self.base_url: str = extract_base_url(base_url)
self.job_business_key: Union[Unset, None, str] = job_business_key
self.wait_timeout: int = wait_timeout
self.fail_on_timeout: bool = fail_on_timeout

def execute(self, context):
client: Client = create_client(
base_url=self.base_url, wait_timeout=self.wait_timeout
)

try:
search_filters: StatisticsCollectorSearchFilters = (
StatisticsCollectorSearchFilters(
connection=self.connection,
full_table_name=self.full_table_name,
enabled=self.enabled,
labels=self.labels,
column_names=self.column_names,
sensor_name=self.sensor_name,
target=self.target,
)
)

response: Response[CollectStatisticsQueueJobResult] = sync_detailed(
client=client,
json_body=search_filters,
job_business_key=self.job_business_key,
wait=True,
wait_timeout=self.wait_timeout,
)
except ReadTimeout as exception:
handle_python_timeout(exception, self.fail_on_timeout)
return None

verify_server_response_correctness(response)

job_result: CollectStatisticsQueueJobResult = (
CollectStatisticsQueueJobResult.from_dict(
json.loads(response.content.decode("utf-8"))
)
)
logging.info(job_result.to_dict())

if job_result.status == DqoJobStatus.FAILED:
raise DqopsJobFailedException(context["ti"], job_result.to_dict())

if job_result.status == DqoJobStatus.RUNNING:
handle_dqo_timeout(self.fail_on_timeout)

return job_result.to_dict()
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from airflow.exceptions import AirflowException
from airflow.models.taskinstance import TaskInstance

class DqopsDataQualityIssueDetectedException(AirflowException):
"""
Exception used in airflow to mark status of task execution as Failed.
The exception informs that DQO has detected a data quality issue.
"""

def __init__(self, ti: TaskInstance, return_value: dict):
error_message: str = "DQOps has detected a data quality issue!"
super().__init__(error_message)
ti.xcom_push("return_value", return_value)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@


class DqopsEmptyResponseException(AirflowException):
"""
Exception used in airflow to mark status of task execution as Failed.
The exception informs that the response from DQO api is empty.
"""

def __init__(self):
error_message: str = "DQOps did not return any data!"
super().__init__(error_message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from airflow.exceptions import AirflowException


class DqopsInternalServerErrorException(AirflowException):
"""
Exception used in airflow to mark status of task execution as Failed.
The exception is thrown on internal server error from DQOps server API.
"""

def __init__(self):
error_message: str = "DQOps server responded with Internal Server Error!"
super().__init__(error_message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from airflow.exceptions import AirflowException
from airflow.models.taskinstance import TaskInstance

class DqopsJobFailedException(AirflowException):
"""
Exception used in airflow to mark status of task execution as Failed.
The exception informs that a DQOps' job has failed.
"""

def __init__(self, ti: TaskInstance, return_value: dict):
error_message: str = "DQOps job has failed!"
super().__init__(error_message)
ti.xcom_push("return_value", return_value)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@


class DqopsTimeoutException(AirflowException):
"""
Exception used in airflow to mark status of task execution as Failed.
The exception informs that the response from DQO api has timed out.
"""

def __init__(self):
error_message: str = "DQOps' job has timed out!"
super().__init__(error_message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from airflow.exceptions import AirflowException


class DqopsUnfinishedJobException(AirflowException):
"""
Exception used in airflow to mark status of task execution as Failed.
The exception informs that wait for job action has not completed yet.
"""

def __init__(self):
error_message: str = "Job has not completed yet."
super().__init__(error_message)
29 changes: 29 additions & 0 deletions distribution/python/dqops/airflow/common/tools/client_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Union

from httpx import Timeout

from dqops.client import Client
from dqops.client.types import UNSET, Unset

# extra time for python client to wait for dqo after it times out
EXTRA_TIMEOUT_SECONDS: int = 1


def create_client(base_url: str, *, wait_timeout: Union[int, Unset] = Unset) -> Client:
"""Creates python client for airflow operators.
Parameters
----------
base_url : str
The base url to DQOps application. Default value is http://localhost:8888/
wait_timeout : int
Time in seconds for execution that client will wait. It prevents from hanging the task for an action that is never completed. If not set, the timeout is read form the client defaults, which value is 120 seconds.
Returns
DQOps client object.
"""
client: Client = Client(base_url=base_url)
if wait_timeout is not UNSET:
client.with_timeout(Timeout(wait_timeout + EXTRA_TIMEOUT_SECONDS))

return client

0 comments on commit 48adcd3

Please sign in to comment.