Skip to content

Commit 9268f5a

Browse files
authored
[Fix][Connector-V2] Fix CSV String type write type (#8499)
1 parent 2bfb97e commit 9268f5a

File tree

21 files changed

+1812
-29
lines changed

21 files changed

+1812
-29
lines changed

.licenserc.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ header:
3838
- '**/*.ini'
3939
- '**/*.svg'
4040
- '**/*.txt'
41+
- '**/*.csv'
4142
- '**/.gitignore'
4243
- '**/LICENSE'
4344
- '**/NOTICE'

seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@
7575
<version>${project.version}</version>
7676
</dependency>
7777

78+
<dependency>
79+
<groupId>org.apache.seatunnel</groupId>
80+
<artifactId>seatunnel-format-csv</artifactId>
81+
<version>${project.version}</version>
82+
</dependency>
83+
7884
<dependency>
7985
<groupId>org.apache.seatunnel</groupId>
8086
<artifactId>connector-common</artifactId>

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
2121
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.BinaryWriteStrategy;
22+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.CsvWriteStrategy;
2223
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ExcelWriteStrategy;
2324
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.JsonWriteStrategy;
2425
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
@@ -27,6 +28,7 @@
2728
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
2829
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.XmlWriteStrategy;
2930
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy;
31+
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.CsvReadStrategy;
3032
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy;
3133
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
3234
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
@@ -43,12 +45,12 @@ public enum FileFormat implements Serializable {
4345
@Override
4446
public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
4547
fileSinkConfig.setFieldDelimiter(",");
46-
return new TextWriteStrategy(fileSinkConfig);
48+
return new CsvWriteStrategy(fileSinkConfig);
4749
}
4850

4951
@Override
5052
public ReadStrategy getReadStrategy() {
51-
return new TextReadStrategy();
53+
return new CsvReadStrategy();
5254
}
5355
},
5456
TEXT("txt") {
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
19+
20+
import org.apache.seatunnel.api.serialization.SerializationSchema;
21+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
23+
import org.apache.seatunnel.common.exception.CommonError;
24+
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
25+
import org.apache.seatunnel.common.utils.DateTimeUtils;
26+
import org.apache.seatunnel.common.utils.DateUtils;
27+
import org.apache.seatunnel.common.utils.EncodingUtils;
28+
import org.apache.seatunnel.common.utils.TimeUtils;
29+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
30+
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
31+
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
32+
import org.apache.seatunnel.format.csv.CsvSerializationSchema;
33+
34+
import org.apache.hadoop.fs.FSDataOutputStream;
35+
36+
import io.airlift.compress.lzo.LzopCodec;
37+
import lombok.NonNull;
38+
39+
import java.io.IOException;
40+
import java.io.OutputStream;
41+
import java.nio.charset.Charset;
42+
import java.util.HashMap;
43+
import java.util.LinkedHashMap;
44+
import java.util.Map;
45+
46+
public class CsvWriteStrategy extends AbstractWriteStrategy<FSDataOutputStream> {
47+
private final LinkedHashMap<String, FSDataOutputStream> beingWrittenOutputStream;
48+
private final Map<String, Boolean> isFirstWrite;
49+
private final String fieldDelimiter;
50+
private final String rowDelimiter;
51+
private final DateUtils.Formatter dateFormat;
52+
private final DateTimeUtils.Formatter dateTimeFormat;
53+
private final TimeUtils.Formatter timeFormat;
54+
private final FileFormat fileFormat;
55+
private final Boolean enableHeaderWriter;
56+
private final Charset charset;
57+
private SerializationSchema serializationSchema;
58+
59+
public CsvWriteStrategy(FileSinkConfig fileSinkConfig) {
60+
super(fileSinkConfig);
61+
this.beingWrittenOutputStream = new LinkedHashMap<>();
62+
this.isFirstWrite = new HashMap<>();
63+
this.fieldDelimiter = fileSinkConfig.getFieldDelimiter();
64+
this.rowDelimiter = fileSinkConfig.getRowDelimiter();
65+
this.dateFormat = fileSinkConfig.getDateFormat();
66+
this.dateTimeFormat = fileSinkConfig.getDatetimeFormat();
67+
this.timeFormat = fileSinkConfig.getTimeFormat();
68+
this.fileFormat = fileSinkConfig.getFileFormat();
69+
this.enableHeaderWriter = fileSinkConfig.getEnableHeaderWriter();
70+
this.charset = EncodingUtils.tryParseCharset(fileSinkConfig.getEncoding());
71+
}
72+
73+
@Override
74+
public void setCatalogTable(CatalogTable catalogTable) {
75+
super.setCatalogTable(catalogTable);
76+
this.serializationSchema =
77+
CsvSerializationSchema.builder()
78+
.seaTunnelRowType(
79+
buildSchemaWithRowType(
80+
catalogTable.getSeaTunnelRowType(), sinkColumnsIndexInRow))
81+
.delimiter(fieldDelimiter)
82+
.dateFormatter(dateFormat)
83+
.dateTimeFormatter(dateTimeFormat)
84+
.timeFormatter(timeFormat)
85+
.charset(charset)
86+
.build();
87+
}
88+
89+
@Override
90+
public void write(@NonNull SeaTunnelRow seaTunnelRow) {
91+
super.write(seaTunnelRow);
92+
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
93+
FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
94+
try {
95+
if (isFirstWrite.get(filePath)) {
96+
isFirstWrite.put(filePath, false);
97+
} else {
98+
fsDataOutputStream.write(rowDelimiter.getBytes(charset));
99+
}
100+
fsDataOutputStream.write(
101+
serializationSchema.serialize(
102+
seaTunnelRow.copy(
103+
sinkColumnsIndexInRow.stream()
104+
.mapToInt(Integer::intValue)
105+
.toArray())));
106+
} catch (IOException e) {
107+
throw CommonError.fileOperationFailed("CsvFile", "write", filePath, e);
108+
}
109+
}
110+
111+
@Override
112+
public void finishAndCloseFile() {
113+
beingWrittenOutputStream.forEach(
114+
(key, value) -> {
115+
try {
116+
value.flush();
117+
} catch (IOException e) {
118+
throw new FileConnectorException(
119+
CommonErrorCodeDeprecated.FLUSH_DATA_FAILED,
120+
String.format("Flush data to this file [%s] failed", key),
121+
e);
122+
} finally {
123+
try {
124+
value.close();
125+
} catch (IOException e) {
126+
log.error("error when close output stream {}", key, e);
127+
}
128+
}
129+
needMoveFiles.put(key, getTargetLocation(key));
130+
});
131+
beingWrittenOutputStream.clear();
132+
isFirstWrite.clear();
133+
}
134+
135+
@Override
136+
public FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
137+
FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
138+
if (fsDataOutputStream == null) {
139+
try {
140+
switch (compressFormat) {
141+
case LZO:
142+
LzopCodec lzo = new LzopCodec();
143+
OutputStream out =
144+
lzo.createOutputStream(
145+
hadoopFileSystemProxy.getOutputStream(filePath));
146+
fsDataOutputStream = new FSDataOutputStream(out, null);
147+
enableWriteHeader(fsDataOutputStream);
148+
break;
149+
case NONE:
150+
fsDataOutputStream = hadoopFileSystemProxy.getOutputStream(filePath);
151+
enableWriteHeader(fsDataOutputStream);
152+
break;
153+
default:
154+
log.warn(
155+
"Csv file does not support this compress type: {}",
156+
compressFormat.getCompressCodec());
157+
fsDataOutputStream = hadoopFileSystemProxy.getOutputStream(filePath);
158+
enableWriteHeader(fsDataOutputStream);
159+
break;
160+
}
161+
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
162+
isFirstWrite.put(filePath, true);
163+
} catch (IOException e) {
164+
throw CommonError.fileOperationFailed("CsvFile", "open", filePath, e);
165+
}
166+
}
167+
return fsDataOutputStream;
168+
}
169+
170+
private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOException {
171+
if (enableHeaderWriter) {
172+
fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes());
173+
fsDataOutputStream.write(rowDelimiter.getBytes());
174+
}
175+
}
176+
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,7 @@ public FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
170170
private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOException {
171171
if (enableHeaderWriter) {
172172
fsDataOutputStream.write(
173-
String.join(
174-
FileFormat.CSV.equals(fileFormat) ? "," : fieldDelimiter,
175-
seaTunnelRowType.getFieldNames())
176-
.getBytes());
173+
String.join(fieldDelimiter, seaTunnelRowType.getFieldNames()).getBytes());
177174
fsDataOutputStream.write(rowDelimiter.getBytes());
178175
}
179176
}

0 commit comments

Comments
 (0)