Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Data Quality Design #4283

Closed
zixi0825 opened this issue Dec 21, 2020 · 23 comments
Closed

[Feature] Data Quality Design #4283

zixi0825 opened this issue Dec 21, 2020 · 23 comments
Labels
feature new feature

Comments

@zixi0825
Copy link
Member

zixi0825 commented Dec 21, 2020

1 Summary

Data quality inspection is an important part of the data processing process. After data synchronization and data processing, it is usually necessary to check the accuracy of the data, such as comparing the difference in the number of data between the source table and the target table, or checking according to a certain rule that calculate a certain column and compare the standard value and the calculated value to judge. At present, there is no such type of data quality check in the task type of DS, so it is necessary to add a new data quality task type so that the data quality check task can be directly added when defining the workflow, so that the entire data processing process is more complete.

2 Requirements Analysis

For data quality inspection tasks, the core functions are rule management, specific task execution, and execution result alarms. To achieve a lightweight data quality, the following functions must be met:

2.1 Rule Manager

2.1.1 RuleType

  • SingleTableRule
  • MultiTableRule

2.1.2 Rule Implementation

  • InnerRule
    • NullCheck
    • RowCountCheck
    • AverageCheck
    • TimelinessCheck
    • DuplicateCheck
    • AccuracyCheck
    • others
  • CustomRule

2.1.3 Rule Definition and Parser

2.1.3.1 Rule Definition

The complete rules should include connector information, executed SQL statements, the type of comparison value, the type of inspection, etc., that is, the parameters needed to define a data quality task can be obtained through the rules

2.1.3.2 Rule Parser

The main responsibility of rule parser is to obtain an parameter that conforms to the execution of the data quality task by parsing the parameter value input by the user and the rule definition.

2.2 Task Execution Mode

Based on the existing task execution method of DolphinScheduler, a more appropriate way is to use Spark as the execution engine for data quality tasks, pass specific execution SQL to the Spark job to run through configuration, and write the execution results to the specified storage engine

2.1.2 Alert

Each rule configure alertrules, when the check result is abnormal, an alertoccurs. Use DS's alert module for alarm

3 Summary Design

3.1 Rule Manager Design

3.1.1 Rule Component Design

3.1.1.1 Single Rule

  • RuleDefinition
    • RuleType
      • rule_type
    • RuleName
      • rule_name
    • RuleInputEntry
      • DefaultInputEntry
        • connector_type,eg. JDBC,HIVE
        • datasource_id
        • table
        • filed
        • filter
      • StatisticsInputEntry
        • statistics_name
    • RuleMidExecuteSQLDefinition
      • ExecuteSQLDefinition
        • ExecuteSQL & TableAlias
    • StatisticsExecuteSQLDefinition
      • ExecuteSQLDefinition
        • ExecuteSQL & TableAlias
    • ComparsionExecuteSQLDefinition
      • FixedValue
        • InputEntry
          • ComparsionRuleTilte:comparsion_title
          • ComparsionName:comparsion_name
          • ComparsionValue:comparsion_value
      • CalculateValue
        • InputEntry
          • ComparsionRuleTitle:comparsion_title
          • ComparsionName:comparsion_name
          • ComparsionValue:comparsion_value
        • ExecuteSQLDefinition
          • ExecuteSQL & TableAlias
    • CheckInputEntry
      • CheckType:fixed/percentage, ${check_type}
      • Threshold:1000/30%,${threshold}
      • Operator:=,<,>,>=,<= ${operator}
    • ResultOutputSQL
      • according the comparsion type
        • Fiexed
          • get the ${comparsion_value} as result field value
        • Calculate
          • get the comparsion execute sql value as result field value

3.1.1.2 MultiTableAccuracyRule

  • RuleDefinition
    • RuleType
      • rule_type
    • RuleName
      • rule_name
    • RuleInputEntry
      • DefaultInputEntry
        • src_connector_type
        • src_datasource_id
        • src_table
        • src_filter
        • target_connector_type
        • target_datasource_id
        • target_table
        • target_filter
        • mapping_columns
        • on_clause
      • StatisticsInputEntry
        • statistics_name
    • RuleMidExecuteSQLDefinition
      • ExecuteSQLDefinition
        • ExecuteSQL & TableAlias
    • StatisticsExecuteSQLDefinition
      • ExecuteSQLDefinition
        • ExecuteSQL & TableAlias
    • ComparsionExecuteSQLDefinition
      • FixedValue
        • InputEntry
          • ComparsionRuleTilte:comparsion_title
          • ComparsionName:comparsion_name
          • ComparsionValue:comparsion_value
      • CalculateValue
        • InputEntry
          • ComparsionRuleTitle:comparsion_title
          • ComparsionName:comparsion_name
          • ComparsionValue:comparsion_value
        • ExecuteSQLDefinition
          • ExecuteSQL & TableAlias
    • CheckInputEntry
      • CheckType:fixed/percentage, ${check_type}
      • Threshold:1000/30%,${threshold}
      • Operator:=,<,>,>=,<= ${operator}
    • ResultOutputSQL
      • according the comparsion type
        • Fiexed
          • get the ${comparsion_value} as result field value
        • Calculate
          • get the comparsion execute sql value as result field value

