From 863eefb6cdd9951e3eb1260bdcafcd870050a2f0 Mon Sep 17 00:00:00 2001 From: ganfengtan Date: Mon, 13 Jun 2022 21:17:59 +0800 Subject: [PATCH 1/4] [INLONG-405][Sort] Add sqlserver cdc,hdfs,hive doc --- docs/data_node/extract_node/hdfs.md | 7 +- docs/data_node/extract_node/sqlserver-cdc.md | 334 ++++++++++++++++++- docs/data_node/load_node/hdfs.md | 165 ++++++++- docs/data_node/load_node/hive.md | 198 ++++++++++- 4 files changed, 699 insertions(+), 5 deletions(-) diff --git a/docs/data_node/extract_node/hdfs.md b/docs/data_node/extract_node/hdfs.md index 947e2f8c2b8..7a46656e9f9 100644 --- a/docs/data_node/extract_node/hdfs.md +++ b/docs/data_node/extract_node/hdfs.md @@ -1,4 +1,9 @@ --- title: HDFS sidebar_position: 6 ---- \ No newline at end of file +--- +The file system connector can be used to read single files or entire directories into a single table. + +When using a directory as the source path, there is no defined order of ingestion for the files inside the directory. + +Notice:CDC future is developing. diff --git a/docs/data_node/extract_node/sqlserver-cdc.md b/docs/data_node/extract_node/sqlserver-cdc.md index d4f2c1a1918..92372bf48ac 100644 --- a/docs/data_node/extract_node/sqlserver-cdc.md +++ b/docs/data_node/extract_node/sqlserver-cdc.md @@ -1,4 +1,336 @@ --- title: SqlServer-CDC sidebar_position: 11 ---- \ No newline at end of file +--- +## SqlServer Extract Node + +The SqlServer extract node reads data and incremental data from the SqlServer database. The following will describe how to set up the SqlServer extraction node. + +## Supported Version + +| Extract Node | Version | +|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| [sqlserver-cdc](./sqlserver-cdc.md) | [SqlServer](https://docs.microsoft.com/en-us/sql/database-engine/install-windows/install-sql-server?view=sql-server-ver16): 2014、2016、2017、2019、2022 | | + +## Dependencies + +Introduce related SQLServer cdc connector dependencies through maven. + +### Maven dependency + +``` + + org.apache.inlong + sort-connector-sqlserver-cdc + + inlong_version + +``` +## Setup SqlServer CDC + +SqlServer CDC needs to open related libraries and tables, the steps are as follows: + +1.Enable the CDC function for the database. +```sql +if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0) +begin + exec sys.sp_cdc_enable_db +end +``` +2.Check the database CDC capability status. +```sql +select is_cdc_enabled from sys.databases where name='dbName' +``` +nodes: 1 is running CDC of DB. + +3.Turn on CDC for the table +```sql +IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0) +BEGIN + EXEC sys.sp_cdc_enable_table + @source_schema = 'dbo', -- source_schema + @source_name = 'tableName', -- table_name + @capture_instance = NULL, -- capture_instance + @supports_net_changes = 1, -- supports_net_changes + @role_name = NULL, -- role_name + @index_name = NULL, -- index_name + @captured_column_list = NULL, -- captured_column_list + @filegroup_name = 'PRIMARY' -- filegroup_name +END +``` +node: The table must have a primary key or unique index. + +4.Check the table CDC capability status. +```sql +SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName' +``` +nodes: 1 is running CDC of table. + +## How to create a SqlServer Extract Node + +### Usage for SQL API + +The example below shows how to create a SqlServer Extract Node with `Flink SQL Cli` : + +```sql +-- Set checkpoint every 3000 milliseconds +Flink SQL> SET 'execution.checkpointing.interval' = '3s'; + +-- Create a SqlServer table 'sqlserver_extract_node' in Flink SQL Cli +Flink SQL> CREATE TABLE sqlserver_extract_node ( + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + PRIMARY KEY(order_id) NOT ENFORCED + ) WITH ( + 'connector' = 'sqlserver-cdc', + 'hostname' = 'YourHostname', + 'port' = 'port', --default:1433 + 'username' = 'YourUsername', + 'password' = 'YourPassword', + 'database-name' = 'YourDatabaseName', + 'schema-name' = 'YourSchemaName' -- default:dbo + 'table-name' = 'YourTableName'); + +-- Read snapshot and binlog from sqlserver_extract_node +Flink SQL> SELECT * FROM sqlserver_extract_node; +``` +### Usage for InLong Dashboard +TODO + +### Usage for InLong Manager Client +TODO + +## SqlServer Extract Node Options + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, here should be 'sqlserver-cdc'.
hostnamerequired(none)StringIP address or hostname of the SQLServer database.
usernamerequired(none)StringUsername to use when connecting to the SQLServer database.
passwordrequired(none)StringPassword to use when connecting to the SQLServer database.
database-namerequired(none)StringDatabase name of the SQLServer database to monitor.
schema-namerequireddboStringSchema name of the SQLServer database to monitor.
table-namerequired(none)StringTable name of the SQLServer database to monitor.
portoptional1433IntegerInteger port number of the SQLServer database.
server-time-zoneoptionalUTCStringThe session time zone in database server, e.g. "Asia/Shanghai".
+
+ +## Available Metadata +The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
KeyDataTypeDescription
meta.table_nameSTRING NOT NULLName of the table that contain the row.
meta.schema_nameSTRING NOT NULLName of the schema that contain the row.
meta.database_nameSTRING NOT NULLName of the database that contain the row.
meta.op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the binlog, the value is always 0.
+ +The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields: +```sql +CREATE TABLE sqlserver_extract_node ( + table_name STRING METADATA FROM 'table_name' VIRTUAL, + schema_name STRING METADATA FROM 'schema_name' VIRTUAL, + db_name STRING METADATA FROM 'database_name' VIRTUAL, + operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + id INT NOT NULL +) WITH ( + 'connector' = 'sqlserver-cdc', + 'hostname' = 'localhost', + 'port' = '1433', + 'username' = 'sa', + 'password' = 'password', + 'database-name' = 'test', + 'schema-name' = 'dbo', + 'table-name' = 'worker' +); +``` + +## Data Type Mapping +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
SQLServer typeFlink SQL type
char(n)CHAR(n)
+ varchar(n)
+ nvarchar(n)
+ nchar(n)
VARCHAR(n)
+ text
+ ntext
+ xml
STRING
+ decimal(p, s)
+ money
+ smallmoney
DECIMAL(p, s)
numericNUMERIC
+ REAL
+ FLOAT
+
FLOAT
bitBOOLEAN
intINT
tinyintTINYINT
smallintSMALLINT
time (n)TIME (n)
bigintBIGINT
dateDATE
+ datetime2
+ datetime
+ smalldatetime +
TIMESTAMP(n)
+ datetimeoffset + TIMESTAMP_LTZ(3)
+
+ + + diff --git a/docs/data_node/load_node/hdfs.md b/docs/data_node/load_node/hdfs.md index c4a3c386480..f795cb96abb 100644 --- a/docs/data_node/load_node/hdfs.md +++ b/docs/data_node/load_node/hdfs.md @@ -1,4 +1,167 @@ --- title: HDFS sidebar_position: 11 ---- \ No newline at end of file +--- +## HDFS Load Node + +HDFS uses the general capabilities of flink's fileSystem to support single files and partitioned files. +The file system connector itself is included in Flink and does not require an additional dependency. +The corresponding jar can be found in the Flink distribution inside the /lib directory. +A corresponding format needs to be specified for reading and writing rows from and to a file system. + +## How to create a HDFS Load Node + +### Usage for SQL API +The example below shows how to create a HDFS Load Node with `Flink SQL Cli` : + +```sql +CREATE TABLE hdfs_load_node ( + id STRING, + name STRING, + uv BIGINT, + pv BIGINT, + dt STRING, + `hour` STRING + ) PARTITIONED BY (dt, `hour`) WITH ( + 'connector'='filesystem', + 'path'='...', + 'format'='orc', + 'sink.partition-commit.delay'='1 h', + 'sink.partition-commit.policy.kind'='success-file' + ); +``` + +#### File Formats + + +#### Rolling Policy +Data within the partition directories are split into part files. +Each partition will contain at least one part file for each subtask of the sink that has received data for that partition. +The in-progress part file will be closed and additional part file will be created according to the configurable rolling policy. +The policy rolls part files based on size, a timeout that specifies the maximum duration for which a file can be open. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionRequiredDefaultTypeDescription
sink.rolling-policy.file-size
optional128MBMemorySizeThe maximum part file size before rolling.
sink.rolling-policy.rollover-interval
optional30 minStringThe maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files). The frequency at which this is checked is controlled by the 'sink.rolling-policy.check-interval' option.
sink.rolling-policy.check-interval
required1 minStringThe interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'.
+ +#### File Compaction +The file sink supports file compactions, which allows applications to have smaller checkpoint intervals without generating a large number of files. + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionRequiredDefaultTypeDescription
auto-compaction
optionalfalseBooleanWhether to enable automatic compaction in streaming sink or not. + The data will be written to temporary files. After the checkpoint is completed, the temporary files generated by a checkpoint will be compacted. + The temporary files are invisible before compaction.
compaction.file-size
optional(none)StringThe compaction target file size, the default value is the rolling file size.
+ +#### Partition Commit +After writing a partition, it is often necessary to notify downstream applications. +For example, add the partition to a Hive metastore or writing a _SUCCESS file in the directory. +The file system sink contains a partition commit feature that allows configuring custom policies. +Commit actions are based on a combination of triggers and policies. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionRequiredDefaultTypeDescription
sink.partition-commit.trigger
optionalprocess-timeStringTrigger type for partition commit: 'process-time': based on the time of the machine, it neither requires partition time extraction nor watermark generation. Commit partition once the 'current system time' passes 'partition creation system time' plus 'delay'. 'partition-time': based on the time that extracted from partition values, it requires watermark generation. Commit partition once the 'watermark' passes 'time extracted from partition values' plus 'delay'.
sink.partition-commit.delay
optional0 sDurationThe partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.
sink.partition-commit.watermark-time-zone
optionalUTCStringThe time zone to parse the long watermark value to TIMESTAMP value, + the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. + This option is only take effect when `sink.partition-commit.trigger` is set to 'partition-time'. + If this option is not configured correctly, e.g. + source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, + then users may see the partition committed after a few hours. The default value is 'UTC', + which means the watermark is defined on TIMESTAMP column or not defined. + If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is the session time zone. + The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-08:00'.
+ + + diff --git a/docs/data_node/load_node/hive.md b/docs/data_node/load_node/hive.md index c1ddfdb09f2..25a797a1501 100644 --- a/docs/data_node/load_node/hive.md +++ b/docs/data_node/load_node/hive.md @@ -2,8 +2,202 @@ title: Hive sidebar_position: 3 --- +## Hive Load Node +Hive Load Node can write data to hive. Using the flink dialect, the insert operation is currently supported, and the data in the upper mode will be converted into insert. +Manipulating hive tables using the hive dialect is currently not supported. -## Configuration +## Supported Version + +| Load Node | Version | +|-------------------------------------|----------------------------------------------------| +| [Hive](./hive.md) | [Hive](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/#supported-hive-versions): 1.x, 2.x, 3.x | + +### Dependencies + +Using Hive load requires the introduction of dependencies. + +``` + + org.apache.inlong + sort-connector-hive + inlong_version + +``` +## How to create a Hive Load Node + +### Usage for SQL API + +The example below shows how to create a Hive Load Node with `Flink SQL Cli` : + +```sql +CREATE TABLE hiveTableName ( + id STRING, + name STRING, + uv BIGINT, + pv BIGINT +) WITH ( + 'connector' = 'hive', + 'default-database' = 'default', + 'hive-version' = '3.1.2', + 'hive-conf-dir' = 'hdfs://localhost:9000/user/hive/hive-site.xml' +); +``` +### Usage for InLong Dashboard + +#### Configuration When creating a data flow, select `Hive` for the data stream direction, and click "Add" to configure it. -![Hive Configuration](img/hive.png) \ No newline at end of file +![Hive Configuration](img/hive.png) + +### Usage for InLong Manager Client + +TODO: It will be supported in the future. + +## Hive Load Node Options + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionRequiredDefaultTypeDescription
connector
required(none)StringSpecify what connector to use, here should be 'hive'.
default-database
required(none)String
hive-conf-dir
required(none)StringIf you don't want to upload hive-site.xml to HDFS, + you can put this configuration into the classpath of the project, + and then this place only needs to be not empty, + otherwise you must fill in the complete HDFS URL.
sink.partition-commit.trigger
optional(none)StringIf hive exists partition you can set trigger mode.(process-time)
partition.time-extractor.timestamp-pattern
optional(none)StringIf hive exists partition you can set timestamp-pattern mode.(yyyy-MM-dd...)
sink.partition-commit.delay
optional(none)StringIf hive exists partition you can set delay mode.(10s,20s,1m...)
sink.partition-commit.policy.kind
optional(none)Stringmetastore,success-file
+ +## Data Type Mapping +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Hive typeFlink SQL type
char(p)CHAR(p)
varchar(p)VARCHAR(p)
stringSTRING
booleanBOOLEAN
tinyintTINYINT
smallintSMALLINT
intINT
bigintBIGINT
floatFLOAT
doubleDOUBLE
decimal(p, s)DECIMAL(p, s)
dateDATE
timestamp(9)TIMESTAMP
bytesBINARY
arrayLIST
mapMAP
rowSTRUCT
+
From a33a30bb16db032654a98d6f3fafff67756c5323 Mon Sep 17 00:00:00 2001 From: ganfengtan Date: Mon, 13 Jun 2022 22:01:34 +0800 Subject: [PATCH 2/4] [INLONG-405][Sort] Fix --- docs/data_node/extract_node/hdfs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/data_node/extract_node/hdfs.md b/docs/data_node/extract_node/hdfs.md index 7a46656e9f9..5de2a982e69 100644 --- a/docs/data_node/extract_node/hdfs.md +++ b/docs/data_node/extract_node/hdfs.md @@ -6,4 +6,4 @@ The file system connector can be used to read single files or entire directories When using a directory as the source path, there is no defined order of ingestion for the files inside the directory. -Notice:CDC future is developing. +Notice:CDC feature is developing. From 163d759d969ca4e978b4b4905bad6fc9522a8b0e Mon Sep 17 00:00:00 2001 From: ganfengtan Date: Tue, 14 Jun 2022 15:47:46 +0800 Subject: [PATCH 3/4] [INLONG-405][Sort] Add zh-CN doc --- docs/data_node/extract_node/sqlserver-cdc.md | 28 +- docs/data_node/load_node/hdfs.md | 58 ++- docs/data_node/load_node/hive.md | 9 +- .../current/data_node/extract_node/hdfs.md | 8 +- .../data_node/extract_node/sqlserver-cdc.md | 334 +++++++++++++++++- .../current/data_node/load_node/hdfs.md | 205 ++++++++++- .../current/data_node/load_node/hive.md | 204 ++++++++++- 7 files changed, 812 insertions(+), 34 deletions(-) diff --git a/docs/data_node/extract_node/sqlserver-cdc.md b/docs/data_node/extract_node/sqlserver-cdc.md index 92372bf48ac..f44c60f1bc2 100644 --- a/docs/data_node/extract_node/sqlserver-cdc.md +++ b/docs/data_node/extract_node/sqlserver-cdc.md @@ -1,8 +1,8 @@ --- -title: SqlServer-CDC +title: SQLServer-CDC sidebar_position: 11 --- -## SqlServer Extract Node +## SQLServer Extract Node The SqlServer extract node reads data and incremental data from the SqlServer database. The following will describe how to set up the SqlServer extraction node. @@ -10,7 +10,7 @@ The SqlServer extract node reads data and incremental data from the SqlServer da | Extract Node | Version | |-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| [sqlserver-cdc](./sqlserver-cdc.md) | [SqlServer](https://docs.microsoft.com/en-us/sql/database-engine/install-windows/install-sql-server?view=sql-server-ver16): 2014、2016、2017、2019、2022 | | +| [sqlserver-cdc](./sqlserver-cdc.md) | [SQLServer](https://docs.microsoft.com/en-us/sql/database-engine/install-windows/install-sql-server?view=sql-server-ver16): 2014、2016、2017、2019、2022 | | ## Dependencies @@ -26,24 +26,24 @@ Introduce related SQLServer cdc connector dependencies through maven. inlong_version ``` -## Setup SqlServer CDC +## Setup SQLServer CDC -SqlServer CDC needs to open related libraries and tables, the steps are as follows: +SQLServer CDC needs to open related libraries and tables, the steps are as follows: -1.Enable the CDC function for the database. +1. Enable the CDC function for the database. ```sql if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0) begin exec sys.sp_cdc_enable_db end ``` -2.Check the database CDC capability status. +2. Check the database CDC capability status. ```sql select is_cdc_enabled from sys.databases where name='dbName' ``` -nodes: 1 is running CDC of DB. +note: 1 is running CDC of DB. -3.Turn on CDC for the table +3. Turn on CDC for the table ```sql IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0) BEGIN @@ -58,15 +58,15 @@ BEGIN @filegroup_name = 'PRIMARY' -- filegroup_name END ``` -node: The table must have a primary key or unique index. +note: The table must have a primary key or unique index. -4.Check the table CDC capability status. +4. Check the table CDC capability status. ```sql SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName' ``` -nodes: 1 is running CDC of table. +note: 1 is running CDC of table. -## How to create a SqlServer Extract Node +## How to create a SQLServer Extract Node ### Usage for SQL API @@ -104,7 +104,7 @@ TODO ### Usage for InLong Manager Client TODO -## SqlServer Extract Node Options +## SQLServer Extract Node Options
diff --git a/docs/data_node/load_node/hdfs.md b/docs/data_node/load_node/hdfs.md index f795cb96abb..1ae608da948 100644 --- a/docs/data_node/load_node/hdfs.md +++ b/docs/data_node/load_node/hdfs.md @@ -52,7 +52,6 @@ The policy rolls part files based on size, a timeout that specifies the maximum - @@ -61,21 +60,18 @@ The policy rolls part files based on size, a timeout that specifies the maximum - - - @@ -89,7 +85,6 @@ The file sink supports file compactions, which allows applications to have small - @@ -98,7 +93,6 @@ The file sink supports file compactions, which allows applications to have small - - @@ -124,7 +117,6 @@ Commit actions are based on a combination of triggers and policies. - @@ -133,21 +125,18 @@ Commit actions are based on a combination of triggers and policies. - - -
OptionRequired Default Type Description
sink.rolling-policy.file-size
optional 128MB MemorySize The maximum part file size before rolling.
sink.rolling-policy.rollover-interval
optional 30 min String The maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files). The frequency at which this is checked is controlled by the 'sink.rolling-policy.check-interval' option.
sink.rolling-policy.check-interval
required 1 min String The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'.
OptionRequired Default Type Description
auto-compaction
optional false Boolean Whether to enable automatic compaction in streaming sink or not. @@ -107,7 +101,6 @@ The file sink supports file compactions, which allows applications to have small
compaction.file-size
optional (none) String The compaction target file size, the default value is the rolling file size.
OptionRequired Default Type Description
sink.partition-commit.trigger
optional process-time String Trigger type for partition commit: 'process-time': based on the time of the machine, it neither requires partition time extraction nor watermark generation. Commit partition once the 'current system time' passes 'partition creation system time' plus 'delay'. 'partition-time': based on the time that extracted from partition values, it requires watermark generation. Commit partition once the 'watermark' passes 'time extracted from partition values' plus 'delay'.
sink.partition-commit.delay
optional 0 s Duration The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.
sink.partition-commit.watermark-time-zone
optional UTC String The time zone to parse the long watermark value to TIMESTAMP value, @@ -163,5 +152,52 @@ Commit actions are based on a combination of triggers and policies.
+#### Partition Commit Policy + + +The partition strategy defines the specific operation of partition submission. + +- metastore:This strategy is only supported when hive. +- success: The '_SUCCESS' file will be generated after the part file is generated. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionRequiredDefaultTypeDescription
sink.partition-commit.policy.kind
optional(none)StringPolicy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. + metastore: add partition to metastore. Only hive table supports metastore policy, + file system manages partitions through directory structure. success-file: add '_success' file to directory. + Both can be configured at the same time: 'metastore,success-file'. custom: use policy class to create a commit policy. + Support to configure multiple policies: 'metastore,success-file'.
sink.partition-commit.policy.class
optional(none)StringThe partition commit policy class for implement PartitionCommitPolicy interface. + Only work in custom commit policy.
sink.partition-commit.success-file.name
optional_SUCCESSStringThe file name for success-file partition commit policy, default is '_SUCCESS'.
diff --git a/docs/data_node/load_node/hive.md b/docs/data_node/load_node/hive.md index 25a797a1501..eac3add7aad 100644 --- a/docs/data_node/load_node/hive.md +++ b/docs/data_node/load_node/hive.md @@ -3,7 +3,7 @@ title: Hive sidebar_position: 3 --- ## Hive Load Node -Hive Load Node can write data to hive. Using the flink dialect, the insert operation is currently supported, and the data in the upper mode will be converted into insert. +Hive Load Node can write data to hive. Using the flink dialect, the insert operation is currently supported, and the data in the upsert mode will be converted into insert. Manipulating hive tables using the hive dialect is currently not supported. ## Supported Version @@ -115,7 +115,12 @@ TODO: It will be supported in the future. optional (none) String - metastore,success-file + Policy to commit a partition is to notify the downstream application that the partition has finished writing, + the partition is ready to be read. metastore: add partition to metastore. + Only hive table supports metastore policy, file system manages partitions through directory structure. + success-file: add '_success' file to directory. Both can be configured at the same time: 'metastore,success-file'. + custom: use policy class to create a commit policy. + Support to configure multiple policies: 'metastore,success-file'. diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/hdfs.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/hdfs.md index 947e2f8c2b8..bda1fdf40e5 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/hdfs.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/hdfs.md @@ -1,4 +1,10 @@ --- title: HDFS sidebar_position: 6 ---- \ No newline at end of file +--- + +文件系统连接器可用于将单个文件或整个目录读取到单个表中。 + +当使用目录作为源路径时,目录中的文件没有定义摄取顺序。 + +注意:CDC功能正在开发中。 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md index d4f2c1a1918..ccd2f1d7a80 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md @@ -1,4 +1,334 @@ --- -title: SqlServer-CDC +title: SQLServer-CDC sidebar_position: 11 ---- \ No newline at end of file +--- +## SQLServer抽取节点 + +SQLServer 提取节点从 SQLServer 数据库中读取数据和增量数据。下面将介绍如何配置 SQLServer 抽取节点。 + +## 支持的版本 + +| Extract Node | Version | +|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| [sqlserver-cdc](./sqlserver-cdc.md) | [SQLServer](https://docs.microsoft.com/en-us/sql/database-engine/install-windows/install-sql-server?view=sql-server-ver16): 2014、2016、2017、2019、2022 | | + +## 依赖配置 + +通过maven引入sort-connector-sqlserver-cdc构建自己的项目。 + +### Maven依赖配置 + +``` + + org.apache.inlong + sort-connector-sqlserver-cdc + + inlong_version + +``` +## 配置 SQLServer CDC + +SQLServer CDC 需要开启库和表的CDC功能,配置步骤如下: + +1. 开启数据库CDC能力。 +```sql +if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0) +begin + exec sys.sp_cdc_enable_db +end +``` +2. 检查数据库CDC是否开启。 +```sql +select is_cdc_enabled from sys.databases where name='dbName' +``` +备注: "1"表示数据库CDC开启 + +3. 开启表的CDC能力。 +```sql +IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0) +BEGIN + EXEC sys.sp_cdc_enable_table + @source_schema = 'dbo', -- source_schema + @source_name = 'tableName', -- table_name + @capture_instance = NULL, -- capture_instance + @supports_net_changes = 1, -- supports_net_changes + @role_name = NULL, -- role_name + @index_name = NULL, -- index_name + @captured_column_list = NULL, -- captured_column_list + @filegroup_name = 'PRIMARY' -- filegroup_name +END +``` +备注: 表必须有主键或者唯一索引。 + +4. 检查表CDC是否开启。 +```sql +SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName' +``` +备注: "1"表示表CDC开启 + +## 如何创建一个SQLServer抽取节点 + +### SQL API的使用 + +使用 `Flink SQL Cli` : + +```sql +-- Set checkpoint every 3000 milliseconds +Flink SQL> SET 'execution.checkpointing.interval' = '3s'; + +-- Create a SqlServer table 'sqlserver_extract_node' in Flink SQL Cli +Flink SQL> CREATE TABLE sqlserver_extract_node ( + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + PRIMARY KEY(order_id) NOT ENFORCED + ) WITH ( + 'connector' = 'sqlserver-cdc', + 'hostname' = 'YourHostname', + 'port' = 'port', --default:1433 + 'username' = 'YourUsername', + 'password' = 'YourPassword', + 'database-name' = 'YourDatabaseName', + 'schema-name' = 'YourSchemaName' -- default:dbo + 'table-name' = 'YourTableName'); + +-- Read snapshot and binlog from sqlserver_extract_node +Flink SQL> SELECT * FROM sqlserver_extract_node; +``` +### InLong Dashboard方式 +TODO + +### InLong Manager Client方式 +TODO + +## SQLServer抽取节点参数信息 + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
参数是否必须默认值数据类型描述
connector必须(none)String指定使用什么连接器,这里应该是 'sqlserver-cdc'。
hostname必须(none)StringSQLServer数据库IP地址或者hostname。
username必须(none)StringSQLServer数据库用户名。
password必须(none)StringSQLServer数据库用户密码。
database-name必须(none)StringSQLServer数据库监控的数据库名称。
schema-name必须dboStringSQLServer数据库监控的schema名称。
table-name必须(none)StringSQLServer数据库监控的表名称。
port可选1433IntegerSQLServer数据库端口。
server-time-zone可选UTCStringSQLServer数据库连接配置时区。 例如: "Asia/Shanghai"。
+
+ +## 可用的元数据字段 + +以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
字段名称数据类型描述
meta.table_nameSTRING NOT NULL包含该行的表的名称。
meta.schema_nameSTRING NOT NULL包含该行schema的名称。
meta.database_nameSTRING NOT NULL包含该行数据库的名称。
meta.op_tsTIMESTAMP_LTZ(3) NOT NULL它表示在数据库中进行更改的时间。如果记录是从表的快照而不是 binlog 中读取的,则该值始终为 0。
+ +使用元数据字段的例子: +```sql +CREATE TABLE sqlserver_extract_node ( + table_name STRING METADATA FROM 'table_name' VIRTUAL, + schema_name STRING METADATA FROM 'schema_name' VIRTUAL, + db_name STRING METADATA FROM 'database_name' VIRTUAL, + operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + id INT NOT NULL +) WITH ( + 'connector' = 'sqlserver-cdc', + 'hostname' = 'localhost', + 'port' = '1433', + 'username' = 'sa', + 'password' = 'password', + 'database-name' = 'test', + 'schema-name' = 'dbo', + 'table-name' = 'worker' +); +``` + +## 数据类型映射 +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
SQLServer typeFlink SQL type
char(n)CHAR(n)
+ varchar(n)
+ nvarchar(n)
+ nchar(n)
VARCHAR(n)
+ text
+ ntext
+ xml
STRING
+ decimal(p, s)
+ money
+ smallmoney
DECIMAL(p, s)
numericNUMERIC
+ REAL
+ FLOAT
+
FLOAT
bitBOOLEAN
intINT
tinyintTINYINT
smallintSMALLINT
time (n)TIME (n)
bigintBIGINT
dateDATE
+ datetime2
+ datetime
+ smalldatetime +
TIMESTAMP(n)
+ datetimeoffset + TIMESTAMP_LTZ(3)
+
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md index c4a3c386480..d20deecaa93 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md @@ -1,4 +1,207 @@ --- title: HDFS sidebar_position: 11 ---- \ No newline at end of file +--- +## HDFS加载节点 +HDFS连接器为Flink内部依赖,支持分区文件。 +在Flink中包含了该文件系统连接器,不需要添加额外的依赖。 +相应的jar包可以在Flink工程项目的/lib目录下找到。 +从文件系统中读取或者向文件系统中写入行时,需要指定相应的format。 + +## 如何创建HDFS加载节点 + +### SQL API的使用 +使用 `Flink SQL Cli` : + +```sql +CREATE TABLE hdfs_load_node ( + id STRING, + name STRING, + uv BIGINT, + pv BIGINT, + dt STRING, + `hour` STRING + ) PARTITIONED BY (dt, `hour`) WITH ( + 'connector'='filesystem', + 'path'='...', + 'format'='orc', + 'sink.partition-commit.delay'='1 h', + 'sink.partition-commit.policy.kind'='success-file' + ); +``` + +#### File Formats +
    +
  • CSV(非压缩格式)
  • +
  • JSON(文件系统连接器的 JSON format 与传统的标准的 JSON file 的不同,而是非压缩的。换行符分割的 JSON)
  • +
  • Avro(通过配置 avro.codec 属性支持压缩)
  • +
  • Parquet(与hive兼容)
  • +
  • Orc(与hive兼容)
  • +
  • Debezium-JSON
  • +
  • Canal-JSON
  • +
  • Raw
  • +
