From ec533ecd9a7d94d1c3e6d6ad7cc0440315d897e5 Mon Sep 17 00:00:00 2001 From: "AzkabanWarden.Gf" Date: Thu, 7 Mar 2024 10:38:53 +0800 Subject: [PATCH] Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327) --- docs/en/connector-v2/sink/CosFile.md | 18 +- docs/en/connector-v2/sink/FtpFile.md | 18 +- docs/en/connector-v2/sink/HdfsFile.md | 6 +- docs/en/connector-v2/sink/LocalFile.md | 18 +- docs/en/connector-v2/sink/OssFile.md | 18 +- docs/en/connector-v2/sink/OssJindoFile.md | 18 +- docs/en/connector-v2/sink/S3File.md | 18 +- docs/en/connector-v2/sink/SftpFile.md | 18 +- docs/en/connector-v2/source/CosFile.md | 19 +- docs/en/connector-v2/source/FtpFile.md | 19 +- docs/en/connector-v2/source/HdfsFile.md | 7 +- docs/en/connector-v2/source/LocalFile.md | 19 +- docs/en/connector-v2/source/OssFile.md | 47 +-- docs/en/connector-v2/source/OssJindoFile.md | 7 +- docs/en/connector-v2/source/S3File.md | 9 +- docs/en/connector-v2/source/SftpFile.md | 5 +- .../file/hdfs/source/BaseHdfsFileSource.java | 1 + .../connector-file-base/pom.xml | 14 + .../file/config/BaseFileSourceConfig.java | 1 + .../seatunnel/file/config/BaseSinkConfig.java | 21 ++ .../file/config/BaseSourceConfigOptions.java | 14 + .../seatunnel/file/config/FileFormat.java | 13 + .../file/sink/config/FileSinkConfig.java | 27 ++ .../seatunnel/file/sink/util/XmlWriter.java | 145 +++++++++ .../file/sink/writer/XmlWriteStrategy.java | 78 +++++ .../file/source/reader/XmlReadStrategy.java | 305 ++++++++++++++++++ .../file/writer/XmlReadStrategyTest.java | 159 +++++++++ .../resources/xml/name=xmlTest/test_read.xml | 24 ++ .../src/test/resources/xml/test_read_xml.conf | 39 +++ .../file/cos/sink/CosFileSinkFactory.java | 4 + .../file/cos/source/CosFileSource.java | 1 + .../file/cos/source/CosFileSourceFactory.java | 11 +- .../file/ftp/sink/FtpFileSinkFactory.java | 4 + .../file/ftp/source/FtpFileSource.java | 1 + .../file/ftp/source/FtpFileSourceFactory.java | 11 +- .../file/hdfs/sink/HdfsFileSinkFactory.java | 4 + .../hdfs/source/HdfsFileSourceFactory.java | 11 +- .../file/oss/sink/OssFileSinkFactory.java | 4 + .../file/oss/source/OssFileSource.java | 1 + .../file/oss/source/OssFileSourceFactory.java | 11 +- .../file/local/sink/LocalFileSinkFactory.java | 4 + .../local/source/LocalFileSourceFactory.java | 11 +- .../file/oss/sink/OssFileSinkFactory.java | 4 + .../file/oss/source/OssFileSourceFactory.java | 11 +- .../file/s3/sink/S3FileSinkFactory.java | 4 + .../file/s3/source/S3FileSource.java | 1 + .../file/s3/source/S3FileSourceFactory.java | 11 +- .../file/sftp/sink/SftpFileSinkFactory.java | 4 + .../file/sftp/source/SftpFileSource.java | 3 +- .../sftp/source/SftpFileSourceFactory.java | 11 +- .../e2e/connector/file/fstp/SftpFileIT.java | 9 + .../src/test/resources/xml/e2e.xml | 24 ++ .../resources/xml/fake_to_sftp_file_xml.conf | 88 +++++ .../xml/sftp_file_xml_to_assert.conf | 121 +++++++ 54 files changed, 1421 insertions(+), 53 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/XmlWriter.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/xml/name=xmlTest/test_read.xml create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/xml/test_read_xml.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/e2e.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/fake_to_sftp_file_xml.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/sftp_file_xml_to_assert.conf diff --git a/docs/en/connector-v2/sink/CosFile.md b/docs/en/connector-v2/sink/CosFile.md index f0d6517a055..6c88e922947 100644 --- a/docs/en/connector-v2/sink/CosFile.md +++ b/docs/en/connector-v2/sink/CosFile.md @@ -29,6 +29,7 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] orc - [x] json - [x] excel + - [x] xml ## Options @@ -57,6 +58,9 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | ### path [string] @@ -110,7 +114,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file We supported as the following file types: -`text` `json` `csv` `orc` `parquet` `excel` +`text` `json` `csv` `orc` `parquet` `excel` `xml` Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. @@ -189,6 +193,18 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### xml_root_tag [string] + +Specifies the tag name of the root element within the XML file. + +### xml_row_tag [string] + +Specifies the tag name of the data rows within the XML file. + +### xml_use_attr_format [boolean] + +Specifies Whether to process data using the tag attribute format. + ## Example For text file format with `have_partition` and `custom_filename` and `sink_columns` diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md index cdc3512485e..9a3af0e744c 100644 --- a/docs/en/connector-v2/sink/FtpFile.md +++ b/docs/en/connector-v2/sink/FtpFile.md @@ -27,6 +27,7 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] orc - [x] json - [x] excel + - [x] xml ## Options @@ -56,6 +57,9 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | ### host [string] @@ -115,7 +119,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file We supported as the following file types: -`text` `json` `csv` `orc` `parquet` `excel` +`text` `json` `csv` `orc` `parquet` `excel` `xml` Please note that, The final file name will end with the file_format_type's suffix, the suffix of the text file is `txt`. @@ -194,6 +198,18 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### xml_root_tag [string] + +Specifies the tag name of the root element within the XML file. + +### xml_row_tag [string] + +Specifies the tag name of the data rows within the XML file. + +### xml_use_attr_format [boolean] + +Specifies Whether to process data using the tag attribute format. + ## Example For text file format simple config diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md index 535b4fc6cda..4df905ff439 100644 --- a/docs/en/connector-v2/sink/HdfsFile.md +++ b/docs/en/connector-v2/sink/HdfsFile.md @@ -21,6 +21,7 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] orc - [x] json - [x] excel + - [x] xml - [x] compress codec - [x] lzo @@ -45,7 +46,7 @@ Output data to hdfs file | custom_filename | boolean | no | false | Whether you need custom the filename | | file_name_expression | string | no | "${transactionId}" | Only used when `custom_filename` is `true`.`file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`,`${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`.Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. | | filename_time_format | string | no | "yyyy.MM.dd" | Only used when `custom_filename` is `true`.When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:[y:Year,M:Month,d:Day of month,H:Hour in day (0-23),m:Minute in hour,s:Second in minute] | -| file_format_type | string | no | "csv" | We supported as the following file types:`text` `json` `csv` `orc` `parquet` `excel`.Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. | +| file_format_type | string | no | "csv" | We supported as the following file types:`text` `json` `csv` `orc` `parquet` `excel` `xml`.Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. | | field_delimiter | string | no | '\001' | Only used when file_format is text,The separator between columns in a row of data. Only needed by `text` file format. | | row_delimiter | string | no | "\n" | Only used when file_format is text,The separator between rows in a file. Only needed by `text` file format. | | have_partition | boolean | no | false | Whether you need processing partitions. | @@ -63,6 +64,9 @@ Output data to hdfs file | common-options | object | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | | max_rows_in_memory | int | no | - | Only used when file_format is excel.When File Format is Excel,The maximum number of data items that can be cached in the memory. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel.Writer the sheet of the workbook | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml, specifies the tag name of the root element within the XML file. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml, specifies the tag name of the data rows within the XML file | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml, specifies Whether to process data using the tag attribute format. | ### Tips diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 2f88f0fe720..e16c81c3f3a 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -27,6 +27,7 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] orc - [x] json - [x] excel + - [x] xml ## Options @@ -51,6 +52,9 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | | enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | ### path [string] @@ -89,7 +93,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file We supported as the following file types: -`text` `json` `csv` `orc` `parquet` `excel` +`text` `json` `csv` `orc` `parquet` `excel` `xml` Please note that, The final file name will end with the file_format_type's suffix, the suffix of the text file is `txt`. @@ -168,6 +172,18 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### xml_root_tag [string] + +Specifies the tag name of the root element within the XML file. + +### xml_row_tag [string] + +Specifies the tag name of the data rows within the XML file. + +### xml_use_attr_format [boolean] + +Specifies Whether to process data using the tag attribute format. + ### enable_header_write [boolean] Only used when file_format_type is text,csv.false:don't write header,true:write header. diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md index 7cbab4347de..4c85121c20c 100644 --- a/docs/en/connector-v2/sink/OssFile.md +++ b/docs/en/connector-v2/sink/OssFile.md @@ -32,6 +32,7 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] orc - [x] json - [x] excel + - [x] xml ## Data Type Mapping @@ -108,6 +109,9 @@ If write to `csv`, `text` file type, All column will be string. | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | ### path [string] @@ -161,7 +165,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${Now}` , `file We supported as the following file types: -`text` `json` `csv` `orc` `parquet` `excel` +`text` `json` `csv` `orc` `parquet` `excel` `xml` Please note that, The final file name will end with the file_format_type's suffix, the suffix of the text file is `txt`. @@ -240,6 +244,18 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### xml_root_tag [string] + +Specifies the tag name of the root element within the XML file. + +### xml_row_tag [string] + +Specifies the tag name of the data rows within the XML file. + +### xml_use_attr_format [boolean] + +Specifies Whether to process data using the tag attribute format. + ## How to Create an Oss Data Synchronization Jobs The following example demonstrates how to create a data synchronization job that reads data from Fake Source and writes it to the Oss: diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md index 40441ea83ec..1a55c319704 100644 --- a/docs/en/connector-v2/sink/OssJindoFile.md +++ b/docs/en/connector-v2/sink/OssJindoFile.md @@ -33,6 +33,7 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] orc - [x] json - [x] excel + - [x] xml ## Options @@ -61,6 +62,9 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | ### path [string] @@ -114,7 +118,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file We supported as the following file types: -`text` `json` `csv` `orc` `parquet` `excel` +`text` `json` `csv` `orc` `parquet` `excel` `xml` Please note that, The final file name will end with the file_format_type's suffix, the suffix of the text file is `txt`. @@ -193,6 +197,18 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### xml_root_tag [string] + +Specifies the tag name of the root element within the XML file. + +### xml_row_tag [string] + +Specifies the tag name of the data rows within the XML file. + +### xml_use_attr_format [boolean] + +Specifies Whether to process data using the tag attribute format. + ## Example For text file format with `have_partition` and `custom_filename` and `sink_columns` diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index 84bca3cb80c..a3811ea34ac 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -22,6 +22,7 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] orc - [x] json - [x] excel + - [x] xml ## Description @@ -116,6 +117,9 @@ If write to `csv`, `text` file type, All column will be string. | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml, specifies the tag name of the root element within the XML file. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml, specifies the tag name of the data rows within the XML file | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml, specifies Whether to process data using the tag attribute format. | | hadoop_s3_properties | map | no | | If you need to add a other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) | | schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Before turning on the synchronous task, do different treatment of the target path | | data_save_mode | Enum | no | APPEND_DATA | Before opening the synchronous task, the data file in the target path is differently processed | @@ -167,7 +171,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file We supported as the following file types: -`text` `json` `csv` `orc` `parquet` `excel` +`text` `json` `csv` `orc` `parquet` `excel` `xml` Please note that, The final file name will end with the file_format_type's suffix, the suffix of the text file is `txt`. @@ -246,6 +250,18 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### xml_root_tag [string] + +Specifies the tag name of the root element within the XML file. + +### xml_row_tag [string] + +Specifies the tag name of the data rows within the XML file. + +### xml_use_attr_format [boolean] + +Specifies Whether to process data using the tag attribute format. + ### schema_save_mode[Enum] Before turning on the synchronous task, do different treatment of the target path. diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md index 7bb3f12559b..448d1dd050d 100644 --- a/docs/en/connector-v2/sink/SftpFile.md +++ b/docs/en/connector-v2/sink/SftpFile.md @@ -27,6 +27,7 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] orc - [x] json - [x] excel + - [x] xml ## Options @@ -55,6 +56,9 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | ### host [string] @@ -108,7 +112,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file We supported as the following file types: -`text` `json` `csv` `orc` `parquet` `excel` +`text` `json` `csv` `orc` `parquet` `excel` `xml` Please note that, The final file name will end with the file_format_type's suffix, the suffix of the text file is `txt`. @@ -187,6 +191,18 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### xml_root_tag [string] + +Specifies the tag name of the root element within the XML file. + +### xml_row_tag [string] + +Specifies the tag name of the data rows within the XML file. + +### xml_use_attr_format [boolean] + +Specifies Whether to process data using the tag attribute format. + ## Example For text file format with `have_partition` and `custom_filename` and `sink_columns` diff --git a/docs/en/connector-v2/source/CosFile.md b/docs/en/connector-v2/source/CosFile.md index 406c86fab5b..7f0d6020800 100644 --- a/docs/en/connector-v2/source/CosFile.md +++ b/docs/en/connector-v2/source/CosFile.md @@ -26,6 +26,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa - [x] orc - [x] json - [x] excel + - [x] xml ## Description @@ -60,6 +61,8 @@ To use this connector you need put hadoop-cos-{hadoop.version}-{version}.jar and | time_format | string | no | HH:mm:ss | | schema | config | no | - | | sheet_name | string | no | - | +| xml_row_tag | string | no | - | +| xml_use_attr_format | boolean | no | - | | file_filter_pattern | string | no | - | | compress_codec | string | no | none | | common-options | | no | - | @@ -72,7 +75,7 @@ The source file path. File type, supported as the following file types: -`text` `csv` `parquet` `orc` `json` `excel` +`text` `csv` `parquet` `orc` `json` `excel` `xml` If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want. @@ -236,7 +239,7 @@ default `HH:mm:ss` ### schema [config] -Only need to be configured when the file_format_type are text, json, excel or csv ( Or other format we can't read the schema from metadata). +Only need to be configured when the file_format_type are text, json, excel, xml or csv ( Or other format we can't read the schema from metadata). #### fields [Config] @@ -248,6 +251,18 @@ Only need to be configured when file_format is excel. Reader the sheet of the workbook. +### xml_row_tag [string] + +Only need to be configured when file_format is xml. + +Specifies the tag name of the data rows within the XML file. + +### xml_use_attr_format [boolean] + +Only need to be configured when file_format is xml. + +Specifies Whether to process data using the tag attribute format. + ### file_filter_pattern [string] Filter pattern, which used for filtering files. diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md index ee231bb087b..e103c14a9ae 100644 --- a/docs/en/connector-v2/source/FtpFile.md +++ b/docs/en/connector-v2/source/FtpFile.md @@ -21,6 +21,7 @@ - [x] csv - [x] json - [x] excel + - [x] xml ## Description @@ -54,6 +55,8 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | skip_header_row_number | long | no | 0 | | schema | config | no | - | | sheet_name | string | no | - | +| xml_row_tag | string | no | - | +| xml_use_attr_format | boolean | no | - | | file_filter_pattern | string | no | - | | compress_codec | string | no | none | | common-options | | no | - | @@ -82,7 +85,7 @@ The source file path. File type, supported as the following file types: -`text` `csv` `parquet` `orc` `json` `excel` +`text` `csv` `parquet` `orc` `json` `excel` `xml` If you assign file type to `json` , you should also assign schema option to tell connector how to parse data to the row you want. @@ -221,7 +224,7 @@ then SeaTunnel will skip the first 2 lines from source files ### schema [config] -Only need to be configured when the file_format_type are text, json, excel or csv ( Or other format we can't read the schema from metadata). +Only need to be configured when the file_format_type are text, json, excel, xml or csv ( Or other format we can't read the schema from metadata). The schema information of upstream data. @@ -233,6 +236,18 @@ The read column list of the data source, user can use it to implement field proj Reader the sheet of the workbook,Only used when file_format_type is excel. +### xml_row_tag [string] + +Only need to be configured when file_format is xml. + +Specifies the tag name of the data rows within the XML file. + +### xml_use_attr_format [boolean] + +Only need to be configured when file_format is xml. + +Specifies Whether to process data using the tag attribute format. + ### compress_codec [string] The compress codec of files and the details that supported as the following shown: diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index ffcb0b68678..5534dcd9653 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -26,6 +26,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa - [x] orc - [x] json - [x] excel + - [x] xml ## Description @@ -42,9 +43,9 @@ Read data from hdfs file system. | Name | Type | Required | Default | Description | |---------------------------|---------|----------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | path | string | yes | - | The source file path. | -| file_format_type | string | yes | - | We supported as the following file types:`text` `json` `csv` `orc` `parquet` `excel`.Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. | +| file_format_type | string | yes | - | We supported as the following file types:`text` `json` `csv` `orc` `parquet` `excel` `xml`.Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. | | fs.defaultFS | string | yes | - | The hadoop cluster address that start with `hdfs://`, for example: `hdfs://hadoopcluster` | -| read_columns | list | yes | - | The read column list of the data source, user can use it to implement field projection.The file type supported column projection as the following shown:[text,json,csv,orc,parquet,excel].Tips: If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured. | +| read_columns | list | yes | - | The read column list of the data source, user can use it to implement field projection.The file type supported column projection as the following shown:[text,json,csv,orc,parquet,excel,xml].Tips: If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured. | | hdfs_site_path | string | no | - | The path of `hdfs-site.xml`, used to load ha configuration of namenodes | | delimiter/field_delimiter | string | no | \001 | Field delimiter, used to tell connector how to slice and dice fields when reading text files. default `\001`, the same as hive's default delimiter | | parse_partition_from_path | boolean | no | true | Control whether parse the partition keys and values from file path. For example if you read a file from path `hdfs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`. Every record data from file will be added these two fields:[name:tyrantlucifer,age:26].Tips:Do not define partition fields in schema option. | @@ -58,6 +59,8 @@ Read data from hdfs file system. | skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv.For example, set like following:`skip_header_row_number = 2`.then Seatunnel will skip the first 2 lines from source files | | schema | config | no | - | the schema fields of upstream data | | sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | +| xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only used when file_format is xml. | | compress_codec | string | no | none | The compress codec of files | | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index 4d20ca532d1..172049498cc 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -26,6 +26,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa - [x] orc - [x] json - [x] excel + - [x] xml ## Description @@ -54,6 +55,8 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | skip_header_row_number | long | no | 0 | | schema | config | no | - | | sheet_name | string | no | - | +| xml_row_tag | string | no | - | +| xml_use_attr_format | boolean | no | - | | file_filter_pattern | string | no | - | | compress_codec | string | no | none | | common-options | | no | - | @@ -67,7 +70,7 @@ The source file path. File type, supported as the following file types: -`text` `csv` `parquet` `orc` `json` `excel` +`text` `csv` `parquet` `orc` `json` `excel` `xml` If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want. @@ -215,7 +218,7 @@ then SeaTunnel will skip the first 2 lines from source files ### schema [config] -Only need to be configured when the file_format_type are text, json, excel or csv ( Or other format we can't read the schema from metadata). +Only need to be configured when the file_format_type are text, json, excel, xml or csv ( Or other format we can't read the schema from metadata). #### fields [Config] @@ -227,6 +230,18 @@ Only need to be configured when file_format is excel. Reader the sheet of the workbook. +### xml_row_tag [string] + +Only need to be configured when file_format is xml. + +Specifies the tag name of the data rows within the XML file. + +### xml_use_attr_format [boolean] + +Only need to be configured when file_format is xml. + +Specifies Whether to process data using the tag attribute format. + ### file_filter_pattern [string] Filter pattern, which used for filtering files. diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md index 233eb76800f..85d922644de 100644 --- a/docs/en/connector-v2/source/OssFile.md +++ b/docs/en/connector-v2/source/OssFile.md @@ -37,12 +37,13 @@ Read all the data in a split in a pollNext call. What splits are read will be sa - [x] orc - [x] json - [x] excel + - [x] xml ## Data Type Mapping Data type mapping is related to the type of file being read, We supported as the following file types: -`text` `csv` `parquet` `orc` `json` `excel` +`text` `csv` `parquet` `orc` `json` `excel` `xml` ### JSON File Type @@ -188,26 +189,28 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto ## Options -| name | type | required | default value | Description | -|---------------------------|---------|----------|---------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| path | string | yes | - | The Oss path that needs to be read can have sub paths, but the sub paths need to meet certain format requirements. Specific requirements can be referred to "parse_partition_from_path" option | -| file_format_type | string | yes | - | File type, supported as the following file types: `text` `csv` `parquet` `orc` `json` `excel` | -| bucket | string | yes | - | The bucket address of oss file system, for example: `oss://seatunnel-test`. | -| endpoint | string | yes | - | fs oss endpoint | -| read_columns | list | no | - | The read column list of the data source, user can use it to implement field projection. The file type supported column projection as the following shown: `text` `csv` `parquet` `orc` `json` `excel` . If the user wants to use this feature when reading `text` `json` `csv` files, the "schema" option must be configured. | -| access_key | string | no | - | | -| access_secret | string | no | - | | -| delimiter | string | no | \001 | Field delimiter, used to tell connector how to slice and dice fields when reading text files. Default `\001`, the same as hive's default delimiter. | -| parse_partition_from_path | boolean | no | true | Control whether parse the partition keys and values from file path. For example if you read a file from path `oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`. Every record data from file will be added these two fields: name="tyrantlucifer", age=16 | -| date_format | string | no | yyyy-MM-dd | Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`. default `yyyy-MM-dd` | -| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` | -| time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS` | -| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. For example, set like following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 lines from source files | -| schema | config | no | - | The schema of upstream data. | -| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | -| compress_codec | string | no | none | Which compress codec the files used. | -| file_filter_pattern | string | no | | `*.txt` means you only need read the files end with `.txt` | -| common-options | config | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | +| name | type | required | default value | Description | +|---------------------------|---------|----------|---------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | The Oss path that needs to be read can have sub paths, but the sub paths need to meet certain format requirements. Specific requirements can be referred to "parse_partition_from_path" option | +| file_format_type | string | yes | - | File type, supported as the following file types: `text` `csv` `parquet` `orc` `json` `excel` `xml` | +| bucket | string | yes | - | The bucket address of oss file system, for example: `oss://seatunnel-test`. | +| endpoint | string | yes | - | fs oss endpoint | +| read_columns | list | no | - | The read column list of the data source, user can use it to implement field projection. The file type supported column projection as the following shown: `text` `csv` `parquet` `orc` `json` `excel` `xml` . If the user wants to use this feature when reading `text` `json` `csv` files, the "schema" option must be configured. | +| access_key | string | no | - | | +| access_secret | string | no | - | | +| delimiter | string | no | \001 | Field delimiter, used to tell connector how to slice and dice fields when reading text files. Default `\001`, the same as hive's default delimiter. | +| parse_partition_from_path | boolean | no | true | Control whether parse the partition keys and values from file path. For example if you read a file from path `oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`. Every record data from file will be added these two fields: name="tyrantlucifer", age=16 | +| date_format | string | no | yyyy-MM-dd | Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`. default `yyyy-MM-dd` | +| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` | +| time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS` | +| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. For example, set like following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 lines from source files | +| schema | config | no | - | The schema of upstream data. | +| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | +| xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only used when file_format is xml. | +| compress_codec | string | no | none | Which compress codec the files used. | +| file_filter_pattern | string | no | | `*.txt` means you only need read the files end with `.txt` | +| common-options | config | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | ### compress_codec [string] @@ -225,7 +228,7 @@ Filter pattern, which used for filtering files. ### schema [config] -Only need to be configured when the file_format_type are text, json, excel or csv ( Or other format we can't read the schema from metadata). +Only need to be configured when the file_format_type are text, json, excel, xml or csv ( Or other format we can't read the schema from metadata). #### fields [Config] diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md index 27b710cfb8a..d1a28265539 100644 --- a/docs/en/connector-v2/source/OssJindoFile.md +++ b/docs/en/connector-v2/source/OssJindoFile.md @@ -26,6 +26,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa - [x] orc - [x] json - [x] excel + - [x] xml ## Description @@ -64,6 +65,8 @@ It only supports hadoop version **2.9.X+**. | skip_header_row_number | long | no | 0 | | schema | config | no | - | | sheet_name | string | no | - | +| xml_row_tag | string | no | - | +| xml_use_attr_format | boolean | no | - | | file_filter_pattern | string | no | - | | compress_codec | string | no | none | | common-options | | no | - | @@ -76,7 +79,7 @@ The source file path. File type, supported as the following file types: -`text` `csv` `parquet` `orc` `json` `excel` +`text` `csv` `parquet` `orc` `json` `excel` `xml` If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want. @@ -240,7 +243,7 @@ then SeaTunnel will skip the first 2 lines from source files ### schema [config] -Only need to be configured when the file_format_type are text, json, excel or csv ( Or other format we can't read the schema from metadata). +Only need to be configured when the file_format_type are text, json, excel, xml or csv ( Or other format we can't read the schema from metadata). #### fields [Config] diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index 7ad6f5735cc..0387af044d6 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -26,6 +26,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa - [x] orc - [x] json - [x] excel + - [x] xml ## Description @@ -48,7 +49,7 @@ Read data from aws s3 file system. Data type mapping is related to the type of file being read, We supported as the following file types: -`text` `csv` `parquet` `orc` `json` `excel` +`text` `csv` `parquet` `orc` `json` `excel` `xml` ### JSON File Type @@ -197,11 +198,11 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto | name | type | required | default value | Description | |---------------------------------|---------|----------|-------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | path | string | yes | - | The s3 path that needs to be read can have sub paths, but the sub paths need to meet certain format requirements. Specific requirements can be referred to "parse_partition_from_path" option | -| file_format_type | string | yes | - | File type, supported as the following file types: `text` `csv` `parquet` `orc` `json` `excel` | +| file_format_type | string | yes | - | File type, supported as the following file types: `text` `csv` `parquet` `orc` `json` `excel` `xml` | | bucket | string | yes | - | The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`. | | fs.s3a.endpoint | string | yes | - | fs s3a endpoint | | fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. More information about the credential provider you can see [Hadoop AWS Document](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Simple_name.2Fsecret_credentials_with_SimpleAWSCredentialsProvider.2A) | -| read_columns | list | no | - | The read column list of the data source, user can use it to implement field projection. The file type supported column projection as the following shown: `text` `csv` `parquet` `orc` `json` `excel` . If the user wants to use this feature when reading `text` `json` `csv` files, the "schema" option must be configured. | +| read_columns | list | no | - | The read column list of the data source, user can use it to implement field projection. The file type supported column projection as the following shown: `text` `csv` `parquet` `orc` `json` `excel` `xml` . If the user wants to use this feature when reading `text` `json` `csv` files, the "schema" option must be configured. | | access_key | string | no | - | Only used when `fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider ` | | access_secret | string | no | - | Only used when `fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider ` | | hadoop_s3_properties | map | no | - | If you need to add other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) | @@ -213,6 +214,8 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto | skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. For example, set like following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 lines from source files | | schema | config | no | - | The schema of upstream data. | | sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | +| xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only valid for XML files. | +| xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only valid for XML files. | | compress_codec | string | no | none | | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md index 4f6e9af44bc..0f179749fbc 100644 --- a/docs/en/connector-v2/source/SftpFile.md +++ b/docs/en/connector-v2/source/SftpFile.md @@ -21,6 +21,7 @@ - [x] csv - [x] json - [x] excel + - [x] xml ## Description @@ -86,6 +87,8 @@ The File does not have a specific type list, and we can indicate which SeaTunnel | skip_header_row_number | Long | No | 0 | Skip the first few lines, but only for the txt and csv.
For example, set like following:
`skip_header_row_number = 2`
then SeaTunnel will skip the first 2 lines from source files | | read_columns | list | no | - | The read column list of the data source, user can use it to implement field projection. | | sheet_name | String | No | - | Reader the sheet of the workbook,Only used when file_format is excel. | +| xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only used when file_format is xml. | | schema | Config | No | - | Please check #schema below | | compress_codec | String | No | None | The compress codec of files and the details that supported as the following shown:
- txt: `lzo` `None`
- json: `lzo` `None`
- csv: `lzo` `None`
- orc: `lzo` `snappy` `lz4` `zlib` `None`
- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `None`
Tips: excel type does Not support any compression format | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | @@ -93,7 +96,7 @@ The File does not have a specific type list, and we can indicate which SeaTunnel ### file_format_type [string] File type, supported as the following file types: -`text` `csv` `parquet` `orc` `json` `excel` +`text` `csv` `parquet` `orc` `json` `excel` `xml` If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want. For example: upstream data is the following: diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java index cd42af0b931..75fbd04e68f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java @@ -102,6 +102,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { case TEXT: case JSON: case EXCEL: + case XML: SeaTunnelRowType userDefinedSchema = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml index 486b75939af..f091e7023d9 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml @@ -37,6 +37,8 @@ 4.1.2 4.1.2 3.1.4 + 2.1.4 + 2.0.0 @@ -144,6 +146,18 @@ ${hadoop-minikdc.version} test + + + org.dom4j + dom4j + ${dom4j.version} + + + + jaxen + jaxen + ${jaxen.version} + diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java index 520d40f9be2..c08a7a11def 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java @@ -93,6 +93,7 @@ private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) { case TEXT: case JSON: case EXCEL: + case XML: readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType()); return newCatalogTable(catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo()); case ORC: diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index cff9fc87a98..c7d4576f288 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -246,6 +246,27 @@ public class BaseSinkConfig { .noDefaultValue() .withDescription("To be written sheet name,only valid for excel files"); + public static final Option XML_ROOT_TAG = + Options.key("xml_root_tag") + .stringType() + .defaultValue("RECORDS") + .withDescription( + "Specifies the tag name of the root element within the XML file, only valid for xml files, default value is 'RECORDS'"); + + public static final Option XML_ROW_TAG = + Options.key("xml_row_tag") + .stringType() + .defaultValue("RECORD") + .withDescription( + "Specifies the tag name of the data rows within the XML file, only valid for xml files, default value is 'RECORD'"); + + public static final Option XML_USE_ATTR_FORMAT = + Options.key("xml_use_attr_format") + .booleanType() + .noDefaultValue() + .withDescription( + "Specifies whether to process data using the tag attribute format, only valid for XML files."); + public static final Option ENABLE_HEADER_WRITE = Options.key("enable_header_write") .booleanType() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java index 11f9488ab4e..4e4c0bbef5f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java @@ -130,6 +130,20 @@ public class BaseSourceConfigOptions { .noDefaultValue() .withDescription("To be read sheet name,only valid for excel files"); + public static final Option XML_ROW_TAG = + Options.key("xml_row_tag") + .stringType() + .noDefaultValue() + .withDescription( + "Specifies the tag name of the data rows within the XML file, only valid for XML files."); + + public static final Option XML_USE_ATTR_FORMAT = + Options.key("xml_use_attr_format") + .booleanType() + .noDefaultValue() + .withDescription( + "Specifies whether to process data using the tag attribute format, only valid for XML files."); + public static final Option FILE_FILTER_PATTERN = Options.key("file_filter_pattern") .stringType() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java index 1de8a5e02b4..52465fa48a4 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java @@ -24,12 +24,14 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.TextWriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; +import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.XmlWriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.XmlReadStrategy; import java.io.Serializable; @@ -100,6 +102,17 @@ public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) { public ReadStrategy getReadStrategy() { return new ExcelReadStrategy(); } + }, + XML("xml") { + @Override + public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) { + return new XmlWriteStrategy(fileSinkConfig); + } + + @Override + public ReadStrategy getReadStrategy() { + return new XmlReadStrategy(); + } }; private final String suffix; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java index 387b4c12710..7fe10224b71 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.PartitionConfig; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; @@ -71,6 +72,12 @@ public class FileSinkConfig extends BaseFileSinkConfig implements PartitionConfi private String sheetName; + private String xmlRootTag = BaseSinkConfig.XML_ROOT_TAG.defaultValue(); + + private String xmlRowTag = BaseSinkConfig.XML_ROW_TAG.defaultValue(); + + private Boolean xmlUseAttrFormat; + public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaTunnelRowTypeInfo) { super(config); checkArgument( @@ -184,5 +191,25 @@ public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaTunne if (config.hasPath(BaseSinkConfig.SHEET_NAME.key())) { this.sheetName = config.getString(BaseSinkConfig.SHEET_NAME.key()); } + + if (FileFormat.XML + .name() + .equalsIgnoreCase(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) { + if (!config.hasPath(BaseSinkConfig.XML_USE_ATTR_FORMAT.key())) { + throw new FileConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + "User must define xml_use_attr_format when file_format_type is xml"); + } + + this.xmlUseAttrFormat = config.getBoolean(BaseSinkConfig.XML_USE_ATTR_FORMAT.key()); + + if (config.hasPath(BaseSinkConfig.XML_ROOT_TAG.key())) { + this.xmlRootTag = config.getString(BaseSinkConfig.XML_ROOT_TAG.key()); + } + + if (config.hasPath(BaseSinkConfig.XML_ROW_TAG.key())) { + this.xmlRowTag = config.getString(BaseSinkConfig.XML_ROW_TAG.key()); + } + } } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/XmlWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/XmlWriter.java new file mode 100644 index 00000000000..2617817f7d7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/XmlWriter.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.sink.util; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; + +import org.dom4j.Document; +import org.dom4j.DocumentHelper; +import org.dom4j.Element; +import org.dom4j.io.OutputFormat; +import org.dom4j.io.XMLWriter; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.AbstractMap; +import java.util.List; + +/** The XmlWriter class provides functionality to write data in XML format. */ +public class XmlWriter { + + private final FileSinkConfig fileSinkConfig; + private final List sinkColumnsIndexInRow; + private final SeaTunnelRowType seaTunnelRowType; + private final Document document; + private final Element rootElement; + private final String fieldDelimiter; + private OutputFormat format; + + public XmlWriter( + FileSinkConfig fileSinkConfig, + List sinkColumnsIndexInRow, + SeaTunnelRowType seaTunnelRowType) { + this.fileSinkConfig = fileSinkConfig; + this.sinkColumnsIndexInRow = sinkColumnsIndexInRow; + this.seaTunnelRowType = seaTunnelRowType; + + this.fieldDelimiter = fileSinkConfig.getFieldDelimiter(); + + setXmlOutputFormat(); + document = DocumentHelper.createDocument(); + rootElement = document.addElement(fileSinkConfig.getXmlRootTag()); + } + + public void writeData(SeaTunnelRow seaTunnelRow) { + Element rowElement = rootElement.addElement(fileSinkConfig.getXmlRowTag()); + boolean useAttributeFormat = fileSinkConfig.getXmlUseAttrFormat(); + + sinkColumnsIndexInRow.stream() + .map( + index -> + new AbstractMap.SimpleEntry<>( + seaTunnelRowType.getFieldName(index), + convertToXmlString( + seaTunnelRow.getField(index), + seaTunnelRowType.getFieldType(index)))) + .forEach( + entry -> { + if (useAttributeFormat) { + rowElement.addAttribute(entry.getKey(), entry.getValue()); + } else { + rowElement.addElement(entry.getKey()).addText(entry.getValue()); + } + }); + } + + private String convertToXmlString(Object fieldValue, SeaTunnelDataType fieldType) { + if (fieldValue == null) { + return ""; + } + + switch (fieldType.getSqlType()) { + case STRING: + case DATE: + case TIME: + case TIMESTAMP: + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + case DOUBLE: + case FLOAT: + case DECIMAL: + case BOOLEAN: + return fieldValue.toString(); + case NULL: + return ""; + case ROW: + Object[] fields = ((SeaTunnelRow) fieldValue).getFields(); + String[] strings = new String[fields.length]; + for (int i = 0; i < fields.length; i++) { + strings[i] = + convertToXmlString( + fields[i], ((SeaTunnelRowType) fieldType).getFieldType(i)); + } + return String.join(fieldDelimiter, strings); + case MAP: + case ARRAY: + return JsonUtils.toJsonString(fieldValue); + case BYTES: + return new String((byte[]) fieldValue, StandardCharsets.UTF_8); + default: + throw new FileConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, + "SeaTunnel format not support this data type " + fieldType.getSqlType()); + } + } + + public void flushAndCloseXmlWriter(OutputStream output) throws IOException { + XMLWriter xmlWriter = new XMLWriter(output, format); + xmlWriter.write(document); + xmlWriter.close(); + } + + private void setXmlOutputFormat() { + this.format = OutputFormat.createPrettyPrint(); + this.format.setNewlines(true); + this.format.setNewLineAfterDeclaration(true); + this.format.setSuppressDeclaration(false); + this.format.setExpandEmptyElements(false); + this.format.setEncoding(StandardCharsets.UTF_8.name()); + this.format.setIndent("\t"); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java new file mode 100644 index 00000000000..74fa220031d --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.sink.writer; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.file.sink.util.XmlWriter; + +import org.apache.hadoop.fs.FSDataOutputStream; + +import java.io.IOException; +import java.util.LinkedHashMap; + +/** + * An implementation of the AbstractWriteStrategy class that writes data in XML format. + * + *