3.1.1.3 ,MultiTableValueComparsionRule

  • RuleDefinition
    • RuleType
      • rule_type
    • RuleName
      • rule_name
    • RuleInputEntry
      • DefaultInputEntry
        • src_connector_type
        • src_datasource_id
        • statistics_name
        • statistics_execute_sql
        • target_connector_type
        • target_datasource_id
        • comparsion_name
        • comparsion_execute_sql
    • CheckInputEntry
      • CheckType:fixed/percentage, ${check_type}
      • Threshold:1000/30%,${threshold}
      • Operator:=,<,>,>=,<= ${operator}
    • ResultOutputSQL
      • select ${statistics_name} as statistics_value,${comparsion_name} as coparsion_value from ${statistics_execute_sql} full join ${comparsion_execute_sql}

3.1.2 Custom Rule

  • Support single table for custom rule calculation for single column
  • Support cross-table value calculation comparison for single-column

3.2 Task Execute Process Design

3.2.1 Execution Engine

  • Spark2.0+

3.2.2 Task Execution Process

数据质量流程英文版

3.3 Task Manager Design

Data quality tasks do not support separate definition and scheduled scheduling, which can be defined and scheduled in the workflow

3.4 Data Quality Task Definition UI Design

3.4.1 UI Generation Method

The data quality task definition UI interface will automatically generated by the front-end component according to a JSON string.

3.4.2 Task Define UI Prototype Diagram

数据质量任务定义界面UI (2)

3.4.3 Custom Rule UI Prototype Diagram

自定义规则 (1)

4 Detail Design

4.1 Database Design

4.1.1 RuleInfo

column type comment
id int id
name string rule name
type int rule type:single-table/multi-table
rule_json text rule definition
create_time date create time
update_time date update time

4.1.2 CheckResultInfo

column type comment
id int id
task_id long Task ID
task_instance_id long TaskInstance ID
rule_type int rule type
statistics_value double statistics value
comparsion_value double comparsion value
check_type int check type,fixed value or percentage
threshold double threshold
operator int operator:>,<,=,>=,<=
create_time date create time
update_time date update time

4.1.3 CheckResultStatisticsInfo

4.2 Class Design

4.2.1 Rule Design

4.2.1.1Rule Related Model

  • RuleDefinition
  • RuleInputEntry
  • ExecuteSqlDefinition
  • InputType
  • FormType
  • OptionSourceType
  • ValueType
  • RuleType
  • ConnectorType
  • ComparsionValueType
  • CheckType
  • FixedComparsionValueParameter
  • CalculateComparsionValueParameter
  • ConnectorDefinition

4.2.1.2 RuleParser

  • Different types of rules have default input items, you can add exclusive input items on this basis
  • After selecting the rule, it will read the rule input items in the content of the rule, construct a json string conforming to the form-create specification, and return it to the front end to generate the corresponding UI interface
  • After filling in the rule parameters, after submission, the configured parameters will be constructed into a map and stored in the task parameters
  • After the task is asssigned, the parameters are analyzed, and the parameters required for the task are constructed to execute the spark job

1)Connector Parameter Parser

To get the information of datasource including url, database, table, username, password according the datasource_id and constructed information of connector

2)Replace the placeholders in executeSQL to construct an executeSQL list

3)Construct writer configuration, including construct writer connector configuration and saveSQL

  • The pseudo code for constructing save sql in Writer is as follows:
if(comparsionType == FIXED){
  map.put("${comparsion_name}","fixed_value")
  sql = "select ${comparsion_name} as comparsion_value from ${staticsTableName} 
} else {
  sql = "select ${comparsion_name} as comparsion_value from ${statics_table_Name} full join ${comparsion_table_Name} 
}
resultSQL = sql.replacePlaceholder(map)

