From cc59fadaf5c9cac5fbafd721e59ab454f2bf121f Mon Sep 17 00:00:00 2001 From: lincoln lee Date: Mon, 15 Aug 2022 18:11:47 +0800 Subject: [PATCH 1/7] [FLINK-28971][table-planner][doc] Adds user documentation for the new LOOKUP hint --- .../docs/dev/table/sql/queries/hints.md | 280 ++++++++++++++++ .../docs/dev/table/sql/queries/hints.md | 307 ++++++++++++++++++ 2 files changed, 587 insertions(+) diff --git a/docs/content.zh/docs/dev/table/sql/queries/hints.md b/docs/content.zh/docs/dev/table/sql/queries/hints.md index d6e14e192f953..9b49b085c59d2 100644 --- a/docs/content.zh/docs/dev/table/sql/queries/hints.md +++ b/docs/content.zh/docs/dev/table/sql/queries/hints.md @@ -79,4 +79,284 @@ insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select ``` +## 查询提示 + +### 联接提示 + +#### 联接提示 - LOOKUP + +{{< label Streaming >}} + +LOOKUP 联接提示允许用户建议 Flink 优化器: +1. 使用同步或异步的查找函数 +2. 配置异步查找相关参数 +3. 启用延迟重试查找策略 + +##### 语法 +```sql +SELECT /*+ LOOKUP(hint_options) */ + +hint_options: key=value[, key=value]* + +key: + stringLiteral + +value: + stringLiteral +``` + +可用的提示选项说明: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
选项类型选项名称可选选项值类型默认值选项说明
tabletableNstringN/A查找源表的表名
asyncasyncYbooleanN/A值可以是 'true' 或 'false', 以建议优化器选择对应的查找函数。若底层的连接器无法提供建议模式的查找函数,提示就不会生效
output-modeYstringordered值可以是 'ordered' 或 'allow_unordered','allow_unordered' 代表用户允许不保序的输出, 在优化器判断不影响 + 正确性的情况下会转成 `AsyncDataStream.OutputMode.UNORDERED`, 否则转成 `ORDERED`。 这与作业参数 + `ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE` 是一致的
capacityYinteger100异步查找使用的底层 `AsyncWaitOperator` 算子的缓冲队列大小
timeoutYduration300s异步查找从第一次调用到最终查找完成的超时时间,可能包含了多次重试,在发生 failover 时会重置
retryretry-predicateYstringN/A可以是 'lookup_miss',表示在查找结果为空是启用重试
retry-strategyYstringN/A可以是 'fixed_delay'
fixed-delayYdurationN/A固定延迟策略的延迟时长
max-attemptsYintegerN/A固定延迟策略的最大重试次数
+ +注意:其中 +- 'table' 是必选项,需要填写目标联接表的表名,注意当前不支持填写表的别名(这将在后续版本中支持)。 +- 异步查找参数可按需设置一个或多个,未设置的参数按默认值生效。 +- 重试查找参数没有默认值,在需要开启时所有参数都必须设置为有效值。 + +##### 1. 使用同步或异步的查找函数 +如果连接器同时具备同步和异步查找能力,用户通过给出提示选项值 'async'='false' 来建议优化器选择同步查找, 或 'async'='true' 来建议选择异步查找。 + +示例: +```sql +-- 建议优化器选择同步查找 +LOOKUP('table'='Customers', 'async'='false') + +-- 建议优化器选择异步查找 +LOOKUP('table'='Customers', 'async'='true') +``` +注意:当没有指定 'async' 选项值时,优化器优先选择异步查找,在以下两种情况下优化器会选择同步查找: +1. 当连接器仅实现了同步查找时 +2. 用户在参数 'table.optimizer.non-deterministic-update.strategy' 上启用了 'TRY_RESOLVE' 模式,并且优化器推断用户查询中存在非确定性更新的潜在风险时 + +##### 2. 配置异步查找相关参数 +在异步查找模式下,用户可通过提示选项直接配置异步查找相关参数 + +示例: +```sql +-- 设置异步查找参数 'output-mode', 'capacity', 'timeout', 可按需设置单个或多个参数 +LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') +``` +注意:联接提示上的异步查找参数和[作业级别配置参数]]({{< ref "docs/dev/table/config/#execution-options" >}})的 +含义是一致的,没有设置的参数值由默认值生效,另一个区别是联接提示作用的范围更小,仅限于当前联接操作中对应联接提示选项设置的表名(未被联接提示作用的其他联接查询不受影响)。 + +例如:作业级别异步查找参数设置为 +```gitexclude +table.exec.async-lookup.output-mode: ORDERED +table.exec.async-lookup.buffer-capacity: 100 +table.exec.async-lookup.timeout: 180s +``` + +那么以下联接提示: +```sql +1. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') +2. LOOKUP('table'='Customers', 'async'='true', 'timeout'='300s') +``` + +分别等价于: +```sql +1. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') +2. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s') +``` + +##### 3. 启用延迟重试查找策略 +延迟重试查找希望解决流场景中经常遇到的维表数据更新延迟而不能被流数据正确关联的问题。通过提示选项 'retry-predicate'='lookup_miss' +可设置查找结果为空的重试条件,同时设置重试策略参数来开启重试查找功能(同步或异步查找均可),当前仅支持固定延迟重试策略。 + +固定延迟重试策略参数: +```gitexclude +'retry-strategy'='fixed_delay' +-- 固定重试间隔 +'fixed-delay' +-- 最大重试次数(从重试执行开始计数,比如最大重试次数设置为 1,则对某个具体查找键的一次查找处理实际最多执行 2 次查找请求) +'max-attempts' +``` + +示例: +1. 开启异步查找重试 +```sql +LOOKUP('table'='Customers', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +2. 开启同步查找重试 +```sql +LOOKUP('table'='Customers', 'async'='false', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +若连接器仅实现了同步或异步中的一种查找能力,'async' 提示选项可以省略: +```sql +LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +###### 关于查找键及 'retry-predicate'='lookup_miss' 重试条件的说明 +对不同的连接器,提供的索引查找能力可能是不同的,例如内置的 HBase 连接器,默认仅提供了基于 `rowkey` 的索引查找能力(未 +启用二级索引),而对于内置的 JDBC 连接器,默认情况下任何字段都可以被用作索引查找,这是物理存储的特性不同所决定的。 +查找键即这里提到的作为索引查找的字段或字段组合,以 `lookup join` 文档中的示例为例,联接条件 "ON o.customer_id = c.id" 中 `c.id` 即为查找键 + +```sql +SELECT o.order_id, o.total, c.country, c.zip +FROM Orders AS o + JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c + ON o.customer_id = c.id +``` + +如果联接条件改为 "ON o.customer_id = c.id and c.country = 'US'",即: +```sql +SELECT o.order_id, o.total, c.country, c.zip +FROM Orders AS o + JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c + ON o.customer_id = c.id and c.country = 'US' +``` + +当 `Customers` 表存储在 MySql 中时,`c.id` 和 `c.country` 都会被用作查找键 +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING +) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', + 'table-name' = 'customers' +) +``` + +而当 `Customers` 表存储在 HBase 中时,仅 `c.id` 会被用作查找键,而 `c.country = 'US'` 会作为剩余的联接条件在查找返回的记录上进一步检查是否满足 +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'connector' = 'hbase-2.2', + ... +) +``` + +相应的,在启用查找结果为空的重试条件和对应的固定间隔重试策略时,上述查询在不同的存储上的重试效果可能是不一样的,比如 `Customers` 表中的有一条记录: +```gitexclude +id=100, country='CN' +``` +处理订单流中一条 'id=100' 的记录,当连接器为 'jdbc' 时,因为 `c.id` 和 `c.country` 都会被用作查找键,对应的查找结果为空(`country='CN'` 不满足条件 `c.country = 'US'`),会触发重试查找; +而当连接器为 'hbase-2.2' 时,因为仅 `c.id` 会被用作查找键,因而对应的查找结果非空(会返回 `id=100, country='CN'` 的记录),因此不会触发重试查找,只是在检查剩余的联接条件 `c.country = 'US'` 时不满足。 + +当前基于 SQL 语义的考虑,仅提供了 'lookup_miss' 重试条件,当需要等待维度表中某些更新时(表中已存在历史版本记录,而非不存在),用户可以尝试两种选择: +1. 利用 DataStream Async I/O 中新增的异步重试支持,实现定制的重试条件(可实现对返回记录更复杂的判断) +2. 利用上述查找键在不同连接器上的特性区别,某些场景下延迟查找维表更新记录的一种解决方案是在联接条件上增加数据的时间版本比较: +比如示例中 `Customers` 维表每小时都会更新,可以新增一个时间相关的版本字段 `update_version`,保留到小时精度(可根据时效性需求修改生成方式),如更新时间 '2022-08-15 12:01:02' 记录 `update_version` 为 '2022-08-15 12:00' +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING, + -- 新增时间相关的数据版本字段, + update_version STRING +) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', + 'table-name' = 'customers' +) +``` +增加使用订单流的时间字段和维表 `Customers`.`update_version` 的等值联接条件: +```sql +ON o.customer_id = c.id AND DATE_FORMAT(o.order_timestamp, 'yyyy-MM-dd HH:mm') = c.update_version +``` +这样当新来的订单流数据未查到 `Customers` 表 12 点的新数据时,就能开启等待重试来查找期望的更新值。 + +###### 常见问题排查 +开启延迟重试查找后,较容易遇到的问题是维表查找节点形成反压,通过 web ui Task Manager 页面的 Thread Dump 功能可以快速确认是否延迟重试引起。 +从异步和同步查找分别来看,thread sleep 调用栈会出现在: +1. 异步查找:`RetryableAsyncLookupFunctionDelegator` +2. 同步查找:`RetryableLookupFunctionDelegator` + +注意: +- 异步查找时,如果所有流数据需要等待一定时长再去查找维表,我们建议尝试其他更轻量的方式(比如源表延迟一定时间消费)。 +- 同步查找中的延迟等待重试执行是完全同步的,即在当前数据没有完成重试前,不会开始下一条数据的处理。 +- 异步查找中,如果 'output-mode' 最终为 'ORDERED',那延迟重试造成反压的概率相对 'UNORDERED' 更高,这种情况下调大 'capacity' 不一定能有效减轻反压,可能需要考虑减小延迟等待的时长。 + {{< top >}} diff --git a/docs/content/docs/dev/table/sql/queries/hints.md b/docs/content/docs/dev/table/sql/queries/hints.md index fc556c393658c..72cbe6b72fee6 100644 --- a/docs/content/docs/dev/table/sql/queries/hints.md +++ b/docs/content/docs/dev/table/sql/queries/hints.md @@ -84,4 +84,311 @@ insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select ``` +## Query Hints + +### Join Hints + +#### LOOKUP Hint + +{{< label Streaming >}} + +The LOOKUP hint allows users to suggest the Flink optimizer to: +1. use synchronous(sync) or asynchronous(async) lookup function +2. configure the async parameters +3. enable delayed retry strategy for lookup + +```sql +SELECT /*+ LOOKUP(key=value[, key=value]*) */ + +key: + stringLiteral + +value: + stringLiteral +``` + +The available hint options: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
option typeoption nameoptionalvalue typedefault valuedescription
tabletableNstringN/Athe table name of the lookup source
asyncasyncYbooleanN/Avalue can be 'true' or 'false' to suggest the planner choose the corresponding lookup function. + If the backend lookup source does not support the suggested lookup mode, it will take no effect.
output-modeYstringorderedvalue can be 'ordered' or 'allow_unordered'.
'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used. It is consistent with
`ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE`.
capacityYinteger100the buffer capacity for the backend asyncWaitOperator of the lookup join operator.
timeoutYduration300stimeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover
retryretry-predicateYstringN/Acan be 'lookup_miss' which will enable retry if lookup result is empty.
retry-strategyYstringN/Acan be 'fixed_delay'
fixed-delayYdurationN/Adelay time for the 'fixed_delay' strategy
max-attemptsYintegerN/Amax attempt number of the 'fixed_delay' strategy
+ +Note: +- 'table' option is required, only table name is supported, alias name is not supported currently(will be supported in later versions). +- async options are all optional, will use default value if not configured. +- there is no default value for retry options, all retry options should be set to valid values when need to enable retry. + +##### 1. Use Sync And Async Lookup Function +If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='false' +to suggest the planner to use the sync lookup or 'async'='true' to use the async lookup: + +Example: +```sql +-- suggest the optimizer to use sync lookup +LOOKUP('table'='Customers', 'async'='false') + +-- suggest the optimizer to use async lookup +LOOKUP('table'='Customers', 'async'='true') +``` +Note: the optimizer prefers async lookup if no 'async' option is specified, it will always use sync lookup when: +1. the connector only implements the sync lookup +2. user enables 'TRY_RESOLVE' mode of 'table.optimizer.non-deterministic-update.strategy' and the +optimizer has checked there's correctness issue caused by non-deterministic update. + +##### 2. Configure The Async Parameters +Users can configure the async parameters via async options on async lookup mode. + +Example: +```sql +-- configure the async parameters: 'output-mode', 'capacity', 'timeout', can set single one or multi params +LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') +``` +Note: the async options are consistent with the async options in [job level Execution Options]({{< ref "docs/dev/table/config/#execution-options" >}}), +will use job level configuration if not set. Another difference is that the scope of the LOOKUP hint +is smaller, limited to the table name corresponding to the hint option set in the current lookup +operation (other lookup operations will not be affected by the LOOKUP hint). + +e.g., if the job level configuration is: +```gitexclude +table.exec.async-lookup.output-mode: ORDERED +table.exec.async-lookup.buffer-capacity: 100 +table.exec.async-lookup.timeout: 180s +``` + +then the following hints: +```sql +1. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') +2. LOOKUP('table'='Customers', 'async'='true', 'timeout'='300s') +``` + +are equivalent to: +```sql +1. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') +2. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s') +``` + +##### 3. Enable Delayed Retry Strategy For Lookup +Delayed retry for lookup join is intended to solve the problem of delayed updates in external system +which cause unexpected enrichment with stream data. The hint option 'retry-predicate'='lookup_miss' +can enable retry on both sync and async lookup, only support fixed delay retry strategy currently. + +Options of fixed delay retry strategy: +```gitexclude +'retry-strategy'='fixed_delay' +-- fixed delay duration +'fixed-delay'='10s' +-- max number of retry(counting from the retry operation, if set to '1', then a single lookup process +-- for a specific lookup key will actually execute up to 2 lookup requests) +'max-attempts'='3' +``` + +Example: +1. enable retry on async lookup +```sql +LOOKUP('table'='Customers', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +2. enable retry on sync lookup +```sql +LOOKUP('table'='Customers', 'async'='false', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +If the lookup source only has one capability, then the 'async' mode option can be omitted: + +```sql +LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +###### Note On Lookup Keys And 'retry-predicate'='lookup_miss' Retry Conditions +For different connectors, the index-lookup capability maybe different, e.g., builtin HBase connector +can lookup on rowkey only (without secondary index), while builtin JDBC connector can provide more +powerful index-lookup capabilities on arbitrary columns, this is determined by the different physical +storages. +The lookup key mentioned here is the field or combination of fields for the index-lookup, +as the example of [`lookup join`]({{< ref "docs/dev/table/sql/queries/joins/#lookup-join" >}}), where +`c.id` is the lookup key of the join condition "ON o.customer_id = c.id": + +```sql +SELECT o.order_id, o.total, c.country, c.zip +FROM Orders AS o + JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c + ON o.customer_id = c.id +``` + +if we change the join condition to "ON o.customer_id = c.id and c.country = 'US'": +```sql +SELECT o.order_id, o.total, c.country, c.zip +FROM Orders AS o + JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c + ON o.customer_id = c.id and c.country = 'US' +``` + +both `c.id` and `c.country` will be used as lookup key when `Customers` table was stored in MySql: +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING +) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', + 'table-name' = 'customers' +) +``` + +only `c.id` can be the lookup key when `Customers` table was stored in HBase, and the remaining join +condition `c.country = 'US'` will be evaluated after lookup result returned +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'connector' = 'hbase-2.2', + ... +) +``` + +Accordingly, the above query will have different retry effects on different storages when enable +'lookup_miss' retry predicate and the fixed-delay retry strategy. + +e.g., if there is a row in the `Customers` table: +```gitexclude +id=100, country='CN' +``` +When processing an record with 'id=100' in the order stream, in 'jdbc' connector, the corresponding +lookup result is null (`country='CN'` does not satisfy the condition `c.country = 'US'`) because both +`c.id` and `c.country` are used as lookup keys, so this will trigger a retry. + +When in 'hbase' connector, only `c.id` will be used as the lookup key, the corresponding lookup result +will not be empty(it will return the record `id=100, country='CN'`), so it will not trigger a retry +(the remaining join condition `c.country = 'US'` will be evaluated as not true for returned record). + +Currently, based on SQL semantic considerations, only the 'lookup_miss' retry predicate is provided, +and when it is necessary to wait for delayed updates of the dimension table (where a historical +version record already exists in the table, rather than not), users can try two solutions: +1. implements a custom retry predicate with the new retry support in DataStream Async I/O (allows for more complex judgments on returned records). +2. enable delayed retry by adding another join condition including comparison on some kind of data version generated by timestamp +for the above example, assume the `Customers` table is updated every hour, we can add a new time-dependent +version field `update_version`, which is reserved to hourly precision, e.g., update time '2022-08-15 12:01:02' +of record will store the `update_version` as ' 2022-08-15 12:00' +```sql +CREATE TEMPORARY TABLE Customers ( + id INT, + name STRING, + country STRING, + zip STRING, + -- the newly added time-dependent version field + update_version STRING +) WITH ( + 'connector' = 'jdbc', + 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', + 'table-name' = 'customers' +) +``` + +append an equal condition on both time field of Order stream and `Customers`.`update_version` to the join condition: +```sql +ON o.customer_id = c.id AND DATE_FORMAT(o.order_timestamp, 'yyyy-MM-dd HH:mm') = c.update_version +``` +then we can enable delayed retry when Order's record can not lookup the new record with '12:00' version in `Customers` table. + + +###### Trouble Shooting +When turning on the delayed retry lookup, it is more likely to encounter a backpressure problem in +the lookup node, this can be quickly confirmed via the 'Thread Dump' on the 'Task Manager' page of web ui. +From async and sync lookups respectively, call stack of thread sleep will appear in: +1. async lookup:`RetryableAsyncLookupFunctionDelegator` +2. sync lookup:`RetryableLookupFunctionDelegator` + +Note: +- async lookup with retry is not capable for fixed delayed processing for all input data (should use other +lighter ways to solve, e.g., pending source consumption or use sync lookup with retry) +- delayed waiting for retry execution in sync lookup is fully synchronous, i.e., processing of the +next record does not begin until the current record has completed. +- in async lookup, if 'output-mode' is 'ORDERED' mode, the probability of backpressure caused by delayed +retry maybe higher than 'UNORDERED' mode, in which case increasing async 'capacity' may not be effective +in reducing backpressure, and it may be necessary to consider reducing the delay duration. + {{< top >}} From edb588ca5a36050ce413ba64257eec5d198c5bff Mon Sep 17 00:00:00 2001 From: lincoln lee Date: Tue, 16 Aug 2022 10:05:16 +0800 Subject: [PATCH 2/7] fix doc ref --- docs/content.zh/docs/dev/table/sql/queries/hints.md | 5 +++-- docs/content/docs/dev/table/sql/queries/hints.md | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/queries/hints.md b/docs/content.zh/docs/dev/table/sql/queries/hints.md index 9b49b085c59d2..c1df741d2a0da 100644 --- a/docs/content.zh/docs/dev/table/sql/queries/hints.md +++ b/docs/content.zh/docs/dev/table/sql/queries/hints.md @@ -218,7 +218,7 @@ LOOKUP('table'='Customers', 'async'='true') -- 设置异步查找参数 'output-mode', 'capacity', 'timeout', 可按需设置单个或多个参数 LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') ``` -注意:联接提示上的异步查找参数和[作业级别配置参数]]({{< ref "docs/dev/table/config/#execution-options" >}})的 +注意:联接提示上的异步查找参数和[作业级别配置参数]]({{< ref "docs/dev/table/config" >}}#execution-options)的 含义是一致的,没有设置的参数值由默认值生效,另一个区别是联接提示作用的范围更小,仅限于当前联接操作中对应联接提示选项设置的表名(未被联接提示作用的其他联接查询不受影响)。 例如:作业级别异步查找参数设置为 @@ -272,7 +272,8 @@ LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='f ###### 关于查找键及 'retry-predicate'='lookup_miss' 重试条件的说明 对不同的连接器,提供的索引查找能力可能是不同的,例如内置的 HBase 连接器,默认仅提供了基于 `rowkey` 的索引查找能力(未 启用二级索引),而对于内置的 JDBC 连接器,默认情况下任何字段都可以被用作索引查找,这是物理存储的特性不同所决定的。 -查找键即这里提到的作为索引查找的字段或字段组合,以 `lookup join` 文档中的示例为例,联接条件 "ON o.customer_id = c.id" 中 `c.id` 即为查找键 +查找键即这里提到的作为索引查找的字段或字段组合,以 [`lookup join`]({{< ref "docs/dev/table/sql/queries/joins" >}}#lookup-join) +文档中的示例为例,联接条件 "ON o.customer_id = c.id" 中 `c.id` 即为查找键 ```sql SELECT o.order_id, o.total, c.country, c.zip diff --git a/docs/content/docs/dev/table/sql/queries/hints.md b/docs/content/docs/dev/table/sql/queries/hints.md index 72cbe6b72fee6..6720f266eb68b 100644 --- a/docs/content/docs/dev/table/sql/queries/hints.md +++ b/docs/content/docs/dev/table/sql/queries/hints.md @@ -221,7 +221,7 @@ Example: -- configure the async parameters: 'output-mode', 'capacity', 'timeout', can set single one or multi params LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') ``` -Note: the async options are consistent with the async options in [job level Execution Options]({{< ref "docs/dev/table/config/#execution-options" >}}), +Note: the async options are consistent with the async options in [job level Execution Options]({{< ref "docs/dev/table/config" >}}#execution-options), will use job level configuration if not set. Another difference is that the scope of the LOOKUP hint is smaller, limited to the table name corresponding to the hint option set in the current lookup operation (other lookup operations will not be affected by the LOOKUP hint). @@ -283,7 +283,7 @@ can lookup on rowkey only (without secondary index), while builtin JDBC connecto powerful index-lookup capabilities on arbitrary columns, this is determined by the different physical storages. The lookup key mentioned here is the field or combination of fields for the index-lookup, -as the example of [`lookup join`]({{< ref "docs/dev/table/sql/queries/joins/#lookup-join" >}}), where +as the example of [`lookup join`]({{< ref "docs/dev/table/sql/queries/joins" >}}#lookup-join), where `c.id` is the lookup key of the join condition "ON o.customer_id = c.id": ```sql From a7d2efdf215dcca0ea2c7a15f4cee621c82e93d3 Mon Sep 17 00:00:00 2001 From: lincoln lee Date: Mon, 22 Aug 2022 17:48:21 +0800 Subject: [PATCH 3/7] address review comments and add a section of the 'Effect Of Enabling Caching On Retries' --- .../docs/dev/table/sql/queries/hints.md | 113 +++++++++-------- .../docs/dev/table/sql/queries/hints.md | 120 ++++++++++-------- 2 files changed, 126 insertions(+), 107 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/queries/hints.md b/docs/content.zh/docs/dev/table/sql/queries/hints.md index c1df741d2a0da..f43254f209596 100644 --- a/docs/content.zh/docs/dev/table/sql/queries/hints.md +++ b/docs/content.zh/docs/dev/table/sql/queries/hints.md @@ -112,86 +112,86 @@ value: 选项类型 选项名称 - 可选 + 必选 选项值类型 默认值 - 选项说明 + 选项说明 - table - table - N - string - N/A - 查找源表的表名 + table + table + Y + string + N/A + 查找源表的表名 - async - async - Y - boolean - N/A - 值可以是 'true' 或 'false', 以建议优化器选择对应的查找函数。若底层的连接器无法提供建议模式的查找函数,提示就不会生效 + async + async + N + boolean + N/A + 值可以是 'true' 或 'false', 以建议优化器选择对应的查找函数。若底层的连接器无法提供建议模式的查找函数,提示就不会生效 - output-mode - Y - string - ordered - 值可以是 'ordered' 或 'allow_unordered','allow_unordered' 代表用户允许不保序的输出, 在优化器判断不影响 + output-mode + N + string + ordered + 值可以是 'ordered' 或 'allow_unordered','allow_unordered' 代表用户允许不保序的输出, 在优化器判断不影响 正确性的情况下会转成 `AsyncDataStream.OutputMode.UNORDERED`, 否则转成 `ORDERED`。 这与作业参数 - `ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE` 是一致的 + `ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE` 是一致的 - capacity - Y - integer - 100 - 异步查找使用的底层 `AsyncWaitOperator` 算子的缓冲队列大小 + capacity + N + integer + 100 + 异步查找使用的底层 `AsyncWaitOperator` 算子的缓冲队列大小 - timeout - Y - duration - 300s - 异步查找从第一次调用到最终查找完成的超时时间,可能包含了多次重试,在发生 failover 时会重置 + timeout + N + duration + 300s + 异步查找从第一次调用到最终查找完成的超时时间,可能包含了多次重试,在发生 failover 时会重置 - retry - retry-predicate - Y - string - N/A - 可以是 'lookup_miss',表示在查找结果为空是启用重试 + retry + retry-predicate + N + string + N/A + 可以是 'lookup_miss',表示在查找结果为空是启用重试 - retry-strategy - Y - string - N/A - 可以是 'fixed_delay' + retry-strategy + N + string + N/A + 可以是 'fixed_delay' - fixed-delay - Y - duration - N/A - 固定延迟策略的延迟时长 + fixed-delay + N + duration + N/A + 固定延迟策略的延迟时长 - max-attempts - Y - integer - N/A - 固定延迟策略的最大重试次数 + max-attempts + N + integer + N/A + 固定延迟策略的最大重试次数 注意:其中 -- 'table' 是必选项,需要填写目标联接表的表名,注意当前不支持填写表的别名(这将在后续版本中支持)。 +- 'table' 是必选项,需要填写目标联接表的表名(和 FROM 子句引用的表名保持一致),注意当前不支持填写表的别名(这将在后续版本中支持)。 - 异步查找参数可按需设置一个或多个,未设置的参数按默认值生效。 - 重试查找参数没有默认值,在需要开启时所有参数都必须设置为有效值。 @@ -269,6 +269,13 @@ LOOKUP('table'='Customers', 'async'='false', 'retry-predicate'='lookup_miss', 'r LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') ``` +##### 进一步说明 + +###### 开启缓存对重试的影响 +[FLIP-221]({{< ref "https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job" >}}) 引入了对查找源表的缓存支持, +缓存策略有部分缓存、全部缓存两种,当开启全部缓存时('lookup.cache'='FULL'),重试无法起作用(因为查找表被完整缓存,重试查找没有任何实际意义);当开启部分缓存时,当一条数据开始查找处理时, +先在本地缓存中查找,如果没找到则通过连接器进行外部查找(如果存在,则立即返回),此时查不到的记录和不开启缓存时一样,会触发重试查找,重试结束时的结果即为最终的查找结果(在部分缓存模式下,更新本地缓存)。 + ###### 关于查找键及 'retry-predicate'='lookup_miss' 重试条件的说明 对不同的连接器,提供的索引查找能力可能是不同的,例如内置的 HBase 连接器,默认仅提供了基于 `rowkey` 的索引查找能力(未 启用二级索引),而对于内置的 JDBC 连接器,默认情况下任何字段都可以被用作索引查找,这是物理存储的特性不同所决定的。 @@ -349,7 +356,7 @@ ON o.customer_id = c.id AND DATE_FORMAT(o.order_timestamp, 'yyyy-MM-dd HH:mm') = ``` 这样当新来的订单流数据未查到 `Customers` 表 12 点的新数据时,就能开启等待重试来查找期望的更新值。 -###### 常见问题排查 +##### 常见问题排查 开启延迟重试查找后,较容易遇到的问题是维表查找节点形成反压,通过 web ui Task Manager 页面的 Thread Dump 功能可以快速确认是否延迟重试引起。 从异步和同步查找分别来看,thread sleep 调用栈会出现在: 1. 异步查找:`RetryableAsyncLookupFunctionDelegator` diff --git a/docs/content/docs/dev/table/sql/queries/hints.md b/docs/content/docs/dev/table/sql/queries/hints.md index 6720f266eb68b..6c0e2a6efc583 100644 --- a/docs/content/docs/dev/table/sql/queries/hints.md +++ b/docs/content/docs/dev/table/sql/queries/hints.md @@ -114,85 +114,85 @@ The available hint options: option type option name - optional + required value type default value - description + description - table - table - N - string - N/A - the table name of the lookup source + table + table + Y + string + N/A + the table name of the lookup source - async - async - Y - boolean - N/A - value can be 'true' or 'false' to suggest the planner choose the corresponding lookup function. - If the backend lookup source does not support the suggested lookup mode, it will take no effect. + async + async + N + boolean + N/A + value can be 'true' or 'false' to suggest the planner choose the corresponding lookup function. + If the backend lookup source does not support the suggested lookup mode, it will take no effect. - output-mode - Y - string - ordered - value can be 'ordered' or 'allow_unordered'.
'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used. It is consistent with
`ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE`. + output-mode + N + string + ordered + value can be 'ordered' or 'allow_unordered'.
'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used. It is consistent with
`ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE`. - capacity - Y - integer - 100 - the buffer capacity for the backend asyncWaitOperator of the lookup join operator. + capacity + N + integer + 100 + the buffer capacity for the backend asyncWaitOperator of the lookup join operator. - timeout - Y - duration - 300s - timeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover + timeout + N + duration + 300s + timeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover - retry - retry-predicate - Y - string - N/A - can be 'lookup_miss' which will enable retry if lookup result is empty. + retry + retry-predicate + N + string + N/A + can be 'lookup_miss' which will enable retry if lookup result is empty. - retry-strategy - Y - string - N/A - can be 'fixed_delay' + retry-strategy + N + string + N/A + can be 'fixed_delay' - fixed-delay - Y - duration - N/A - delay time for the 'fixed_delay' strategy + fixed-delay + N + duration + N/A + delay time for the 'fixed_delay' strategy - max-attempts - Y - integer - N/A - max attempt number of the 'fixed_delay' strategy + max-attempts + N + integer + N/A + max attempt number of the 'fixed_delay' strategy Note: -- 'table' option is required, only table name is supported, alias name is not supported currently(will be supported in later versions). +- 'table' option is required, only table name is supported(keep consistent with which in the FROM clause), alias name is not supported currently(will be supported in later versions). - async options are all optional, will use default value if not configured. - there is no default value for retry options, all retry options should be set to valid values when need to enable retry. @@ -248,7 +248,7 @@ are equivalent to: ##### 3. Enable Delayed Retry Strategy For Lookup Delayed retry for lookup join is intended to solve the problem of delayed updates in external system which cause unexpected enrichment with stream data. The hint option 'retry-predicate'='lookup_miss' -can enable retry on both sync and async lookup, only support fixed delay retry strategy currently. +can enable retry on both sync and async lookup, only fixed delay retry strategy is supported currently. Options of fixed delay retry strategy: ```gitexclude @@ -277,6 +277,18 @@ If the lookup source only has one capability, then the 'async' mode option can b LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') ``` +##### Further Notes + +###### Effect Of Enabling Caching On Retries +[FLIP-221]({{< ref "https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job" >}}) adds caching support for lookup source, +which has PARTIAL and FULL caching mode(the mode NONE means disable caching). When FULL caching is enabled, there'll +be no retry at all(because it's meaningless to retry lookup via a full cached mirror of lookup source). +When PARTIAL caching is enabled, it will lookup from local cache first for a coming record and will +do an external lookup via backend connector if cache miss(if cache hit, then return the record immediately), +and this will trigger a retry when lookup result is empty(same with caching disabled), the final lookup +result is determined when retry done(in PARTIAL caching mode, it will also update local cache). + + ###### Note On Lookup Keys And 'retry-predicate'='lookup_miss' Retry Conditions For different connectors, the index-lookup capability maybe different, e.g., builtin HBase connector can lookup on rowkey only (without secondary index), while builtin JDBC connector can provide more @@ -375,7 +387,7 @@ ON o.customer_id = c.id AND DATE_FORMAT(o.order_timestamp, 'yyyy-MM-dd HH:mm') = then we can enable delayed retry when Order's record can not lookup the new record with '12:00' version in `Customers` table. -###### Trouble Shooting +##### Trouble Shooting When turning on the delayed retry lookup, it is more likely to encounter a backpressure problem in the lookup node, this can be quickly confirmed via the 'Thread Dump' on the 'Task Manager' page of web ui. From async and sync lookups respectively, call stack of thread sleep will appear in: From 57509bcb0d8d3e824ce166435d3a4590926943d8 Mon Sep 17 00:00:00 2001 From: lincoln lee Date: Mon, 22 Aug 2022 18:34:49 +0800 Subject: [PATCH 4/7] fix flip ref --- docs/content.zh/docs/dev/table/sql/queries/hints.md | 2 +- docs/content/docs/dev/table/sql/queries/hints.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/queries/hints.md b/docs/content.zh/docs/dev/table/sql/queries/hints.md index f43254f209596..3db3c10984190 100644 --- a/docs/content.zh/docs/dev/table/sql/queries/hints.md +++ b/docs/content.zh/docs/dev/table/sql/queries/hints.md @@ -272,7 +272,7 @@ LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='f ##### 进一步说明 ###### 开启缓存对重试的影响 -[FLIP-221]({{< ref "https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job" >}}) 引入了对查找源表的缓存支持, +[FLIP-221](https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job) 引入了对查找源表的缓存支持, 缓存策略有部分缓存、全部缓存两种,当开启全部缓存时('lookup.cache'='FULL'),重试无法起作用(因为查找表被完整缓存,重试查找没有任何实际意义);当开启部分缓存时,当一条数据开始查找处理时, 先在本地缓存中查找,如果没找到则通过连接器进行外部查找(如果存在,则立即返回),此时查不到的记录和不开启缓存时一样,会触发重试查找,重试结束时的结果即为最终的查找结果(在部分缓存模式下,更新本地缓存)。 diff --git a/docs/content/docs/dev/table/sql/queries/hints.md b/docs/content/docs/dev/table/sql/queries/hints.md index 6c0e2a6efc583..60c4fb358c7bc 100644 --- a/docs/content/docs/dev/table/sql/queries/hints.md +++ b/docs/content/docs/dev/table/sql/queries/hints.md @@ -280,7 +280,7 @@ LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='f ##### Further Notes ###### Effect Of Enabling Caching On Retries -[FLIP-221]({{< ref "https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job" >}}) adds caching support for lookup source, +[FLIP-221](https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job) adds caching support for lookup source, which has PARTIAL and FULL caching mode(the mode NONE means disable caching). When FULL caching is enabled, there'll be no retry at all(because it's meaningless to retry lookup via a full cached mirror of lookup source). When PARTIAL caching is enabled, it will lookup from local cache first for a coming record and will From 87ecee87825d8c92a42ded885ae9c777872c10e7 Mon Sep 17 00:00:00 2001 From: lincoln lee Date: Tue, 23 Aug 2022 15:35:48 +0800 Subject: [PATCH 5/7] address comments --- .../docs/dev/table/sql/queries/hints.md | 22 +++++++++--------- .../docs/dev/table/sql/queries/hints.md | 23 ++++++++++--------- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/queries/hints.md b/docs/content.zh/docs/dev/table/sql/queries/hints.md index 3db3c10984190..7c6670dc9fa54 100644 --- a/docs/content.zh/docs/dev/table/sql/queries/hints.md +++ b/docs/content.zh/docs/dev/table/sql/queries/hints.md @@ -105,7 +105,7 @@ value: stringLiteral ``` -可用的提示选项说明: +#### LOOKUP 提示选项: @@ -195,7 +195,7 @@ value: - 异步查找参数可按需设置一个或多个,未设置的参数按默认值生效。 - 重试查找参数没有默认值,在需要开启时所有参数都必须设置为有效值。 -##### 1. 使用同步或异步的查找函数 +#### 1. 使用同步或异步的查找函数 如果连接器同时具备同步和异步查找能力,用户通过给出提示选项值 'async'='false' 来建议优化器选择同步查找, 或 'async'='true' 来建议选择异步查找。 示例: @@ -208,9 +208,9 @@ LOOKUP('table'='Customers', 'async'='true') ``` 注意:当没有指定 'async' 选项值时,优化器优先选择异步查找,在以下两种情况下优化器会选择同步查找: 1. 当连接器仅实现了同步查找时 -2. 用户在参数 'table.optimizer.non-deterministic-update.strategy' 上启用了 'TRY_RESOLVE' 模式,并且优化器推断用户查询中存在非确定性更新的潜在风险时 +2. 用户在参数 ['table.optimizer.non-deterministic-update.strategy']({{ < ref "docs/dev/table/config" > }}#table-optimizer-non-deterministic-update-strategy) 上启用了 'TRY_RESOLVE' 模式,并且优化器推断用户查询中存在非确定性更新的潜在风险时 -##### 2. 配置异步查找相关参数 +#### 2. 配置异步查找相关参数 在异步查找模式下,用户可通过提示选项直接配置异步查找相关参数 示例: @@ -218,7 +218,7 @@ LOOKUP('table'='Customers', 'async'='true') -- 设置异步查找参数 'output-mode', 'capacity', 'timeout', 可按需设置单个或多个参数 LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') ``` -注意:联接提示上的异步查找参数和[作业级别配置参数]]({{< ref "docs/dev/table/config" >}}#execution-options)的 +注意:联接提示上的异步查找参数和[作业级别配置参数]({{< ref "docs/dev/table/config" >}}#execution-options)的 含义是一致的,没有设置的参数值由默认值生效,另一个区别是联接提示作用的范围更小,仅限于当前联接操作中对应联接提示选项设置的表名(未被联接提示作用的其他联接查询不受影响)。 例如:作业级别异步查找参数设置为 @@ -240,7 +240,7 @@ table.exec.async-lookup.timeout: 180s 2. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s') ``` -##### 3. 启用延迟重试查找策略 +#### 3. 启用延迟重试查找策略 延迟重试查找希望解决流场景中经常遇到的维表数据更新延迟而不能被流数据正确关联的问题。通过提示选项 'retry-predicate'='lookup_miss' 可设置查找结果为空的重试条件,同时设置重试策略参数来开启重试查找功能(同步或异步查找均可),当前仅支持固定延迟重试策略。 @@ -269,14 +269,14 @@ LOOKUP('table'='Customers', 'async'='false', 'retry-predicate'='lookup_miss', 'r LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') ``` -##### 进一步说明 +#### 进一步说明 -###### 开启缓存对重试的影响 -[FLIP-221](https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job) 引入了对查找源表的缓存支持, +#### 开启缓存对重试的影响 +[FLIP-221](https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric) 引入了对查找源表的缓存支持, 缓存策略有部分缓存、全部缓存两种,当开启全部缓存时('lookup.cache'='FULL'),重试无法起作用(因为查找表被完整缓存,重试查找没有任何实际意义);当开启部分缓存时,当一条数据开始查找处理时, 先在本地缓存中查找,如果没找到则通过连接器进行外部查找(如果存在,则立即返回),此时查不到的记录和不开启缓存时一样,会触发重试查找,重试结束时的结果即为最终的查找结果(在部分缓存模式下,更新本地缓存)。 -###### 关于查找键及 'retry-predicate'='lookup_miss' 重试条件的说明 +#### 关于查找键及 'retry-predicate'='lookup_miss' 重试条件的说明 对不同的连接器,提供的索引查找能力可能是不同的,例如内置的 HBase 连接器,默认仅提供了基于 `rowkey` 的索引查找能力(未 启用二级索引),而对于内置的 JDBC 连接器,默认情况下任何字段都可以被用作索引查找,这是物理存储的特性不同所决定的。 查找键即这里提到的作为索引查找的字段或字段组合,以 [`lookup join`]({{< ref "docs/dev/table/sql/queries/joins" >}}#lookup-join) @@ -356,7 +356,7 @@ ON o.customer_id = c.id AND DATE_FORMAT(o.order_timestamp, 'yyyy-MM-dd HH:mm') = ``` 这样当新来的订单流数据未查到 `Customers` 表 12 点的新数据时,就能开启等待重试来查找期望的更新值。 -##### 常见问题排查 +#### 常见问题排查 开启延迟重试查找后,较容易遇到的问题是维表查找节点形成反压,通过 web ui Task Manager 页面的 Thread Dump 功能可以快速确认是否延迟重试引起。 从异步和同步查找分别来看,thread sleep 调用栈会出现在: 1. 异步查找:`RetryableAsyncLookupFunctionDelegator` diff --git a/docs/content/docs/dev/table/sql/queries/hints.md b/docs/content/docs/dev/table/sql/queries/hints.md index 60c4fb358c7bc..235a09258f59b 100644 --- a/docs/content/docs/dev/table/sql/queries/hints.md +++ b/docs/content/docs/dev/table/sql/queries/hints.md @@ -97,6 +97,7 @@ The LOOKUP hint allows users to suggest the Flink optimizer to: 2. configure the async parameters 3. enable delayed retry strategy for lookup +##### Syntax ```sql SELECT /*+ LOOKUP(key=value[, key=value]*) */ @@ -107,7 +108,7 @@ value: stringLiteral ``` -The available hint options: +#### LOOKUP Hint Options:
@@ -196,7 +197,7 @@ Note: - async options are all optional, will use default value if not configured. - there is no default value for retry options, all retry options should be set to valid values when need to enable retry. -##### 1. Use Sync And Async Lookup Function +#### 1. Use Sync And Async Lookup Function If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='false' to suggest the planner to use the sync lookup or 'async'='true' to use the async lookup: @@ -210,10 +211,10 @@ LOOKUP('table'='Customers', 'async'='true') ``` Note: the optimizer prefers async lookup if no 'async' option is specified, it will always use sync lookup when: 1. the connector only implements the sync lookup -2. user enables 'TRY_RESOLVE' mode of 'table.optimizer.non-deterministic-update.strategy' and the +2. user enables 'TRY_RESOLVE' mode of ['table.optimizer.non-deterministic-update.strategy']({{ < ref "docs/dev/table/config" > }}#table-optimizer-non-deterministic-update-strategy) and the optimizer has checked there's correctness issue caused by non-deterministic update. -##### 2. Configure The Async Parameters +#### 2. Configure The Async Parameters Users can configure the async parameters via async options on async lookup mode. Example: @@ -245,7 +246,7 @@ are equivalent to: 2. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s') ``` -##### 3. Enable Delayed Retry Strategy For Lookup +#### 3. Enable Delayed Retry Strategy For Lookup Delayed retry for lookup join is intended to solve the problem of delayed updates in external system which cause unexpected enrichment with stream data. The hint option 'retry-predicate'='lookup_miss' can enable retry on both sync and async lookup, only fixed delay retry strategy is supported currently. @@ -277,19 +278,19 @@ If the lookup source only has one capability, then the 'async' mode option can b LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') ``` -##### Further Notes +#### Further Notes -###### Effect Of Enabling Caching On Retries -[FLIP-221](https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job) adds caching support for lookup source, +#### Effect Of Enabling Caching On Retries +[FLIP-221](https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric) adds caching support for lookup source, which has PARTIAL and FULL caching mode(the mode NONE means disable caching). When FULL caching is enabled, there'll be no retry at all(because it's meaningless to retry lookup via a full cached mirror of lookup source). When PARTIAL caching is enabled, it will lookup from local cache first for a coming record and will do an external lookup via backend connector if cache miss(if cache hit, then return the record immediately), and this will trigger a retry when lookup result is empty(same with caching disabled), the final lookup -result is determined when retry done(in PARTIAL caching mode, it will also update local cache). +result is determined when retry completed(in PARTIAL caching mode, it will also update local cache). -###### Note On Lookup Keys And 'retry-predicate'='lookup_miss' Retry Conditions +#### Note On Lookup Keys And 'retry-predicate'='lookup_miss' Retry Conditions For different connectors, the index-lookup capability maybe different, e.g., builtin HBase connector can lookup on rowkey only (without secondary index), while builtin JDBC connector can provide more powerful index-lookup capabilities on arbitrary columns, this is determined by the different physical @@ -387,7 +388,7 @@ ON o.customer_id = c.id AND DATE_FORMAT(o.order_timestamp, 'yyyy-MM-dd HH:mm') = then we can enable delayed retry when Order's record can not lookup the new record with '12:00' version in `Customers` table. -##### Trouble Shooting +#### Trouble Shooting When turning on the delayed retry lookup, it is more likely to encounter a backpressure problem in the lookup node, this can be quickly confirmed via the 'Thread Dump' on the 'Task Manager' page of web ui. From async and sync lookups respectively, call stack of thread sleep will appear in: From b9e80b72944acc7a08bd9be2243f1670a54f22bf Mon Sep 17 00:00:00 2001 From: lincoln lee Date: Thu, 25 Aug 2022 10:01:40 +0800 Subject: [PATCH 6/7] address comments --- docs/content.zh/docs/dev/table/sql/queries/hints.md | 8 ++++---- docs/content/docs/dev/table/sql/queries/hints.md | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/queries/hints.md b/docs/content.zh/docs/dev/table/sql/queries/hints.md index 7c6670dc9fa54..b309f3781d20f 100644 --- a/docs/content.zh/docs/dev/table/sql/queries/hints.md +++ b/docs/content.zh/docs/dev/table/sql/queries/hints.md @@ -83,7 +83,7 @@ insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select ### 联接提示 -#### 联接提示 - LOOKUP +#### LOOKUP {{< label Streaming >}} @@ -92,7 +92,7 @@ LOOKUP 联接提示允许用户建议 Flink 优化器: 2. 配置异步查找相关参数 3. 启用延迟重试查找策略 -##### 语法 +#### 语法 ```sql SELECT /*+ LOOKUP(hint_options) */ @@ -208,7 +208,7 @@ LOOKUP('table'='Customers', 'async'='true') ``` 注意:当没有指定 'async' 选项值时,优化器优先选择异步查找,在以下两种情况下优化器会选择同步查找: 1. 当连接器仅实现了同步查找时 -2. 用户在参数 ['table.optimizer.non-deterministic-update.strategy']({{ < ref "docs/dev/table/config" > }}#table-optimizer-non-deterministic-update-strategy) 上启用了 'TRY_RESOLVE' 模式,并且优化器推断用户查询中存在非确定性更新的潜在风险时 +2. 用户在参数 ['table.optimizer.non-deterministic-update.strategy']({{< ref "docs/dev/table/config" >}}#table-optimizer-non-deterministic-update-strategy) 上启用了 'TRY_RESOLVE' 模式,并且优化器推断用户查询中存在非确定性更新的潜在风险时 #### 2. 配置异步查找相关参数 在异步查找模式下,用户可通过提示选项直接配置异步查找相关参数 @@ -273,7 +273,7 @@ LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='f #### 开启缓存对重试的影响 [FLIP-221](https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric) 引入了对查找源表的缓存支持, -缓存策略有部分缓存、全部缓存两种,当开启全部缓存时('lookup.cache'='FULL'),重试无法起作用(因为查找表被完整缓存,重试查找没有任何实际意义);当开启部分缓存时,当一条数据开始查找处理时, +缓存策略有部分缓存、全部缓存两种,开启全部缓存时('lookup.cache'='FULL'),重试无法起作用(因为查找表被完整缓存,重试查找没有任何实际意义);开启部分缓存时,当一条数据开始查找处理时, 先在本地缓存中查找,如果没找到则通过连接器进行外部查找(如果存在,则立即返回),此时查不到的记录和不开启缓存时一样,会触发重试查找,重试结束时的结果即为最终的查找结果(在部分缓存模式下,更新本地缓存)。 #### 关于查找键及 'retry-predicate'='lookup_miss' 重试条件的说明 diff --git a/docs/content/docs/dev/table/sql/queries/hints.md b/docs/content/docs/dev/table/sql/queries/hints.md index 235a09258f59b..ddc009d52b4d4 100644 --- a/docs/content/docs/dev/table/sql/queries/hints.md +++ b/docs/content/docs/dev/table/sql/queries/hints.md @@ -88,7 +88,7 @@ insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select ### Join Hints -#### LOOKUP Hint +#### LOOKUP {{< label Streaming >}} @@ -97,7 +97,7 @@ The LOOKUP hint allows users to suggest the Flink optimizer to: 2. configure the async parameters 3. enable delayed retry strategy for lookup -##### Syntax +#### Syntax ```sql SELECT /*+ LOOKUP(key=value[, key=value]*) */ @@ -211,7 +211,7 @@ LOOKUP('table'='Customers', 'async'='true') ``` Note: the optimizer prefers async lookup if no 'async' option is specified, it will always use sync lookup when: 1. the connector only implements the sync lookup -2. user enables 'TRY_RESOLVE' mode of ['table.optimizer.non-deterministic-update.strategy']({{ < ref "docs/dev/table/config" > }}#table-optimizer-non-deterministic-update-strategy) and the +2. user enables 'TRY_RESOLVE' mode of ['table.optimizer.non-deterministic-update.strategy']({{< ref "docs/dev/table/config" >}}#table-optimizer-non-deterministic-update-strategy) and the optimizer has checked there's correctness issue caused by non-deterministic update. #### 2. Configure The Async Parameters From ec0d77b00d946dec242fd6cae7f8b546ffca8dd2 Mon Sep 17 00:00:00 2001 From: lincoln lee Date: Thu, 25 Aug 2022 10:43:44 +0800 Subject: [PATCH 7/7] remove unnecessary newlines --- docs/content.zh/docs/dev/table/sql/queries/hints.md | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/queries/hints.md b/docs/content.zh/docs/dev/table/sql/queries/hints.md index b309f3781d20f..f3afe1973fdfc 100644 --- a/docs/content.zh/docs/dev/table/sql/queries/hints.md +++ b/docs/content.zh/docs/dev/table/sql/queries/hints.md @@ -218,8 +218,7 @@ LOOKUP('table'='Customers', 'async'='true') -- 设置异步查找参数 'output-mode', 'capacity', 'timeout', 可按需设置单个或多个参数 LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') ``` -注意:联接提示上的异步查找参数和[作业级别配置参数]({{< ref "docs/dev/table/config" >}}#execution-options)的 -含义是一致的,没有设置的参数值由默认值生效,另一个区别是联接提示作用的范围更小,仅限于当前联接操作中对应联接提示选项设置的表名(未被联接提示作用的其他联接查询不受影响)。 +注意:联接提示上的异步查找参数和[作业级别配置参数]({{< ref "docs/dev/table/config" >}}#execution-options)的含义是一致的,没有设置的参数值由默认值生效,另一个区别是联接提示作用的范围更小,仅限于当前联接操作中对应联接提示选项设置的表名(未被联接提示作用的其他联接查询不受影响)。 例如:作业级别异步查找参数设置为 ```gitexclude @@ -241,8 +240,7 @@ table.exec.async-lookup.timeout: 180s ``` #### 3. 启用延迟重试查找策略 -延迟重试查找希望解决流场景中经常遇到的维表数据更新延迟而不能被流数据正确关联的问题。通过提示选项 'retry-predicate'='lookup_miss' -可设置查找结果为空的重试条件,同时设置重试策略参数来开启重试查找功能(同步或异步查找均可),当前仅支持固定延迟重试策略。 +延迟重试查找希望解决流场景中经常遇到的维表数据更新延迟而不能被流数据正确关联的问题。通过提示选项 'retry-predicate'='lookup_miss' 可设置查找结果为空的重试条件,同时设置重试策略参数来开启重试查找功能(同步或异步查找均可),当前仅支持固定延迟重试策略。 固定延迟重试策略参数: ```gitexclude @@ -277,8 +275,7 @@ LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='f 先在本地缓存中查找,如果没找到则通过连接器进行外部查找(如果存在,则立即返回),此时查不到的记录和不开启缓存时一样,会触发重试查找,重试结束时的结果即为最终的查找结果(在部分缓存模式下,更新本地缓存)。 #### 关于查找键及 'retry-predicate'='lookup_miss' 重试条件的说明 -对不同的连接器,提供的索引查找能力可能是不同的,例如内置的 HBase 连接器,默认仅提供了基于 `rowkey` 的索引查找能力(未 -启用二级索引),而对于内置的 JDBC 连接器,默认情况下任何字段都可以被用作索引查找,这是物理存储的特性不同所决定的。 +对不同的连接器,提供的索引查找能力可能是不同的,例如内置的 HBase 连接器,默认仅提供了基于 `rowkey` 的索引查找能力(未启用二级索引),而对于内置的 JDBC 连接器,默认情况下任何字段都可以被用作索引查找,这是物理存储的特性不同所决定的。 查找键即这里提到的作为索引查找的字段或字段组合,以 [`lookup join`]({{< ref "docs/dev/table/sql/queries/joins" >}}#lookup-join) 文档中的示例为例,联接条件 "ON o.customer_id = c.id" 中 `c.id` 即为查找键