Skip to content

Commit 337aa50

Browse files
authored
[Feature][Connectors] LocalFile Support reading gz (#8025)
1 parent f855cdd commit 337aa50

File tree

13 files changed

+409
-6
lines changed

13 files changed

+409
-6
lines changed

docs/en/connector-v2/source/CosFile.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ The compress codec of archive files and the details that supported as the follow
343343
| ZIP | txt,json,excel,xml | .zip |
344344
| TAR | txt,json,excel,xml | .tar |
345345
| TAR_GZ | txt,json,excel,xml | .tar.gz |
346+
| GZ | txt,json,xml | .gz |
346347
| NONE | all | .* |
347348

348349
### encoding [string]

docs/en/connector-v2/source/FtpFile.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ The compress codec of archive files and the details that supported as the follow
328328
| ZIP | txt,json,excel,xml | .zip |
329329
| TAR | txt,json,excel,xml | .tar |
330330
| TAR_GZ | txt,json,excel,xml | .tar.gz |
331+
| GZ | txt,json,xml | .gz |
331332
| NONE | all | .* |
332333

333334
### encoding [string]

docs/en/connector-v2/source/HdfsFile.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ The compress codec of archive files and the details that supported as the follow
144144
| ZIP | txt,json,excel,xml | .zip |
145145
| TAR | txt,json,excel,xml | .tar |
146146
| TAR_GZ | txt,json,excel,xml | .tar.gz |
147+
| GZ | txt,json,xml | .gz |
147148
| NONE | all | .* |
148149

149150
### encoding [string]

docs/en/connector-v2/source/LocalFile.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ The compress codec of archive files and the details that supported as the follow
322322
| ZIP | txt,json,excel,xml | .zip |
323323
| TAR | txt,json,excel,xml | .tar |
324324
| TAR_GZ | txt,json,excel,xml | .tar.gz |
325+
| GZ | txt,json,xml | .gz |
325326
| NONE | all | .* |
326327

327328
### encoding [string]
@@ -490,4 +491,6 @@ sink {
490491
- [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/seatunnel/pull/2980))
491492
- [Improve] Support extract partition from SeaTunnelRow fields ([3085](https://github.com/apache/seatunnel/pull/3085))
492493
- [Improve] Support parse field from file path ([2985](https://github.com/apache/seatunnel/pull/2985))
494+
### 2.3.9-beta 2024-11-12
495+
- [Improve] Support parse field from file path ([8019](https://github.com/apache/seatunnel/issues/8019))
493496

docs/en/connector-v2/source/OssJindoFile.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ The compress codec of archive files and the details that supported as the follow
335335
| ZIP | txt,json,excel,xml | .zip |
336336
| TAR | txt,json,excel,xml | .tar |
337337
| TAR_GZ | txt,json,excel,xml | .tar.gz |
338+
| GZ | txt,json,xml | .gz |
338339
| NONE | all | .* |
339340

340341
### encoding [string]

docs/en/connector-v2/source/S3File.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ The compress codec of archive files and the details that supported as the follow
299299
| ZIP | txt,json,excel,xml | .zip |
300300
| TAR | txt,json,excel,xml | .tar |
301301
| TAR_GZ | txt,json,excel,xml | .tar.gz |
302+
| GZ | txt,json,xml | .gz |
302303
| NONE | all | .* |
303304

304305
### encoding [string]

docs/en/connector-v2/source/SftpFile.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,12 @@ The compress codec of files and the details that supported as the following show
235235
The compress codec of archive files and the details that supported as the following shown:
236236

237237
| archive_compress_codec | file_format | archive_compress_suffix |
238-
|------------------------|--------------------|-------------------------|
239-
| ZIP | txt,json,excel,xml | .zip |
240-
| TAR | txt,json,excel,xml | .tar |
241-
| TAR_GZ | txt,json,excel,xml | .tar.gz |
242-
| NONE | all | .* |
238+
|--------------------|--------------------|---------------------|
239+
| ZIP | txt,json,excel,xml | .zip |
240+
| TAR | txt,json,excel,xml | .tar |
241+
| TAR_GZ | txt,json,excel,xml | .tar.gz |
242+
| GZ | txt,json,xml | .gz |
243+
| NONE | all | .* |
243244

244245
### encoding [string]
245246

@@ -384,4 +385,4 @@ sink {
384385
Console {
385386
}
386387
}
387-
```
388+
```

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ArchiveCompressFormat.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public enum ArchiveCompressFormat {
3535
ZIP(".zip"),
3636
TAR(".tar"),
3737
TAR_GZ(".tar.gz"),
38+
GZ(".gz"),
3839
;
3940
private final String archiveCompressCodec;
4041

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,11 @@ protected void resolveArchiveCompressedInputStream(
238238
}
239239
}
240240
break;
241+
case GZ:
242+
GzipCompressorInputStream gzipIn =
243+
new GzipCompressorInputStream(hadoopFileSystemProxy.getInputStream(path));
244+
readProcess(path, tableId, output, copyInputStream(gzipIn), partitionsMap, path);
245+
break;
241246
case NONE:
242247
readProcess(
243248
path,

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050

5151
import java.io.ByteArrayOutputStream;
5252
import java.io.File;
53+
import java.io.FileInputStream;
54+
import java.io.FileOutputStream;
5355
import java.io.IOException;
5456
import java.io.OutputStream;
5557
import java.nio.charset.StandardCharsets;
@@ -149,6 +151,13 @@ public class LocalFileIT extends TestSuiteBase {
149151
"/seatunnel/read/tar_gz/txt/multifile/multiTarGz.tar.gz",
150152
container);
151153

154+
Path txtGz =
155+
convertToGzFile(
156+
Lists.newArrayList(ContainerUtil.getResourcesFile("/text/e2e.txt")),
157+
"e2e-txt-gz");
158+
ContainerUtil.copyFileIntoContainers(
159+
txtGz, "/seatunnel/read/gz/txt/single/e2e-txt-gz.gz", container);
160+
152161
Path jsonZip =
153162
convertToZipFile(
154163
Lists.newArrayList(
@@ -168,6 +177,14 @@ public class LocalFileIT extends TestSuiteBase {
168177
"/seatunnel/read/zip/json/multifile/multiJson.zip",
169178
container);
170179

180+
Path jsonGz =
181+
convertToGzFile(
182+
Lists.newArrayList(
183+
ContainerUtil.getResourcesFile("/json/e2e.json")),
184+
"e2e-json-gz");
185+
ContainerUtil.copyFileIntoContainers(
186+
jsonGz, "/seatunnel/read/gz/json/single/e2e-json-gz.gz", container);
187+
171188
ContainerUtil.copyFileIntoContainers(
172189
"/text/e2e_gbk.txt",
173190
"/seatunnel/read/encoding/text/e2e_gbk.txt",
@@ -193,6 +210,13 @@ public class LocalFileIT extends TestSuiteBase {
193210
ContainerUtil.copyFileIntoContainers(
194211
xmlZip, "/seatunnel/read/zip/xml/single/e2e-xml.zip", container);
195212

213+
Path xmlGz =
214+
convertToGzFile(
215+
Lists.newArrayList(ContainerUtil.getResourcesFile("/xml/e2e.xml")),
216+
"e2e-xml-gz");
217+
ContainerUtil.copyFileIntoContainers(
218+
xmlGz, "/seatunnel/read/gz/xml/single/e2e-xml-gz.gz", container);
219+
196220
Path txtLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/text/e2e.txt"));
197221
ContainerUtil.copyFileIntoContainers(
198222
txtLzo, "/seatunnel/read/lzo_text/e2e.txt", container);
@@ -313,6 +337,7 @@ public void testLocalFileReadAndWrite(TestContainer container)
313337
/** Compressed file test */
314338
// test read single local text file with zip compression
315339
helper.execute("/text/local_file_zip_text_to_assert.conf");
340+
helper.execute("/text/local_file_gz_text_to_assert.conf");
316341
// test read multi local text file with zip compression
317342
helper.execute("/text/local_file_multi_zip_text_to_assert.conf");
318343
// test read single local text file with tar compression
@@ -325,10 +350,12 @@ public void testLocalFileReadAndWrite(TestContainer container)
325350
helper.execute("/text/local_file_multi_tar_gz_text_to_assert.conf");
326351
// test read single local json file with zip compression
327352
helper.execute("/json/local_file_json_zip_to_assert.conf");
353+
helper.execute("/json/local_file_json_gz_to_assert.conf");
328354
// test read multi local json file with zip compression
329355
helper.execute("/json/local_file_json_multi_zip_to_assert.conf");
330356
// test read single local xml file with zip compression
331357
helper.execute("/xml/local_file_zip_xml_to_assert.conf");
358+
helper.execute("/xml/local_file_gz_xml_to_assert.conf");
332359
// test read single local excel file with zip compression
333360
helper.execute("/excel/local_excel_zip_to_assert.conf");
334361
// test read multi local excel file with zip compression
@@ -551,4 +578,29 @@ public FileVisitResult visitFile(
551578

552579
return tarGzFilePath;
553580
}
581+
582+
public Path convertToGzFile(List<File> files, String name) throws IOException {
583+
if (files == null || files.isEmpty()) {
584+
throw new IllegalArgumentException("File list is empty or invalid");
585+
}
586+
587+
File firstFile = files.get(0);
588+
Path gzFilePath = Paths.get(firstFile.getParent(), String.format("%s.gz", name));
589+
590+
try (FileInputStream fis = new FileInputStream(firstFile);
591+
FileOutputStream fos = new FileOutputStream(gzFilePath.toFile());
592+
GZIPOutputStream gzos = new GZIPOutputStream(fos)) {
593+
594+
byte[] buffer = new byte[2048];
595+
int length;
596+
597+
while ((length = fis.read(buffer)) > 0) {
598+
gzos.write(buffer, 0, length);
599+
}
600+
gzos.finish();
601+
} catch (IOException e) {
602+
e.printStackTrace();
603+
}
604+
return gzFilePath;
605+
}
554606
}

0 commit comments

Comments
 (0)