+ +备注:文件格式明细可以查看[Flink Formats](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/overview/) + +#### 滚动策略 + +数据会被加载到文件的目录下的part文件中,每个分区接收到来之subtask的数据至少会为该分区生成一个part文件。同时可以配置滚动策略 +来生成part文件,生成part文件会将in-progress part文件关闭。该策略基于大小和指定文件被打开的超时时间来生成part文件。 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
参数默认值数据类型描述
sink.rolling-policy.file-size
128MBMemorySize滚动前part文件的最大值。
sink.rolling-policy.rollover-interval
30 minString滚动前,part 文件处于打开状态的最大时长(默认值30分钟,以避免产生大量小文件)。 + 检查频率是由 'sink.rolling-policy.check-interval' 属性控制的。
sink.rolling-policy.check-interval
1 minString基于时间的滚动策略的检查间隔。 + 该属性控制了基于 'sink.rolling-policy.rollover-interval' 属性检查文件是否该被滚动的检查频率。
+ +#### 文件合并 +支持文件能力,允许在较小的checkpoint下不产生大量的小文件。 + + + + + + + + + + + + + + + + + + + + + + + +
参数默认值数据类型描述
auto-compaction
falseBoolean在流式sink中是否开启自动合并功能,数据首先会被写入临时文件。 + 当checkpoint完成后,该检查点产生的临时文件会被合并,这些临时文件在合并前不可见。
compaction.file-size
(none)String合并目标文件大小,默认值为滚动文件大小。
+ +#### 分区提交 + +分区数据写入完成后,一般需要通知下流应用。如:更新hive的元数据信息或者hdfs目录生成_SUCCESS文件。 +分区提交策略是配置的,分区提交行为基于triggers和policies的组合。 + +- Trigger :分区提交时机可以基于分区的watermark或者基于处理时间(process-time)。 +- Policy :分区提交策略,内置策略包括提交hive元数据和生成_SUCCESS文件,同时支持自定策略,如生成hive的统计信息、合并小文件等。 + +备注:分区提交仅支持动态分区插入。 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
参数默认值数据类型描述
sink.partition-commit.trigger
process-timeString分区提交触发器类型: 'process-time':基于机器时间既不需要分区时间提取器也不需要watermark生成器。 + 一旦 "当前系统时间" 超过了 "分区创建系统时间" 和 'sink.partition-commit.delay' 之和立即提交分区。
+ 'partition-time':基于提取的分区时间,需要 watermark 生成。一旦 watermark 超过了 "分区创建系统时间" 和 'sink.partition-commit.delay' 之和立即提交分区。
sink.partition-commit.delay
0 sDuration如果设置分区延迟提交,这个延迟时间之前不会提交。天:'d';小时:'h';秒:'s'等
sink.partition-commit.watermark-time-zone
UTCString 解析Long类型的watermark到TIMESTAMP类型时所采用的时区, + 解析得到的watermark的TIMESTAMP会被用来跟分区时间进行比较以判断是否该被提交。 + 这个属性仅当`sink.partition-commit.trigger`被设置为'partition-time'时有效。 + 如果这个属性设置的不正确,例如在 TIMESTAMP_LTZ 类型的列上定义了 source rowtime, + 如果没有设置该属性,那么用户可能会在若干个小时后才看到分区的提交。 + 默认值为'UTC'意味着 watermark 是定义在 TIMESTAMP 类型的列上或者没有定义 watermark。 + 如果watermark定义在TIMESTAMP_LTZ类型的列上,watermark时区必须是会话时区(session time zone)。 + 该属性的可选值要么是完整的时区名比如 'America/Los_Angeles',要么是自定义时区,例如 'GMT-08:00'。
+ +#### 分区提交策略 + +分区提交策略定义了分区提交使用的具体策略。 + +- metastore:仅在hive时支持该策略。 +- success: part文件生成后会生成'_SUCCESS'文件。 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
参数是否必须默认值数据类型描述
sink.partition-commit.policy.kind
可选(none)String分区策略通知分区part生成可以被访问,仅hive支持metastore策略,文件系统生成'_success'文件表示文件写入完成。 + 两种策略的指定分别为'metastore,success-file',也可以通过custom的指定的类创建提交策略。
sink.partition-commit.policy.class
可选(none)String实现 PartitionCommitPolicy 接口的分区提交策略类,只有在 custom 提交策略下才使用该类。
sink.partition-commit.success-file.name
可选_SUCCESSString使用success-file分区提交策略时的文件名,默认值是 '_SUCCESS'。
+ + + diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md index 874c5030e00..77dedd91478 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md @@ -2,8 +2,206 @@ title: Hive sidebar_position: 2 --- +## Hive加载节点 -## 配置 -创建数据流时,数据流向选择 `Hive`,并点击 ”添加“ 进行配置。 +Hive加载节点可以将数据写入hive。使用flink方言,目前仅支持insert操作,upsert模式下的数据会转换成insert方式 +目前暂时不支持使用hive方言操作hive表。 -![Hive Configuration](img/hive.png) \ No newline at end of file +## 支持的版本 + +| Load Node | Version | +|-------------------------------------|----------------------------------------------------| +| [Hive](./hive.md) | [Hive](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/#supported-hive-versions): 1.x, 2.x, 3.x | + +### 依赖 + +通过maven引入sort-connector-hive构建自己的项目。 + +``` + + org.apache.inlong + sort-connector-hive + inlong_version + +``` +## 如何配置Hive数据加载节点 + +### SQL API的使用 + +使用 `Flink SQL Cli` : + +```sql +CREATE TABLE hiveTableName ( + id STRING, + name STRING, + uv BIGINT, + pv BIGINT +) WITH ( + 'connector' = 'hive', + 'default-database' = 'default', + 'hive-version' = '3.1.2', + 'hive-conf-dir' = 'hdfs://localhost:9000/user/hive/hive-site.xml' +); +``` +### InLong Dashboard方式 + +#### 配置 +在创建数据流时,选择数据落地为'Hive然后点击'Add'来配置hive的相关信息。 + +![Hive Configuration](img/hive.png) + +### InLong Manager Client方式 + +TODO: 未来版本支持 + +## Hive加载节点参数信息 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
参数是否必须默认值数据类型描述
connector
必须(none)String指定使用什么连接器,这里应该是 'hive'。
default-database
必须(none)String指定数据库名称。
hive-conf-dir
必须(none)String本地构建项目可以将hive-site.xml构建到classpath中,未来dashboard将支持本地上传能力。 + 目前通用方式只支持配置已经上传文件后的HDFS路径。
sink.partition-commit.trigger
可选(none)String如果表是分区表,可以配置触发模式。如:(process-time)
partition.time-extractor.timestamp-pattern
可选(none)String如果表是分区表,可以配置时间戳。如:(yyyy-MM-dd)
sink.partition-commit.delay
可选(none)String如果表是分区表,可以配置延迟时间。如:(10s,20s,1m...)
sink.partition-commit.policy.kind
可选(none)String分区提交策略通知下游某个分区已经写完毕可以被读取了。 + metastore:向 metadata 增加分区。仅 hive 支持 metastore 策略,文件系统通过目录结构管理分区; + success-file:在目录中增加 '_success' 文件; + 上述两个策略可以同时指定:'metastore,success-file'。 + custom:通过指定的类来创建提交策略, + 支持同时指定多个提交策略:'metastore,success-file'。
+ +## 数据类型映射 +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Hive typeFlink SQL type
char(p)CHAR(p)
varchar(p)VARCHAR(p)
stringSTRING
booleanBOOLEAN
tinyintTINYINT
smallintSMALLINT
intINT
bigintBIGINT
floatFLOAT
doubleDOUBLE
decimal(p, s)DECIMAL(p, s)
dateDATE
timestamp(9)TIMESTAMP
bytesBINARY
arrayLIST
mapMAP
rowSTRUCT
+
From 42d58afb0ea51b7988e524b83d3a03415e2b3994 Mon Sep 17 00:00:00 2001 From: ganfengtan Date: Tue, 14 Jun 2022 20:27:08 +0800 Subject: [PATCH 4/4] [INLONG-405][Sort] fix checkstyle --- docs/data_node/extract_node/hdfs.md | 4 +- docs/data_node/extract_node/sqlserver-cdc.md | 15 ++--- docs/data_node/load_node/hive.md | 6 +- .../current/data_node/extract_node/hdfs.md | 4 +- .../data_node/extract_node/sqlserver-cdc.md | 61 ++++++++++--------- .../current/data_node/load_node/hdfs.md | 60 +++++++++--------- .../current/data_node/load_node/hive.md | 28 +++++---- 7 files changed, 94 insertions(+), 84 deletions(-) diff --git a/docs/data_node/extract_node/hdfs.md b/docs/data_node/extract_node/hdfs.md index 5de2a982e69..9762d7893f6 100644 --- a/docs/data_node/extract_node/hdfs.md +++ b/docs/data_node/extract_node/hdfs.md @@ -2,8 +2,8 @@ title: HDFS sidebar_position: 6 --- -The file system connector can be used to read single files or entire directories into a single table. +The HDFS connector can be used to read single files or entire directories into a single table. When using a directory as the source path, there is no defined order of ingestion for the files inside the directory. -Notice:CDC feature is developing. +Notice:HDFS CDC feature is developing. diff --git a/docs/data_node/extract_node/sqlserver-cdc.md b/docs/data_node/extract_node/sqlserver-cdc.md index f44c60f1bc2..c4ad66faaff 100644 --- a/docs/data_node/extract_node/sqlserver-cdc.md +++ b/docs/data_node/extract_node/sqlserver-cdc.md @@ -4,17 +4,18 @@ sidebar_position: 11 --- ## SQLServer Extract Node -The SqlServer extract node reads data and incremental data from the SqlServer database. The following will describe how to set up the SqlServer extraction node. +The SQLServer Extract Node reads data and incremental data from the SQLServer database. The following will describe how to set up the SQLServer extraction node. ## Supported Version | Extract Node | Version | |-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| [sqlserver-cdc](./sqlserver-cdc.md) | [SQLServer](https://docs.microsoft.com/en-us/sql/database-engine/install-windows/install-sql-server?view=sql-server-ver16): 2014、2016、2017、2019、2022 | | +| [SQLServer-cdc](./sqlserver-cdc.md) | [SQLServer](https://docs.microsoft.com/en-us/sql/database-engine/install-windows/install-sql-server?view=sql-server-ver16): 2014、2016、2017、2019、2022 | | ## Dependencies -Introduce related SQLServer cdc connector dependencies through maven. +Introduce related SQLServer Extract Node dependencies through maven. +Of course, you can also use INLONG to provide jar packages.([sort-connector-sqlserver-cdc](https://inlong.apache.org/download/main/)) ### Maven dependency @@ -26,9 +27,9 @@ Introduce related SQLServer cdc connector dependencies through maven. inlong_version ``` -## Setup SQLServer CDC +## Setup SQLServer Extract Node -SQLServer CDC needs to open related libraries and tables, the steps are as follows: +SQLServer Extract Node needs to open related libraries and tables, the steps are as follows: 1. Enable the CDC function for the database. ```sql @@ -70,13 +71,13 @@ note: 1 is running CDC of table. ### Usage for SQL API -The example below shows how to create a SqlServer Extract Node with `Flink SQL Cli` : +The example below shows how to create a SQLServer Extract Node with `Flink SQL Cli` : ```sql -- Set checkpoint every 3000 milliseconds Flink SQL> SET 'execution.checkpointing.interval' = '3s'; --- Create a SqlServer table 'sqlserver_extract_node' in Flink SQL Cli +-- Create a SQLServer table 'sqlserver_extract_node' in Flink SQL Cli Flink SQL> CREATE TABLE sqlserver_extract_node ( order_id INT, order_date TIMESTAMP(0), diff --git a/docs/data_node/load_node/hive.md b/docs/data_node/load_node/hive.md index eac3add7aad..23a7db06879 100644 --- a/docs/data_node/load_node/hive.md +++ b/docs/data_node/load_node/hive.md @@ -15,11 +15,15 @@ Manipulating hive tables using the hive dialect is currently not supported. ### Dependencies Using Hive load requires the introduction of dependencies. +Of course, you can also use INLONG to provide jar packages.([sort-connector-hive](https://inlong.apache.org/download/main/)) + +### Maven dependency ``` org.apache.inlong sort-connector-hive + inlong_version ``` @@ -45,7 +49,7 @@ CREATE TABLE hiveTableName ( ### Usage for InLong Dashboard #### Configuration -When creating a data flow, select `Hive` for the data stream direction, and click "Add" to configure it. +When creating a data stream, select `Hive` for the data stream direction, and click "Add" to configure it. ![Hive Configuration](img/hive.png) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/hdfs.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/hdfs.md index bda1fdf40e5..3ae130e27c0 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/hdfs.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/hdfs.md @@ -3,8 +3,8 @@ title: HDFS sidebar_position: 6 --- -文件系统连接器可用于将单个文件或整个目录读取到单个表中。 +HDFS 连接器可用于将单个文件或整个目录读取到单个表中。 当使用目录作为源路径时,目录中的文件没有定义摄取顺序。 -注意:CDC功能正在开发中。 +注意:HDFS CDC 功能正在开发中。 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md index ccd2f1d7a80..6c68e9a419e 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md @@ -10,11 +10,12 @@ SQLServer 提取节点从 SQLServer 数据库中读取数据和增量数据。 | Extract Node | Version | |-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| [sqlserver-cdc](./sqlserver-cdc.md) | [SQLServer](https://docs.microsoft.com/en-us/sql/database-engine/install-windows/install-sql-server?view=sql-server-ver16): 2014、2016、2017、2019、2022 | | +| [SQLServer-cdc](./sqlserver-cdc.md) | [SQLServer](https://docs.microsoft.com/en-us/sql/database-engine/install-windows/install-sql-server?view=sql-server-ver16): 2014、2016、2017、2019、2022 | | ## 依赖配置 -通过maven引入sort-connector-sqlserver-cdc构建自己的项目。 +通过 Maven 引入 sort-connector-sqlserver-cdc 构建自己的项目。 +当然,你也可以直接使用 INLONG 提供的 jar 包。([sort-connector-sqlserver-cdc](https://inlong.apache.org/download/main/)) ### Maven依赖配置 @@ -22,28 +23,28 @@ SQLServer 提取节点从 SQLServer 数据库中读取数据和增量数据。 org.apache.inlong sort-connector-sqlserver-cdc - + inlong_version ``` -## 配置 SQLServer CDC +## 配置 SQLServer 加载节点 -SQLServer CDC 需要开启库和表的CDC功能,配置步骤如下: +SQLServer 加载节点需要开启库和表的 CDC 功能,配置步骤如下: -1. 开启数据库CDC能力。 +1. 开启数据库 CDC 能力。 ```sql if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0) begin exec sys.sp_cdc_enable_db end ``` -2. 检查数据库CDC是否开启。 +2. 检查数据库 CDC 是否开启。 ```sql select is_cdc_enabled from sys.databases where name='dbName' ``` -备注: "1"表示数据库CDC开启 +备注: "1"表示数据库 CDC 开启 -3. 开启表的CDC能力。 +3. 开启表的 CDC 能力。 ```sql IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0) BEGIN @@ -60,15 +61,15 @@ END ``` 备注: 表必须有主键或者唯一索引。 -4. 检查表CDC是否开启。 +4. 检查表 CDC 是否开启。 ```sql SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName' ``` -备注: "1"表示表CDC开启 +备注: "1"表示表 CDC 开启 -## 如何创建一个SQLServer抽取节点 +## 如何创建一个 SQLServer 抽取节点 -### SQL API的使用 +### SQL API 的使用 使用 `Flink SQL Cli` : @@ -98,22 +99,22 @@ Flink SQL> CREATE TABLE sqlserver_extract_node ( -- Read snapshot and binlog from sqlserver_extract_node Flink SQL> SELECT * FROM sqlserver_extract_node; ``` -### InLong Dashboard方式 +### InLong Dashboard 方式 TODO -### InLong Manager Client方式 +### InLong Manager Client 方式 TODO -## SQLServer抽取节点参数信息 +## SQLServer 抽取节点参数信息
- - - + + + @@ -130,56 +131,56 @@ TODO - + - + - + - + - + - + - + - +
参数是否必须默认值数据类型是否必须默认值数据类型 描述
必须 (none) StringSQLServer数据库IP地址或者hostname。SQLServer 数据库 IP 地址或者 hostname。
username 必须 (none) StringSQLServer数据库用户名。SQLServer 数据库用户名。
password 必须 (none) StringSQLServer数据库用户密码。SQLServer 数据库用户密码。
database-name 必须 (none) StringSQLServer数据库监控的数据库名称。SQLServer 数据库监控的数据库名称。
schema-name 必须 dbo StringSQLServer数据库监控的schema名称。SQLServer 数据库监控的 schema 名称。
table-name 必须 (none) StringSQLServer数据库监控的表名称。SQLServer 数据库监控的表名称。
port 可选 1433 IntegerSQLServer数据库端口。SQLServer 数据库端口。
server-time-zone 可选 UTC StringSQLServer数据库连接配置时区。 例如: "Asia/Shanghai"。SQLServer 数据库连接配置时区。 例如: "Asia/Shanghai"。
@@ -193,8 +194,8 @@ TODO 字段名称 - 数据类型 - 描述 + 数据类型 + 描述 @@ -206,7 +207,7 @@ TODO meta.schema_name STRING NOT NULL - 包含该行schema的名称。 + 包含该行 schema 的名称。 meta.database_name diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md index d20deecaa93..92c33ba841b 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md @@ -2,15 +2,15 @@ title: HDFS sidebar_position: 11 --- -## HDFS加载节点 -HDFS连接器为Flink内部依赖,支持分区文件。 -在Flink中包含了该文件系统连接器,不需要添加额外的依赖。 -相应的jar包可以在Flink工程项目的/lib目录下找到。 -从文件系统中读取或者向文件系统中写入行时,需要指定相应的format。 +## HDFS 加载节点 +HDFS 连接器为 Flink 内部依赖,支持分区文件。 +在 Flink 中包含了该文件系统连接器,不需要添加额外的依赖。 +相应的 jar 包可以在 Flink 工程项目的 /lib 目录下找到。 +从文件系统中读取或者向文件系统中写入行时,需要指定相应的 format。 -## 如何创建HDFS加载节点 +## 如何创建 HDFS 加载节点 -### SQL API的使用 +### SQL API 的使用 使用 `Flink SQL Cli` : ```sql @@ -35,8 +35,8 @@ CREATE TABLE hdfs_load_node (
  • CSV(非压缩格式)
  • JSON(文件系统连接器的 JSON format 与传统的标准的 JSON file 的不同,而是非压缩的。换行符分割的 JSON)
  • Avro(通过配置 avro.codec 属性支持压缩)
  • -
  • Parquet(与hive兼容)
  • -
  • Orc(与hive兼容)
  • +
  • Parquet(与 hive 兼容)
  • +
  • Orc(与 hive 兼容)
  • Debezium-JSON
  • Canal-JSON
  • Raw
  • @@ -46,8 +46,8 @@ CREATE TABLE hdfs_load_node ( #### 滚动策略 -数据会被加载到文件的目录下的part文件中,每个分区接收到来之subtask的数据至少会为该分区生成一个part文件。同时可以配置滚动策略 -来生成part文件,生成part文件会将in-progress part文件关闭。该策略基于大小和指定文件被打开的超时时间来生成part文件。 +数据会被加载到文件的目录下的 part 文件中,每个分区接收到来之 subtask 的数据至少会为该分区生成一个 part 文件。同时可以配置滚动策略 +来生成 part 文件,生成 part 文件会将 in-progress part 文件关闭。该策略基于大小和指定文件被打开的超时时间来生成 part 文件。 @@ -63,7 +63,7 @@ CREATE TABLE hdfs_load_node ( - + @@ -83,7 +83,7 @@ CREATE TABLE hdfs_load_node (
    sink.rolling-policy.file-size
    128MB MemorySize滚动前part文件的最大值。滚动前 part 文件的最大值。
    sink.rolling-policy.rollover-interval
    #### 文件合并 -支持文件能力,允许在较小的checkpoint下不产生大量的小文件。 +支持文件能力,允许在较小的 checkpoint 下不产生大量的小文件。 @@ -98,8 +98,8 @@ CREATE TABLE hdfs_load_node ( - + @@ -112,11 +112,11 @@ CREATE TABLE hdfs_load_node ( #### 分区提交 -分区数据写入完成后,一般需要通知下流应用。如:更新hive的元数据信息或者hdfs目录生成_SUCCESS文件。 -分区提交策略是配置的,分区提交行为基于triggers和policies的组合。 +分区数据写入完成后,一般需要通知下流应用。如:更新 hive 的元数据信息或者 hdfs 目录生成 _SUCCESS 文件。 +分区提交策略是配置的,分区提交行为基于 triggers 和 policies 的组合。 -- Trigger :分区提交时机可以基于分区的watermark或者基于处理时间(process-time)。 -- Policy :分区提交策略,内置策略包括提交hive元数据和生成_SUCCESS文件,同时支持自定策略,如生成hive的统计信息、合并小文件等。 +- Trigger :分区提交时机可以基于分区的 watermark 或者基于处理时间(process-time)。 +- Policy :分区提交策略,内置策略包括提交 hive 元数据和生成 _SUCCESS 文件,同时支持自定策略,如生成 hive 的统计信息、合并小文件等。 备注:分区提交仅支持动态分区插入。 @@ -134,7 +134,7 @@ CREATE TABLE hdfs_load_node ( - @@ -148,13 +148,13 @@ CREATE TABLE hdfs_load_node ( - @@ -164,8 +164,8 @@ CREATE TABLE hdfs_load_node ( 分区提交策略定义了分区提交使用的具体策略。 -- metastore:仅在hive时支持该策略。 -- success: part文件生成后会生成'_SUCCESS'文件。 +- metastore:仅在 hive 时支持该策略。 +- success: part 文件生成后会生成 '_SUCCESS' 文件。
    auto-compaction
    false Boolean在流式sink中是否开启自动合并功能,数据首先会被写入临时文件。 - 当checkpoint完成后,该检查点产生的临时文件会被合并,这些临时文件在合并前不可见。在流式 sink 中是否开启自动合并功能,数据首先会被写入临时文件。 + 当 checkpoint 完成后,该检查点产生的临时文件会被合并,这些临时文件在合并前不可见。
    compaction.file-size
    sink.partition-commit.trigger
    process-time String分区提交触发器类型: 'process-time':基于机器时间既不需要分区时间提取器也不需要watermark生成器。 + 分区提交触发器类型: 'process-time':基于机器时间既不需要分区时间提取器也不需要 watermark 生成器。 一旦 "当前系统时间" 超过了 "分区创建系统时间" 和 'sink.partition-commit.delay' 之和立即提交分区。
    'partition-time':基于提取的分区时间,需要 watermark 生成。一旦 watermark 超过了 "分区创建系统时间" 和 'sink.partition-commit.delay' 之和立即提交分区。
    sink.partition-commit.watermark-time-zone
    UTC String 解析Long类型的watermark到TIMESTAMP类型时所采用的时区, - 解析得到的watermark的TIMESTAMP会被用来跟分区时间进行比较以判断是否该被提交。 - 这个属性仅当`sink.partition-commit.trigger`被设置为'partition-time'时有效。 + 解析 Long 类型的 watermark 到 TIMESTAMP 类型时所采用的时区, + 解析得到的 watermark 的 TIMESTAMP 会被用来跟分区时间进行比较以判断是否该被提交。 + 这个属性仅当 `sink.partition-commit.trigger` 被设置为 'partition-time' 时有效。 如果这个属性设置的不正确,例如在 TIMESTAMP_LTZ 类型的列上定义了 source rowtime, 如果没有设置该属性,那么用户可能会在若干个小时后才看到分区的提交。 - 默认值为'UTC'意味着 watermark 是定义在 TIMESTAMP 类型的列上或者没有定义 watermark。 - 如果watermark定义在TIMESTAMP_LTZ类型的列上,watermark时区必须是会话时区(session time zone)。 + 默认值为 'UTC' 意味着 watermark 是定义在 TIMESTAMP 类型的列上或者没有定义 watermark。 + 如果 watermark 定义在 TIMESTAMP_LTZ 类型的列上,watermark 时区必须是会话时区(session time zone)。 该属性的可选值要么是完整的时区名比如 'America/Los_Angeles',要么是自定义时区,例如 'GMT-08:00'。
    @@ -183,8 +183,8 @@ CREATE TABLE hdfs_load_node ( - + @@ -198,7 +198,7 @@ CREATE TABLE hdfs_load_node ( - +
    可选 (none) String分区策略通知分区part生成可以被访问,仅hive支持metastore策略,文件系统生成'_success'文件表示文件写入完成。 - 两种策略的指定分别为'metastore,success-file',也可以通过custom的指定的类创建提交策略。分区策略通知分区 part 生成可以被访问,仅 hive 支持 metastore 策略,文件系统生成 '_success' 文件表示文件写入完成。 + 两种策略的指定分别为 'metastore,success-file' ,也可以通过 custom 的指定的类创建提交策略。
    sink.partition-commit.policy.class
    可选 _SUCCESS String使用success-file分区提交策略时的文件名,默认值是 '_SUCCESS'。使用 success-file 分区提交策略时的文件名,默认值是 '_SUCCESS'。
    diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md index 77dedd91478..d8468a225d4 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md @@ -2,10 +2,10 @@ title: Hive sidebar_position: 2 --- -## Hive加载节点 +## Hive 加载节点 -Hive加载节点可以将数据写入hive。使用flink方言,目前仅支持insert操作,upsert模式下的数据会转换成insert方式 -目前暂时不支持使用hive方言操作hive表。 +Hive 加载节点可以将数据写入 Hive。使用 Flink 方言,目前仅支持 Insert 操作,Upsert 模式下的数据会转换成 Insert 方式 +目前暂时不支持使用 Hive 方言操作 Hive 表。 ## 支持的版本 @@ -15,18 +15,22 @@ Hive加载节点可以将数据写入hive。使用flink方言,目前仅支持i ### 依赖 -通过maven引入sort-connector-hive构建自己的项目。 +通过 Maven 引入 sort-connector-hive 构建自己的项目。 +当然,你也可以直接使用 INLONG 提供的 jar 包。([sort-connector-sqlserver-cdc](https://inlong.apache.org/download/main/)) + +### Maven 依赖 ``` org.apache.inlong sort-connector-hive + inlong_version ``` -## 如何配置Hive数据加载节点 +## 如何配置 Hive 数据加载节点 -### SQL API的使用 +### SQL API 的使用 使用 `Flink SQL Cli` : @@ -43,18 +47,18 @@ CREATE TABLE hiveTableName ( 'hive-conf-dir' = 'hdfs://localhost:9000/user/hive/hive-site.xml' ); ``` -### InLong Dashboard方式 +### InLong Dashboard 方式 #### 配置 -在创建数据流时,选择数据落地为'Hive然后点击'Add'来配置hive的相关信息。 +在创建数据流时,选择数据落地为 'Hive' 然后点击 'Add' 来配置 Hive 的相关信息。 ![Hive Configuration](img/hive.png) -### InLong Manager Client方式 +### InLong Manager Client 方式 TODO: 未来版本支持 -## Hive加载节点参数信息 +## Hive 加载节点参数信息 @@ -85,8 +89,8 @@ TODO: 未来版本支持 - +
    必须 (none) String本地构建项目可以将hive-site.xml构建到classpath中,未来dashboard将支持本地上传能力。 - 目前通用方式只支持配置已经上传文件后的HDFS路径。本地构建项目可以将hive-site.xml构建到 classpath 中,未来 Dashboard 将支持本地上传能力。 + 目前通用方式只支持配置已经上传文件后的 HDFS 路径。
    sink.partition-commit.trigger