Finally, it will be constructed into the json string parameter and passed to the Spark application

4.2.2 Task Design

4.2.2.1 DolphinScheduler Task Design

  • DataQualityParameter

  • DataQualityTask

    • The main responsibility is to execute spark job

4.2.2.2 Spark Data Quality Task Design

1)The data quality task is actually a Spark task. The main responsibilities of this task are as follows:

  • Parse the parameters and obtain the parameters needed to construct Reader, Executor and Writer
  • Construct corresponding types of Reader, Executor and Writer according to the parameters
    • The main responsibility of Reader is to read the data of the specified data source and create a temporary table for subsequent use
    • The main responsibility of Executor is to run the intermediate step SQL statement, the statistical step SQL statement and the comparison value calculation SQL statement
    • The main responsibility of Writer is to write the calculation results of data quality tasks to the corresponding storage engine. Currently, it only supports writing back to the ds database

2)The execute mode has the follow options

  • Package a DQApplicaiton.jar and upload it to hdfs, and load the default upload address when running
    • This method is relatively more saving storage space and reducing jar upload time
  • Put the packaged DQApplication.jar into the lib directory, and load the jar package when running
    • This method is more insensitive to user deployment

5 Todo List

  • DataQualityTask UI Component
  • DataQuailtyTask Component
    • DataQualityParameter
    • DataQualityTask
  • Rule Component
    • RuleModel
    • RuleManager
    • RuleConverter
  • Spark DataQuality Application

6 related issue and pr

issue: DataQuality Application
pr: DataQuality Common Entity


1 摘要

数据质量检查是数据处理流程中比较重要的环节,在数据同步和数据处理后通常是需要检查数据的准确性,例如比较源表和目标表之间的数据条数差,或者根据某个规则对某一列进行计算,将标准值和计算值进行比较判断。目前在 DS 的任务类型没有数据质量检查这样的类型,所以需要新增数据质量任务类型,以便于在定义工作流的时候可以直接添加数据质量检查任务,让整个数据处理流程更加的完整。

2 需求分析

对于数据质量检查任务来说,核心的功能就是规则管理、具体的任务执行以及执行结果告警,实现一个轻量级的数据质量需要满足以下功能:

2.1 规则管理

2.1.1 规则类型

  • 单表规则
  • 跨表规则

2.1.2 规则实现方式

  • 内置规则
    • 空值检查
    • 表行数检查
    • 均值检查
    • 及时性检查
    • 重复性检查
    • 准确性检查
    • 等等
  • 自定义规则

2.1.3 规则的定义和解析

2.1.3.1 规则定义

完整的规则应该包括 connector 信息、执行的 sql 语句、比较值的类型,检查的类型等,即通过规则可以获取定义一个数据质量任务所需要的参数

2.1.3.2 规则解析

规则解析主要职责是通过解析用户输入的参数值和规则定义得到一个符合数据质量任务运行的输入参数

2.2 任务的执行方式

基于 DolphinScheduler 现有的任务执行方式,比较合适的方式就是使用 Spark 作为数据质量任务的执行引擎,通过配置的方式将具体的执行 SQL 传入 Spark 作业来运行,并将执行的结果写到指定的存储引擎中

2.3 检查结果告警

每个规则都会配置告警规则,当检查结果为异常的话,则会进行告警。使用 DolphinScheduler 的告警模块进行告警

3 概要设计

3.1 规则管理设计

3.1.1 规则组成设计

3.1.1.1 单表规则

  • 规则的定义
    • 规则的类型
      • rule_type
    • 规则的名称
      • rule_name
    • 规则输入项
      • 常规输入项
        • connector_type,例如 JDBC,HIVE
        • datasource_id
        • table
        • filed
        • filter
      • 统计指标输入项
        • statistics_name
    • 规则中间运行语句
      • ExecuteSQLDefinition
        • ExecuteSQL & TableAlias
    • 规则统计运行语句
      • ExecuteSQLDefinition
        • ExecuteSQL & TableAlias
    • 比较值相关输入项
      • 固定型
        • 输入项
          • 比较值规则标题:comparsion_title
          • 比较值名:comparsion_name
          • 比较值:comparsion_value
      • 计算型
        • 输入项
          • 比较值规则标题:comparsion_title
          • 比较值名:comparsion_name
          • 比较值:comparsion_value
        • 计算过程语句
          • ExecuteSQL & TableAlias
    • 校验规则输入项
      • 检查方式:固定条数/百分比, ${check_type}
      • 阈值:1000/30%,${threshold}
      • 比较符:等于,小于,大于,大于等于,小于等于 ${operator}
    • 结果输出运行语句(即将数据输出到某个存储引擎中,系统会自动生成)
      • 根据比较值类型进行生成
        • 固定型
          • 直接读取参数值 comparsion_value 作为字段值
        • 计算型
          • 将统计运行语句的表和比较值计算语句进行 join 以后获取两个统计指标的值作为字段值进行插入

