Skip to content
This repository has been archived by the owner on May 14, 2021. It is now read-only.

Commit

Permalink
MINIFI-434 - PullHttpChangeIngestor should preserve security properties
Browse files Browse the repository at this point in the history
This closes #114.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
Signed-off-by: Aldrin Piri <aldrin@apache.org>
  • Loading branch information
pvillard31 authored and apiri committed Feb 8, 2018
1 parent 3125b00 commit 270a308
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 10 deletions.
Expand Up @@ -27,23 +27,23 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


import java.util.concurrent.atomic.AtomicReference;

public abstract class AbstractPullChangeIngestor implements Runnable, ChangeIngestor {


// 5 minute default pulling period
protected static final String DEFAULT_POLLING_PERIOD = "300000";
protected static Logger logger;

protected final AtomicInteger pollingPeriodMS = new AtomicInteger();
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
protected volatile ConfigurationChangeNotifier configurationChangeNotifier;
protected final AtomicReference<Properties> properties = new AtomicReference<>();

@Override
public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
this.configurationChangeNotifier = configurationChangeNotifier;
this.properties.set(properties);
}

@Override
Expand All @@ -56,5 +56,6 @@ public void close() throws IOException {
scheduledThreadPoolExecutor.shutdownNow();
}

@Override
public abstract void run();
}
Expand Up @@ -25,18 +25,27 @@
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
import org.apache.nifi.minifi.commons.schema.common.StringUtil;
import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -91,6 +100,7 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
public static final String READ_TIMEOUT_KEY = PULL_HTTP_BASE_KEY + ".read.timeout.ms";
public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY + ".differentiator";
public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY + ".override.security";

private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
private final AtomicReference<Integer> portReference = new AtomicReference<>();
Expand All @@ -101,6 +111,7 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
private volatile String connectionScheme;
private volatile String lastEtag = "";
private volatile boolean useEtag = false;
private volatile boolean overrideSecurity = false;

public PullHttpChangeIngestor() {
logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
Expand Down Expand Up @@ -144,6 +155,14 @@ public void initialize(Properties properties, ConfigurationFileHolder configurat
"the default value of \"false\". It is set to \"" + useEtagString + "\".");
}

final String overrideSecurityProperties = (String) properties.getOrDefault(OVERRIDE_SECURITY, "false");
if ("true".equalsIgnoreCase(overrideSecurityProperties) || "false".equalsIgnoreCase(overrideSecurityProperties)){
overrideSecurity = Boolean.parseBoolean(overrideSecurityProperties);
} else {
throw new IllegalArgumentException("Property, " + OVERRIDE_SECURITY + ", to specify whether to override security properties must either be a value boolean value (\"true\" or \"false\")" +
" or left to the default value of \"false\". It is set to \"" + overrideSecurityProperties + "\".");
}

httpClientReference.set(null);

final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
Expand Down Expand Up @@ -252,12 +271,31 @@ public void run() {
}

ByteBuffer bodyByteBuffer = ByteBuffer.wrap(body.bytes());
ByteBuffer readOnlyNewConfig = null;

if (differentiator.isNew(bodyByteBuffer)) {
logger.debug("New change, notifying listener");
// checking if some parts of the configuration must be preserved
if(overrideSecurity) {
readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer();
} else {
logger.debug("Preserving previous security properties...");

// get the current security properties from the current configuration file
final File configFile = new File(properties.get().getProperty(RunMiNiFi.MINIFI_CONFIG_FILE_KEY));
ConvertableSchema<ConfigSchema> configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new FileInputStream(configFile));
ConfigSchema currentSchema = configSchema.convert();
SecurityPropertiesSchema secProps = currentSchema.getSecurityProperties();

ByteBuffer readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer();
// override the security properties in the pulled configuration with the previous properties
configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(bodyByteBuffer.duplicate()));
ConfigSchema newSchema = configSchema.convert();
newSchema.setSecurityProperties(secProps);

// return the updated configuration preserving the previous security configuration
readOnlyNewConfig = ByteBuffer.wrap(new Yaml().dump(newSchema.toMap()).getBytes()).asReadOnlyBuffer();
}

