Skip to content

Commit 0ecd790

Browse files
[Improve][Connector-V2] Improve text write (#2971)
* [Improve][Connector-V2] Improve write text files * [Improve][Connector-V2][File] Improve e2e test case * [Docs][Connector-V2-status] Fix dead link * [Improve][Connector-V2][File] Fix e2e test cases * [Improve][Connector-V2][File] Solved the bug of one more line break at the end of the file
1 parent 5b9033e commit 0ecd790

File tree

5 files changed

+95
-37
lines changed

5 files changed

+95
-37
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
import static com.google.common.base.Preconditions.checkNotNull;
2121

22+
import org.apache.seatunnel.common.utils.DateTimeUtils;
23+
import org.apache.seatunnel.common.utils.DateUtils;
24+
import org.apache.seatunnel.common.utils.TimeUtils;
25+
2226
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2327

2428
import lombok.Data;
@@ -31,16 +35,15 @@
3135
@Data
3236
public class BaseTextFileConfig implements DelimiterConfig, CompressConfig, Serializable {
3337
private static final long serialVersionUID = 1L;
34-
3538
protected String compressCodec;
36-
3739
protected String fieldDelimiter = String.valueOf('\001');
38-
3940
protected String rowDelimiter = "\n";
40-
4141
protected String path;
4242
protected String fileNameExpression;
4343
protected FileFormat fileFormat = FileFormat.TEXT;
44+
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
45+
protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
46+
protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
4447

4548
public BaseTextFileConfig(@NonNull Config config) {
4649
if (config.hasPath(Constant.COMPRESS_CODEC)) {
@@ -67,6 +70,18 @@ public BaseTextFileConfig(@NonNull Config config) {
6770
if (config.hasPath(Constant.FILE_FORMAT) && !StringUtils.isBlank(config.getString(Constant.FILE_FORMAT))) {
6871
this.fileFormat = FileFormat.valueOf(config.getString(Constant.FILE_FORMAT).toUpperCase(Locale.ROOT));
6972
}
73+
74+
if (config.hasPath(Constant.DATE_FORMAT)) {
75+
dateFormat = DateUtils.Formatter.parse(config.getString(Constant.DATE_FORMAT));
76+
}
77+
78+
if (config.hasPath(Constant.DATETIME_FORMAT)) {
79+
datetimeFormat = DateTimeUtils.Formatter.parse(config.getString(Constant.DATETIME_FORMAT));
80+
}
81+
82+
if (config.hasPath(Constant.TIME_FORMAT)) {
83+
timeFormat = TimeUtils.Formatter.parse(config.getString(Constant.TIME_FORMAT));
84+
}
7085
}
7186

7287
public BaseTextFileConfig() {}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/Constant.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ public class Constant {
2222
public static final String NON_PARTITION = "NON_PARTITION";
2323
public static final String TRANSACTION_ID_SPLIT = "_";
2424
public static final String TRANSACTION_EXPRESSION = "transactionId";
25-
2625
public static final String SAVE_MODE = "save_mode";
2726
public static final String COMPRESS_CODEC = "compress_codec";
28-
27+
public static final String DATE_FORMAT = "date_format";
28+
public static final String DATETIME_FORMAT = "datetime_format";
29+
public static final String TIME_FORMAT = "time_format";
2930
public static final String PATH = "path";
3031
public static final String FIELD_DELIMITER = "field_delimiter";
3132
public static final String ROW_DELIMITER = "row_delimiter";

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
3535
private final byte[] rowDelimiter;
3636
private SerializationSchema serializationSchema;
3737
private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
38+
private final Map<String, Boolean> isFirstWrite;
3839

3940
public JsonWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
4041
super(textFileSinkConfig);
4142
this.beingWrittenOutputStream = new HashMap<>();
43+
this.isFirstWrite = new HashMap<>();
4244
this.rowDelimiter = textFileSinkConfig.getRowDelimiter().getBytes();
4345
}
4446

@@ -54,8 +56,12 @@ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
5456
FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
5557
try {
5658
byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
59+
if (isFirstWrite.get(filePath)) {
60+
isFirstWrite.put(filePath, false);
61+
} else {
62+
fsDataOutputStream.write(rowDelimiter);
63+
}
5764
fsDataOutputStream.write(rowBytes);
58-
fsDataOutputStream.write(rowDelimiter);
5965
} catch (IOException e) {
6066
log.error("write data to file {} error", filePath);
6167
throw new RuntimeException(e);
@@ -88,6 +94,7 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
8894
try {
8995
fsDataOutputStream = FileSystemUtils.getOutputStream(filePath);
9096
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
97+
isFirstWrite.put(filePath, true);
9198
} catch (IOException e) {
9299
log.error("can not get output file stream");
93100
throw new RuntimeException(e);

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,39 +17,67 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
1919

20+
import org.apache.seatunnel.api.serialization.SerializationSchema;
2021
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
23+
import org.apache.seatunnel.common.utils.DateTimeUtils;
24+
import org.apache.seatunnel.common.utils.DateUtils;
25+
import org.apache.seatunnel.common.utils.TimeUtils;
2126
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
2227
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
28+
import org.apache.seatunnel.format.text.TextSerializationSchema;
2329

2430
import lombok.NonNull;
2531
import org.apache.hadoop.fs.FSDataOutputStream;
2632

2733
import java.io.IOException;
28-
import java.util.Arrays;
2934
import java.util.HashMap;
3035
import java.util.Map;
31-
import java.util.stream.Collectors;
3236

3337
public class TextWriteStrategy extends AbstractWriteStrategy {
3438
private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
39+
private final Map<String, Boolean> isFirstWrite;
3540
private final String fieldDelimiter;
3641
private final String rowDelimiter;
42+
private final DateUtils.Formatter dateFormat;
43+
private final DateTimeUtils.Formatter dateTimeFormat;
44+
private final TimeUtils.Formatter timeFormat;
45+
private SerializationSchema serializationSchema;
3746

3847
public TextWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
3948
super(textFileSinkConfig);
4049
this.beingWrittenOutputStream = new HashMap<>();
50+
this.isFirstWrite = new HashMap<>();
4151
this.fieldDelimiter = textFileSinkConfig.getFieldDelimiter();
4252
this.rowDelimiter = textFileSinkConfig.getRowDelimiter();
53+
this.dateFormat = textFileSinkConfig.getDateFormat();
54+
this.dateTimeFormat = textFileSinkConfig.getDatetimeFormat();
55+
this.timeFormat = textFileSinkConfig.getTimeFormat();
56+
}
57+
58+
@Override
59+
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
60+
super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
61+
this.serializationSchema = TextSerializationSchema.builder()
62+
.seaTunnelRowType(seaTunnelRowType)
63+
.delimiter(fieldDelimiter)
64+
.dateFormatter(dateFormat)
65+
.dateTimeFormatter(dateTimeFormat)
66+
.timeFormatter(timeFormat)
67+
.build();
4368
}
4469

4570
@Override
4671
public void write(@NonNull SeaTunnelRow seaTunnelRow) {
4772
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
4873
FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
49-
String line = transformRowToLine(seaTunnelRow);
5074
try {
51-
fsDataOutputStream.write(line.getBytes());
52-
fsDataOutputStream.write(rowDelimiter.getBytes());
75+
if (isFirstWrite.get(filePath)) {
76+
isFirstWrite.put(filePath, false);
77+
} else {
78+
fsDataOutputStream.write(rowDelimiter.getBytes());
79+
}
80+
fsDataOutputStream.write(serializationSchema.serialize(seaTunnelRow));
5381
} catch (IOException e) {
5482
log.error("write data to file {} error", filePath);
5583
throw new RuntimeException(e);
@@ -81,29 +109,12 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
81109
try {
82110
fsDataOutputStream = FileSystemUtils.getOutputStream(filePath);
83111
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
112+
isFirstWrite.put(filePath, true);
84113
} catch (IOException e) {
85114
log.error("can not get output file stream");
86115
throw new RuntimeException(e);
87116
}
88117
}
89118
return fsDataOutputStream;
90119
}
91-
92-
private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) {
93-
return Arrays.stream(seaTunnelRow.getFields()).map(v -> {
94-
if (v == null) {
95-
return "";
96-
} else if (v.getClass().isArray()) {
97-
if (v instanceof byte[]) {
98-
return Arrays.toString((byte[]) v);
99-
} else {
100-
return Arrays.toString((Object[]) v);
101-
}
102-
} else if (v instanceof SeaTunnelRow) {
103-
return "{" + transformRowToLine((SeaTunnelRow) v) + "}";
104-
} else {
105-
return v.toString();
106-
}
107-
}).collect(Collectors.joining(fieldDelimiter));
108-
}
109120
}

seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_local_text.conf

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,36 @@ source {
3030
result_table_name = "fake"
3131
schema = {
3232
fields {
33-
name = "string"
34-
age = "int"
33+
c_map = "map<string, string>"
34+
c_array = "array<int>"
35+
c_string = string
36+
c_boolean = boolean
37+
c_tinyint = tinyint
38+
c_smallint = smallint
39+
c_int = int
40+
c_bigint = bigint
41+
c_float = float
42+
c_double = double
43+
c_bytes = bytes
44+
c_date = date
45+
c_decimal = "decimal(30, 8)"
46+
c_timestamp = timestamp
47+
c_row = {
48+
c_map = "map<string, string>"
49+
c_array = "array<int>"
50+
c_string = string
51+
c_boolean = boolean
52+
c_tinyint = tinyint
53+
c_smallint = smallint
54+
c_int = int
55+
c_bigint = bigint
56+
c_float = float
57+
c_double = double
58+
c_bytes = bytes
59+
c_date = date
60+
c_decimal = "decimal(30, 8)"
61+
c_timestamp = timestamp
62+
}
3563
}
3664
}
3765
}
@@ -42,7 +70,7 @@ source {
4270

4371
transform {
4472
sql {
45-
sql = "select name,age from fake"
73+
sql = "select * from fake"
4674
}
4775

4876
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
@@ -54,12 +82,8 @@ sink {
5482
path="/tmp/test/text"
5583
field_delimiter="\t"
5684
row_delimiter="\n"
57-
partition_by=["age"]
58-
partition_dir_expression="${k0}=${v0}"
59-
is_partition_field_write_in_file=true
6085
file_name_expression="${transactionId}_${now}"
6186
file_format="text"
62-
sink_columns=["name","age"]
6387
filename_time_format="yyyy.MM.dd"
6488
is_enable_transaction=true
6589
save_mode="error"

0 commit comments

Comments
 (0)