3.1.1.2 跨表准确性规则

  • 规则的定义
    • 规则的类型
      • rule_type
    • 规则的名称
      • rule_name
    • 规则输入项
      • 常规输入项
        • src_connector_type
        • src_datasource_id
        • src_table
        • src_filter
        • target_connector_type
        • target_datasource_id
        • target_table
        • target_filter
        • mapping_columns
        • on_clause
      • 统计指标输入项
        • statistics_name
    • 规则中间运行语句
      • ExecuteSQLDefinition
        • ExecuteSQL & TableAlias
    • 规则统计运行语句
      • ExecuteSQLDefinition
        • ExecuteSQL & TableAlias
    • 比较值相关输入项
      • 固定型
        • 输入项
          • 比较值规则标题:comparsion_title
          • 比较值名:comparsion_name
          • 比较值:comparsion_value
      • 计算型
        • 输入项
          • 比较值规则标题:comparsion_title
          • 比较值名:comparsion_name
          • 比较值:comparsion_value
        • 计算过程语句
          • ExecuteSQL & TableAlias
    • 校验规则输入项
      • 检查方式:固定条数/百分比, ${check_type}
      • 阈值:1000/30%,${threshold}
      • 比较符:等于,小于,大于,大于等于,小于等于 ${operator}
    • 结果输出运行语句(即将数据输出到某个存储引擎中,系统会自动生成)
      • 根据比较值类型进行生成
        • 固定型
          • 直接读取参数值作为字段值
        • 计算型
          • 将统计运行语句的表和比较值计算语句进行 join 以后获取两个统计指标的值作为字段值进行插入

3.1.1.3 跨表值比对规则

  • 规则的定义
    • 规则的类型
      • rule_type
    • 规则的名称
      • rule_name
    • 规则输入项
      • 常规输入项
        • src_connector_type
        • src_datasource_id
        • statistics_name
        • statistics_execute_sql
        • target_connector_type
        • target_datasource_id
        • comparsion_name
        • comparsion_execute_sql
    • 校验规则输入项
      • 检查方式:固定条数/百分比, ${check_type}
      • 阈值:1000/30%,${threshold}
      • 比较符:等于,小于,大于,大于等于,小于等于 ${operator}
    • 结果输出运行语句(即将数据输出到某个存储引擎中,系统会自动生成)

select ${statistics_name} as statistics_value,${comparsion_name} as coparsion_value from ${statistics_execute_sql} full join ${comparsion_execute_sql}

3.1.2 自定义规则

  • 支持单表的对单列进行自定义规则计算
  • 支持跨表对单列的值计算对比

3.2 任务执行流程设计

3.2.1 执行引擎

  • Spark 计算引擎,2.0 以上

3.2.2 任务执行流程

数据质量检测任务执行流程 (3)

3.3 任务管理设计

数据质量任务不支持单独定义和定时调度,可以通过在工作流中定义和定时调度

3.4 数据质量任务定义 UI 设计

3.4.1 UI 页面生成方式

数据质量任务定义 UI 界面会根据不同规则的参数生成 JSON 串由前端组件自动生成

3.4.2 任务定义 UI 示意图

数据质量任务定义界面UI (2)

3.4.3 自定义规则界面 UI 示意图

自定义规则 (1)

4 详细设计

4.1 数据库设计

4.1.1 规则表

字段 类型 解释
id int id
name string 规则名称
type int 规则类型:单表规则/跨表规则
rule_json text 规则定义
create_time date 创建时间
update_time date 更新时间

4.1.2 检查结果表

字段 类型 解释
id int id
task_id long 任务 ID
task_instance_id long 任务实例 ID
rule_type int 规则类型
statistics_value double 计算的指标值
comparsion_value double 比对的指标值
check_type int 检测类型,数值比较或者百分比
threshold double 阈值
operator int 操作符:大于,小于,等于,不等于,大于等于,小于等于
create_time date 创建时间
update_time date 更新时间

