Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3980] Suport kerberos hbase index #5464

Merged
merged 6 commits into from
May 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,33 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
.withDocumentation("When set to true, the rollback method will delete the last failed task index. "
+ "The default value is false. Because deleting the index will add extra load on the Hbase cluster for each rollback");

public static final ConfigProperty<String> SECURITY_AUTHENTICATION = ConfigProperty
.key("hoodie.index.hbase.security.authentication")
.defaultValue("simple")
.withDocumentation("Property to decide if the hbase cluster secure authentication is enabled or not. "
+ "Possible values are 'simple' (no authentication), and 'kerberos'.");

public static final ConfigProperty<String> KERBEROS_USER_KEYTAB = ConfigProperty
.key("hoodie.index.hbase.kerberos.user.keytab")
.noDefaultValue()
.withDocumentation("File name of the kerberos keytab file for connecting to the hbase cluster.");

public static final ConfigProperty<String> KERBEROS_USER_PRINCIPAL = ConfigProperty
.key("hoodie.index.hbase.kerberos.user.principal")
.noDefaultValue()
.withDocumentation("The kerberos principal name for connecting to the hbase cluster.");

public static final ConfigProperty<String> REGIONSERVER_PRINCIPAL = ConfigProperty
.key("hoodie.index.hbase.regionserver.kerberos.principal")
.noDefaultValue()
.withDocumentation("The value of hbase.regionserver.kerberos.principal in hbase cluster.");

public static final ConfigProperty<String> MASTER_PRINCIPAL = ConfigProperty
.key("hoodie.index.hbase.master.kerberos.principal")
.noDefaultValue()
.withDocumentation("The value of hbase.master.kerberos.principal in hbase cluster.");


/**
* @deprecated Use {@link #ZKQUORUM} and its methods instead
*/
Expand Down Expand Up @@ -444,6 +471,31 @@ public Builder hbaseZkZnodeParent(String zkZnodeParent) {
return this;
}

public Builder hbaseSecurityAuthentication(String authentication) {
hBaseIndexConfig.setValue(SECURITY_AUTHENTICATION, authentication);
return this;
}

public Builder hbaseKerberosUserKeytab(String keytab) {
hBaseIndexConfig.setValue(KERBEROS_USER_KEYTAB, keytab);
return this;
}

public Builder hbaseKerberosUserPrincipal(String principal) {
hBaseIndexConfig.setValue(KERBEROS_USER_PRINCIPAL, principal);
return this;
}

public Builder hbaseKerberosRegionserverPrincipal(String principal) {
hBaseIndexConfig.setValue(REGIONSERVER_PRINCIPAL, principal);
return this;
}

public Builder hbaseKerberosMasterPrincipal(String principal) {
hBaseIndexConfig.setValue(MASTER_PRINCIPAL, principal);
return this;
}

/**
* <p>
* Method to set maximum QPS allowed per Region Server. This should be same across various jobs. This is intended to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,26 @@ public boolean getHBaseIndexShouldComputeQPSDynamically() {
return getBoolean(HoodieHBaseIndexConfig.COMPUTE_QPS_DYNAMICALLY);
}

public String getHBaseIndexSecurityAuthentication() {
return getString(HoodieHBaseIndexConfig.SECURITY_AUTHENTICATION);
}

public String getHBaseIndexKerberosUserKeytab() {
return getString(HoodieHBaseIndexConfig.KERBEROS_USER_KEYTAB);
}

public String getHBaseIndexKerberosUserPrincipal() {
return getString(HoodieHBaseIndexConfig.KERBEROS_USER_PRINCIPAL);
}

public String getHBaseIndexRegionserverPrincipal() {
return getString(HoodieHBaseIndexConfig.REGIONSERVER_PRINCIPAL);
}

public String getHBaseIndexMasterPrincipal() {
return getString(HoodieHBaseIndexConfig.MASTER_PRINCIPAL);
}

public int getHBaseIndexDesiredPutsTime() {
return getInt(HoodieHBaseIndexConfig.DESIRED_PUTS_TIME_IN_SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
Expand All @@ -60,10 +59,12 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -72,6 +73,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -150,9 +152,28 @@ private Connection getHBaseConnection() {
}
String port = String.valueOf(config.getHbaseZkPort());
hbaseConfig.set("hbase.zookeeper.property.clientPort", port);

try {
return ConnectionFactory.createConnection(hbaseConfig);
} catch (IOException e) {
String authentication = config.getHBaseIndexSecurityAuthentication();
if (authentication.equals("kerberos")) {
hbaseConfig.set("hbase.security.authentication", "kerberos");
hbaseConfig.set("hadoop.security.authentication", "kerberos");
hbaseConfig.set("hbase.security.authorization", "true");
hbaseConfig.set("hbase.regionserver.kerberos.principal", config.getHBaseIndexRegionserverPrincipal());
hbaseConfig.set("hbase.master.kerberos.principal", config.getHBaseIndexMasterPrincipal());

String principal = config.getHBaseIndexKerberosUserPrincipal();
String keytab = SparkFiles.get(config.getHBaseIndexKerberosUserKeytab());

UserGroupInformation.setConfiguration(hbaseConfig);
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
return ugi.doAs((PrivilegedExceptionAction<Connection>) () ->
(Connection) ConnectionFactory.createConnection(hbaseConfig)
);
} else {
return ConnectionFactory.createConnection(hbaseConfig);
}
} catch (IOException | InterruptedException e) {
throw new HoodieDependentSystemUnavailableException(HoodieDependentSystemUnavailableException.HBASE,
quorum + ":" + port, e);
}
Expand Down