Skip to content

Commit

Permalink
[FLINK-6711] Activate strict checkstyle for flink-elasticsearch*
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed May 27, 2017
1 parent 88189f2 commit c20b396
Show file tree
Hide file tree
Showing 29 changed files with 110 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
* {@link ActionRequest ActionRequests} should be handled, e.g. dropping them, reprocessing malformed documents, or
* simply requesting them to be sent to Elasticsearch again if the failure is only temporary.
*
* <p>
* Example:
* <p>Example:
*
* <pre>{@code
*
Expand All @@ -50,12 +49,10 @@
*
* }</pre>
*
* <p>
* The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests
* <p>The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests
* with malformed documents, without failing the sink. For all other failures, the sink will fail.
*
* <p>
* Note: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type
* <p>Note: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type
* could not be retrieved through the older version Java client APIs (thus, the types will be general {@link Exception}s
* and only differ in the failure message). In this case, it is recommended to match on the provided REST status code.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.client.Client;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Map;

Expand All @@ -31,7 +32,7 @@
* This includes calls to create Elasticsearch clients, handle failed item responses, etc. Any incompatible Elasticsearch
* Java APIs should be bridged using this interface.
*
* Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since connecting via an embedded node
* <p>Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since connecting via an embedded node
* is allowed, the call bridge will hold reference to the created embedded node. Each instance of the sink will hold
* exactly one instance of the call bridge, and state cleanup is performed when the sink is closed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.InstantiationUtil;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand All @@ -49,14 +50,12 @@
/**
* Base class for all Flink Elasticsearch Sinks.
*
* <p>
* This class implements the common behaviour across Elasticsearch versions, such as
* <p>This class implements the common behaviour across Elasticsearch versions, such as
* the use of an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before
* sending the requests to the cluster, as well as passing input records to the user provided
* {@link ElasticsearchSinkFunction} for processing.
*
* <p>
* The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
* <p>The version specific API calls for different Elasticsearch versions should be defined by a concrete implementation of
* a {@link ElasticsearchApiCallBridge}, which is provided to the constructor of this class. This call bridge is used,
* for example, to create a Elasticsearch {@link Client}, handle failed item responses, etc.
*
Expand All @@ -80,11 +79,21 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";

/**
* Used to control whether the retry delay should increase exponentially or remain constant.
*/
public enum FlushBackoffType {
CONSTANT,
EXPONENTIAL
}

