Skip to content

Commit c8d59ec

Browse files
[Improve][Connector-V2][HDFS] Support setting hdfs-site.xml (#3778)
1 parent 14703fe commit c8d59ec

File tree

5 files changed

+20
-0
lines changed

5 files changed

+20
-0
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.seatunnel.common.constants.PluginType;
2727
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2828
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
29+
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
2930
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
3031

3132
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -42,5 +43,8 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
4243
}
4344
super.prepare(pluginConfig);
4445
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
46+
if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) {
47+
hadoopConf.setHdfsSitePath(pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key()));
48+
}
4549
}
4650
}

seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
5252
readStrategy.setPluginConfig(pluginConfig);
5353
String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH.key());
5454
hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS.key()));
55+
if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) {
56+
hadoopConf.setHdfsSitePath(pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key()));
57+
}
5558
try {
5659
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
5760
} catch (IOException e) {

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,8 @@ public class BaseSinkConfig {
112112
.intType()
113113
.defaultValue(DEFAULT_BATCH_SIZE)
114114
.withDescription("The batch size of each split file");
115+
public static final Option<String> HDFS_SITE_PATH = Options.key("hdfs_site_path")
116+
.stringType()
117+
.noDefaultValue()
118+
.withDescription("The path of hdfs-site.xml");
115119
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,8 @@ public class BaseSourceConfig {
5252
.booleanType()
5353
.defaultValue(true)
5454
.withDescription("Whether parse partition fields from file path");
55+
public static final Option<String> HDFS_SITE_PATH = Options.key("hdfs_site_path")
56+
.stringType()
57+
.noDefaultValue()
58+
.withDescription("The path of hdfs-site.xml");
5559
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import lombok.Data;
2121
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.Path;
2223

2324
import java.io.Serializable;
2425
import java.util.HashMap;
@@ -30,6 +31,7 @@ public class HadoopConf implements Serializable {
3031
private static final String SCHEMA = "hdfs";
3132
protected Map<String, String> extraOptions = new HashMap<>();
3233
protected String hdfsNameKey;
34+
protected String hdfsSitePath;
3335

3436
public HadoopConf(String hdfsNameKey) {
3537
this.hdfsNameKey = hdfsNameKey;
@@ -47,5 +49,8 @@ public void setExtraOptionsForConfiguration(Configuration configuration) {
4749
if (!extraOptions.isEmpty()) {
4850
extraOptions.forEach(configuration::set);
4951
}
52+
if (hdfsSitePath != null) {
53+
configuration.addResource(new Path(hdfsSitePath));
54+
}
5055
}
5156
}

0 commit comments

Comments
 (0)