Skip to content

Commit d928e8b

Browse files
authored
[Improve] Refactor S3FileCatalog and it's factory (#7457)
1 parent 825caa8 commit d928e8b

File tree

3 files changed

+18
-114
lines changed

3 files changed

+18
-114
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import lombok.SneakyThrows;
3434

35+
import java.io.IOException;
3536
import java.util.List;
3637

3738
public abstract class AbstractFileCatalog implements Catalog {
@@ -51,7 +52,15 @@ protected AbstractFileCatalog(
5152
public void open() throws CatalogException {}
5253

5354
@Override
54-
public void close() throws CatalogException {}
55+
public void close() throws CatalogException {
56+
if (hadoopFileSystemProxy != null) {
57+
try {
58+
hadoopFileSystemProxy.close();
59+
} catch (IOException e) {
60+
throw new CatalogException(e);
61+
}
62+
}
63+
}
5564

5665
@Override
5766
public String name() {

seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java

Lines changed: 6 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -17,120 +17,14 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
1919

20-
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21-
import org.apache.seatunnel.api.table.catalog.Catalog;
22-
import org.apache.seatunnel.api.table.catalog.CatalogTable;
23-
import org.apache.seatunnel.api.table.catalog.TablePath;
24-
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
25-
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
26-
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
27-
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
28-
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
20+
import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
2921
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
30-
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
3122

32-
import org.apache.commons.collections4.CollectionUtils;
33-
import org.apache.hadoop.fs.LocatedFileStatus;
23+
public class S3FileCatalog extends AbstractFileCatalog {
24+
// TODO: this catalog name conflict with a factory identifier
25+
public static final String CATALOG_NAME = "S3File";
3426

35-
import lombok.AllArgsConstructor;
36-
import lombok.SneakyThrows;
37-
38-
import java.io.IOException;
39-
import java.util.List;
40-
41-
@AllArgsConstructor
42-
public class S3FileCatalog implements Catalog {
43-
44-
private final HadoopFileSystemProxy hadoopFileSystemProxy;
45-
private final ReadonlyConfig readonlyConfig;
46-
47-
@Override
48-
public void open() throws CatalogException {}
49-
50-
@Override
51-
public void close() throws CatalogException {
52-
if (hadoopFileSystemProxy != null) {
53-
try {
54-
hadoopFileSystemProxy.close();
55-
} catch (IOException e) {
56-
throw new CatalogException(e);
57-
}
58-
}
59-
}
60-
61-
@Override
62-
public String name() {
63-
return "S3File";
64-
}
65-
66-
@Override
67-
public String getDefaultDatabase() throws CatalogException {
68-
return null;
69-
}
70-
71-
@Override
72-
public boolean databaseExists(String databaseName) throws CatalogException {
73-
return false;
74-
}
75-
76-
@Override
77-
public List<String> listDatabases() throws CatalogException {
78-
return null;
79-
}
80-
81-
@Override
82-
public List<String> listTables(String databaseName)
83-
throws CatalogException, DatabaseNotExistException {
84-
return null;
85-
}
86-
87-
@SneakyThrows
88-
@Override
89-
public boolean tableExists(TablePath tablePath) throws CatalogException {
90-
return hadoopFileSystemProxy.fileExist(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
91-
}
92-
93-
@Override
94-
public CatalogTable getTable(TablePath tablePath)
95-
throws CatalogException, TableNotExistException {
96-
return null;
97-
}
98-
99-
@SneakyThrows
100-
@Override
101-
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
102-
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
103-
hadoopFileSystemProxy.createDir(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
104-
}
105-
106-
@SneakyThrows
107-
@Override
108-
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
109-
throws TableNotExistException, CatalogException {
110-
hadoopFileSystemProxy.deleteFile(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
111-
}
112-
113-
@Override
114-
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
115-
throws DatabaseAlreadyExistException, CatalogException {}
116-
117-
@Override
118-
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
119-
throws DatabaseNotExistException, CatalogException {}
120-
121-
@SneakyThrows
122-
@Override
123-
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
124-
throws TableNotExistException, CatalogException {
125-
hadoopFileSystemProxy.deleteFile(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
126-
hadoopFileSystemProxy.createDir(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
127-
}
128-
129-
@SneakyThrows
130-
@Override
131-
public boolean isExistsData(TablePath tablePath) {
132-
final List<LocatedFileStatus> locatedFileStatuses =
133-
hadoopFileSystemProxy.listFile(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
134-
return CollectionUtils.isNotEmpty(locatedFileStatuses);
27+
public S3FileCatalog(HadoopFileSystemProxy hadoopFileSystemProxy, String filePath) {
28+
super(hadoopFileSystemProxy, filePath, CATALOG_NAME);
13529
}
13630
}

seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalogFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.catalog.Catalog;
2323
import org.apache.seatunnel.api.table.factory.CatalogFactory;
2424
import org.apache.seatunnel.api.table.factory.Factory;
25+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
2526
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2627
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
2728
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
@@ -34,7 +35,7 @@ public class S3FileCatalogFactory implements CatalogFactory {
3435
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
3536
HadoopConf hadoopConf = S3HadoopConf.buildWithReadOnlyConfig(options);
3637
HadoopFileSystemProxy fileSystemUtils = new HadoopFileSystemProxy(hadoopConf);
37-
return new S3FileCatalog(fileSystemUtils, options);
38+
return new S3FileCatalog(fileSystemUtils, options.get(BaseSourceConfigOptions.FILE_PATH));
3839
}
3940

4041
@Override

0 commit comments

Comments
 (0)