From 9388198b492656265bcff494401af73779fe446a Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 28 Apr 2016 16:54:15 -0700 Subject: [PATCH 1/3] Allow specifying allowed cross origin methods --- .../apache/kafka/connect/runtime/WorkerConfig.java | 10 +++++++++- .../kafka/connect/runtime/rest/RestServer.java | 14 ++++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 471e4a57975a5..d2383a3591ee2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -109,6 +109,11 @@ public class WorkerConfig extends AbstractConfig { " from the domain of the REST API."; protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = ""; + public static final String ACCESS_CONTROL_ALLOW_METHODS_CONFIG = "access.control.allow.methods"; + protected static final String ACCESS_CONTROL_ALLOW_METHODS_DOC = + "Set value to Jetty Access-Control-Allow-Origin header for the specified methods. " + + "The default value of the Access-Control-Allow-Origin header allows cross origin requests for GET, POST and HEAD methods."; + protected static final String ACCESS_CONTROL_ALLOW_METHODS_DEFAULT = ""; /** * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to @@ -141,7 +146,10 @@ protected static ConfigDef baseConfigDef() { .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC) .define(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, Type.STRING, ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, Importance.LOW, - ACCESS_CONTROL_ALLOW_ORIGIN_DOC); + ACCESS_CONTROL_ALLOW_ORIGIN_DOC) + .define(ACCESS_CONTROL_ALLOW_METHODS_CONFIG, Type.STRING, + ACCESS_CONTROL_ALLOW_METHODS_DEFAULT, Importance.LOW, + ACCESS_CONTROL_ALLOW_METHODS_DOC); } public WorkerConfig(ConfigDef definition, Map props) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index 3475e1c5683c6..a878fb0eab810 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -47,9 +48,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.DispatcherType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -60,6 +58,10 @@ import java.util.List; import java.util.Map; +import javax.servlet.DispatcherType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; + /** * Embedded server for the REST API that provides the control plane for Kafka Connect workers. */ @@ -115,7 +117,11 @@ public void start(Herder herder) { if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) { FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter()); filterHolder.setName("cross-origin"); - filterHolder.setInitParameter("allowedOrigins", allowedOrigins); + filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins); + String allowedMethods = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG); + if (allowedMethods != null && !allowedOrigins.trim().isEmpty()) { + filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, allowedMethods); + } context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST)); } From 9ab79910162708713c60d42e6a09e5ed6260dd6d Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 28 Apr 2016 18:22:37 -0700 Subject: [PATCH 2/3] Add unit test for CORS allow methods --- .../connect/runtime/rest/RestServerTest.java | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index 8e9d52b4ca431..8ef24f60e144d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -31,11 +31,6 @@ import org.powermock.api.easymock.annotation.MockStrict; import org.powermock.modules.junit4.PowerMockRunner; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Invocation; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Response; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; @@ -43,6 +38,12 @@ import java.util.HashMap; import java.util.Map; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; + import static org.junit.Assert.assertEquals; @RunWith(PowerMockRunner.class) @@ -71,15 +72,15 @@ private Map baseWorkerProps() { @Test public void testCORSEnabled() { - checkCORSRequest("*", "http://bar.com", "http://bar.com"); + checkCORSRequest("*", "http://bar.com", "http://bar.com", "PUT"); } @Test public void testCORSDisabled() { - checkCORSRequest("", "http://bar.com", null); + checkCORSRequest("", "http://bar.com", null, null); } - public void checkCORSRequest(String corsDomain, String origin, String expectedHeader) { + public void checkCORSRequest(String corsDomain, String origin, String expectedHeader, String method) { // To be able to set the Origin, we need to toggle this flag System.setProperty("sun.net.http.allowRestrictedHeaders", "true"); @@ -92,10 +93,12 @@ public Object answer() throws Throwable { return null; } }); + PowerMock.replayAll(); Map workerProps = baseWorkerProps(); workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain); + workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method); WorkerConfig workerConfig = new StandaloneConfig(workerProps); server = new RestServer(workerConfig); server.start(herder); @@ -107,6 +110,14 @@ public Object answer() throws Throwable { assertEquals(200, response.getStatus()); assertEquals(expectedHeader, response.getHeaderString("Access-Control-Allow-Origin")); + + response = request("/connector-plugins/FileStreamSource/validate") + .header("Referer", origin + "/page") + .header("Origin", origin) + .header("Access-Control-Request-Method", method) + .options(); + assertEquals(404, response.getStatus()); + assertEquals(method, response.getHeaderString("Access-Control-Allow-Methods")); PowerMock.verifyAll(); } From b5d28ffbdcd1df2af09caa65931416cd4d86ca22 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 29 Apr 2016 10:23:05 -0700 Subject: [PATCH 3/3] Improve unit test and documentation --- .../java/org/apache/kafka/connect/runtime/WorkerConfig.java | 4 ++-- .../org/apache/kafka/connect/runtime/rest/RestServerTest.java | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index d2383a3591ee2..7ede1307e12a6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -111,8 +111,8 @@ public class WorkerConfig extends AbstractConfig { public static final String ACCESS_CONTROL_ALLOW_METHODS_CONFIG = "access.control.allow.methods"; protected static final String ACCESS_CONTROL_ALLOW_METHODS_DOC = - "Set value to Jetty Access-Control-Allow-Origin header for the specified methods. " - + "The default value of the Access-Control-Allow-Origin header allows cross origin requests for GET, POST and HEAD methods."; + "Sets the methods supported for cross origin requests by setting the Access-Control-Allow-Methods header. " + + "The default value of the Access-Control-Allow-Methods header allows cross origin requests for GET, POST and HEAD."; protected static final String ACCESS_CONTROL_ALLOW_METHODS_DEFAULT = ""; /** diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index 8ef24f60e144d..64d5b5efc072c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -117,6 +117,7 @@ public Object answer() throws Throwable { .header("Access-Control-Request-Method", method) .options(); assertEquals(404, response.getStatus()); + assertEquals(expectedHeader, response.getHeaderString("Access-Control-Allow-Origin")); assertEquals(method, response.getHeaderString("Access-Control-Allow-Methods")); PowerMock.verifyAll(); }