Skip to content

Commit f39e3a5

Browse files
[Improve][Connector-V2][File] Support split file based on batch size (#3625)
1 parent 58e5604 commit f39e3a5

File tree

16 files changed

+129
-13
lines changed

16 files changed

+129
-13
lines changed

docs/en/connector-v2/sink/FtpFile.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
3939
| is_partition_field_write_in_file | boolean | no | false |
4040
| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
4141
| is_enable_transaction | boolean | no | true |
42+
| batch_size | int | no | 1000000 |
4243
| common-options | | no | - |
4344

4445
### host [string]
@@ -127,6 +128,10 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
127128

128129
Only support `true` now.
129130

131+
### batch_size [int]
132+
133+
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
134+
130135
### common options
131136

132137
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -173,3 +178,5 @@ FtpFile {
173178
- When field from upstream is null it will throw NullPointerException
174179
- Sink columns mapping failed
175180
- When restore writer from states getting transaction directly failed
181+
182+
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))

docs/en/connector-v2/sink/HdfsFile.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ In order to use this connector, You must ensure your spark/flink cluster already
3838
| is_partition_field_write_in_file | boolean | no | false |
3939
| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
4040
| is_enable_transaction | boolean | no | true |
41+
| batch_size | int | no | 1000000 |
4142
| common-options | | no | - |
4243

4344
### fs.defaultFS [string]
@@ -115,6 +116,10 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
115116

116117
Only support `true` now.
117118

119+
### batch_size [int]
120+
121+
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
122+
118123
### common options
119124

120125
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
@@ -195,4 +200,6 @@ HdfsFile {
195200
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
196201
- When field from upstream is null it will throw NullPointerException
197202
- Sink columns mapping failed
198-
- When restore writer from states getting transaction directly failed
203+
- When restore writer from states getting transaction directly failed
204+
205+
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))

docs/en/connector-v2/sink/LocalFile.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ By default, we use 2PC commit to ensure `exactly-once`
3535
| is_partition_field_write_in_file | boolean | no | false |
3636
| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
3737
| is_enable_transaction | boolean | no | true |
38+
| batch_size | int | no | 1000000 |
3839
| common-options | | no | - |
3940

4041
### path [string]
@@ -108,6 +109,10 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
108109

109110
Only support `true` now.
110111

112+
### batch_size [int]
113+
114+
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
115+
111116
### common options
112117

113118
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -186,3 +191,5 @@ LocalFile {
186191
- When field from upstream is null it will throw NullPointerException
187192
- Sink columns mapping failed
188193
- When restore writer from states getting transaction directly failed
194+
195+
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))

docs/en/connector-v2/sink/OssFile.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
4242
| is_partition_field_write_in_file | boolean | no | false |
4343
| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
4444
| is_enable_transaction | boolean | no | true |
45+
| batch_size | int | no | 1000000 |
4546
| common-options | | no | - |
4647

4748
### path [string]
@@ -131,6 +132,10 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
131132

132133
Only support `true` now.
133134

135+
### batch_size [int]
136+
137+
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
138+
134139
### common options
135140

136141
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -224,4 +229,6 @@ For orc file format
224229
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
225230
- When field from upstream is null it will throw NullPointerException
226231
- Sink columns mapping failed
227-
- When restore writer from states getting transaction directly failed
232+
- When restore writer from states getting transaction directly failed
233+
234+
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))

docs/en/connector-v2/sink/S3File.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ By default, we use 2PC commit to ensure `exactly-once`
4343
| is_partition_field_write_in_file | boolean | no | false |
4444
| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
4545
| is_enable_transaction | boolean | no | true |
46+
| batch_size | int | no | 1000000 |
4647
| common-options | | no | - |
4748

4849
### path [string]
@@ -137,6 +138,10 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
137138

138139
Only support `true` now.
139140

141+
### batch_size [int]
142+
143+
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
144+
140145
### common options
141146

142147
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -229,4 +234,5 @@ For orc file format
229234
- [Feature] Support S3A protocol ([3632](https://github.com/apache/incubator-seatunnel/pull/3632))
230235
- Allow user to add additional hadoop-s3 parameters
231236
- Allow the use of the s3a protocol
232-
- Decouple hadoop-aws dependencies
237+
- Decouple hadoop-aws dependencies
238+
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))

docs/en/connector-v2/sink/SftpFile.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
3939
| is_partition_field_write_in_file | boolean | no | false |
4040
| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
4141
| is_enable_transaction | boolean | no | true |
42+
| batch_size | int | no | 1000000 |
4243
| common-options | | no | - |
4344

4445
### host [string]
@@ -127,6 +128,10 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
127128

128129
Only support `true` now.
129130

131+
### batch_size [int]
132+
133+
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
134+
130135
### common options
131136

