diff --git a/docs/dev/table/connectors/upsert-kafka.md b/docs/dev/table/connectors/upsert-kafka.md index c92cf4b8053cc..3cd0fc3951635 100644 --- a/docs/dev/table/connectors/upsert-kafka.md +++ b/docs/dev/table/connectors/upsert-kafka.md @@ -47,9 +47,6 @@ key will fall into the same partition. Dependencies ------------ -In order to set up the upsert-kafka connector, the following table provide dependency information for -both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. - {% assign connector = site.data.sql-connectors['upsert-kafka'] %} {% include sql-connector-download-table.html connector=connector @@ -143,7 +140,7 @@ Connector Options required (none) String - The format used to deserialize and serialize the key part of the Kafka messages. The key part + The format used to serialize and deserialize the key part of the Kafka messages. The key part fields are specified by the PRIMARY KEY syntax. The supported formats include 'csv', 'json', 'avro'. Please refer to Formats page for more details and more format options. @@ -154,7 +151,7 @@ Connector Options required (none) String - The format used to deserialize and serialize the value part of the Kafka messages. + The format used to serialize and deserialize the value part of the Kafka messages. The supported formats include 'csv', 'json', 'avro'. Please refer to Formats page for more details and more format options. @@ -205,7 +202,7 @@ Data Type Mapping ---------------- Upsert Kafka stores message keys and values as bytes, so Upsert Kafka doesn't have schema or data types. -The messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping +The messages are serialized and deserialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to [Formats]({% link dev/table/connectors/formats/index.md %}) pages for more details. diff --git a/docs/dev/table/connectors/upsert-kafka.zh.md b/docs/dev/table/connectors/upsert-kafka.zh.md index 137408e65e775..e700460afbd0d 100644 --- a/docs/dev/table/connectors/upsert-kafka.zh.md +++ b/docs/dev/table/connectors/upsert-kafka.zh.md @@ -1,5 +1,5 @@ --- -title: "Upsert Kafka SQL Connector" +title: "Upsert Kafka SQL 连接器" nav-title: Upsert Kafka nav-parent_id: sql-connectors nav-pos: 3 @@ -29,36 +29,24 @@ under the License. * This will be replaced by the TOC {:toc} -The Upsert Kafka connector allows for reading data from and writing data into Kafka topics in the upsert fashion. +Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。 -As a source, the upsert-kafka connector produces a changelog stream, where each data record represents -an update or delete event. More precisely, the value in a data record is interpreted as an UPDATE of -the last value for the same key, if any (if a corresponding key doesn’t exist yet, the update will -be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted -as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null -values are interpreted in a special way: a record with a null value represents a “DELETE”. +作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。 -As a sink, the upsert-kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER -data as normal Kafka messages value, and write DELETE data as Kafka messages with null values -(indicate tombstone for the key). Flink will guarantee the message ordering on the primary key by -partition data on the values of the primary key columns, so the update/deletion messages on the same -key will fall into the same partition. +作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。 -Dependencies +依赖 ------------ -In order to set up the upsert-kafka connector, the following table provide dependency information for -both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. - {% assign connector = site.data.sql-connectors['upsert-kafka'] %} {% include sql-connector-download-table.html connector=connector %} -Full Example +完整示例 ---------------- -The example below shows how to create and use an Upsert Kafka table: +下面的示例展示了如何创建和使用 Upsert Kafka 表:
@@ -89,7 +77,7 @@ CREATE TABLE pageviews ( 'format' = 'json' ); --- calculate the pv, uv and insert into the upsert-kafka sink +-- 计算 pv、uv 并插入到 upsert-kafka sink INSERT INTO pageviews_per_region SELECT region, @@ -101,112 +89,97 @@ GROUP BY region; {% endhighlight %}
-Attention Make sure to define the primary key in the DDL. +注意 确保在 DDL 中定义主键。 -Connector Options +连接器参数 ---------------- - - - - - + + + + + - + - + - + - + - + - + - + - - + - - + - - + - +
OptionRequiredDefaultTypeDescription参数是否必选默认参数数据类型描述
connector
required必选 (none) StringSpecify which connector to use, for the Upsert Kafka use: 'upsert-kafka'.指定要使用的连接器,Upsert Kafka 连接器使用:'upsert-kafka'
topic
required必选 (none) StringThe Kafka topic name to read from and write to.用于读取和写入的 Kafka topic 名称。
properties.bootstrap.servers
required必选 (none) StringComma separated list of Kafka brokers.以逗号分隔的 Kafka brokers 列表。
key.format
required必选 (none) StringThe format used to deserialize and serialize the key part of the Kafka messages. The key part - fields are specified by the PRIMARY KEY syntax. The supported formats include 'csv', - 'json', 'avro'. Please refer to Formats - page for more details and more format options. + 用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 'csv''json''avro'。请参考格式页面以获取更多详细信息和格式参数。
value.format
required必选 (none) StringThe format used to deserialize and serialize the value part of the Kafka messages. - The supported formats include 'csv', 'json', 'avro'. - Please refer to Formats page for more details and more format options. + 用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv''json''avro'。请参考格式页面以获取更多详细信息和格式参数。
value.fields-include
required必选 'ALL' StringControls which fields should end up in the value as well. Available values: + 控制哪些字段应该出现在 value 中。可取值:
    -
  • ALL: the value part of the record contains all fields of the schema, even if they are part of the key.
  • -
  • EXCEPT_KEY: the value part of the record contains all fields of the schema except the key fields.
  • +
  • ALL:消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。
  • +
  • EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。
sink.parallelism
optional可选 (none) IntegerDefines the parallelism of the upsert-kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.定义 upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。
-Features +特性 ---------------- -### Primary Key Constraints +### 主键约束 -The Upsert Kafka always works in the upsert fashion and requires to define the primary key in the DDL. -With the assumption that records with the same key should be ordered in the same partition, the -primary key semantic on the changelog source means the materialized changelog is unique on the primary -keys. The primary key definition will also control which fields should end up in Kafka’s key. +Upsert Kafka 始终以 upsert 方式工作,并且需要在 DDL 中定义主键。在具有相同主键值的消息按序存储在同一个分区的前提下,在 changlog source 定义主键意味着 在物化后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在 Kafka 消息的 key 中。 -### Consistency Guarantees +### 一致性保证 -By default, an Upsert Kafka sink ingests data with at-least-once guarantees into a Kafka topic if -the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.zh.md %}#enabling-and-configuring-checkpointing). +默认情况下,如果[启用 checkpoint]({% link dev/stream/state/checkpointing.zh.md %}#enabling-and-configuring-checkpointing),Upsert Kafka sink 会保证至少一次将数据插入 Kafka topic。 -This means, Flink may write duplicate records with the same key into the Kafka topic. But as the -connector is working in the upsert mode, the last record on the same key will take effect when -reading back as a source. Therefore, the upsert-kafka connector achieves idempotent writes just like -the [HBase sink]({{ site.baseurl }}/dev/table/connectors/hbase.html). +这意味着,Flink 可以将具有相同 key 的重复记录写入 Kafka topic。但由于该连接器以 upsert 的模式工作,该连接器作为 source 读入时,可以确保具有相同主键值下仅最后一条消息会生效。因此,upsert-kafka 连接器可以像 [HBase sink]({{ site.baseurl }}/dev/table/connectors/hbase.html) 一样实现幂等写入。 -Data Type Mapping +数据类型映射 ---------------- -Upsert Kafka stores message keys and values as bytes, so Upsert Kafka doesn't have schema or data types. -The messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping -is determined by specific formats. Please refer to [Formats]({% link dev/table/connectors/formats/index.zh.md %}) -pages for more details. +Upsert Kafka 用字节存储消息的 key 和 value,因此没有 schema 或数据类型。消息按格式进行序列化和反序列化,例如:csv、json、avro。因此数据类型映射表由指定的格式确定。请参考[格式]({% link dev/table/connectors/formats/index.zh.md %})页面以获取更多详细信息。 {% top %}