Skip to content

Commit

Permalink
NIFI-8067 - Fix 1-way SSL in GRPC processors
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes apache#4733.
  • Loading branch information
turcsanyip authored and driesva committed Mar 19, 2021
1 parent 31c4bfc commit 4fd6793
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,6 @@
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContextBuilder;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.KeyStore;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
Expand All @@ -49,6 +34,8 @@
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
Expand All @@ -58,9 +45,23 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;

import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

@EventDriven
@SupportsBatching
@Tags({"grpc", "rpc", "client"})
Expand Down Expand Up @@ -108,16 +109,18 @@ public class InvokeGRPC extends AbstractProcessor {
.build();
public static final PropertyDescriptor PROP_USE_SECURE = new PropertyDescriptor.Builder()
.name("Use SSL/TLS")
.description("Whether or not to use SSL/TLS to send the contents of the gRPC messages.")
.displayName("Use TLS")
.description("Whether or not to use TLS to send the contents of the gRPC messages.")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
.description("The SSL Context Service used to provide client certificate information for TLS (https) connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.dependsOn(PROP_USE_SECURE, "true")
.build();
public static final PropertyDescriptor PROP_SEND_CONTENT = new PropertyDescriptor.Builder()
.name("Send FlowFile Content")
Expand Down Expand Up @@ -207,6 +210,24 @@ public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}

@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
List<ValidationResult> results = new ArrayList<>(super.customValidate(context));

final boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean();
final boolean sslContextServiceConfigured = context.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet();

if (useSecure && !sslContextServiceConfigured) {
results.add(new ValidationResult.Builder()
.subject(PROP_SSL_CONTEXT_SERVICE.getDisplayName())
.valid(false)
.explanation(String.format("'%s' must be configured when '%s' is true", PROP_SSL_CONTEXT_SERVICE.getDisplayName(), PROP_USE_SECURE.getDisplayName()))
.build());
}

return results;
}

/**
* Whenever this processor is triggered, we need to construct a client in order to communicate
* with the configured gRPC service.
Expand All @@ -222,7 +243,7 @@ public void initializeClient(final ProcessContext context) throws Exception {

final String host = context.getProperty(PROP_SERVICE_HOST).getValue();
final int port = context.getProperty(PROP_SERVICE_PORT).asInteger();
final Integer maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
final int maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
String userAgent = USER_AGENT_PREFIX;
try {
userAgent += "_" + InetAddress.getLocalHost().getHostName();
Expand All @@ -240,33 +261,20 @@ public void initializeClient(final ProcessContext context) throws Exception {
// configure whether or not we're using secure comms
final boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean();
final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(ClientAuth.NONE);

if (useSecure && sslContext != null) {
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
if(StringUtils.isNotBlank(sslContextService.getKeyStoreFile())) {
final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm(),
sslContext.getProvider());
final KeyStore keyStore = KeyStore.getInstance(sslContextService.getKeyStoreType());
try (final InputStream is = new FileInputStream(sslContextService.getKeyStoreFile())) {
keyStore.load(is, sslContextService.getKeyStorePassword().toCharArray());
}
keyManagerFactory.init(keyStore, sslContextService.getKeyStorePassword().toCharArray());
sslContextBuilder.keyManager(keyManagerFactory);

if (useSecure) {
final TlsConfiguration tlsConfiguration = sslContextService.createTlsConfiguration();
final SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();

if (StringUtils.isNotBlank(sslContextService.getKeyStoreFile())) {
sslContextBuilder.keyManager(KeyStoreUtils.loadKeyManagerFactory(tlsConfiguration));
}

if(StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) {
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm(),
sslContext.getProvider());
final KeyStore trustStore = KeyStore.getInstance(sslContextService.getTrustStoreType());
try (final InputStream is = new FileInputStream(sslContextService.getTrustStoreFile())) {
trustStore.load(is, sslContextService.getTrustStorePassword().toCharArray());
}
trustManagerFactory.init(trustStore);
sslContextBuilder.trustManager(trustManagerFactory);
if (StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) {
sslContextBuilder.trustManager(KeyStoreUtils.loadTrustManagerFactory(tlsConfiguration));
}
nettyChannelBuilder.sslContext(sslContextBuilder.build());

nettyChannelBuilder.sslContext(sslContextBuilder.build());
} else {
nettyChannelBuilder.usePlaintext();
}
Expand Down

0 comments on commit 4fd6793

Please sign in to comment.