4.1.3 检查结果统计表

4.2 类设计

4.2.1 规则相关

4.2.1.1 规则实体

  • RuleDefinition 规则定义
  • RuleInputEntry 输入项定义,每一个 RuleInputEntry 都有默认值,可以不修改
  • ExecuteSqlDefinition 执行 SQL 的定义
  • InputType 输入项的类型
  • FormType前端表单控间类型
  • OptionSourceType 前端表单控间所需 Options 来源
  • ValueType 输入项的值的类型
  • RuleType 规则类型
  • ConnectorType 数据源类型
  • ComparsionValueType 比较值类型
  • CheckType 检验类型
  • FixedComparsionValueParameter 固定数值比较值参数
  • CalculateComparsionValueParameter 计算类型比较值参数
  • ConnectorDefinition

4.2.1.2 规则解析

1) 规则使用的流程分析

  • 对于不同类型的规则都会有预先设计好的输入项,可以在此基础上添加专属输入项
  • 选择规则以后,会读取规则的内容中的规则输入项,构造一个符合 form-create 规范的 json 字符串,返回给前端生成对应的 UI 界面
  • 填写完成规则参数以后,提交以后会将配置好的的参数构造成一个 map 存储在任务参数中
  • 下发任务后进行参数的解析,构造成任务运行所需要的参数执行 spark 作业

2)规则解析具体内容

  • connector 数据解析

根据 datasource_id 拿到相关的数据源信息,包括 url,database,table,username,password,构造 connector 配置

  • 对 executeSQL 中的占位符进行替换,构造 executeSQL 列表
  • 构造 Writer 配置,包括构造 Writer 的 Connector 配置以及 saveSQL
    • writer 中构造 saveSQL 的伪代码如下:
if(comparsionType == FIXED){
  map.put("${comparsion_name}","fixed_value")
  sql = "select ${comparsion_name} as comparsion_value from ${staticsTableName} 
} else {
  sql = "select ${comparsion_name} as comparsion_value from ${statics_table_Name} full join ${comparsion_table_Name} 
}
resultSQL = sql.replacePlaceholder(map)

3)最终会构造成json 格式 的参数传给 Spark 应用

4.2.2 任务相关

4.2.2.1 DolphinScheduler 任务设计

  • DataQualityParameter

  • DataQualityTask

    • 主要职责就是运行一个 Spark 作业

4.2.2.2 Spark 数据质量任务设计

1)数据质量任务实际上是一个 Spark 任务,这个任务的主要责任是如下:

  • 解析参数,获取构造 Reader,Executor 和 Writer 所需要的参数
  • 根据参数构造对应类型的 Reader、Executor 和 Writer
    • Reader 的主要职责是读取指定数据源的数据,并创建临时表供后续使用
    • Executor 的主要职责是运行中间步骤 SQL 语句、统计步骤 SQL 语句和比较值计算 SQL 语句
    • Writer 的主要职责是将数据质量任务的计算结果写到相应的存储引擎中,目前只支持写回 ds 的数据库中

2)运行方式可如下:

  • 打包一个 DQApplicaiton.jar 上传至 hdfs 上,运行的时候加载默认的上传地址
    • 这种方式相对来说更节省存储空间和减少 jar 上传的时间
  • 将打包好的 DQApplication.jar 放至 lib 目录下,运行的时候加载该 jar 包
    • 这种方式对于用户的部署更加无感

5 Todo List

  • DataQualityTask UI 相关开发
  • DataQuailtyTask 相关开发
    • DataQualityParameter设计
    • DataQualityTask 设计
  • Rule 相关开发
    • RuleModel相关设计
    • RuleManager设计开发
    • RuleConverter
  • Spark DataQuality 相关开发

6 相关 issue 和 pr

issue: DataQuality Application
pr: DataQuality Common Entity

@zixi0825 zixi0825 added the feature new feature label Dec 21, 2020
@davidzollo
Copy link
Contributor

good feature

@Kyofin
Copy link
Contributor

Kyofin commented Dec 31, 2020

LGTM

@lbjyuer
Copy link

lbjyuer commented Jan 5, 2021

Essential functions of big data ETL System~~Looking forward to going online soon

@davidzollo
Copy link
Contributor

Essential functions of big data ETL System~~Looking forward to going online soon

+1

@zixi0825
Copy link
Member Author

zixi0825 commented Feb 1, 2021

Development Planning:

Version 1.0

  • Data quality type task development, including front end and back end (development completed)

    • Automatic generation of rule input items by selecting rules in front-end interface (development completed)

    • Provide a variety of detection methods (developed)

    • Provide multiple failure strategies (development completed)

  • The main responsibility of the executor with spark as the computing engine is to run data quality detection SQL (developed)

  • Built in multiple detection rules, including single table null value detection, cross table accuracy detection, cross table value comparison and single table custom SQL detection, etc. (developed)

  • Quality inspection results view, including front end and back end (development completed)

    • You can view the workflow of the task (development completed)
  • Rule management, only support view (development completed)

    • Viewable rule definition (development completed)
  • Data source only supports JDBC and hive (developed)

  • Part of the front end experience Optimization (incomplete)

Version 2.0 (Time to be determined)

  • Optimize the user experience of front-end input items, introduce metadata management of multiple data sources, select tables and columns, etc. (to be developed)

  • Provide custom rule template, support single table rule customization (to be developed)

  • New rules modification and deletion (to be developed)

  • Support abnormal data export (to be developed)

  • Support multiple data source detection, such as file, ES, etc. (to be developed)

  • Support to run data quality inspection task independently (to be developed)


开发计划:

1.0 版本 (本地开发已完成95%以上,尚未提PR)

  • 数据质量类型任务开发,包括前端和后端(已完成开发)
    • 实现前端界面选择规则自动生成规则输入项(已完成开发)
    • 提供多种检测方式(已完成开发)
    • 提供多种失败策略(已完成开发)

dqs_1

  • 以Spark为计算引擎的Executor,主要职责是运行数据质量检测SQL (已完成开发)
  • 内置多种检测规则,包括单表空值检测、跨表准确性检测、跨表值比对和单表自定义SQL检测等等(已完成开发)
  • 质量检测结果查看,包括前端和后端(已完成开发)
    • 可查看任务所在工作流(已完成开发)

dqs_2

  • 规则管理,仅支持查看(已完成开发)
    • 可查看规则定义(已完成开发)

dqs_3

  • 数据源仅支持JDBC和HIVE(已完成开发)
  • 部分前端体验优化(未完成)

2.0 版本 (时间待定)

  • 优化前端用户体验,包括输入项优化,引入多种数据源的元数据管理,选择表和列等(待开发)
  • 提供自定义规则模板,支持单表规则的自定义(待开发)
  • 新增规则修改和删除(待开发)
  • 支持异常数据导出(待开发)
  • 支持多种数据源检测,例如文件、ES等(待开发)
  • 支持单独运行数据质量检测任务(待开发)

@597365581
Copy link
Contributor

+1

2 similar comments
@davidzollo
Copy link
Contributor

+1

@ATLgo
Copy link

ATLgo commented Mar 2, 2021

+1

@ATLgo
Copy link

ATLgo commented Mar 4, 2021

@zixi0825 大佬,想请教一下怎么才能将您完成的功能跑起来?

@zixi0825
Copy link
Member Author

zixi0825 commented Mar 4, 2021

@zixi0825 大佬,想请教一下怎么才能将您完成的功能跑起来?

It is not completed yet

@JacobZheng0927
Copy link

I have also implemented the data quality management function in my company. The difference is that my work is based on Spark + parquet/avro to complete the data quality calculation. So I can provide some reference for the design of the calculation rules or the implementation of the specific code. I am very interested in participating in the development of this feature.How can I help you?

@davidzollo
Copy link
Contributor

I have also implemented the data quality management function in my company. The difference is that my work is based on Spark + parquet/avro to complete the data quality calculation. So I can provide some reference for the design of the calculation rules or the implementation of the specific code. I am very interested in participating in the development of this feature.How can I help you?

When you see the current solution, please give some suggestions and we can discuss

@zixi0825 zixi0825 reopened this Jun 29, 2021
caishunfeng pushed a commit that referenced this issue Jan 27, 2022
* add data quality module

* add license

* add package configuration in dist pom

* fix license and jar import bug

* replace apache/skywalking-eyes@9bd5feb SHA

* refacotr jbdc-connector and writer

* modify parameter name in HiveConnector

* fix checkstyle error

* fix checkstyle error in dolphinschesuler-dist

* fix checkstyle error in dolphinschesuler-dist

* fix checkstyle error in dolphinschesuler-dist

* fix duplicate code bug

* fix code style bug

* fix code smells

* add dq relevant enums and parameter

* replace apache/skywalking-eyes@9bd5feb SHA

* fix Constants bug

