From bc26cc90c53160208dbab5fea04fef59620efac3 Mon Sep 17 00:00:00 2001 From: Arjun Satish Date: Sat, 19 Jan 2019 12:47:10 -0800 Subject: [PATCH 1/5] MINOR: Handle case where connector status endpoints returns 404 Signed-off-by: Arjun Satish --- .../ErrorHandlingIntegrationTest.java | 13 ++++++++++--- .../ExampleConnectIntegrationTest.java | 13 ++++++++++--- .../util/clusters/EmbeddedConnectCluster.java | 17 +++++++++++++++++ 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index af3ab4421a363..15ce6a5d9f018 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; @@ -73,6 +74,7 @@ public class ErrorHandlingIntegrationTest { private static final int NUM_RECORDS_PRODUCED = 20; private static final int EXPECTED_CORRECT_RECORDS = 19; private static final int EXPECTED_INCORRECT_RECORDS = 1; + private static final int NUM_TASKS = 1; private static final int CONNECTOR_SETUP_DURATION_MS = 5000; private static final int CONSUME_MAX_DURATION_MS = 5000; @@ -105,7 +107,7 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception { // setup connector config Map props = new HashMap<>(); props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink"); - props.put(TASKS_MAX_CONFIG, "1"); + props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS)); props.put(TOPICS_CONFIG, "test-topic"); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); @@ -132,8 +134,7 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception { connect.configureConnector(CONNECTOR_NAME, props); - waitForCondition(() -> connect.connectorStatus(CONNECTOR_NAME).tasks().size() == 1 - && connectorHandle.taskHandle(TASK_ID).partitionsAssigned() == 1, + waitForCondition(this::partitionsAssigned, CONNECTOR_SETUP_DURATION_MS, "Connector task was not assigned a partition."); @@ -172,6 +173,12 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception { connect.deleteConnector(CONNECTOR_NAME); } + private boolean partitionsAssigned() { + ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME); + return info != null && info.tasks().size() == NUM_TASKS + && connectorHandle.taskHandle(TASK_ID).partitionsAssigned() == 1; + } + private void assertValue(String expected, Headers headers, String headerKey) { byte[] actual = headers.lastHeader(headerKey).value(); if (expected == null && actual == null) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java index 5d887cf4cbf89..846606e95e457 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.integration; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.test.IntegrationTest; @@ -51,6 +52,7 @@ public class ExampleConnectIntegrationTest { private static final int NUM_TOPIC_PARTITIONS = 3; private static final int CONSUME_MAX_DURATION_MS = 5000; private static final int CONNECTOR_SETUP_DURATION_MS = 15000; + private static final int NUM_TASKS = 3; private static final String CONNECTOR_NAME = "simple-conn"; private EmbeddedConnectCluster connect; @@ -103,7 +105,7 @@ public void testProduceConsumeConnector() throws Exception { // setup up props for the sink connector Map props = new HashMap<>(); props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink"); - props.put(TASKS_MAX_CONFIG, "3"); + props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS)); props.put(TOPICS_CONFIG, "test-topic"); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); @@ -114,8 +116,7 @@ public void testProduceConsumeConnector() throws Exception { // start a sink connector connect.configureConnector(CONNECTOR_NAME, props); - waitForCondition(() -> connect.connectorStatus(CONNECTOR_NAME).tasks().size() == 3 - && connectorHandle.tasks().stream().allMatch(th -> th.partitionsAssigned() == 1), + waitForCondition(this::partitionsAssigned, CONNECTOR_SETUP_DURATION_MS, "Connector tasks were not assigned a partition each."); @@ -134,4 +135,10 @@ public void testProduceConsumeConnector() throws Exception { // delete connector connect.deleteConnector(CONNECTOR_NAME); } + + private boolean partitionsAssigned() { + ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME); + return info != null && info.tasks().size() == NUM_TASKS + && connectorHandle.tasks().stream().allMatch(th -> th.partitionsAssigned() == 1); + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index 9ba0e06bfd013..9e7dcd13396b4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.cli.ConnectDistributed; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.Connect; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; @@ -174,6 +175,9 @@ public ConnectorStateInfo connectorStatus(String connectorName) { String url = endpointForResource(String.format("connectors/%s/status", connectorName)); try { return mapper.readerFor(ConnectorStateInfo.class).readValue(executeGet(url)); + } catch (NotFoundException e) { + // the connector doesn't exist in the cluster yet + return null; } catch (IOException e) { log.error("Could not read connector state", e); throw new ConnectException("Could not read connector state", e); @@ -215,6 +219,14 @@ public int executePut(String url, String body) throws IOException { return httpCon.getResponseCode(); } + /** + * Execute a GET request on the given URL. + * + * @param url the HTTP endpoint + * @return response body encoded as a String + * @throws NotFoundException if the HTTP request returns a 404 + * @throws IOException for any other I/O error. + */ public String executeGet(String url) throws IOException { log.debug("Executing GET request to URL={}.", url); HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection(); @@ -228,6 +240,11 @@ public String executeGet(String url) throws IOException { } log.debug("Get response for URL={} is {}", url, response); return response.toString(); + } catch (IOException e) { + if (httpCon.getResponseCode() == 404) { + throw new NotFoundException("Invalid endpoint: " + url, e); + } + throw e; } } From 2a0ea83e31f753bf552f49e153a354408896c0eb Mon Sep 17 00:00:00 2001 From: Arjun Satish Date: Sat, 19 Jan 2019 13:29:01 -0800 Subject: [PATCH 2/5] MINOR: Replace NotFoundException with ConnectRestException Signed-off-by: Arjun Satish --- checkstyle/import-control.xml | 1 + .../util/clusters/EmbeddedConnectCluster.java | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 8c98f8d447865..a0bf7400407da 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -370,6 +370,7 @@ + diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index 9e7dcd13396b4..b724fe954e692 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.cli.ConnectDistributed; import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.Connect; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; @@ -27,6 +26,7 @@ import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; @@ -175,9 +175,14 @@ public ConnectorStateInfo connectorStatus(String connectorName) { String url = endpointForResource(String.format("connectors/%s/status", connectorName)); try { return mapper.readerFor(ConnectorStateInfo.class).readValue(executeGet(url)); - } catch (NotFoundException e) { + } catch (ConnectRestException e) { // the connector doesn't exist in the cluster yet - return null; + if (e.statusCode() == Response.Status.NOT_FOUND.getStatusCode()) { + log.warn("Could not find connector '{}' in cluster.", connectorName); + return null; + } else { + throw e; + } } catch (IOException e) { log.error("Could not read connector state", e); throw new ConnectException("Could not read connector state", e); @@ -224,7 +229,7 @@ public int executePut(String url, String body) throws IOException { * * @param url the HTTP endpoint * @return response body encoded as a String - * @throws NotFoundException if the HTTP request returns a 404 + * @throws ConnectRestException if the HTTP request fails with a valid status code * @throws IOException for any other I/O error. */ public String executeGet(String url) throws IOException { @@ -241,10 +246,12 @@ public String executeGet(String url) throws IOException { log.debug("Get response for URL={} is {}", url, response); return response.toString(); } catch (IOException e) { - if (httpCon.getResponseCode() == 404) { - throw new NotFoundException("Invalid endpoint: " + url, e); + Response.Status status = Response.Status.fromStatusCode(httpCon.getResponseCode()); + if (status != null) { + throw new ConnectRestException(status, "Invalid endpoint: " + url, e); + } else { + throw e; } - throw e; } } From 1861d24cf65c6b69ea7b262faf857cb901ccced3 Mon Sep 17 00:00:00 2001 From: Arjun Satish Date: Sat, 19 Jan 2019 15:28:58 -0800 Subject: [PATCH 3/5] MINOR: Address reviewer comments Signed-off-by: Arjun Satish --- .../ErrorHandlingIntegrationTest.java | 11 +++++++--- .../ExampleConnectIntegrationTest.java | 15 ++++++++++--- .../util/clusters/EmbeddedConnectCluster.java | 21 ++++++++++--------- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 15ce6a5d9f018..56332eb345d69 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -174,9 +174,14 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception { } private boolean partitionsAssigned() { - ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME); - return info != null && info.tasks().size() == NUM_TASKS - && connectorHandle.taskHandle(TASK_ID).partitionsAssigned() == 1; + try { + ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME); + return info != null && info.tasks().size() == NUM_TASKS + && connectorHandle.taskHandle(TASK_ID).partitionsAssigned() == 1; + } catch (Exception e) { + log.error("Could not check connector state info. Swallowing exception.", e); + return false; + } } private void assertValue(String expected, Headers headers, String headerKey) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java index 846606e95e457..3c42d160f46d4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java @@ -24,6 +24,8 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; @@ -48,6 +50,8 @@ @Category(IntegrationTest.class) public class ExampleConnectIntegrationTest { + private static final Logger log = LoggerFactory.getLogger(ExampleConnectIntegrationTest.class); + private static final int NUM_RECORDS_PRODUCED = 2000; private static final int NUM_TOPIC_PARTITIONS = 3; private static final int CONSUME_MAX_DURATION_MS = 5000; @@ -137,8 +141,13 @@ public void testProduceConsumeConnector() throws Exception { } private boolean partitionsAssigned() { - ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME); - return info != null && info.tasks().size() == NUM_TASKS - && connectorHandle.tasks().stream().allMatch(th -> th.partitionsAssigned() == 1); + try { + ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME); + return info != null && info.tasks().size() == NUM_TASKS + && connectorHandle.tasks().stream().allMatch(th -> th.partitionsAssigned() == 1); + } catch (Exception e) { + log.error("Could not check connector state info. Swallowing exception.", e); + return false; + } } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index b724fe954e692..ea8ac13e99333 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -170,19 +170,19 @@ public void deleteConnector(String connName) throws IOException { } } + /** + * Get the status for a connector running in this cluster. + * + * @param connectorName name of the connector + * @return an instance of {@link ConnectorStateInfo} populated with state informaton of the connector and it's tasks. + * @throws ConnectRestException if the HTTP request to the REST API failed with a valid status code. + * @throws ConnectException for any other error. + */ public ConnectorStateInfo connectorStatus(String connectorName) { ObjectMapper mapper = new ObjectMapper(); String url = endpointForResource(String.format("connectors/%s/status", connectorName)); try { return mapper.readerFor(ConnectorStateInfo.class).readValue(executeGet(url)); - } catch (ConnectRestException e) { - // the connector doesn't exist in the cluster yet - if (e.statusCode() == Response.Status.NOT_FOUND.getStatusCode()) { - log.warn("Could not find connector '{}' in cluster.", connectorName); - return null; - } else { - throw e; - } } catch (IOException e) { log.error("Could not read connector state", e); throw new ConnectException("Could not read connector state", e); @@ -249,9 +249,10 @@ public String executeGet(String url) throws IOException { Response.Status status = Response.Status.fromStatusCode(httpCon.getResponseCode()); if (status != null) { throw new ConnectRestException(status, "Invalid endpoint: " + url, e); - } else { - throw e; } + + // invalid response code, re-throw the IOException. + throw e; } } From f700c80e8a572bbef863caf130f6d8a7a3e5773b Mon Sep 17 00:00:00 2001 From: Arjun Satish Date: Sat, 19 Jan 2019 15:51:49 -0800 Subject: [PATCH 4/5] MINOR: Add javadoc for checkForPartitionAssignment explaining why it swallows exceptions Signed-off-by: Arjun Satish --- .../integration/ErrorHandlingIntegrationTest.java | 15 ++++++++++++--- .../ExampleConnectIntegrationTest.java | 15 ++++++++++++--- .../util/clusters/EmbeddedConnectCluster.java | 1 - 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 56332eb345d69..7805337e024f5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -134,7 +134,7 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception { connect.configureConnector(CONNECTOR_NAME, props); - waitForCondition(this::partitionsAssigned, + waitForCondition(this::checkForPartitionAssignment, CONNECTOR_SETUP_DURATION_MS, "Connector task was not assigned a partition."); @@ -173,13 +173,22 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception { connect.deleteConnector(CONNECTOR_NAME); } - private boolean partitionsAssigned() { + /** + * Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from from a + * {@link org.apache.kafka.test.TestUtils#waitForCondition} in this test that throws errors if this + * method continues to return false after the specified duration has elapsed. + * + * @return true if each task was assigned a partition each, false if this was not true or an error occurred when + * executing this operation. + */ + private boolean checkForPartitionAssignment() { try { ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME); return info != null && info.tasks().size() == NUM_TASKS && connectorHandle.taskHandle(TASK_ID).partitionsAssigned() == 1; } catch (Exception e) { - log.error("Could not check connector state info. Swallowing exception.", e); + // Log the exception and return that the partitions were not assigned + log.error("Could not check connector state info.", e); return false; } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java index 3c42d160f46d4..85bf9074dd2be 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java @@ -120,7 +120,7 @@ public void testProduceConsumeConnector() throws Exception { // start a sink connector connect.configureConnector(CONNECTOR_NAME, props); - waitForCondition(this::partitionsAssigned, + waitForCondition(this::checkForPartitionAssignment, CONNECTOR_SETUP_DURATION_MS, "Connector tasks were not assigned a partition each."); @@ -140,13 +140,22 @@ public void testProduceConsumeConnector() throws Exception { connect.deleteConnector(CONNECTOR_NAME); } - private boolean partitionsAssigned() { + /** + * Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from from a + * {@link org.apache.kafka.test.TestUtils#waitForCondition} in this test that throws errors if this + * method continues to return false after the specified duration has elapsed. + * + * @return true if each task was assigned a partition each, false if this was not true or an error occurred when + * executing this operation. + */ + private boolean checkForPartitionAssignment() { try { ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME); return info != null && info.tasks().size() == NUM_TASKS && connectorHandle.tasks().stream().allMatch(th -> th.partitionsAssigned() == 1); } catch (Exception e) { - log.error("Could not check connector state info. Swallowing exception.", e); + // Log the exception and return that the partitions were not assigned + log.error("Could not check connector state info.", e); return false; } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index ea8ac13e99333..b660a1dfa5b94 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -250,7 +250,6 @@ public String executeGet(String url) throws IOException { if (status != null) { throw new ConnectRestException(status, "Invalid endpoint: " + url, e); } - // invalid response code, re-throw the IOException. throw e; } From fffef42403f757f480d9e3eaf4f7d4f43895d3ae Mon Sep 17 00:00:00 2001 From: Arjun Satish Date: Sun, 20 Jan 2019 02:05:08 -0800 Subject: [PATCH 5/5] MINOR: Reword javadoc Signed-off-by: Arjun Satish --- .../connect/integration/ErrorHandlingIntegrationTest.java | 6 +++--- .../connect/integration/ExampleConnectIntegrationTest.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 7805337e024f5..5f7cfc93082ea 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -174,9 +174,9 @@ public void testSkipRetryAndDLQWithHeaders() throws Exception { } /** - * Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from from a - * {@link org.apache.kafka.test.TestUtils#waitForCondition} in this test that throws errors if this - * method continues to return false after the specified duration has elapsed. + * Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from a + * {@link org.apache.kafka.test.TestUtils#waitForCondition} that will throw an error if this method continued + * to return false after the specified duration has elapsed. * * @return true if each task was assigned a partition each, false if this was not true or an error occurred when * executing this operation. diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java index 85bf9074dd2be..0648e9ff59ac3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java @@ -141,9 +141,9 @@ public void testProduceConsumeConnector() throws Exception { } /** - * Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from from a - * {@link org.apache.kafka.test.TestUtils#waitForCondition} in this test that throws errors if this - * method continues to return false after the specified duration has elapsed. + * Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from a + * {@link org.apache.kafka.test.TestUtils#waitForCondition} that will throw an error if this method continued + * to return false after the specified duration has elapsed. * * @return true if each task was assigned a partition each, false if this was not true or an error occurred when * executing this operation.