132137
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -165,4 +170,6 @@ SftpFile {
165170
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
166171
- When field from upstream is null it will throw NullPointerException
167172
- Sink columns mapping failed
168-
- When restore writer from states getting transaction directly failed
173+
- When restore writer from states getting transaction directly failed
174+
175+
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class BaseSinkConfig {
3535
public static final String DEFAULT_PARTITION_DIR_EXPRESSION = "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/";
3636
public static final String DEFAULT_TMP_PATH = "/tmp/seatunnel";
3737
public static final String DEFAULT_FILE_NAME_EXPRESSION = "${transactionId}";
38+
public static final int DEFAULT_BATCH_SIZE = 1000000;
3839

3940
public static final Option<String> COMPRESS_CODEC = Options.key("compress_codec")
4041
.stringType()
@@ -107,4 +108,8 @@ public class BaseSinkConfig {
107108
.booleanType()
108109
.defaultValue(true)
109110
.withDescription("If or not enable transaction");
111+
public static final Option<Integer> BATCH_SIZE = Options.key("batch_size")
112+
.intType()
113+
.defaultValue(DEFAULT_BATCH_SIZE)
114+
.withDescription("The batch size of each split file");
110115
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class BaseTextFileConfig implements DelimiterConfig, CompressConfig, Seri
4040
protected String compressCodec;
4141
protected String fieldDelimiter = BaseSinkConfig.FIELD_DELIMITER.defaultValue();
4242
protected String rowDelimiter = BaseSinkConfig.ROW_DELIMITER.defaultValue();
43+
protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue();
4344
protected String path;
4445
protected String fileNameExpression;
4546
protected FileFormat fileFormat = FileFormat.TEXT;
@@ -52,7 +53,9 @@ public BaseTextFileConfig(@NonNull Config config) {
5253
throw new FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
5354
"Compress not supported by SeaTunnel file connector now");
5455
}
55-
56+
if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
57+
this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
58+
}
5659
if (config.hasPath(BaseSinkConfig.FIELD_DELIMITER.key()) &&
5760
StringUtils.isNotEmpty(config.getString(BaseSinkConfig.FIELD_DELIMITER.key()))) {
5861
this.fieldDelimiter = config.getString(BaseSinkConfig.FIELD_DELIMITER.key());

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,21 @@
2222
import org.apache.seatunnel.common.exception.CommonErrorCode;
2323
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2424
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
25+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
2526
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
27+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
2628
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
29+
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
2730
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
2831

32+
import org.apache.hadoop.fs.Path;
33+
2934
import java.io.IOException;
3035
import java.util.Collections;
36+
import java.util.HashMap;
3137
import java.util.List;
3238
import java.util.Optional;
39+
import java.util.stream.Collectors;
3340

3441
public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> {
3542
private final WriteStrategy writeStrategy;
@@ -46,8 +53,31 @@ public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, Si
4653
this.subTaskIndex = context.getIndexOfSubtask();
4754
writeStrategy.init(hadoopConf, jobId, subTaskIndex);
4855
if (!fileSinkStates.isEmpty()) {
49-
List<String> transactionIds = writeStrategy.getTransactionIdFromStates(fileSinkStates);
50-
transactionIds.forEach(writeStrategy::abortPrepare);
56+
try {
57+
List<Path> paths = FileSystemUtils.dirList(writeStrategy.getFileSinkConfig().getTmpPath());
58+
List<String> transactions = paths.stream().map(Path::getName).collect(Collectors.toList());
59+
FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(hadoopConf);
60+
HashMap<String, FileSinkState> fileStatesMap = new HashMap<>();
61+
fileSinkStates.forEach(fileSinkState ->
62+
fileStatesMap.put(fileSinkState.getTransactionId(), fileSinkState));
63+
for (String transaction : transactions) {
64+
if (fileStatesMap.containsKey(transaction)) {
65+
// need commit
66+
FileSinkState fileSinkState = fileStatesMap.get(transaction);
67+
FileAggregatedCommitInfo fileCommitInfo = fileSinkAggregatedCommitter
68+
.combine(Collections.singletonList(new FileCommitInfo(fileSinkState.getNeedMoveFiles(),
69+
fileSinkState.getPartitionDirAndValuesMap(),
70+
fileSinkState.getTransactionDir())));
71+
fileSinkAggregatedCommitter.commit(Collections.singletonList(fileCommitInfo));
72+
} else {
73+
// need abort
74+
writeStrategy.abortPrepare(transaction);
75+
}
76+
}
77+
} catch (IOException e) {
78+
String errorMsg = String.format("Try to process these fileStates %s failed", fileSinkStates);
79+
throw new FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
80+
}
5181
writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId() + 1);
5282
} else {
5383
writeStrategy.beginTransaction(1L);

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,15 @@
2121
import lombok.Data;
2222

2323
import java.io.Serializable;
24+
import java.util.List;
25+
import java.util.Map;
2426

2527
@Data
2628
@AllArgsConstructor
2729
public class FileSinkState implements Serializable {
2830
private final String transactionId;
2931
private final Long checkpointId;
32+
private final Map<String, String> needMoveFiles;
33+
private final Map<String, List<String>> partitionDirAndValuesMap;
34+
private final String transactionDir;
3035
}

0 commit comments

Comments
 (0)