Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@
<allow pkg="kafka.zk" />
<allow pkg="kafka.utils" />
<allow class="javax.servlet.http.HttpServletResponse" />
<allow class="javax.ws.rs.core.Response" />
</subpackage>
</subpackage>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class ErrorHandlingIntegrationTest {
private static final int NUM_RECORDS_PRODUCED = 20;
private static final int EXPECTED_CORRECT_RECORDS = 19;
private static final int EXPECTED_INCORRECT_RECORDS = 1;
private static final int NUM_TASKS = 1;
private static final int CONNECTOR_SETUP_DURATION_MS = 5000;
private static final int CONSUME_MAX_DURATION_MS = 5000;

Expand Down Expand Up @@ -105,7 +107,7 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception {
// setup connector config
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink");
props.put(TASKS_MAX_CONFIG, "1");
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPICS_CONFIG, "test-topic");
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
Expand All @@ -132,8 +134,7 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception {

connect.configureConnector(CONNECTOR_NAME, props);

waitForCondition(() -> connect.connectorStatus(CONNECTOR_NAME).tasks().size() == 1
&& connectorHandle.taskHandle(TASK_ID).partitionsAssigned() == 1,
waitForCondition(this::checkForPartitionAssignment,
CONNECTOR_SETUP_DURATION_MS,
"Connector task was not assigned a partition.");

Expand Down Expand Up @@ -172,6 +173,26 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception {
connect.deleteConnector(CONNECTOR_NAME);
}

/**
* Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from a
* {@link org.apache.kafka.test.TestUtils#waitForCondition} that will throw an error if this method continued
* to return false after the specified duration has elapsed.
*
* @return true if each task was assigned a partition each, false if this was not true or an error occurred when
* executing this operation.
*/
private boolean checkForPartitionAssignment() {
try {
ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME);
return info != null && info.tasks().size() == NUM_TASKS
&& connectorHandle.taskHandle(TASK_ID).partitionsAssigned() == 1;
} catch (Exception e) {
// Log the exception and return that the partitions were not assigned
log.error("Could not check connector state info.", e);
return false;
}
}

private void assertValue(String expected, Headers headers, String headerKey) {
byte[] actual = headers.lastHeader(headerKey).value();
if (expected == null && actual == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.kafka.connect.integration;

import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
Expand All @@ -47,10 +50,13 @@
@Category(IntegrationTest.class)
public class ExampleConnectIntegrationTest {

private static final Logger log = LoggerFactory.getLogger(ExampleConnectIntegrationTest.class);

private static final int NUM_RECORDS_PRODUCED = 2000;
private static final int NUM_TOPIC_PARTITIONS = 3;
private static final int CONSUME_MAX_DURATION_MS = 5000;
private static final int CONNECTOR_SETUP_DURATION_MS = 15000;
private static final int NUM_TASKS = 3;
private static final String CONNECTOR_NAME = "simple-conn";

private EmbeddedConnectCluster connect;
Expand Down Expand Up @@ -103,7 +109,7 @@ public void testProduceConsumeConnector() throws Exception {
// setup up props for the sink connector
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink");
props.put(TASKS_MAX_CONFIG, "3");
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPICS_CONFIG, "test-topic");
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
Expand All @@ -114,8 +120,7 @@ public void testProduceConsumeConnector() throws Exception {
// start a sink connector
connect.configureConnector(CONNECTOR_NAME, props);

waitForCondition(() -> connect.connectorStatus(CONNECTOR_NAME).tasks().size() == 3
&& connectorHandle.tasks().stream().allMatch(th -> th.partitionsAssigned() == 1),
waitForCondition(this::checkForPartitionAssignment,
CONNECTOR_SETUP_DURATION_MS,
"Connector tasks were not assigned a partition each.");

Expand All @@ -134,4 +139,24 @@ public void testProduceConsumeConnector() throws Exception {
// delete connector
connect.deleteConnector(CONNECTOR_NAME);
}

/**
* Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from a
* {@link org.apache.kafka.test.TestUtils#waitForCondition} that will throw an error if this method continued
* to return false after the specified duration has elapsed.
*
* @return true if each task was assigned a partition each, false if this was not true or an error occurred when
* executing this operation.
*/
private boolean checkForPartitionAssignment() {
try {
ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME);
return info != null && info.tasks().size() == NUM_TASKS
&& connectorHandle.tasks().stream().allMatch(th -> th.partitionsAssigned() == 1);
} catch (Exception e) {
// Log the exception and return that the partitions were not assigned
log.error("Could not check connector state info.", e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;

import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
Expand Down Expand Up @@ -169,6 +170,14 @@ public void deleteConnector(String connName) throws IOException {
}
}

/**
* Get the status for a connector running in this cluster.
*
* @param connectorName name of the connector
* @return an instance of {@link ConnectorStateInfo} populated with state informaton of the connector and it's tasks.
* @throws ConnectRestException if the HTTP request to the REST API failed with a valid status code.
* @throws ConnectException for any other error.
*/
public ConnectorStateInfo connectorStatus(String connectorName) {
ObjectMapper mapper = new ObjectMapper();
String url = endpointForResource(String.format("connectors/%s/status", connectorName));
Expand Down Expand Up @@ -215,6 +224,14 @@ public int executePut(String url, String body) throws IOException {
return httpCon.getResponseCode();
}

/**
* Execute a GET request on the given URL.
*
* @param url the HTTP endpoint
* @return response body encoded as a String
* @throws ConnectRestException if the HTTP request fails with a valid status code
* @throws IOException for any other I/O error.
*/
public String executeGet(String url) throws IOException {
log.debug("Executing GET request to URL={}.", url);
HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
Expand All @@ -228,6 +245,13 @@ public String executeGet(String url) throws IOException {
}
log.debug("Get response for URL={} is {}", url, response);
return response.toString();
} catch (IOException e) {
Response.Status status = Response.Status.fromStatusCode(httpCon.getResponseCode());
if (status != null) {
throw new ConnectRestException(status, "Invalid endpoint: " + url, e);
}
// invalid response code, re-throw the IOException.
throw e;
}
}

Expand Down