From 866dc3a3f1add783a7615007900fe643b87a3aea Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Thu, 1 Feb 2018 19:20:43 +0100 Subject: [PATCH 1/2] MINIFI-434 - PullHttpChangeIngestor should preserve security properties Signed-off-by: Pierre Villard --- .../ingestors/AbstractPullChangeIngestor.java | 7 +-- .../ingestors/PullHttpChangeIngestor.java | 44 +++++++++++++++++-- .../PullHttpChangeIngestorSSLTest.java | 1 + .../ingestors/PullHttpChangeIngestorTest.java | 1 + .../minifi/commons/schema/ConfigSchema.java | 4 ++ 5 files changed, 51 insertions(+), 6 deletions(-) diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java index 1678f2080..deebe90ce 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java @@ -27,12 +27,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - - +import java.util.concurrent.atomic.AtomicReference; public abstract class AbstractPullChangeIngestor implements Runnable, ChangeIngestor { - // 5 minute default pulling period protected static final String DEFAULT_POLLING_PERIOD = "300000"; protected static Logger logger; @@ -40,10 +38,12 @@ public abstract class AbstractPullChangeIngestor implements Runnable, ChangeInge protected final AtomicInteger pollingPeriodMS = new AtomicInteger(); private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); protected volatile ConfigurationChangeNotifier configurationChangeNotifier; + protected final AtomicReference properties = new AtomicReference<>(); @Override public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) { this.configurationChangeNotifier = configurationChangeNotifier; + this.properties.set(properties); } @Override @@ -56,5 +56,6 @@ public void close() throws IOException { scheduledThreadPoolExecutor.shutdownNow(); } + @Override public abstract void run(); } diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java index 6c8adcc12..f7add36c1 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java @@ -25,11 +25,18 @@ import okhttp3.Response; import okhttp3.ResponseBody; import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder; +import org.apache.nifi.minifi.bootstrap.RunMiNiFi; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier; import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator; import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator; +import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream; +import org.apache.nifi.minifi.commons.schema.ConfigSchema; +import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; import org.apache.nifi.minifi.commons.schema.common.StringUtil; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; @@ -37,6 +44,8 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; + +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; @@ -91,6 +100,7 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor { public static final String READ_TIMEOUT_KEY = PULL_HTTP_BASE_KEY + ".read.timeout.ms"; public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY + ".differentiator"; public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag"; + public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY + ".override.security"; private final AtomicReference httpClientReference = new AtomicReference<>(); private final AtomicReference portReference = new AtomicReference<>(); @@ -101,6 +111,7 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor { private volatile String connectionScheme; private volatile String lastEtag = ""; private volatile boolean useEtag = false; + private volatile boolean overrideSecurity = false; public PullHttpChangeIngestor() { logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class); @@ -144,6 +155,14 @@ public void initialize(Properties properties, ConfigurationFileHolder configurat "the default value of \"false\". It is set to \"" + useEtagString + "\"."); } + final String overrideSecurityProperties = (String) properties.getOrDefault(OVERRIDE_SECURITY, "false"); + if ("true".equalsIgnoreCase(overrideSecurityProperties) || "false".equalsIgnoreCase(overrideSecurityProperties)){ + overrideSecurity = Boolean.parseBoolean(overrideSecurityProperties); + } else { + throw new IllegalArgumentException("Property, " + OVERRIDE_SECURITY + ", to specify whether to override security properties must either be a value boolean value (\"true\" or \"false\")" + + " or left to the default value of \"false\". It is set to \"" + overrideSecurityProperties + "\"."); + } + httpClientReference.set(null); final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder(); @@ -252,12 +271,31 @@ public void run() { } ByteBuffer bodyByteBuffer = ByteBuffer.wrap(body.bytes()); + ByteBuffer readOnlyNewConfig = null; - if (differentiator.isNew(bodyByteBuffer)) { - logger.debug("New change, notifying listener"); + // checking if some parts of the configuration must be preserved + if(overrideSecurity) { + readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer(); + } else { + logger.debug("Preserving previous security properties..."); + + // get the current security properties from the current configuration file + final File configFile = new File(properties.get().getProperty(RunMiNiFi.MINIFI_CONFIG_FILE_KEY)); + ConvertableSchema configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new FileInputStream(configFile)); + ConfigSchema currentSchema = configSchema.convert(); + SecurityPropertiesSchema secProps = currentSchema.getSecurityProperties(); - ByteBuffer readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer(); + // override the security properties in the pulled configuration with the previous properties + configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(bodyByteBuffer.duplicate())); + ConfigSchema newSchema = configSchema.convert(); + newSchema.setSecurityProperties(secProps); + + // return the updated configuration preserving the previous security configuration + readOnlyNewConfig = ByteBuffer.wrap(new Yaml().dump(newSchema.toMap()).getBytes()).asReadOnlyBuffer(); + } + if (differentiator.isNew(readOnlyNewConfig)) { + logger.debug("New change received, notifying listener"); configurationChangeNotifier.notifyListeners(readOnlyNewConfig); logger.debug("Listeners notified"); } else { diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java index 195cf60c8..470b380e9 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java @@ -75,6 +75,7 @@ public void pullHttpChangeIngestorInit(Properties properties) { port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort(); properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port)); properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost"); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestor = new PullHttpChangeIngestor(); diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java index f39fbdf0d..41f6f73c8 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java @@ -55,6 +55,7 @@ public void pullHttpChangeIngestorInit(Properties properties) { properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port)); properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost"); properties.put(PullHttpChangeIngestor.PULL_HTTP_POLLING_PERIOD_KEY, "30000"); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestor = new PullHttpChangeIngestor(); diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java index 0b338256e..d871ffd80 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java @@ -197,6 +197,10 @@ public SecurityPropertiesSchema getSecurityProperties() { return securityProperties; } + public void setSecurityProperties(SecurityPropertiesSchema securityProperties) { + this.securityProperties = securityProperties; + } + public ProcessGroupSchema getProcessGroupSchema() { return processGroupSchema; } From b37f43f30d34228c1d834f022e3f7d68b5b6736c Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Mon, 5 Feb 2018 21:09:09 +0100 Subject: [PATCH 2/2] added unit test --- .../ingestors/PullHttpChangeIngestorTest.java | 3 -- .../PullHttpChangeIngestorCommonTest.java | 52 ++++++++++++++++++- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java index 41f6f73c8..5bd218753 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java @@ -55,11 +55,8 @@ public void pullHttpChangeIngestorInit(Properties properties) { properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port)); properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost"); properties.put(PullHttpChangeIngestor.PULL_HTTP_POLLING_PERIOD_KEY, "30000"); - properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestor = new PullHttpChangeIngestor(); - - pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier); pullHttpChangeIngestor.setDifferentiator(mockDifferentiator); } diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java index 229e33d69..798e285fe 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java @@ -17,11 +17,18 @@ package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common; +import org.apache.nifi.minifi.bootstrap.RunMiNiFi; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier; import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult; import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator; import org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor; +import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream; +import org.apache.nifi.minifi.commons.schema.ConfigSchema; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; +import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; +import org.apache.nifi.util.file.FileUtils; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.AbstractHandler; @@ -30,12 +37,14 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.nio.ByteBuffer; @@ -44,6 +53,8 @@ import java.util.Properties; import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PATH_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -89,6 +100,7 @@ public static void shutdown() throws Exception { @Test public void testNewUpdate() throws IOException { Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setUseEtag(false); when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true); @@ -98,10 +110,31 @@ public void testNewUpdate() throws IOException { verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(configBuffer.asReadOnlyBuffer())); } + @Test + public void testSecurityOverride() throws IOException, SchemaLoaderException { + Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "false"); + properties.put(RunMiNiFi.MINIFI_CONFIG_FILE_KEY, "src/test/resources/config.yml"); + properties.put(PATH_KEY, "/config-minimal.yml"); + pullHttpChangeIngestorInit(properties); + when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true); + + pullHttpChangeIngestor.run(); + + ArgumentCaptor argument = ArgumentCaptor.forClass(ByteBuffer.class); + verify(testNotifier, Mockito.times(1)).notifyListeners(argument.capture()); + + ConvertableSchema configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(argument.getValue())); + ConfigSchema newSchema = configSchema.convert(); + + assertNotNull(newSchema.getSecurityProperties().getKeystore()); + assertEquals(newSchema.getProcessGroupSchema().getProcessors().size(), 2); + } @Test public void testNoUpdate() throws IOException { Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setUseEtag(false); when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false); @@ -114,6 +147,7 @@ public void testNoUpdate() throws IOException { @Test public void testUseEtag() throws IOException { Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setLastEtag(""); @@ -135,6 +169,7 @@ public void testUseEtag() throws IOException { public void testNewUpdateWithPath() throws IOException { Properties properties = new Properties(); properties.put(PATH_KEY, "/config.yml"); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setUseEtag(false); when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true); @@ -147,6 +182,7 @@ public void testNewUpdateWithPath() throws IOException { @Test public void testNoUpdateWithPath() throws IOException { Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); properties.put(PATH_KEY, "/config.yml"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setUseEtag(false); @@ -160,6 +196,7 @@ public void testNoUpdateWithPath() throws IOException { @Test public void testUseEtagWithPath() throws IOException { Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); properties.put(PATH_KEY, "/config.yml"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setLastEtag(""); @@ -187,7 +224,6 @@ public JettyHandler(String configResponse, String pathResponse){ this.pathResponse = pathResponse; } - @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { @@ -199,9 +235,10 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques if (QUOTED_ETAG.equals(baseRequest.getHeader("If-None-Match"))){ writeOutput(response, null, 304); } else { - if ("/config.yml".equals(baseRequest.getPathInfo())) { writeOutput(response, pathResponse, 200); + } else if ("/config-minimal.yml".equals(baseRequest.getPathInfo())) { + writeFileOutput(response, new File("src/test/resources/config-minimal.yml"), 200); } else { writeOutput(response, configResponse, 200); } @@ -227,5 +264,16 @@ private void writeOutput(HttpServletResponse response, String responseBuffer, in } } + private void writeFileOutput(HttpServletResponse response, File file, int responseCode) throws IOException { + response.setStatus(responseCode); + response.setHeader("ETag", ETAG); + if (file != null) { + response.setContentType("text/plain"); + response.setContentLength((int) file.length()); + response.setCharacterEncoding(StandardCharsets.UTF_8.displayName()); + FileUtils.copyFile(file, response.getOutputStream(), true, true); + } + } + } }