/**
* Provides a backoff policy for bulk requests. Whenever a bulk request is rejected due to resource constraints
* (i.e. the client's internal thread pool is full), the backoff policy decides how long the bulk processor will
* wait before the operation is retried internally.
*
* <p>This is a proxy for version specific backoff policies.
*/
public class BulkFlushBackoffPolicy implements Serializable {

private static final long serialVersionUID = -6022851996101826049L;
Expand Down Expand Up @@ -149,14 +158,14 @@ public void setDelayMillis(long delayMillis) {
// Internals for the Flink Elasticsearch Sink
// ------------------------------------------------------------------------

/** Call bridge for different version-specfic */
/** Call bridge for different version-specific. */
private final ElasticsearchApiCallBridge callBridge;

/**
* Number of pending action requests not yet acknowledged by Elasticsearch.
* This value is maintained only if {@link ElasticsearchSinkBase#flushOnCheckpoint} is {@code true}.
*
* This is incremented whenever the user adds (or re-adds through the {@link ActionRequestFailureHandler}) requests
* <p>This is incremented whenever the user adds (or re-adds through the {@link ActionRequestFailureHandler}) requests
* to the {@link RequestIndexer}. It is decremented for each completed request of a bulk request, in
* {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, BulkResponse)} and
* {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, Throwable)}.
Expand All @@ -174,7 +183,7 @@ public void setDelayMillis(long delayMillis) {
* the user considered it should fail the sink via the
* {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)} method.
*
* Errors will be checked and rethrown before processing each input element, and when the sink is closed.
* <p>Errors will be checked and rethrown before processing each input element, and when the sink is closed.
*/
private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();

Expand Down Expand Up @@ -260,7 +269,7 @@ public ElasticsearchSinkBase(
* Disable flushing on checkpoint. When disabled, the sink will not wait for all
* pending action requests to be acknowledged by Elasticsearch on checkpoints.
*
* NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT
* <p>NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT
* provide any strong guarantees for at-least-once delivery of action requests.
*/
public void disableFlushOnCheckpoint() {
Expand Down Expand Up @@ -320,8 +329,9 @@ public void close() throws Exception {
/**
* Build the {@link BulkProcessor}.
*
* Note: this is exposed for testing purposes.
* <p>Note: this is exposed for testing purposes.
*/
@VisibleForTesting
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
checkNotNull(listener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;

import org.elasticsearch.action.ActionRequest;

import java.io.Serializable;

/**
* Creates multiple {@link ActionRequest ActionRequests} from an element in a stream.
*
* <p>
* This is used by sinks to prepare elements for sending them to Elasticsearch.
* <p>This is used by sinks to prepare elements for sending them to Elasticsearch.
*
* <p>
* Example:
* <p>Example:
*
* <pre>{@code
* private static class TestElasticSearchSinkFunction implements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

import org.elasticsearch.action.ActionRequest;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.ExceptionUtils;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
Expand All @@ -41,16 +42,16 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Suite of tests for {@link ElasticsearchSinkBase}.
Expand Down Expand Up @@ -123,9 +124,9 @@ public void testItemFailureRethrownOnCheckpoint() throws Throwable {

/**
* Tests that any item failure in the listener callbacks due to flushing on an immediately following checkpoint
* is rethrown; we set a timeout because the test will not finish if the logic is broken
* is rethrown; we set a timeout because the test will not finish if the logic is broken.
*/
@Test(timeout=5000)
@Test(timeout = 5000)
public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable {
final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
Expand Down Expand Up @@ -250,7 +251,7 @@ public void testBulkFailureRethrownOnCheckpoint() throws Throwable {
* Tests that any bulk failure in the listener callbacks due to flushing on an immediately following checkpoint
* is rethrown; we set a timeout because the test will not finish if the logic is broken.
*/
@Test(timeout=5000)
@Test(timeout = 5000)
public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler());
Expand Down Expand Up @@ -307,9 +308,9 @@ public void go() throws Exception {

/**
* Tests that the sink correctly waits for pending requests (including re-added requests) on checkpoints;
* we set a timeout because the test will not finish if the logic is broken
* we set a timeout because the test will not finish if the logic is broken.
*/
@Test(timeout=5000)
@Test(timeout = 5000)
public void testAtLeastOnceSink() throws Throwable {
final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
new HashMap<String, String>(),
Expand Down Expand Up @@ -365,9 +366,9 @@ public void go() throws Exception {
/**
* This test is meant to assure that testAtLeastOnceSink is valid by testing that if flushing is disabled,
* the snapshot method does indeed finishes without waiting for pending requests;
* we set a timeout because the test will not finish if the logic is broken
* we set a timeout because the test will not finish if the logic is broken.
*/
@Test(timeout=5000)
@Test(timeout = 5000)
public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Exception {
final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>(
new HashMap<String, String>(), new SimpleSinkFunction<String>(), new DummyRetryFailureHandler());
Expand Down Expand Up @@ -409,7 +410,7 @@ public DummyElasticsearchSink(

/**
* This method is used to mimic a scheduled bulk request; we need to do this
* manually because we are mocking the BulkProcessor
* manually because we are mocking the BulkProcessor.
*/
public void manualBulkRequestWithAllPendingRequests() {
flushLatch.trigger(); // let the flush
Expand All @@ -429,7 +430,7 @@ public void continueFlush() {
* Set the list of mock failures to use for the next bulk of item responses. A {@code null}
* means that the response is successful, failed otherwise.
*
* The list is used with corresponding order to the requests in the bulk, i.e. the first
* <p>The list is used with corresponding order to the requests in the bulk, i.e. the first
* request uses the response at index 0, the second requests uses the response at index 1, etc.
*/
public void setMockItemFailuresListForNextBulkItemResponses(List<? extends Throwable> mockItemFailuresList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.InstantiationUtil;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.junit.AfterClass;
Expand All @@ -45,7 +46,7 @@
*/
public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgramsTestBase {

protected final static String CLUSTER_NAME = "test-cluster";
protected static final String CLUSTER_NAME = "test-cluster";

protected static EmbeddedElasticsearchNodeEnvironment embeddedNodeEnv;

Expand Down Expand Up @@ -116,7 +117,7 @@ public void runNullTransportClientTest() throws Exception {

try {
createElasticsearchSink(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
} catch(IllegalArgumentException expectedException) {
} catch (IllegalArgumentException expectedException) {
// test passes
return;
}
Expand All @@ -137,7 +138,7 @@ public void runEmptyTransportClientTest() throws Exception {
userConfig,
Collections.<InetSocketAddress>emptyList(),
new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
} catch(IllegalArgumentException expectedException) {
} catch (IllegalArgumentException expectedException) {
// test passes
return;
}
Expand All @@ -162,23 +163,23 @@ public void runTransportClientFailsTest() throws Exception {

try {
env.execute("Elasticsearch Transport Client Test");
} catch(JobExecutionException expectedException) {
} catch (JobExecutionException expectedException) {
assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
return;
}

fail();
}

/** Creates a version-specific Elasticsearch sink, using arbitrary transport addresses */
/** Creates a version-specific Elasticsearch sink, using arbitrary transport addresses. */
protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
List<InetSocketAddress> transportAddresses,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction);

/**
* Creates a version-specific Elasticsearch sink to connect to a local embedded Elasticsearch node.
*
* This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(Map, List, ElasticsearchSinkFunction)}
* <p>This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(Map, List, ElasticsearchSinkFunction)}
* because the Elasticsearch Java API to do so is incompatible across different versions.
*/
protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* The {@link EmbeddedElasticsearchNodeEnvironment} is used in integration tests to manage Elasticsearch embedded nodes.
*
* NOTE: In order for {@link ElasticsearchSinkTestBase} to dynamically load version-specific implementations
* <p>NOTE: In order for {@link ElasticsearchSinkTestBase} to dynamically load version-specific implementations
* for the tests, concrete implementations must be named {@code EmbeddedElasticsearchNodeEnvironmentImpl}. It must
* also be located under the same package. The intentional package-private accessibility of this interface
* enforces that.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand Down
2 changes: 1 addition & 1 deletion flink-connectors/flink-connector-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.util.Preconditions;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.client.Client;
Expand All @@ -29,6 +30,7 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;

Expand Down

0 comments on commit c20b396

Please sign in to comment.