Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata Center - real-time data warehouse metadata, resource metadata (such as parsed out UDF, Connector and other key information, what attributes are there) #607

Closed
wolfboys opened this issue Jan 12, 2022 · 2 comments

Comments

@wolfboys
Copy link
Member

No description provided.

@wolfboys wolfboys mentioned this issue Jan 12, 2022
22 tasks
@Whojohn
Copy link
Contributor

Whojohn commented Sep 20, 2022

Flink metadata management

Metadata definition (StreamPark RoadMap)

Step 1

I will finish to implement the catalog function implementation, and source management participates in the design. (Part of the metadata depends on syntax analysis, which will make the project complicated, so i split as flower.)

reference:

https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.html

definition:

  1. Firstly we must satisfy the most simple metadata management, Include : version control catalog, source manager(kafka,Es and so on),Simple auth control;

Requires implementation

  1. Flink catalog: including: connector information, based on physical column schema management. Schema contains or not contains meta manager. For example: kafka has no meta, and mysql/es has meta.

No meta management, kafka for example

  1. A Topic, has json, we store json define.

There is meta management, mysql for example

  1. Only record meta information obtained from mysql.
  1. Source management: catalog simple auth , check a user has permission to use a table, regardless of read and write permissions. (Simple catalog cannot distinguish whether read or not).
  2. Catalog version control: catalog table has the concept of version. Unless user specified, the default is the time when the task first initialization is selected as the version.
  3. Flink feature support (Not only physic column). **Note that schema management does not include the definition of time columns, primary key columns, and operation columns, just physic column. **
# flink feature  syntax are as follow
Support syntax 1 (define a new table)
create table as tablea
Support syntax 2 (redefine the table structure within the session. For example: a bigint is defined as timestamp)
create tablea (a bigint, ts as xxx)

  1. Schema management: source schema type--> datatype---> Flink sql type . Instead of source type``--> Flink sql type (Hard to scalability, not uniform define).

reference: ( reference design document)

https://atlas.apache.org/2.0.0/TypeSystem.html

https://datahubproject.io/docs/advanced/field-path-spec-v2

https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py

  1. Resolved support for partition insert columns in flink sql. For example, kafka table defines 10 fields, insert only use 6 fields, which is not supported yet. (catalog decline all column but we just need partly.)
solution:
# sql parse rebuild sql as follow:
before :
insert into temp select a,b,c from source
after:
insert into temp select a,b,c, null as `d` from source

Notice:

  1. Do not consider complex permissions issues.
  2. Function definitions such as udx are not considered.
  3. The principle of simplification, without considering metadata manger such as datahub dose.
  4. Do not consider schema evolation, schema compatibility problem . (Compatibility is a big issue in Flink sql, especially the logical changes follow by schema changes will lead to state incompatibility, so it is meaningless for Flink sql to consider this issue.)

Step 2

Part of the functional commitment to achieve

Requires implementation

  1. Meet the step 1.
  2. Introduce field-level permission management.

Since catalog cannot distinguish source/sink tables, there are two ways to implement permissions control:

  1. streamgraph (our implementation)
  2. Syntax parsing
  1. udx management support. (automatically loaded via classload)
  2. Format enhancement. For example, row type flattening avoid bug . (Flink bug, [FLINK-18027](https://issues.apache.org/jira/browse/FLINK-18027) ).

Step 3

No commitment to fulfill

Requires implementation

  1. Satisfy the step 3.
  2. Field source blood relationship analysis.
  3. Cross-task bloodline analysis, open up external metadata tools, such as altas, datahub, etc.
  4. schema evloation & state evloation.

This problem is complicated. Not only solve the problem of scheam, but also the compatibility of Flink state; schema compatibility can be achieved through avro compatibility, refer to hudi, etc.

Flink 元数据管理内容

元数据定义(StreamPark RoadMap)

第一步

除部分列支持外,承诺实现catalog功能实现,源管理参与设计。(部分列依赖语法分析,会导致项目变得复杂)

reference:

https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.html

定义

  1. 第一层的是满足最朴素的元数据管理 ,对应flink sql 中的具有版本管理的catalog实现,最基本的权限管理

需要实现

  1. Flink catalog:包括:connector information基于物理列schema管理。schema管理包括,无meta源,有meta源。比如:kafka 是无meta的,mysql/es等是有meta

meta 管理,kafka 为例

  1. 一个Topic, 是 json格式,管理该schema定义。

meta管理,mysql为例

  1. 只记录从mysql中获取的meta信息。
  1. 源管理:catalog简单的权限控制,一个用户是否具有一张表的使用权限,不区分读写权限。(单纯catalog无法区分,第二阶段能够实现)。
  2. catalog版本控制:catalog 表具有版本的概念,除非特定声明,默认以任务首次初始化的时间作为版本选择的。
  3. flink特性支持(以上只能解决物理列)。注意schema管理不包含时间列,主键列,运算列的定义,是最朴素的物理列定义。Flink 特性支持只在单个任务内有效。
# flink 特性声明语法 (以下语法无需改动 flink 底层,只与 flink catalog 有关)
支持语法1 (定义一张新表)
create table as tablea
支持语法2 (会话内重新定义表结构。如:a bigint 定义为 timestamp)
create tablea (a bigint, ts as xxx)

  1. schema管理:不同源优先考虑 source schema type--> datatype---> Flink sql type。而不是 source schema type--> Flink sql type这种管理方式(扩展性差,不统一)。

reference: (基于以下规则扩展,或者基于avro schema参考设计)

https://atlas.apache.org/2.0.0/TypeSystem.html

https://datahubproject.io/docs/advanced/field-path-spec-v2

https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py

  1. 解决flink sql 部分列的支持。比如 kafka 表定义了10个字段,insert只声明6个字段,不支持的问题。(catalog会导致所有插入都是全部列声明,无法使用部分列。)
解决方案:
sql 改写,自动补 null 
insert a,b,c,null as `d`, null as `e` 

注意:

  1. 不考虑复杂权限问题。
  2. 不考虑 udx 等函数定义。
  3. 最简单化原则,不考虑血缘等其他层的定义。(对于streamPark的核心实现是实现第一步)
  4. 不考虑schema evolationschema兼容相关性问题。(Flink sql 中兼容性是个大问题,特别是schema变更同时带来的逻辑变更会导致state 不兼容,因此Flink sql 考虑这个问题是无意义的。)

第二步

部分功能承诺实现

  • 需要实现
  1. 满足第一层。
  2. 引入字段级别的权限管理。

由于 catalog 无法区分source/sink表,实现权限可以有两种思路:

  1. streamgraph(我们内部实现方式)
  2. 语法解析
  1. udx 管理支持。(通过 classload 自动装载)
  2. 非元数据相关内容,format 增强。比如,row 类型展平(Flink bug,[FLINK-18027](https://issues.apache.org/jira/browse/FLINK-18027) 等row潜在问题),通过 format 支持etl代码和sql混合编程等。(内部实现,所有etl会映射到pojo,通过pojo format 实现支持。这种支持对于spark stream etl/flink stream elt 迁移到flink sql具有实用性。 )。

第三步

不承诺实现

  • 需要实现
  1. 满足第二层。
  2. 字段来源血缘分析。
  3. 跨任务血缘分析,打通外部metadata工具,如altas,datahub等。
  4. schema evoloation& state evloation

这个问题有点复杂,实际应用不仅仅要解决scheam 的问题,还必须解决Flink state 兼容性问题;schema 兼容可以通过avro兼容性,参考hudi等实现

#609
#602

@wolfboys
Copy link
Member Author

Currently, there are no plans for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants