diff --git a/docs/content.zh/docs/dev/table/sql/queries/hints.md b/docs/content.zh/docs/dev/table/sql/queries/hints.md index d6e14e192f953..f3afe1973fdfc 100644 --- a/docs/content.zh/docs/dev/table/sql/queries/hints.md +++ b/docs/content.zh/docs/dev/table/sql/queries/hints.md @@ -79,4 +79,289 @@ insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select ``` +## 查询提示 + +### 联接提示 + +#### LOOKUP + +{{< label Streaming >}} + +LOOKUP 联接提示允许用户建议 Flink 优化器: +1. 使用同步或异步的查找函数 +2. 配置异步查找相关参数 +3. 启用延迟重试查找策略 + +#### 语法 +```sql +SELECT /*+ LOOKUP(hint_options) */ + +hint_options: key=value[, key=value]* + +key: + stringLiteral + +value: + stringLiteral +``` + +#### LOOKUP 提示选项: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
选项类型选项名称必选选项值类型默认值选项说明
tabletableYstringN/A查找源表的表名
asyncasyncNbooleanN/A值可以是 'true' 或 'false', 以建议优化器选择对应的查找函数。若底层的连接器无法提供建议模式的查找函数,提示就不会生效
output-modeNstringordered值可以是 'ordered' 或 'allow_unordered','allow_unordered' 代表用户允许不保序的输出, 在优化器判断不影响 + 正确性的情况下会转成 `AsyncDataStream.OutputMode.UNORDERED`, 否则转成 `ORDERED`。 这与作业参数 + `ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE` 是一致的
capacityNinteger100异步查找使用的底层 `AsyncWaitOperator` 算子的缓冲队列大小
timeoutNduration300s异步查找从第一次调用到最终查找完成的超时时间,可能包含了多次重试,在发生 failover 时会重置
retryretry-predicateNstringN/A可以是 'lookup_miss',表示在查找结果为空是启用重试
retry-strategyNstringN/A可以是 'fixed_delay'
fixed-delayNdurationN/A固定延迟策略的延迟时长
max-attemptsNintegerN/A固定延迟策略的最大重试次数
+ +注意:其中 +- 'table' 是必选项,需要填写目标联接表的表名(和 FROM 子句引用的表名保持一致),注意当前不支持填写表的别名(这将在后续版本中支持)。 +- 异步查找参数可按需设置一个或多个,未设置的参数按默认值生效。 +- 重试查找参数没有默认值,在需要开启时所有参数都必须设置为有效值。 + +#### 1. 使用同步或异步的查找函数 +如果连接器同时具备同步和异步查找能力,用户通过给出提示选项值 'async'='false' 来建议优化器选择同步查找, 或 'async'='true' 来建议选择异步查找。 + +示例: +```sql +-- 建议优化器选择同步查找 +LOOKUP('table'='Customers', 'async'='false') + +-- 建议优化器选择异步查找 +LOOKUP('table'='Customers', 'async'='true') +``` +注意:当没有指定 'async' 选项值时,优化器优先选择异步查找,在以下两种情况下优化器会选择同步查找: +1. 当连接器仅实现了同步查找时 +2. 用户在参数 ['table.optimizer.non-deterministic-update.strategy']({{< ref "docs/dev/table/config" >}}#table-optimizer-non-deterministic-update-strategy) 上启用了 'TRY_RESOLVE' 模式,并且优化器推断用户查询中存在非确定性更新的潜在风险时 + +#### 2. 配置异步查找相关参数 +在异步查找模式下,用户可通过提示选项直接配置异步查找相关参数 + +示例: +```sql +-- 设置异步查找参数 'output-mode', 'capacity', 'timeout', 可按需设置单个或多个参数 +LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') +``` +注意:联接提示上的异步查找参数和[作业级别配置参数]({{< ref "docs/dev/table/config" >}}#execution-options)的含义是一致的,没有设置的参数值由默认值生效,另一个区别是联接提示作用的范围更小,仅限于当前联接操作中对应联接提示选项设置的表名(未被联接提示作用的其他联接查询不受影响)。 + +例如:作业级别异步查找参数设置为 +```gitexclude +table.exec.async-lookup.output-mode: ORDERED +table.exec.async-lookup.buffer-capacity: 100 +table.exec.async-lookup.timeout: 180s +``` + +那么以下联接提示: +```sql +1. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') +2. LOOKUP('table'='Customers', 'async'='true', 'timeout'='300s') +``` + +分别等价于: +```sql +1. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') +2. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s') +``` + +#### 3. 启用延迟重试查找策略 +延迟重试查找希望解决流场景中经常遇到的维表数据更新延迟而不能被流数据正确关联的问题。通过提示选项 'retry-predicate'='lookup_miss' 可设置查找结果为空的重试条件,同时设置重试策略参数来开启重试查找功能(同步或异步查找均可),当前仅支持固定延迟重试策略。 + +固定延迟重试策略参数: +```gitexclude +'retry-strategy'='fixed_delay' +-- 固定重试间隔 +'fixed-delay' +-- 最大重试次数(从重试执行开始计数,比如最大重试次数设置为 1,则对某个具体查找键的一次查找处理实际最多执行 2 次查找请求) +'max-attempts' +``` + +示例: +1. 开启异步查找重试 +```sql +LOOKUP('table'='Customers', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +2. 开启同步查找重试 +```sql +LOOKUP('table'='Customers', 'async'='false', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +若连接器仅实现了同步或异步中的一种查找能力,'async' 提示选项可以省略: +```sql +LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +#### 进一步说明 + +#### 开启缓存对重试的影响 +[FLIP-221](https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric) 引入了对查找源表的缓存支持, +缓存策略有部分缓存、全部缓存两种,开启全部缓存时('lookup.cache'='FULL'),重试无法起作用(因为查找表被完整缓存,重试查找没有任何实际意义);开启部分缓存时,当一条数据开始查找处理时, +先在本地缓存中查找,如果没找到则通过连接器进行外部查找(如果存在,则立即返回),此时查不到的记录和不开启缓存时一样,会触发重试查找,重试结束时的结果即为最终的查找结果(在部分缓存模式下,更新本地缓存)。 + +#### 关于查找键及 'retry-predicate'='lookup_miss' 重试条件的说明 +对不同的连接器,提供的索引查找能力可能是不同的,例如内置的 HBase 连接器,默认仅提供了基于 `rowkey` 的索引查找能力(未启用二级索引),而对于内置的 JDBC 连接器,默认情况下任何字段都可以被用作索引查找,这是物理存储的特性不同所决定的。 +查找键即这里提到的作为索引查找的字段或字段组合,以 [`lookup join`]({{< ref "docs/dev/table/sql/queries/joins" >}}#lookup-join) +文档中的示例为例,联接条件 "ON o.customer_id = c.id" 中 `c.id` 即为查找键 + +```sql +SELECT o.order_id, o.total, c.country, c.zip +FROM Orders AS o + JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c + ON o.customer_id = c.id +``` + +如果联接条件改为 "ON o.customer_id = c.id and c.country = 'US'",即: +```sql +SELECT o.order_id, o.total, c.country, c.zip +FROM Orders AS o + JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c + ON o.customer_id = c.id and c.country = 'US' +``` + +当 `Customers` 表存储在 MySql 中时,`c.id` 和 `c.country` 都会被用作查找键 +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING +) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', + 'table-name' = 'customers' +) +``` + +而当 `Customers` 表存储在 HBase 中时,仅 `c.id` 会被用作查找键,而 `c.country = 'US'` 会作为剩余的联接条件在查找返回的记录上进一步检查是否满足 +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'connector' = 'hbase-2.2', + ... +) +``` + +相应的,在启用查找结果为空的重试条件和对应的固定间隔重试策略时,上述查询在不同的存储上的重试效果可能是不一样的,比如 `Customers` 表中的有一条记录: +```gitexclude +id=100, country='CN' +``` +处理订单流中一条 'id=100' 的记录,当连接器为 'jdbc' 时,因为 `c.id` 和 `c.country` 都会被用作查找键,对应的查找结果为空(`country='CN'` 不满足条件 `c.country = 'US'`),会触发重试查找; +而当连接器为 'hbase-2.2' 时,因为仅 `c.id` 会被用作查找键,因而对应的查找结果非空(会返回 `id=100, country='CN'` 的记录),因此不会触发重试查找,只是在检查剩余的联接条件 `c.country = 'US'` 时不满足。 + +当前基于 SQL 语义的考虑,仅提供了 'lookup_miss' 重试条件,当需要等待维度表中某些更新时(表中已存在历史版本记录,而非不存在),用户可以尝试两种选择: +1. 利用 DataStream Async I/O 中新增的异步重试支持,实现定制的重试条件(可实现对返回记录更复杂的判断) +2. 利用上述查找键在不同连接器上的特性区别,某些场景下延迟查找维表更新记录的一种解决方案是在联接条件上增加数据的时间版本比较: +比如示例中 `Customers` 维表每小时都会更新,可以新增一个时间相关的版本字段 `update_version`,保留到小时精度(可根据时效性需求修改生成方式),如更新时间 '2022-08-15 12:01:02' 记录 `update_version` 为 '2022-08-15 12:00' +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING, + -- 新增时间相关的数据版本字段, + update_version STRING +) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', + 'table-name' = 'customers' +) +``` +增加使用订单流的时间字段和维表 `Customers`.`update_version` 的等值联接条件: +```sql +ON o.customer_id = c.id AND DATE_FORMAT(o.order_timestamp, 'yyyy-MM-dd HH:mm') = c.update_version +``` +这样当新来的订单流数据未查到 `Customers` 表 12 点的新数据时,就能开启等待重试来查找期望的更新值。 + +#### 常见问题排查 +开启延迟重试查找后,较容易遇到的问题是维表查找节点形成反压,通过 web ui Task Manager 页面的 Thread Dump 功能可以快速确认是否延迟重试引起。 +从异步和同步查找分别来看,thread sleep 调用栈会出现在: +1. 异步查找:`RetryableAsyncLookupFunctionDelegator` +2. 同步查找:`RetryableLookupFunctionDelegator` + +注意: +- 异步查找时,如果所有流数据需要等待一定时长再去查找维表,我们建议尝试其他更轻量的方式(比如源表延迟一定时间消费)。 +- 同步查找中的延迟等待重试执行是完全同步的,即在当前数据没有完成重试前,不会开始下一条数据的处理。 +- 异步查找中,如果 'output-mode' 最终为 'ORDERED',那延迟重试造成反压的概率相对 'UNORDERED' 更高,这种情况下调大 'capacity' 不一定能有效减轻反压,可能需要考虑减小延迟等待的时长。 + {{< top >}} diff --git a/docs/content/docs/dev/table/sql/queries/hints.md b/docs/content/docs/dev/table/sql/queries/hints.md index fc556c393658c..ddc009d52b4d4 100644 --- a/docs/content/docs/dev/table/sql/queries/hints.md +++ b/docs/content/docs/dev/table/sql/queries/hints.md @@ -84,4 +84,324 @@ insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select ``` +## Query Hints + +### Join Hints + +#### LOOKUP + +{{< label Streaming >}} + +The LOOKUP hint allows users to suggest the Flink optimizer to: +1. use synchronous(sync) or asynchronous(async) lookup function +2. configure the async parameters +3. enable delayed retry strategy for lookup + +#### Syntax +```sql +SELECT /*+ LOOKUP(key=value[, key=value]*) */ + +key: + stringLiteral + +value: + stringLiteral +``` + +#### LOOKUP Hint Options: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
option typeoption namerequiredvalue typedefault valuedescription
tabletableYstringN/Athe table name of the lookup source
asyncasyncNbooleanN/Avalue can be 'true' or 'false' to suggest the planner choose the corresponding lookup function. + If the backend lookup source does not support the suggested lookup mode, it will take no effect.
output-modeNstringorderedvalue can be 'ordered' or 'allow_unordered'.
'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used. It is consistent with
`ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE`.
capacityNinteger100the buffer capacity for the backend asyncWaitOperator of the lookup join operator.
timeoutNduration300stimeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover
retryretry-predicateNstringN/Acan be 'lookup_miss' which will enable retry if lookup result is empty.
retry-strategyNstringN/Acan be 'fixed_delay'
fixed-delayNdurationN/Adelay time for the 'fixed_delay' strategy
max-attemptsNintegerN/Amax attempt number of the 'fixed_delay' strategy
+ +Note: +- 'table' option is required, only table name is supported(keep consistent with which in the FROM clause), alias name is not supported currently(will be supported in later versions). +- async options are all optional, will use default value if not configured. +- there is no default value for retry options, all retry options should be set to valid values when need to enable retry. + +#### 1. Use Sync And Async Lookup Function +If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='false' +to suggest the planner to use the sync lookup or 'async'='true' to use the async lookup: + +Example: +```sql +-- suggest the optimizer to use sync lookup +LOOKUP('table'='Customers', 'async'='false') + +-- suggest the optimizer to use async lookup +LOOKUP('table'='Customers', 'async'='true') +``` +Note: the optimizer prefers async lookup if no 'async' option is specified, it will always use sync lookup when: +1. the connector only implements the sync lookup +2. user enables 'TRY_RESOLVE' mode of ['table.optimizer.non-deterministic-update.strategy']({{< ref "docs/dev/table/config" >}}#table-optimizer-non-deterministic-update-strategy) and the +optimizer has checked there's correctness issue caused by non-deterministic update. + +#### 2. Configure The Async Parameters +Users can configure the async parameters via async options on async lookup mode. + +Example: +```sql +-- configure the async parameters: 'output-mode', 'capacity', 'timeout', can set single one or multi params +LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') +``` +Note: the async options are consistent with the async options in [job level Execution Options]({{< ref "docs/dev/table/config" >}}#execution-options), +will use job level configuration if not set. Another difference is that the scope of the LOOKUP hint +is smaller, limited to the table name corresponding to the hint option set in the current lookup +operation (other lookup operations will not be affected by the LOOKUP hint). + +e.g., if the job level configuration is: +```gitexclude +table.exec.async-lookup.output-mode: ORDERED +table.exec.async-lookup.buffer-capacity: 100 +table.exec.async-lookup.timeout: 180s +``` + +then the following hints: +```sql +1. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') +2. LOOKUP('table'='Customers', 'async'='true', 'timeout'='300s') +``` + +are equivalent to: +```sql +1. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') +2. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s') +``` + +#### 3. Enable Delayed Retry Strategy For Lookup +Delayed retry for lookup join is intended to solve the problem of delayed updates in external system +which cause unexpected enrichment with stream data. The hint option 'retry-predicate'='lookup_miss' +can enable retry on both sync and async lookup, only fixed delay retry strategy is supported currently. + +Options of fixed delay retry strategy: +```gitexclude +'retry-strategy'='fixed_delay' +-- fixed delay duration +'fixed-delay'='10s' +-- max number of retry(counting from the retry operation, if set to '1', then a single lookup process +-- for a specific lookup key will actually execute up to 2 lookup requests) +'max-attempts'='3' +``` + +Example: +1. enable retry on async lookup +```sql +LOOKUP('table'='Customers', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +2. enable retry on sync lookup +```sql +LOOKUP('table'='Customers', 'async'='false', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +If the lookup source only has one capability, then the 'async' mode option can be omitted: + +```sql +LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +#### Further Notes + +#### Effect Of Enabling Caching On Retries +[FLIP-221](https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric) adds caching support for lookup source, +which has PARTIAL and FULL caching mode(the mode NONE means disable caching). When FULL caching is enabled, there'll +be no retry at all(because it's meaningless to retry lookup via a full cached mirror of lookup source). +When PARTIAL caching is enabled, it will lookup from local cache first for a coming record and will +do an external lookup via backend connector if cache miss(if cache hit, then return the record immediately), +and this will trigger a retry when lookup result is empty(same with caching disabled), the final lookup +result is determined when retry completed(in PARTIAL caching mode, it will also update local cache). + + +#### Note On Lookup Keys And 'retry-predicate'='lookup_miss' Retry Conditions +For different connectors, the index-lookup capability maybe different, e.g., builtin HBase connector +can lookup on rowkey only (without secondary index), while builtin JDBC connector can provide more +powerful index-lookup capabilities on arbitrary columns, this is determined by the different physical +storages. +The lookup key mentioned here is the field or combination of fields for the index-lookup, +as the example of [`lookup join`]({{< ref "docs/dev/table/sql/queries/joins" >}}#lookup-join), where +`c.id` is the lookup key of the join condition "ON o.customer_id = c.id": + +```sql +SELECT o.order_id, o.total, c.country, c.zip +FROM Orders AS o + JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c + ON o.customer_id = c.id +``` + +if we change the join condition to "ON o.customer_id = c.id and c.country = 'US'": +```sql +SELECT o.order_id, o.total, c.country, c.zip +FROM Orders AS o + JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c + ON o.customer_id = c.id and c.country = 'US' +``` + +both `c.id` and `c.country` will be used as lookup key when `Customers` table was stored in MySql: +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING +) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', + 'table-name' = 'customers' +) +``` + +only `c.id` can be the lookup key when `Customers` table was stored in HBase, and the remaining join +condition `c.country = 'US'` will be evaluated after lookup result returned +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'connector' = 'hbase-2.2', + ... +) +``` + +Accordingly, the above query will have different retry effects on different storages when enable +'lookup_miss' retry predicate and the fixed-delay retry strategy. + +e.g., if there is a row in the `Customers` table: +```gitexclude +id=100, country='CN' +``` +When processing an record with 'id=100' in the order stream, in 'jdbc' connector, the corresponding +lookup result is null (`country='CN'` does not satisfy the condition `c.country = 'US'`) because both +`c.id` and `c.country` are used as lookup keys, so this will trigger a retry. + +When in 'hbase' connector, only `c.id` will be used as the lookup key, the corresponding lookup result +will not be empty(it will return the record `id=100, country='CN'`), so it will not trigger a retry +(the remaining join condition `c.country = 'US'` will be evaluated as not true for returned record). + +Currently, based on SQL semantic considerations, only the 'lookup_miss' retry predicate is provided, +and when it is necessary to wait for delayed updates of the dimension table (where a historical +version record already exists in the table, rather than not), users can try two solutions: +1. implements a custom retry predicate with the new retry support in DataStream Async I/O (allows for more complex judgments on returned records). +2. enable delayed retry by adding another join condition including comparison on some kind of data version generated by timestamp +for the above example, assume the `Customers` table is updated every hour, we can add a new time-dependent +version field `update_version`, which is reserved to hourly precision, e.g., update time '2022-08-15 12:01:02' +of record will store the `update_version` as ' 2022-08-15 12:00' +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING, + -- the newly added time-dependent version field + update_version STRING +) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', + 'table-name' = 'customers' +) +``` + +append an equal condition on both time field of Order stream and `Customers`.`update_version` to the join condition: +```sql +ON o.customer_id = c.id AND DATE_FORMAT(o.order_timestamp, 'yyyy-MM-dd HH:mm') = c.update_version +``` +then we can enable delayed retry when Order's record can not lookup the new record with '12:00' version in `Customers` table. + + +#### Trouble Shooting +When turning on the delayed retry lookup, it is more likely to encounter a backpressure problem in +the lookup node, this can be quickly confirmed via the 'Thread Dump' on the 'Task Manager' page of web ui. +From async and sync lookups respectively, call stack of thread sleep will appear in: +1. async lookup:`RetryableAsyncLookupFunctionDelegator` +2. sync lookup:`RetryableLookupFunctionDelegator` + +Note: +- async lookup with retry is not capable for fixed delayed processing for all input data (should use other +lighter ways to solve, e.g., pending source consumption or use sync lookup with retry) +- delayed waiting for retry execution in sync lookup is fully synchronous, i.e., processing of the +next record does not begin until the current record has completed. +- in async lookup, if 'output-mode' is 'ORDERED' mode, the probability of backpressure caused by delayed +retry maybe higher than 'UNORDERED' mode, in which case increasing async 'capacity' may not be effective +in reducing backpressure, and it may be necessary to consider reducing the delay duration. + {{< top >}}