Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,8 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements KafkaCl
descriptors.add(HONOR_TRANSACTIONS);
descriptors.add(SECURITY_PROTOCOL);
descriptors.add(SASL_MECHANISM);
descriptors.add(KERBEROS_CREDENTIALS_SERVICE);
descriptors.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
descriptors.add(KERBEROS_SERVICE_NAME);
descriptors.add(KERBEROS_PRINCIPAL);
descriptors.add(KERBEROS_KEYTAB);
descriptors.add(SASL_USERNAME);
descriptors.add(SASL_PASSWORD);
descriptors.add(TOKEN_AUTHENTICATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,8 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo
descriptors.add(SEPARATE_BY_KEY);
descriptors.add(SECURITY_PROTOCOL);
descriptors.add(SASL_MECHANISM);
descriptors.add(KERBEROS_CREDENTIALS_SERVICE);
descriptors.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
descriptors.add(KERBEROS_SERVICE_NAME);
descriptors.add(KERBEROS_PRINCIPAL);
descriptors.add(KERBEROS_KEYTAB);
descriptors.add(SASL_USERNAME);
descriptors.add(SASL_PASSWORD);
descriptors.add(TOKEN_AUTHENTICATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,8 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPu
properties.add(MESSAGE_HEADER_ENCODING);
properties.add(SECURITY_PROTOCOL);
properties.add(SASL_MECHANISM);
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
properties.add(KERBEROS_SERVICE_NAME);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_KEYTAB);
properties.add(SASL_USERNAME);
properties.add(SASL_PASSWORD);
properties.add(TOKEN_AUTHENTICATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,8 @@ public class PublishKafka_2_6 extends AbstractProcessor implements KafkaPublishC
properties.add(MESSAGE_HEADER_ENCODING);
properties.add(SECURITY_PROTOCOL);
properties.add(SASL_MECHANISM);
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
properties.add(KERBEROS_SERVICE_NAME);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_KEYTAB);
properties.add(SASL_USERNAME);
properties.add(SASL_PASSWORD);
properties.add(AWS_PROFILE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected void init(final ControllerServiceInitializationContext context) {
properties.add(DELIVERY_GUARANTEE);
properties.add(MESSAGE_HEADER_ENCODING);
properties.add(SECURITY_PROTOCOL);
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
properties.add(KERBEROS_SERVICE_NAME);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(MAX_REQUEST_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
Expand Down Expand Up @@ -159,7 +161,7 @@ public void validateGetAllMessagesPattern() {
}

@Test
public void validateGetErrorMessages() throws Exception {
public void validateGetErrorMessages() {
String groupName = "validateGetErrorMessages";

when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
Expand All @@ -181,7 +183,7 @@ public void validateGetErrorMessages() throws Exception {
}

@Test
public void testJaasConfigurationWithDefaultMechanism() {
public void testJaasConfigurationWithDefaultMechanism() throws InitializationException {
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
Expand All @@ -192,13 +194,8 @@ public void testJaasConfigurationWithDefaultMechanism() {
runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_SERVICE_NAME, "kafka");
runner.assertNotValid();

runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_PRINCIPAL, "nifi@APACHE.COM");
runner.assertNotValid();

runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_KEYTAB, "not.A.File");
runner.assertNotValid();

runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_KEYTAB, "src/test/resources/server.properties");
final KerberosUserService kerberosUserService = enableKerberosUserService(runner);
runner.setProperty(ConsumeKafka_2_6.SELF_CONTAINED_KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
runner.assertValid();
}

Expand Down Expand Up @@ -278,4 +275,12 @@ public void testNonSaslSecurityProtocol() {
runner.assertValid();
}

private SelfContainedKerberosUserService enableKerberosUserService(final TestRunner runner) throws InitializationException {
final SelfContainedKerberosUserService kerberosUserService = mock(SelfContainedKerberosUserService.class);
when(kerberosUserService.getIdentifier()).thenReturn("userService1");
runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService);
runner.enableControllerService(kerberosUserService);
return kerberosUserService;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.reporting.InitializationException;
Expand Down Expand Up @@ -100,36 +99,12 @@ public void testJaasGssApiConfiguration() throws Exception {
runner.setProperty(ConsumeKafka_2_6.KERBEROS_SERVICE_NAME, "kafka");
runner.assertNotValid();

runner.setProperty(ConsumeKafka_2_6.KERBEROS_PRINCIPAL, "nifi@APACHE.COM");
runner.assertNotValid();

runner.setProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB, "not.A.File");
runner.assertNotValid();

runner.setProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB, "src/test/resources/server.properties");
runner.assertValid();

runner.setVariable("keytab", "src/test/resources/server.properties");
runner.setVariable("principal", "nifi@APACHE.COM");
runner.setVariable("service", "kafka");
runner.setProperty(ConsumeKafka_2_6.KERBEROS_PRINCIPAL, "${principal}");
runner.setProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB, "${keytab}");
runner.setProperty(ConsumeKafka_2_6.KERBEROS_SERVICE_NAME, "${service}");
runner.assertValid();

final KerberosUserService kerberosUserService = enableKerberosUserService(runner);
runner.setProperty(ConsumeKafka_2_6.SELF_CONTAINED_KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
runner.assertNotValid();

runner.removeProperty(ConsumeKafka_2_6.KERBEROS_PRINCIPAL);
runner.removeProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB);
runner.assertValid();

final KerberosCredentialsService kerberosCredentialsService = enabledKerberosCredentialsService(runner);
runner.setProperty(ConsumeKafka_2_6.KERBEROS_CREDENTIALS_SERVICE, kerberosCredentialsService.getIdentifier());
runner.assertNotValid();

runner.removeProperty(ConsumeKafka_2_6.SELF_CONTAINED_KERBEROS_USER_SERVICE);
runner.setVariable("service", "kafka");
runner.setProperty(ConsumeKafka_2_6.KERBEROS_SERVICE_NAME, "${service}");
runner.assertValid();
}

Expand All @@ -141,15 +116,4 @@ private SelfContainedKerberosUserService enableKerberosUserService(final TestRun
return kerberosUserService;
}

private KerberosCredentialsService enabledKerberosCredentialsService(final TestRunner runner) throws InitializationException {
final KerberosCredentialsService credentialsService = mock(KerberosCredentialsService.class);
when(credentialsService.getIdentifier()).thenReturn("credsService1");
when(credentialsService.getPrincipal()).thenReturn("principal1");
when(credentialsService.getKeytab()).thenReturn("keytab1");

runner.addControllerService(credentialsService.getIdentifier(), credentialsService);
runner.enableControllerService(credentialsService);
return credentialsService;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.record.sink.RecordSinkService;
Expand Down Expand Up @@ -163,8 +161,6 @@ private MockKafkaRecordSink_2_6 initTask() throws InitializationException {
when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer);
when(context.getProperty(KafkaRecordSink_2_6.SSL_CONTEXT_SERVICE)).thenReturn(pValue);
when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);
when(context.getProperty(KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE)).thenReturn(pValue);
when(pValue.asControllerService(KerberosCredentialsService.class)).thenReturn(null);

final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(), logger, stateManager);
task.initialize(initContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
package org.apache.nifi.kafka.shared.component;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
Expand Down Expand Up @@ -137,32 +134,6 @@ public interface KafkaClientComponent {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();

PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("sasl.kerberos.principal")
.displayName("Kerberos Principal")
.description("Principal used for authentication with Kerberos")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();

PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder()
.name("sasl.kerberos.keytab")
.displayName("Kerberos Keytab")
.description("Keytab credentials used for authentication with Kerberos")
.required(false)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();

PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Service supporting generalized credentials authentication with Kerberos")
.identifiesControllerService(KerberosCredentialsService.class)
.required(false)
.build();

PropertyDescriptor SELF_CONTAINED_KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@
*/
package org.apache.nifi.kafka.shared.login;

import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;

/**
* Kerberos Delegating Login Module implementation of configuration provider
*/
public class KerberosDelegatingLoginConfigProvider implements LoginConfigProvider {
private static final LoginConfigProvider CREDENTIALS_PROVIDER = new KerberosCredentialsLoginConfigProvider();

private static final LoginConfigProvider USER_SERVICE_PROVIDER = new KerberosUserServiceLoginConfigProvider();

/**
Expand All @@ -36,15 +32,6 @@ public class KerberosDelegatingLoginConfigProvider implements LoginConfigProvide
*/
@Override
public String getConfiguration(final PropertyContext context) {
final PropertyValue userServiceProperty = context.getProperty(KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE);

final String configuration;
if (userServiceProperty.isSet()) {
configuration = USER_SERVICE_PROVIDER.getConfiguration(context);
} else {
configuration = CREDENTIALS_PROVIDER.getConfiguration(context);
}

return configuration;
return USER_SERVICE_PROVIDER.getConfiguration(context);
}
}
Loading