* remove the unused class

* add unit test

* fix code style error

* add unit test

* refactor data quality common entity

* fix code style error

* add unit test

* close e2e test

* fix code smell bug

* modify dataquality enum value to 14 in TaskType

* add data qualtiy task

* update

* add getDatasourceOptions interface

* fix checkstyle

* close e2e test

* add data quality task ui

* update skywalking-eyes SHA

* fix style

* fix eslint error

* fix eslint error

* test e2e

* add unit test and alter dataquality task result

* fix checkstyle

* fix process service test error

* add unit test and fix code smells

* fix checkstyle error

* fix unit test error

* fix checkstyle error

* change execute sql type name

* revert ui pom.xml

* fix data quality task error

* fix checkstyle error

* fix dq task src_connector_type ui select bug

* fix spark rw postgresql bug

* change mysql driver scope

* fix form-create json bug

* fix code smell

* fix DolphinException Bug

* fix ui validate rule and Alert title

* fix target connection param bug

* fix threshold validate change

* add rule input entry index

* change statistic_comparison_check logic

* remove check type change

* add DateExpressionReplaceUtil

* fix null point expetion

* fix null point expetion

* fix test error

* add more sql driver

* fix test error and remove DateExprReplaceUtil

* add get datasource tables and columns

* add get datasource tables and columns

* remove hive-jdbc in pom.xml

* fix code smells

* update sql

* change the pom.xml

* optimize multi_table_accuracy ui

* fix v-show error

* fix code smells

* update sql