if (differentiator.isNew(readOnlyNewConfig)) {
logger.debug("New change received, notifying listener");
configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
logger.debug("Listeners notified");
} else {
Expand Down
Expand Up @@ -75,6 +75,7 @@ public void pullHttpChangeIngestorInit(Properties properties) {
port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");

pullHttpChangeIngestor = new PullHttpChangeIngestor();

Expand Down
Expand Up @@ -57,8 +57,6 @@ public void pullHttpChangeIngestorInit(Properties properties) {
properties.put(PullHttpChangeIngestor.PULL_HTTP_POLLING_PERIOD_KEY, "30000");

pullHttpChangeIngestor = new PullHttpChangeIngestor();


pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
}
Expand Down
Expand Up @@ -17,11 +17,18 @@

package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;

import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor;
import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
import org.apache.nifi.util.file.FileUtils;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
Expand All @@ -30,12 +37,14 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
Expand All @@ -44,6 +53,8 @@
import java.util.Properties;

import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PATH_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -89,6 +100,7 @@ public static void shutdown() throws Exception {
@Test
public void testNewUpdate() throws IOException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
pullHttpChangeIngestorInit(properties);
pullHttpChangeIngestor.setUseEtag(false);
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
Expand All @@ -98,10 +110,31 @@ public void testNewUpdate() throws IOException {
verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(configBuffer.asReadOnlyBuffer()));
}

@Test
public void testSecurityOverride() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "false");
properties.put(RunMiNiFi.MINIFI_CONFIG_FILE_KEY, "src/test/resources/config.yml");
properties.put(PATH_KEY, "/config-minimal.yml");
pullHttpChangeIngestorInit(properties);
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);

pullHttpChangeIngestor.run();

ArgumentCaptor<ByteBuffer> argument = ArgumentCaptor.forClass(ByteBuffer.class);
verify(testNotifier, Mockito.times(1)).notifyListeners(argument.capture());

ConvertableSchema<ConfigSchema> configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(argument.getValue()));
ConfigSchema newSchema = configSchema.convert();

assertNotNull(newSchema.getSecurityProperties().getKeystore());
assertEquals(newSchema.getProcessGroupSchema().getProcessors().size(), 2);
}

@Test
public void testNoUpdate() throws IOException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
pullHttpChangeIngestorInit(properties);
pullHttpChangeIngestor.setUseEtag(false);
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
Expand All @@ -114,6 +147,7 @@ public void testNoUpdate() throws IOException {
@Test
public void testUseEtag() throws IOException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
pullHttpChangeIngestorInit(properties);
pullHttpChangeIngestor.setLastEtag("");

Expand All @@ -135,6 +169,7 @@ public void testUseEtag() throws IOException {
public void testNewUpdateWithPath() throws IOException {
Properties properties = new Properties();
properties.put(PATH_KEY, "/config.yml");
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
pullHttpChangeIngestorInit(properties);
pullHttpChangeIngestor.setUseEtag(false);
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
Expand All @@ -147,6 +182,7 @@ public void testNewUpdateWithPath() throws IOException {
@Test
public void testNoUpdateWithPath() throws IOException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
properties.put(PATH_KEY, "/config.yml");
pullHttpChangeIngestorInit(properties);
pullHttpChangeIngestor.setUseEtag(false);
Expand All @@ -160,6 +196,7 @@ public void testNoUpdateWithPath() throws IOException {
@Test
public void testUseEtagWithPath() throws IOException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
properties.put(PATH_KEY, "/config.yml");
pullHttpChangeIngestorInit(properties);
pullHttpChangeIngestor.setLastEtag("");
Expand Down Expand Up @@ -187,7 +224,6 @@ public JettyHandler(String configResponse, String pathResponse){
this.pathResponse = pathResponse;
}


@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
Expand All @@ -199,9 +235,10 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
if (QUOTED_ETAG.equals(baseRequest.getHeader("If-None-Match"))){
writeOutput(response, null, 304);
} else {

if ("/config.yml".equals(baseRequest.getPathInfo())) {
writeOutput(response, pathResponse, 200);
} else if ("/config-minimal.yml".equals(baseRequest.getPathInfo())) {
writeFileOutput(response, new File("src/test/resources/config-minimal.yml"), 200);
} else {
writeOutput(response, configResponse, 200);
}
Expand All @@ -227,5 +264,16 @@ private void writeOutput(HttpServletResponse response, String responseBuffer, in
}
}

private void writeFileOutput(HttpServletResponse response, File file, int responseCode) throws IOException {
response.setStatus(responseCode);
response.setHeader("ETag", ETAG);
if (file != null) {
response.setContentType("text/plain");
response.setContentLength((int) file.length());
response.setCharacterEncoding(StandardCharsets.UTF_8.displayName());
FileUtils.copyFile(file, response.getOutputStream(), true, true);
}
}

}
}
Expand Up @@ -197,6 +197,10 @@ public SecurityPropertiesSchema getSecurityProperties() {
return securityProperties;
}

public void setSecurityProperties(SecurityPropertiesSchema securityProperties) {
this.securityProperties = securityProperties;
}

public ProcessGroupSchema getProcessGroupSchema() {
return processGroupSchema;
}
Expand Down

0 comments on commit 270a308

Please sign in to comment.