Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-7453 In PutKudu creating a new Kudu client when refreshing TGT #4276

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
Expand All @@ -62,6 +64,9 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public abstract class AbstractKuduProcessor extends AbstractProcessor {

Expand Down Expand Up @@ -120,40 +125,70 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();

protected KuduClient kuduClient;
private volatile KuduClient kuduClient;
private final ReadWriteLock kuduClientReadWriteLock = new ReentrantReadWriteLock();
private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
private final Lock kuduClientWriteLock = kuduClientReadWriteLock.writeLock();

private volatile KerberosUser kerberosUser;

public KerberosUser getKerberosUser() {
protected abstract void onTrigger(ProcessContext context, ProcessSession session, KuduClient kuduClient) throws ProcessException;

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
kuduClientReadLock.lock();
try {
onTrigger(context, session, kuduClient);
} finally {
kuduClientReadLock.unlock();
}
}

protected KerberosUser getKerberosUser() {
return this.kerberosUser;
}

public KuduClient getKuduClient() {
return this.kuduClient;
protected void createKerberosUserAndKuduClient(ProcessContext context) throws LoginException {
createKerberosUser(context);
createKuduClient(context);
}

public void createKuduClient(ProcessContext context) throws LoginException {
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
protected void createKerberosUser(ProcessContext context) throws LoginException {
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();

if (credentialsService != null) {
kerberosUser = loginKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab());
kerberosUser = loginKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab(), context);
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, kerberosPassword);
kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, kerberosPassword, context);
}
}

if (kerberosUser != null) {
final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger());
this.kuduClient = kerberosAction.execute();
} else {
this.kuduClient = buildClient(kuduMasters, context);
protected void createKuduClient(ProcessContext context) {
kuduClientWriteLock.lock();
try {
if (this.kuduClient != null) {
try {
this.kuduClient.close();
} catch (KuduException e) {
getLogger().error("Couldn't close Kudu client.");
}
}

if (kerberosUser != null) {
final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(context), getLogger());
this.kuduClient = kerberosAction.execute();
} else {
this.kuduClient = buildClient(context);
}
} finally {
kuduClientWriteLock.unlock();
}
}


protected KuduClient buildClient(final String masters, final ProcessContext context) {
protected KuduClient buildClient(final ProcessContext context) {
turcsanyip marked this conversation as resolved.
Show resolved Hide resolved
final String masters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();

Expand All @@ -176,14 +211,36 @@ protected void flushKuduSession(final KuduSession kuduSession, boolean close, fi
}
}

protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab) throws LoginException {
final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab, ProcessContext context) throws LoginException {
turcsanyip marked this conversation as resolved.
Show resolved Hide resolved
final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab) {
@Override
public synchronized boolean checkTGTAndRelogin() throws LoginException {
boolean didRelogin = super.checkTGTAndRelogin();

if (didRelogin) {
createKuduClient(context);
}

return didRelogin;
}
};
kerberosUser.login();
return kerberosUser;
}

protected KerberosUser loginKerberosPasswordUser(final String principal, final String password) throws LoginException {
final KerberosUser kerberosUser = new KerberosPasswordUser(principal, password);
protected KerberosUser loginKerberosPasswordUser(final String principal, final String password, ProcessContext context) throws LoginException {
turcsanyip marked this conversation as resolved.
Show resolved Hide resolved
turcsanyip marked this conversation as resolved.
Show resolved Hide resolved
final KerberosUser kerberosUser = new KerberosPasswordUser(principal, password) {
@Override
public synchronized boolean checkTGTAndRelogin() throws LoginException {
boolean didRelogin = super.checkTGTAndRelogin();

if (didRelogin) {
createKuduClient(context);
}

return didRelogin;
}
};
kerberosUser.login();
return kerberosUser;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,35 +260,34 @@ public void onScheduled(final ProcessContext context) throws IOException, LoginE
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase());
createKuduClient(context);
createKerberosUserAndKuduClient(context);
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
protected void onTrigger(final ProcessContext context, final ProcessSession session, KuduClient kuduClient) throws ProcessException {
final List<FlowFile> flowFiles = session.get(ffbatch);
if (flowFiles.isEmpty()) {
return;
}

final KerberosUser user = getKerberosUser();
if (user == null) {
trigger(context, session, flowFiles);
trigger(context, session, flowFiles, kuduClient);
return;
}

final PrivilegedExceptionAction<Void> privelegedAction = () -> {
trigger(context, session, flowFiles);
final PrivilegedExceptionAction<Void> privilegedAction = () -> {
trigger(context, session, flowFiles, kuduClient);
return null;
};

final KerberosAction<Void> action = new KerberosAction<>(user, privelegedAction, getLogger());
final KerberosAction<Void> action = new KerberosAction<>(user, privilegedAction, getLogger());
action.execute();
}

private void trigger(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles) throws ProcessException {
private void trigger(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles, KuduClient kuduClient) throws ProcessException {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);

final KuduClient kuduClient = getKuduClient();
final KuduSession kuduSession = createKuduSession(kuduClient);

final Map<FlowFile, Integer> numRecords = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.kudu.client.Upsert;
import org.apache.kudu.client.Update;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.Record;

Expand Down Expand Up @@ -83,7 +85,7 @@ protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<Str
}

@Override
public KuduClient buildClient(final String masters, ProcessContext context) {
public KuduClient buildClient(ProcessContext context) {
final KuduClient client = mock(KuduClient.class);

try {
Expand All @@ -96,7 +98,7 @@ public KuduClient buildClient(final String masters, ProcessContext context) {
}

@Override
public KuduClient getKuduClient() {
protected void onTrigger(ProcessContext context, ProcessSession session, KuduClient kuduClient) throws ProcessException {
final KuduClient client = mock(KuduClient.class);

try {
Expand All @@ -105,7 +107,7 @@ public KuduClient getKuduClient() {
throw new AssertionError(e);
}

return client;
super.onTrigger(context, session, client);
}

public boolean loggedIn() {
Expand All @@ -117,12 +119,12 @@ public boolean loggedOut() {
}

@Override
protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab) throws LoginException {
protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab, ProcessContext context) throws LoginException {
return createMockKerberosUser(principal);
}

@Override
protected KerberosUser loginKerberosPasswordUser(String principal, String password) throws LoginException {
protected KerberosUser loginKerberosPasswordUser(String principal, String password, ProcessContext context) throws LoginException {
return createMockKerberosUser(principal);
}

Expand Down