* [Feature][DataQuality] Add data quality task ui (#5054)

* add data quality task ui

* update skywalking-eyes SHA

* fix style

* fix eslint error

* fix eslint error

* test e2e

* fix dq task src_connector_type ui select bug

* fix threshold validate change

* remove check type change

* add get datasource tables and columns

* optimize multi_table_accuracy ui

* fix v-show error

* fix code smells

Co-authored-by: sunchaohe <sunzhaohe@linklogis.com>

* [Feature][DataQuality] Add data quality module  (#4830)

* add data quality module

* add license

* add package configuration in dist pom

* fix license and jar import bug

* replace apache/skywalking-eyes@9bd5feb SHA

* refacotr jbdc-connector and writer

* modify parameter name in HiveConnector

* fix checkstyle error

* fix checkstyle error in dolphinschesuler-dist

* fix checkstyle error in dolphinschesuler-dist

* fix checkstyle error in dolphinschesuler-dist

* fix duplicate code bug

* fix code style bug

* fix code smells

* update

* close e2e test

* fix spark rw postgresql bug

* change mysql driver scope

* add more sql driver

* remove hive-jdbc in pom.xml

* change the pom.xml

Co-authored-by: sunchaohe <sunzhaohe@linklogis.com>

* [Feature][DataQuality] Add data quality task backend (#4883)

* add dq relevant enums and parameter

* replace apache/skywalking-eyes@9bd5feb SHA


Co-authored-by: sunchaohe <sunzhaohe@linklogis.com>

* refactor data_quality_module

* add header license

* data quality module refactor

* fix unit test error

* fix checkstyle error

* fix unit test error

* fix checkstyle error

* fix unit test error

* fix code smell

* fix check style

* fix unit test error

* task statistics value add unique code

* fix unit test error

* fix checkstyle error

* fix checkstyle

* fix security hotspot

* fix unit test error

* fix security hotspot

* fix check

* add data quality task error handling

* fix unit test error

* add unit test

* add unit test

* optimize data quality result alert

* fix unit test

* fix sql script error

* fix bug

* update sql script

* fix checkstyle

* add license

* fix checkstyle

* fix checkstyle

* fix unit test

* add jacoco dependencies

* fix unit test

* fix unit test

* add jacoco dependencies

* add unit test

* add unit test

* add license

* fix checkstyle

* fix pom

* fix checkstyle

* fix checkstyle

* merge dev

* fix ui error

* fix pom error

* fix pom error

* fix test error

* fix test error

* mssql-jdbc exclude azure-keyvault

* fix test error

* merge dev and add unit test

* add notes

* rollback the CollectionUtils

* fix

* update sql

* fix

* fix

* fix query rule page error

* change dq.jar path

* fix sql error

* fix ui error

* fix(dq): jar path&task enum description

* add notes on DataQualityApplication

* fix dq result jump error

* fix(ui): page condition

* feat(ui): add show error output path

* change version

* remove all chinese word in sql

* merge

Co-authored-by: sunchaohe <sunzhaohe@linklogis.com>
@wangqinghuan
Copy link

@zixi0825 Thanks for this amazing job. Do we only output check result to csv file at present?

@zixi0825
Copy link
Member Author

@zixi0825 Thanks for this amazing job. Do we only output check result to csv file at present?

At present, only CSV is supported. I will add other formats.

@wangqinghuan
Copy link

@zixi0825 Thanks for this amazing job. Do we only output check result to csv file at present?

At present, only CSV is supported. I will add other formats.

Do we have plan support output to database ? I want to show these results in UI or BI tools.

@davidzollo davidzollo reopened this Feb 22, 2022
@davidzollo
Copy link
Contributor

@zixi0825 Thanks for this amazing job. Do we only output check result to csv file at present?

At present, only CSV is supported. I will add other formats.

Do we have plan support output to database ? I want to show these results in UI or BI tools.

I think this issue can be discussed in the dev@dolphinscheduler.apache.org mailing list

@davidzollo
Copy link
Contributor

or I think you can submit an new issue to describe what you want

@wangqinghuan
Copy link

I have opened an new #8586

@a092cc
Copy link

a092cc commented May 5, 2022

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found:
执行hive或者mysql语句时,表明明是存在的,报这个错误是什么情况?@zixi0825

@yxsgao
Copy link

yxsgao commented May 19, 2022

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: 执行hive或者mysql语句时,表明明是存在的,报这个错误是什么情况?@zixi0825

请问你这个问题解决了吗?我也遇到了一样的问题。改了hive spark下面的hive-site.xml,但是还是报这个错误。

@a092cc
Copy link

a092cc commented May 20, 2022

需要把hive-site.xml 加载到conf中,我 是这么加的

* The SparkRuntimeEnvironment is responsible for creating SparkSession and SparkExecution
  */
@@ -47,14 +52,29 @@ public class SparkRuntimeEnvironment {
     }
 
     public void prepare() {
-        sparkSession = SparkSession.builder().config(createSparkConf()).getOrCreate();
+        sparkSession = SparkSession.builder().config(createSparkConf())
+                .enableHiveSupport()
+                .getOrCreate();
     }
 
     private SparkConf createSparkConf() {
         SparkConf conf = new SparkConf();
+
         this.config.entrySet()
             .forEach(entry -> conf.set(entry.getKey(), String.valueOf(entry.getValue())));
+
         conf.set("spark.sql.crossJoin.enabled","true");
+
+        Configuration cf = new Configuration();
+        cf.addResource("hive-site.xml");
+        cf.addResource("hdfs-site.xml");
+        cf.addResource("core-site.xml");
+        for (Map.Entry<String, String> next : cf) {
+            String key = next.getKey();
+            String value = next.getValue();
+            conf.set(key, value);
+        }
+
         return conf;
     }```

@feixiameiruhua
Copy link
Contributor

需要把hive-site.xml 加载到conf中,我 是这么加的

* The SparkRuntimeEnvironment is responsible for creating SparkSession and SparkExecution
  */
@@ -47,14 +52,29 @@ public class SparkRuntimeEnvironment {
     }
 
     public void prepare() {
-        sparkSession = SparkSession.builder().config(createSparkConf()).getOrCreate();
+        sparkSession = SparkSession.builder().config(createSparkConf())
+                .enableHiveSupport()
+                .getOrCreate();
     }
 
     private SparkConf createSparkConf() {
         SparkConf conf = new SparkConf();
+
         this.config.entrySet()
             .forEach(entry -> conf.set(entry.getKey(), String.valueOf(entry.getValue())));
+
         conf.set("spark.sql.crossJoin.enabled","true");
+
+        Configuration cf = new Configuration();
+        cf.addResource("hive-site.xml");
+        cf.addResource("hdfs-site.xml");
+        cf.addResource("core-site.xml");
+        for (Map.Entry<String, String> next : cf) {
+            String key = next.getKey();
+            String value = next.getValue();
+            conf.set(key, value);
+        }
+
         return conf;
     }```

赞👍🏻,不过我就改了上面的那句就可以了,然后打包替换成对应的这个jar(dolphinscheduler-data-quality-xxx.jar).
sparkSession = SparkSession.builder().config(createSparkConf())
.enableHiveSupport()
.getOrCreate();

@zixi0825
Copy link
Member Author

zixi0825 commented Oct 11, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature new feature
Projects
Status: Done
Development

No branches or pull requests