Skip to content

Commit 8875d02

Browse files
[Improve][Connector-V2][File] File Connector add lzo compression way. (#3782)
* add lzo compression way. * format code style * add CompressFormat enum. * Judge with switch statement. * fix CompressFormat enum valueOf adapt * Repair NPE when compression is not set. * add license header. * fix CI problem. * CompressFormat enum add default value NONE. * Supplement lzo compressed documents. * Supplement lzo compressed documents. * modify file name ends with *.lzo.txt Co-authored-by: zhaoliang01 <zhaoliang01@58.com>
1 parent 52c6bf2 commit 8875d02

File tree

6 files changed

+94
-8
lines changed

6 files changed

+94
-8
lines changed

docs/en/connector-v2/sink/HdfsFile.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ By default, we use 2PC commit to ensure `exactly-once`
1919
- [x] parquet
2020
- [x] orc
2121
- [x] json
22+
- [x] compress codec
23+
- [x] lzo
24+
2225

2326
## Options
2427

@@ -41,6 +44,7 @@ In order to use this connector, You must ensure your spark/flink cluster already
4144
| is_enable_transaction | boolean | no | true |
4245
| batch_size | int | no | 1000000 |
4346
| common-options | | no | - |
47+
| compressCodec | string | no | none |
4448

4549
### fs.defaultFS [string]
4650

@@ -125,8 +129,10 @@ Only support `true` now.
125129

126130
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
127131

128-
### common options
132+
### compressCodec [string]
133+
Support lzo compression for text in file format. The file name ends with ".lzo.txt" .
129134

135+
### common options
130136
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
131137

132138
## Example
@@ -207,4 +213,5 @@ HdfsFile {
207213
- Sink columns mapping failed
208214
- When restore writer from states getting transaction directly failed
209215

210-
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
216+
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
217+
- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))

docs/en/connector-v2/sink/Hive.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,17 @@ By default, we use 2PC commit to ensure `exactly-once`
2121
- [x] text
2222
- [x] parquet
2323
- [x] orc
24+
- [x] compress codec
25+
- [x] lzo
2426

2527
## Options
2628

2729
| name | type | required | default value |
2830
|----------------|--------|----------|---------------|
2931
| table_name | string | yes | - |
3032
| metastore_uri | string | yes | - |
33+
| compressCodec | string | no | none |
3134
| common-options | | no | - |
32-
3335
### table_name [string]
3436

3537
Target Hive table name eg: db1.table1

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,15 @@ public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig, Seri
5050

5151
public BaseFileSinkConfig(@NonNull Config config) {
5252
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
53-
throw new FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
54-
"Compress not supported by SeaTunnel file connector now");
53+
CompressFormat compressFormat = CompressFormat.valueOf(config.getString(BaseSinkConfig.COMPRESS_CODEC.key()).toUpperCase(Locale.ROOT));
54+
switch (compressFormat) {
55+
case LZO:
56+
this.compressCodec = compressFormat.getCompressCodec();
57+
break;
58+
default:
59+
throw new FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
60+
"Compress not supported this compress code by SeaTunnel file connector now");
61+
}
5562
}
5663
if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
5764
this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.config;
19+
20+
import java.io.Serializable;
21+
22+
public enum CompressFormat implements Serializable {
23+
24+
LZO("lzo"),
25+
NONE("none");
26+
27+
private final String compressCodec;
28+
29+
CompressFormat(String compressCodec) {
30+
this.compressCodec = compressCodec;
31+
}
32+
33+
public String getCompressCodec() {
34+
return compressCodec;
35+
}
36+
37+
public static CompressFormat getCompressFormat(String value) {
38+
switch (value) {
39+
case "lzo":
40+
return CompressFormat.LZO;
41+
default:
42+
return CompressFormat.NONE;
43+
}
44+
}
45+
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.seatunnel.common.exception.CommonErrorCode;
2929
import org.apache.seatunnel.common.utils.VariablesSubstitute;
3030
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
31+
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
3132
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
3233
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
3334
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
@@ -197,8 +198,12 @@ public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow
197198
public String generateFileName(String transactionId) {
198199
String fileNameExpression = fileSinkConfig.getFileNameExpression();
199200
FileFormat fileFormat = fileSinkConfig.getFileFormat();
201+
String suffix = fileFormat.getSuffix();
202+
if (CompressFormat.LZO.getCompressCodec().equals(fileSinkConfig.getCompressCodec())) {
203+
suffix = "." + CompressFormat.LZO.getCompressCodec() + "." + suffix;
204+
}
200205
if (StringUtils.isBlank(fileNameExpression)) {
201-
return transactionId + fileFormat.getSuffix();
206+
return transactionId + suffix;
202207
}
203208
String timeFormat = fileSinkConfig.getFileNameTimeFormat();
204209
DateTimeFormatter df = DateTimeFormatter.ofPattern(timeFormat);
@@ -209,7 +214,7 @@ public String generateFileName(String transactionId) {
209214
valuesMap.put(timeFormat, formattedDate);
210215
valuesMap.put(BaseSinkConfig.TRANSACTION_EXPRESSION, transactionId);
211216
String substitute = VariablesSubstitute.substitute(fileNameExpression, valuesMap) + "_" + partId;
212-
return substitute + fileFormat.getSuffix();
217+
return substitute + suffix;
213218
}
214219

215220
/**

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,19 @@
2424
import org.apache.seatunnel.common.utils.DateTimeUtils;
2525
import org.apache.seatunnel.common.utils.DateUtils;
2626
import org.apache.seatunnel.common.utils.TimeUtils;
27+
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
2728
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
2829
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
2930
import org.apache.seatunnel.format.text.TextSerializationSchema;
3031

32+
import io.airlift.compress.lzo.LzopCodec;
3133
import lombok.NonNull;
3234
import org.apache.hadoop.fs.FSDataOutputStream;
3335

3436
import java.io.IOException;
37+
import java.io.OutputStream;
3538
import java.util.HashMap;
39+
import java.util.Locale;
3640
import java.util.Map;
3741

3842
public class TextWriteStrategy extends AbstractWriteStrategy {
@@ -44,6 +48,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
4448
private final DateTimeUtils.Formatter dateTimeFormat;
4549
private final TimeUtils.Formatter timeFormat;
4650
private SerializationSchema serializationSchema;
51+
private String compressCodec;
4752

4853
public TextWriteStrategy(FileSinkConfig textFileSinkConfig) {
4954
super(textFileSinkConfig);
@@ -54,6 +59,7 @@ public TextWriteStrategy(FileSinkConfig textFileSinkConfig) {
5459
this.dateFormat = textFileSinkConfig.getDateFormat();
5560
this.dateTimeFormat = textFileSinkConfig.getDatetimeFormat();
5661
this.timeFormat = textFileSinkConfig.getTimeFormat();
62+
this.compressCodec = textFileSinkConfig.getCompressCodec();
5763
}
5864

5965
@Override
@@ -111,7 +117,21 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
111117
FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
112118
if (fsDataOutputStream == null) {
113119
try {
114-
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
120+
if (compressCodec != null) {
121+
CompressFormat compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase(Locale.ROOT));
122+
switch (compressFormat) {
123+
case LZO:
124+
LzopCodec lzo = new LzopCodec();
125+
OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
126+
fsDataOutputStream = new FSDataOutputStream(out, null);
127+
break;
128+
default:
129+
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
130+
}
131+
} else {
132+
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
133+
}
134+
115135
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
116136
isFirstWrite.put(filePath, true);
117137
} catch (IOException e) {

0 commit comments

Comments
 (0)