This strategy stores multiple XmlWriter instances for different files being written and + * ensures that each file is written to only once. It writes the data by passing the data row to the + * corresponding XmlWriter instance. + */ +public class XmlWriteStrategy extends AbstractWriteStrategy { + + private final LinkedHashMap beingWrittenWriter; + + public XmlWriteStrategy(FileSinkConfig fileSinkConfig) { + super(fileSinkConfig); + this.beingWrittenWriter = new LinkedHashMap<>(); + } + + @Override + public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException { + super.write(seaTunnelRow); + String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow); + XmlWriter xmlDocWriter = getOrCreateXmlWriter(filePath); + xmlDocWriter.writeData(seaTunnelRow); + } + + @Override + public void finishAndCloseFile() { + this.beingWrittenWriter.forEach( + (k, v) -> { + try { + hadoopFileSystemProxy.createFile(k); + FSDataOutputStream fileOutputStream = + hadoopFileSystemProxy.getOutputStream(k); + v.flushAndCloseXmlWriter(fileOutputStream); + fileOutputStream.close(); + } catch (IOException e) { + throw CommonError.fileOperationFailed("XmlFile", "write", k, e); + } + needMoveFiles.put(k, getTargetLocation(k)); + }); + this.beingWrittenWriter.clear(); + } + + private XmlWriter getOrCreateXmlWriter(String filePath) { + return beingWrittenWriter.computeIfAbsent( + filePath, + k -> new XmlWriter(fileSinkConfig, sinkColumnsIndexInRow, seaTunnelRowType)); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java new file mode 100644 index 00000000000..0752bf52a85 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.source.reader; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.utils.DateTimeUtils; +import org.apache.seatunnel.common.utils.DateUtils; +import org.apache.seatunnel.common.utils.TimeUtils; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; + +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.dom4j.Node; +import org.dom4j.io.SAXReader; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** The XmlReadStrategy class is used to read data from XML files in SeaTunnel. */ +@Slf4j +public class XmlReadStrategy extends AbstractReadStrategy { + + private String tableRowName; + private Boolean useAttrFormat; + private String delimiter; + + private int fieldCount; + + private DateUtils.Formatter dateFormat; + private DateTimeUtils.Formatter datetimeFormat; + private TimeUtils.Formatter timeFormat; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void init(HadoopConf conf) { + super.init(conf); + preCheckAndInitializeConfiguration(); + } + + @Override + public void read(String path, String tableId, Collector output) + throws IOException, FileConnectorException { + Map partitionsMap = parsePartitionsByPath(path); + SAXReader saxReader = new SAXReader(); + Document document; + try { + document = saxReader.read(hadoopFileSystemProxy.getInputStream(path)); + } catch (DocumentException e) { + throw new FileConnectorException( + FileConnectorErrorCode.FILE_READ_FAILED, "Failed to read xml file: " + path, e); + } + Element rootElement = document.getRootElement(); + + fieldCount = + isMergePartition + ? seaTunnelRowTypeWithPartition.getTotalFields() + : seaTunnelRowType.getTotalFields(); + + rootElement + .selectNodes(getXPathExpression(tableRowName)) + .forEach( + node -> { + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fieldCount); + + List fields = + new ArrayList<>( + (useAttrFormat + ? ((Element) node).attributes() + : node.selectNodes("./*"))) + .stream() + .filter( + field -> + ArrayUtils.contains( + seaTunnelRowType + .getFieldNames(), + field.getName())) + .collect(Collectors.toList()); + + if (CollectionUtils.isEmpty(fields)) return; + + fields.forEach( + field -> { + int fieldIndex = + ArrayUtils.indexOf( + seaTunnelRowType.getFieldNames(), + field.getName()); + seaTunnelRow.setField( + fieldIndex, + convert( + field.getText(), + seaTunnelRowType + .getFieldTypes()[fieldIndex])); + }); + + if (isMergePartition) { + int partitionIndex = seaTunnelRowType.getTotalFields(); + for (String value : partitionsMap.values()) { + seaTunnelRow.setField(partitionIndex++, value); + } + } + + seaTunnelRow.setTableId(tableId); + output.collect(seaTunnelRow); + }); + } + + @Override + public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnectorException { + throw new FileConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + "User must defined schema for xml file type"); + } + + @Override + public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { + if (ArrayUtils.isEmpty(seaTunnelRowType.getFieldNames()) + || ArrayUtils.isEmpty(seaTunnelRowType.getFieldTypes())) { + throw new FileConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + "Schema information is undefined or misconfigured, please check your configuration file."); + } + + if (readColumns.isEmpty()) { + this.seaTunnelRowType = seaTunnelRowType; + this.seaTunnelRowTypeWithPartition = + mergePartitionTypes(fileNames.get(0), seaTunnelRowType); + } else { + if (readColumns.retainAll(Arrays.asList(seaTunnelRowType.getFieldNames()))) { + log.warn( + "The read columns configuration will be filtered by the schema configuration, this may cause the actual results to be inconsistent with expectations. This is due to read columns not being a subset of the schema, " + + "maybe you should check the schema and read_columns!"); + } + int[] indexes = new int[readColumns.size()]; + String[] fields = new String[readColumns.size()]; + SeaTunnelDataType[] types = new SeaTunnelDataType[readColumns.size()]; + for (int i = 0; i < readColumns.size(); i++) { + indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i)); + fields[i] = seaTunnelRowType.getFieldName(indexes[i]); + types[i] = seaTunnelRowType.getFieldType(indexes[i]); + } + this.seaTunnelRowType = new SeaTunnelRowType(fields, types); + this.seaTunnelRowTypeWithPartition = + mergePartitionTypes(fileNames.get(0), this.seaTunnelRowType); + } + } + + @SneakyThrows + private Object convert(String fieldValue, SeaTunnelDataType fieldType) { + if (StringUtils.isBlank(fieldValue)) { + return ""; + } + SqlType sqlType = fieldType.getSqlType(); + switch (sqlType) { + case STRING: + return fieldValue; + case DATE: + return DateUtils.parse(fieldValue, dateFormat); + case TIME: + return TimeUtils.parse(fieldValue, timeFormat); + case TIMESTAMP: + return DateTimeUtils.parse(fieldValue, datetimeFormat); + case TINYINT: + return (byte) Double.parseDouble(fieldValue); + case SMALLINT: + return (short) Double.parseDouble(fieldValue); + case INT: + return (int) Double.parseDouble(fieldValue); + case BIGINT: + return (long) Double.parseDouble(fieldValue); + case DOUBLE: + return Double.parseDouble(fieldValue); + case FLOAT: + return (float) Double.parseDouble(fieldValue); + case DECIMAL: + return BigDecimal.valueOf(Double.parseDouble(fieldValue)); + case BOOLEAN: + return Boolean.parseBoolean(fieldValue); + case BYTES: + return fieldValue.getBytes(StandardCharsets.UTF_8); + case NULL: + return ""; + case ROW: + String[] context = fieldValue.split(delimiter); + SeaTunnelRowType ft = (SeaTunnelRowType) fieldType; + SeaTunnelRow row = new SeaTunnelRow(context.length); + IntStream.range(0, context.length) + .forEach(i -> row.setField(i, convert(context[i], ft.getFieldTypes()[i]))); + return row; + case MAP: + case ARRAY: + return objectMapper.readValue(fieldValue, fieldType.getTypeClass()); + default: + throw new FileConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, + String.format("Unsupported data type: %s", sqlType)); + } + } + + private String getXPathExpression(String tableRowIdentification) { + return String.format("//%s", tableRowIdentification); + } + + /** Performs pre-checks and initialization of the configuration for reading XML files. */ + private void preCheckAndInitializeConfiguration() { + this.tableRowName = getPrimitiveConfigValue(BaseSourceConfigOptions.XML_ROW_TAG); + this.useAttrFormat = getPrimitiveConfigValue(BaseSourceConfigOptions.XML_USE_ATTR_FORMAT); + + // Check mandatory configurations + if (StringUtils.isEmpty(tableRowName) || useAttrFormat == null) { + throw new FileConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "Mandatory configurations '%s' and '%s' must be specified when reading XML files.", + BaseSourceConfigOptions.XML_ROW_TAG.key(), + BaseSourceConfigOptions.XML_USE_ATTR_FORMAT.key())); + } + + this.delimiter = getPrimitiveConfigValue(BaseSourceConfigOptions.FIELD_DELIMITER); + + this.dateFormat = + getComplexDateConfigValue( + BaseSourceConfigOptions.DATE_FORMAT, DateUtils.Formatter::parse); + this.timeFormat = + getComplexDateConfigValue( + BaseSourceConfigOptions.TIME_FORMAT, TimeUtils.Formatter::parse); + this.datetimeFormat = + getComplexDateConfigValue( + BaseSourceConfigOptions.DATETIME_FORMAT, DateTimeUtils.Formatter::parse); + } + + /** + * Retrieves the value of a primitive configuration option. + * + * @param option the configuration option to retrieve the value for + * @param the type of the configuration option + * @return the value of the configuration option, or the default value if the option is not set + */ + @SuppressWarnings("unchecked") + private T getPrimitiveConfigValue(Option option) { + if (!pluginConfig.hasPath(option.key())) { + return (T) option.defaultValue(); + } + return (T) pluginConfig.getAnyRef(option.key()); + } + + /** + * Retrieves the complex date configuration value for the given option. + * + * @param option The configuration option to retrieve. + * @param parser The function used to parse the configuration value. + * @param The type of the configuration value. + * @return The parsed configuration value or the default value if not found. + */ + @SuppressWarnings("unchecked") + private T getComplexDateConfigValue(Option option, Function parser) { + if (!pluginConfig.hasPath(option.key())) { + return (T) option.defaultValue(); + } + return parser.apply(pluginConfig.getString(option.key())); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java new file mode 100644 index 00000000000..0679bade2d6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.writer; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.DateTimeUtils; +import org.apache.seatunnel.common.utils.DateUtils; +import org.apache.seatunnel.common.utils.TimeUtils; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.XmlReadStrategy; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import lombok.Getter; + +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT; + +public class XmlReadStrategyTest { + + @Test + public void testXmlRead() throws IOException, URISyntaxException { + URL xmlFile = XmlReadStrategyTest.class.getResource("/xml/name=xmlTest/test_read.xml"); + URL conf = XmlReadStrategyTest.class.getResource("/xml/test_read_xml.conf"); + Assertions.assertNotNull(xmlFile); + Assertions.assertNotNull(conf); + String xmlFilePath = Paths.get(xmlFile.toURI()).toString(); + String confPath = Paths.get(conf.toURI()).toString(); + Config pluginConfig = ConfigFactory.parseFile(new File(confPath)); + XmlReadStrategy xmlReadStrategy = new XmlReadStrategy(); + LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT); + xmlReadStrategy.setPluginConfig(pluginConfig); + xmlReadStrategy.init(localConf); + List fileNamesByPath = xmlReadStrategy.getFileNamesByPath(xmlFilePath); + SeaTunnelRowType userDefinedSchema = + CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); + xmlReadStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); + TestCollector testCollector = new TestCollector(); + xmlReadStrategy.read(fileNamesByPath.get(0), "", testCollector); + for (SeaTunnelRow seaTunnelRow : testCollector.getRows()) { + Assertions.assertEquals(seaTunnelRow.getArity(), 15); + Assertions.assertEquals(seaTunnelRow.getField(0).getClass(), Byte.class); + Assertions.assertEquals(seaTunnelRow.getField(1).getClass(), Short.class); + Assertions.assertEquals(seaTunnelRow.getField(2).getClass(), Integer.class); + Assertions.assertEquals(seaTunnelRow.getField(3).getClass(), Long.class); + Assertions.assertEquals(seaTunnelRow.getField(4).getClass(), String.class); + Assertions.assertEquals(seaTunnelRow.getField(5).getClass(), Double.class); + Assertions.assertEquals(seaTunnelRow.getField(6).getClass(), Float.class); + Assertions.assertEquals(seaTunnelRow.getField(7).getClass(), BigDecimal.class); + Assertions.assertEquals(seaTunnelRow.getField(8).getClass(), Boolean.class); + Assertions.assertEquals(seaTunnelRow.getField(9).getClass(), LinkedHashMap.class); + Assertions.assertEquals(seaTunnelRow.getField(10).getClass(), String[].class); + Assertions.assertEquals(seaTunnelRow.getField(11).getClass(), LocalDate.class); + Assertions.assertEquals(seaTunnelRow.getField(12).getClass(), LocalDateTime.class); + Assertions.assertEquals(seaTunnelRow.getField(13).getClass(), LocalTime.class); + Assertions.assertEquals(seaTunnelRow.getField(14).getClass(), String.class); + + Assertions.assertEquals(seaTunnelRow.getField(0), (byte) 1); + Assertions.assertEquals(seaTunnelRow.getField(1), (short) 22); + Assertions.assertEquals(seaTunnelRow.getField(2), 333); + Assertions.assertEquals(seaTunnelRow.getField(3), 4444L); + Assertions.assertEquals(seaTunnelRow.getField(4), "DusayI"); + Assertions.assertEquals(seaTunnelRow.getField(5), 5.555); + Assertions.assertEquals(seaTunnelRow.getField(6), (float) 6.666); + Assertions.assertEquals(seaTunnelRow.getField(7), new BigDecimal("7.78")); + Assertions.assertEquals(seaTunnelRow.getField(8), Boolean.FALSE); + Assertions.assertEquals( + seaTunnelRow.getField(9), + new LinkedHashMap() { + { + put("name", "Ivan"); + put("age", "26"); + } + }); + Assertions.assertArrayEquals( + (String[]) seaTunnelRow.getField(10), new String[] {"Ivan", "Dusayi"}); + Assertions.assertEquals( + seaTunnelRow.getField(11), + DateUtils.parse("2024-01-31", DateUtils.Formatter.YYYY_MM_DD)); + Assertions.assertEquals( + seaTunnelRow.getField(12), + DateTimeUtils.parse( + "2024-01-31 16:00:48", DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)); + Assertions.assertEquals( + seaTunnelRow.getField(13), + TimeUtils.parse("16:00:48", TimeUtils.Formatter.HH_MM_SS)); + Assertions.assertEquals(seaTunnelRow.getField(14), "xmlTest"); + } + } + + @Getter + public static class TestCollector implements Collector { + private final List rows = new ArrayList<>(); + + @Override + public void collect(SeaTunnelRow record) { + System.out.println(record); + rows.add(record); + } + + @Override + public Object getCheckpointLock() { + return null; + } + } + + public static class LocalConf extends HadoopConf { + private static final String HDFS_IMPL = "org.apache.hadoop.fs.LocalFileSystem"; + private static final String SCHEMA = "file"; + + public LocalConf(String hdfsNameKey) { + super(hdfsNameKey); + } + + @Override + public String getFsHdfsImpl() { + return HDFS_IMPL; + } + + @Override + public String getSchema() { + return SCHEMA; + } + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/xml/name=xmlTest/test_read.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/xml/name=xmlTest/test_read.xml new file mode 100644 index 00000000000..0ffec43a150 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/xml/name=xmlTest/test_read.xml @@ -0,0 +1,24 @@ + + + + + + + diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/xml/test_read_xml.conf b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/xml/test_read_xml.conf new file mode 100644 index 00000000000..f81534b8b78 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/xml/test_read_xml.conf @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +{ + xml_row_tag = "RECORD" + xml_use_attr_format = true + schema = { + fields { + c_bytes = "tinyint" + c_short = "smallint" + c_int = "int" + c_bigint = "bigint" + c_string = "string" + c_double = "double" + c_float = "float" + c_decimal = "decimal(10, 2)" + c_boolean = "boolean" + c_map = "map" + c_array = "array" + c_date = "date" + c_datetime = "timestamp" + c_time = "time" + } + } +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java index 451221248d3..fb45c947172 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java @@ -67,6 +67,10 @@ public OptionRule optionRule() { BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSinkConfig.XML_USE_ATTR_FORMAT) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional( BaseSinkConfig.CUSTOM_FILENAME, diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java index b2a5d931a18..476a3878fe1 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java @@ -94,6 +94,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { case TEXT: case JSON: case EXCEL: + case XML: SeaTunnelRowType userDefinedSchema = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java index b92c67c4c60..6ab9ef17f48 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java @@ -51,10 +51,19 @@ public OptionRule optionRule() { BaseSourceConfigOptions.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfigOptions.FIELD_DELIMITER) + .conditional( + BaseSourceConfigOptions.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSourceConfigOptions.XML_ROW_TAG, + BaseSourceConfigOptions.XML_USE_ATTR_FORMAT) .conditional( BaseSourceConfigOptions.FILE_FORMAT_TYPE, Arrays.asList( - FileFormat.TEXT, FileFormat.JSON, FileFormat.EXCEL, FileFormat.CSV), + FileFormat.TEXT, + FileFormat.JSON, + FileFormat.EXCEL, + FileFormat.CSV, + FileFormat.XML), TableSchemaOptions.SCHEMA) .optional(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH) .optional(BaseSourceConfigOptions.DATE_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java index a3fbf886fbb..f3a12d117aa 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java @@ -67,6 +67,10 @@ public OptionRule optionRule() { BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSinkConfig.XML_USE_ATTR_FORMAT) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional( BaseSinkConfig.CUSTOM_FILENAME, diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java index b8ccb69d836..19ea0c0ba09 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java @@ -99,6 +99,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { case TEXT: case JSON: case EXCEL: + case XML: SeaTunnelRowType userDefinedSchema = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java index 529c93a3f79..249deac26da 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java @@ -51,10 +51,19 @@ public OptionRule optionRule() { BaseSourceConfigOptions.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfigOptions.FIELD_DELIMITER) + .conditional( + BaseSourceConfigOptions.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSourceConfigOptions.XML_ROW_TAG, + BaseSourceConfigOptions.XML_USE_ATTR_FORMAT) .conditional( BaseSourceConfigOptions.FILE_FORMAT_TYPE, Arrays.asList( - FileFormat.TEXT, FileFormat.JSON, FileFormat.EXCEL, FileFormat.CSV), + FileFormat.TEXT, + FileFormat.JSON, + FileFormat.EXCEL, + FileFormat.CSV, + FileFormat.XML), TableSchemaOptions.SCHEMA) .optional(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH) .optional(BaseSourceConfigOptions.DATE_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java index f5243edbda8..fa2f872b0e8 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java @@ -64,6 +64,10 @@ public OptionRule optionRule() { BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSinkConfig.XML_USE_ATTR_FORMAT) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional( BaseSinkConfig.CUSTOM_FILENAME, diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java index 82db2773ee3..e02f7ad42c2 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java @@ -48,10 +48,19 @@ public OptionRule optionRule() { BaseSourceConfigOptions.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfigOptions.FIELD_DELIMITER) + .conditional( + BaseSourceConfigOptions.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSourceConfigOptions.XML_ROW_TAG, + BaseSourceConfigOptions.XML_USE_ATTR_FORMAT) .conditional( BaseSourceConfigOptions.FILE_FORMAT_TYPE, Arrays.asList( - FileFormat.TEXT, FileFormat.JSON, FileFormat.EXCEL, FileFormat.CSV), + FileFormat.TEXT, + FileFormat.JSON, + FileFormat.EXCEL, + FileFormat.CSV, + FileFormat.XML), TableSchemaOptions.SCHEMA) .optional(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH) .optional(BaseSourceConfigOptions.DATE_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java index 4adf578b697..60a426ccb90 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java @@ -67,6 +67,10 @@ public OptionRule optionRule() { BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSinkConfig.XML_USE_ATTR_FORMAT) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional( BaseSinkConfig.CUSTOM_FILENAME, diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java index 857e8d89ff1..7d73f16e7b6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java @@ -95,6 +95,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { case TEXT: case JSON: case EXCEL: + case XML: SeaTunnelRowType userDefinedSchema = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java index df640e44bd8..5a31832b33f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -51,10 +51,19 @@ public OptionRule optionRule() { BaseSourceConfigOptions.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfigOptions.FIELD_DELIMITER) + .conditional( + BaseSourceConfigOptions.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSourceConfigOptions.XML_ROW_TAG, + BaseSourceConfigOptions.XML_USE_ATTR_FORMAT) .conditional( BaseSourceConfigOptions.FILE_FORMAT_TYPE, Arrays.asList( - FileFormat.TEXT, FileFormat.JSON, FileFormat.EXCEL, FileFormat.CSV), + FileFormat.TEXT, + FileFormat.JSON, + FileFormat.EXCEL, + FileFormat.CSV, + FileFormat.XML), TableSchemaOptions.SCHEMA) .optional(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH) .optional(BaseSourceConfigOptions.DATE_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java index f65a93f9095..770e8866b54 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java @@ -70,6 +70,10 @@ public OptionRule optionRule() { BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSinkConfig.XML_USE_ATTR_FORMAT) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional( BaseSinkConfig.CUSTOM_FILENAME, diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java index 44d5dab2015..450561a6081 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java @@ -57,10 +57,19 @@ public OptionRule optionRule() { BaseSourceConfigOptions.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfigOptions.FIELD_DELIMITER) + .conditional( + BaseSourceConfigOptions.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSourceConfigOptions.XML_ROW_TAG, + BaseSourceConfigOptions.XML_USE_ATTR_FORMAT) .conditional( BaseSourceConfigOptions.FILE_FORMAT_TYPE, Arrays.asList( - FileFormat.TEXT, FileFormat.JSON, FileFormat.EXCEL, FileFormat.CSV), + FileFormat.TEXT, + FileFormat.JSON, + FileFormat.EXCEL, + FileFormat.CSV, + FileFormat.XML), TableSchemaOptions.SCHEMA) .optional(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH) .optional(BaseSourceConfigOptions.DATE_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java index 49b5ff8bfa4..5931a46977b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java @@ -81,6 +81,10 @@ public OptionRule optionRule() { BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSinkConfig.XML_USE_ATTR_FORMAT) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional( BaseSinkConfig.CUSTOM_FILENAME, diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java index b332d99d472..6fb4a55c5d2 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -64,10 +64,19 @@ public OptionRule optionRule() { BaseSourceConfigOptions.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfigOptions.FIELD_DELIMITER) + .conditional( + BaseSourceConfigOptions.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSourceConfigOptions.XML_ROW_TAG, + BaseSourceConfigOptions.XML_USE_ATTR_FORMAT) .conditional( BaseSourceConfigOptions.FILE_FORMAT_TYPE, Arrays.asList( - FileFormat.TEXT, FileFormat.JSON, FileFormat.EXCEL, FileFormat.CSV), + FileFormat.TEXT, + FileFormat.JSON, + FileFormat.EXCEL, + FileFormat.CSV, + FileFormat.XML), TableSchemaOptions.SCHEMA) .optional(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH) .optional(BaseSourceConfigOptions.DATE_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java index d366449356c..3a4e3df2fb8 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java @@ -86,6 +86,10 @@ public OptionRule optionRule() { BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSinkConfig.XML_USE_ATTR_FORMAT) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional( BaseSinkConfig.CUSTOM_FILENAME, diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java index aa1e0cb952d..f36e935818c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java @@ -91,6 +91,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { case TEXT: case JSON: case EXCEL: + case XML: SeaTunnelRowType userDefinedSchema = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java index 552bff51fc9..ebd752fbf09 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java @@ -56,10 +56,19 @@ public OptionRule optionRule() { BaseSourceConfigOptions.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfigOptions.FIELD_DELIMITER) + .conditional( + BaseSourceConfigOptions.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSourceConfigOptions.XML_ROW_TAG, + BaseSourceConfigOptions.XML_USE_ATTR_FORMAT) .conditional( BaseSourceConfigOptions.FILE_FORMAT_TYPE, Arrays.asList( - FileFormat.TEXT, FileFormat.JSON, FileFormat.EXCEL, FileFormat.CSV), + FileFormat.TEXT, + FileFormat.JSON, + FileFormat.EXCEL, + FileFormat.CSV, + FileFormat.XML), TableSchemaOptions.SCHEMA) .optional(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH) .optional(BaseSourceConfigOptions.DATE_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java index 3fe6e3abea5..2dc2c29bd99 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java @@ -67,6 +67,10 @@ public OptionRule optionRule() { BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) + .conditional( + BaseSinkConfig.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSinkConfig.XML_USE_ATTR_FORMAT) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional( BaseSinkConfig.CUSTOM_FILENAME, diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java index 5efaea3137b..0ccee6c281e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java @@ -75,7 +75,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) { throw new FileConnectorException( CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - "Sftp file source connector only support read [text, csv, json] files"); + "Sftp file source connector only support read [text, csv, json, xml] files"); } String path = pluginConfig.getString(SftpConfigOptions.FILE_PATH.key()); hadoopConf = SftpConf.buildWithConfig(pluginConfig); @@ -99,6 +99,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { case TEXT: case JSON: case EXCEL: + case XML: SeaTunnelRowType userDefinedSchema = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java index 939ecc985f0..f66db3996b7 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java @@ -51,10 +51,19 @@ public OptionRule optionRule() { BaseSourceConfigOptions.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfigOptions.FIELD_DELIMITER) + .conditional( + BaseSourceConfigOptions.FILE_FORMAT_TYPE, + FileFormat.XML, + BaseSourceConfigOptions.XML_ROW_TAG, + BaseSourceConfigOptions.XML_USE_ATTR_FORMAT) .conditional( BaseSourceConfigOptions.FILE_FORMAT_TYPE, Arrays.asList( - FileFormat.TEXT, FileFormat.JSON, FileFormat.EXCEL, FileFormat.CSV), + FileFormat.TEXT, + FileFormat.JSON, + FileFormat.EXCEL, + FileFormat.CSV, + FileFormat.XML), TableSchemaOptions.SCHEMA) .optional(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH) .optional(BaseSourceConfigOptions.DATE_FORMAT) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java index e5fbcb5f5ef..9645268882e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java @@ -93,6 +93,11 @@ public void startUp() throws Exception { "/home/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx", sftpContainer); + ContainerUtil.copyFileIntoContainers( + "/xml/e2e.xml", + "/home/seatunnel/tmp/seatunnel/read/xml/name=tyrantlucifer/hobby=coding/e2e.xml", + sftpContainer); + sftpContainer.execInContainer("sh", "-c", "chown -R seatunnel /home/seatunnel/tmp/"); } @@ -120,6 +125,10 @@ public void testSftpFileReadAndWrite(TestContainer container) helper.execute("/json/fake_to_sftp_file_json.conf"); // test read sftp json file helper.execute("/json/sftp_file_json_to_assert.conf"); + // test write sftp xml file + helper.execute("/xml/fake_to_sftp_file_xml.conf"); + // test read sftp xml file + helper.execute("/xml/sftp_file_xml_to_assert.conf"); } @AfterAll diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/e2e.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/e2e.xml new file mode 100644 index 00000000000..0ffec43a150 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/e2e.xml @@ -0,0 +1,24 @@ + + + + + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/fake_to_sftp_file_xml.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/fake_to_sftp_file_xml.conf new file mode 100644 index 00000000000..9b25ec0446b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/fake_to_sftp_file_xml.conf @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + result_table_name = "sftp" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/seatunnel/xml" + source_table_name = "sftp" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "xml" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + xml_root_tag = "RECORDS" + xml_row_tag = "RECORD" + xml_use_attr_format = true + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/sftp_file_xml_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/sftp_file_xml_to_assert.conf new file mode 100644 index 00000000000..f9c26128ce8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/xml/sftp_file_xml_to_assert.conf @@ -0,0 +1,121 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/seatunnel/read/xml" + file_format_type = "xml" + result_table_name = "sftp" + xml_row_tag = "RECORD" + xml_use_attr_format = true + schema = { + fields { + c_bytes = "tinyint" + c_short = "smallint" + c_int = "int" + c_bigint = "bigint" + c_string = "string" + c_double = "double" + c_float = "float" + c_decimal = "decimal(10, 2)" + c_boolean = "boolean" + c_map = "map" + c_array = "array" + c_date = "date" + c_datetime = "timestamp" + c_time = "time" + } + } + } +} + +sink { + Assert { + result_table_name = "sftp" + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 1 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = hobby + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file