Skip to content

Commit dd68c06

Browse files
hailin0wanghailin
andauthored
[Feature][Connector-V2] Add json file sink & json format (#2385)
* [Feature][Connector-V2] Add [File]JSON sink & json format Co-authored-by: wanghailin <hailin@fiture.com>
1 parent a039407 commit dd68c06

File tree

22 files changed

+986
-5
lines changed

22 files changed

+986
-5
lines changed

docs/en/connector-v2/sink/HdfsFile.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
4040

4141
We supported as the following file types:
4242

43-
`text` `csv` `parquet` `orc`
43+
`text` `csv` `parquet` `orc` `json`
4444

4545
Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.
4646

docs/en/connector-v2/sink/LocalFile.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
3838

3939
We supported as the following file types:
4040

41-
`text` `csv` `parquet` `orc`
41+
`text` `csv` `parquet` `orc` `json`
4242

4343
Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.
4444

seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@
3535
<artifactId>seatunnel-api</artifactId>
3636
<version>${project.version}</version>
3737
</dependency>
38+
<dependency>
39+
<groupId>org.apache.seatunnel</groupId>
40+
<artifactId>seatunnel-format-json</artifactId>
41+
<version>${project.version}</version>
42+
</dependency>
3843

3944
<dependency>
4045
<groupId>org.apache.seatunnel</groupId>

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ public enum FileFormat implements Serializable {
2323
CSV("csv"),
2424
TEXT("txt"),
2525
PARQUET("parquet"),
26-
ORC("orc");
26+
ORC("orc"),
27+
JSON("json");
2728

2829
private String suffix;
2930

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs;
19+
20+
import org.apache.seatunnel.api.serialization.SerializationSchema;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
23+
import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
24+
import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
25+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
26+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
27+
import org.apache.seatunnel.format.json.JsonSerializationSchema;
28+
29+
import lombok.NonNull;
30+
import lombok.extern.slf4j.Slf4j;
31+
import org.apache.hadoop.fs.FSDataOutputStream;
32+
33+
import java.io.IOException;
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
38+
@Slf4j
39+
public class HdfsJsonTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
40+
41+
private static final long serialVersionUID = -5432828969702531646L;
42+
43+
private final byte[] rowDelimiter;
44+
private final SerializationSchema serializationSchema;
45+
private Map<String, FSDataOutputStream> beingWrittenOutputStream;
46+
47+
public HdfsJsonTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
48+
@NonNull TransactionFileNameGenerator transactionFileNameGenerator,
49+
@NonNull PartitionDirNameGenerator partitionDirNameGenerator,
50+
@NonNull List<Integer> sinkColumnsIndexInRow,
51+
@NonNull String tmpPath,
52+
@NonNull String targetPath,
53+
@NonNull String jobId,
54+
int subTaskIndex,
55+
@NonNull String rowDelimiter,
56+
@NonNull FileSystem fileSystem) {
57+
super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
58+
59+
this.rowDelimiter = rowDelimiter.getBytes();
60+
this.serializationSchema = new JsonSerializationSchema(seaTunnelRowTypeInfo);
61+
beingWrittenOutputStream = new HashMap<>();
62+
}
63+
64+
@Override
65+
public void beginTransaction(String transactionId) {
66+
this.beingWrittenOutputStream = new HashMap<>();
67+
}
68+
69+
@Override
70+
public void abortTransaction(String transactionId) {
71+
this.beingWrittenOutputStream = new HashMap<>();
72+
}
73+
74+
@Override
75+
public void write(@NonNull SeaTunnelRow seaTunnelRow) {
76+
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
77+
FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
78+
try {
79+
byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
80+
fsDataOutputStream.write(rowBytes);
81+
fsDataOutputStream.write(rowDelimiter);
82+
} catch (IOException e) {
83+
log.error("write data to file {} error", filePath);
84+
throw new RuntimeException(e);
85+
}
86+
}
87+
88+
@Override
89+
public void finishAndCloseWriteFile() {
90+
beingWrittenOutputStream.entrySet().forEach(entry -> {
91+
try {
92+
entry.getValue().flush();
93+
} catch (IOException e) {
94+
log.error("error when flush file {}", entry.getKey());
95+
throw new RuntimeException(e);
96+
} finally {
97+
try {
98+
entry.getValue().close();
99+
} catch (IOException e) {
100+
log.error("error when close output stream {}", entry.getKey());
101+
}
102+
}
103+
104+
needMoveFiles.put(entry.getKey(), getTargetLocation(entry.getKey()));
105+
});
106+
}
107+
108+
private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
109+
FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
110+
if (fsDataOutputStream == null) {
111+
try {
112+
fsDataOutputStream = HdfsUtils.getOutputStream(filePath);
113+
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
114+
} catch (IOException e) {
115+
log.error("can not get output file stream");
116+
throw new RuntimeException(e);
117+
}
118+
}
119+
return fsDataOutputStream;
120+
}
121+
}

seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,19 @@ public static TransactionStateFileWriter of(@NonNull SeaTunnelRowType seaTunnelR
8585
subTaskIndex,
8686
fileSystem);
8787
}
88+
if (fileFormat.equals(FileFormat.JSON)) {
89+
return new HdfsJsonTransactionStateFileWriter(
90+
seaTunnelRowTypeInfo,
91+
transactionFileNameGenerator,
92+
partitionDirNameGenerator,
93+
sinkColumnsIndexInRow,
94+
tmpPath,
95+
targetPath,
96+
jobId,
97+
subTaskIndex,
98+
rowDelimiter,
99+
fileSystem);
100+
}
88101
// if file type not supported by file connector, default txt writer will be generated
89102
return new HdfsTxtTransactionStateFileWriter(
90103
seaTunnelRowTypeInfo,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.sink.local;
19+
20+
import org.apache.seatunnel.api.serialization.SerializationSchema;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
23+
import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
24+
import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
25+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
26+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
27+
import org.apache.seatunnel.format.json.JsonSerializationSchema;
28+
29+
import lombok.NonNull;
30+
import lombok.extern.slf4j.Slf4j;
31+
32+
import java.io.FileOutputStream;
33+
import java.io.IOException;
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
38+
@Slf4j
39+
public class LocalJsonTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
40+
41+
private static final long serialVersionUID = -3834472539886339383L;
42+
43+
private final byte[] rowDelimiter;
44+
private final SerializationSchema serializationSchema;
45+
private Map<String, FileOutputStream> beingWrittenOutputStream;
46+
47+
public LocalJsonTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
48+
@NonNull TransactionFileNameGenerator transactionFileNameGenerator,
49+
@NonNull PartitionDirNameGenerator partitionDirNameGenerator,
50+
@NonNull List<Integer> sinkColumnsIndexInRow,
51+
@NonNull String tmpPath,
52+
@NonNull String targetPath,
53+
@NonNull String jobId,
54+
int subTaskIndex,
55+
@NonNull String rowDelimiter,
56+
@NonNull FileSystem fileSystem) {
57+
super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
58+
59+
this.rowDelimiter = rowDelimiter.getBytes();
60+
this.serializationSchema = new JsonSerializationSchema(seaTunnelRowTypeInfo);
61+
this.beingWrittenOutputStream = new HashMap<>();
62+
}
63+
64+
@Override
65+
public void beginTransaction(String transactionId) {
66+
this.beingWrittenOutputStream = new HashMap<>();
67+
}
68+
69+
@Override
70+
public void abortTransaction(String transactionId) {
71+
this.beingWrittenOutputStream = new HashMap<>();
72+
}
73+
74+
@Override
75+
public void write(@NonNull SeaTunnelRow seaTunnelRow) {
76+
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
77+
FileOutputStream fileOutputStream = getOrCreateOutputStream(filePath);
78+
try {
79+
byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
80+
fileOutputStream.write(rowBytes);
81+
fileOutputStream.write(rowDelimiter);
82+
} catch (IOException e) {
83+
log.error("write data to file {} error", filePath);
84+
throw new RuntimeException(e);
85+
}
86+
}
87+
88+
@Override
89+
public void finishAndCloseWriteFile() {
90+
beingWrittenOutputStream.entrySet().forEach(entry -> {
91+
try {
92+
entry.getValue().flush();
93+
} catch (IOException e) {
94+
log.error("error when flush file {}", entry.getKey());
95+
throw new RuntimeException(e);
96+
} finally {
97+
try {
98+
entry.getValue().close();
99+
} catch (IOException e) {
100+
log.error("error when close output stream {}", entry.getKey());
101+
}
102+
}
103+
104+
needMoveFiles.put(entry.getKey(), getTargetLocation(entry.getKey()));
105+
});
106+
}
107+
108+
private FileOutputStream getOrCreateOutputStream(@NonNull String filePath) {
109+
FileOutputStream fileOutputStream = beingWrittenOutputStream.get(filePath);
110+
if (fileOutputStream == null) {
111+
try {
112+
FileUtils.createFile(filePath);
113+
fileOutputStream = new FileOutputStream(filePath);
114+
beingWrittenOutputStream.put(filePath, fileOutputStream);
115+
} catch (IOException e) {
116+
log.error("can not get output file stream");
117+
throw new RuntimeException(e);
118+
}
119+
}
120+
return fileOutputStream;
121+
}
122+
}

seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,19 @@ public static TransactionStateFileWriter of(@NonNull SeaTunnelRowType seaTunnelR
8585
subTaskIndex,
8686
fileSystem);
8787
}
88+
if (fileFormat.equals(FileFormat.JSON)) {
89+
return new LocalJsonTransactionStateFileWriter(
90+
seaTunnelRowTypeInfo,
91+
transactionFileNameGenerator,
92+
partitionDirNameGenerator,
93+
sinkColumnsIndexInRow,
94+
tmpPath,
95+
targetPath,
96+
jobId,
97+
subTaskIndex,
98+
rowDelimiter,
99+
fileSystem);
100+
}
88101
// if file type not supported by file connector, default txt writer will be generated
89102
return new LocalTxtTransactionStateFileWriter(
90103
seaTunnelRowTypeInfo,

seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,15 @@ public void testFakeSourceToLocalFileParquet() throws IOException, InterruptedEx
4545
Assertions.assertEquals(0, execResult.getExitCode());
4646
}
4747

48+
/**
49+
* fake source -> local json file sink
50+
*/
51+
@Test
52+
public void testFakeSourceToLocalFileJson() throws IOException, InterruptedException {
53+
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_local_json.conf");
54+
Assertions.assertEquals(0, execResult.getExitCode());
55+
}
56+
4857
/**
4958
* fake source -> hdfs text file sink
5059
*/
@@ -62,4 +71,13 @@ public void testFakeSourceToHdfsFileParquet() throws IOException, InterruptedExc
6271
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_parquet.conf");
6372
Assertions.assertEquals(0, execResult.getExitCode());
6473
}
74+
75+
/**
76+
* fake source -> hdfs json file sink
77+
*/
78+
@Test
79+
public void testFakeSourceToHdfsFileJson() throws IOException, InterruptedException {
80+
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_json.conf");
81+
Assertions.assertEquals(0, execResult.getExitCode());
82+
}
6583
}

0 commit comments

Comments
 (0)