From 2d4f34e4308bc2b5863d5cd85f5d0c75ba8d377e Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Wed, 21 Mar 2018 17:53:04 -0700 Subject: [PATCH 01/17] Using CloseableHttpAsyncClient --- pom.xml | 22 +++++++ .../yahoo/bullet/pubsub/rest/RESTPubSub.java | 30 +++++---- .../bullet/pubsub/rest/RESTPublisher.java | 62 ++++++++++--------- .../pubsub/rest/RESTQueryPublisher.java | 6 +- .../pubsub/rest/RESTResultPublisher.java | 6 +- .../bullet/pubsub/rest/RESTSubscriber.java | 23 ++++--- 6 files changed, 88 insertions(+), 61 deletions(-) diff --git a/pom.xml b/pom.xml index 688354d9..64eaa1c9 100644 --- a/pom.xml +++ b/pom.xml @@ -102,11 +102,33 @@ jvyaml 0.2.1 + + + + + + org.asynchttpclient async-http-client 2.0.37 + + + + + org.apache.directory.studio + org.apache.commons.io + 2.4 + + + org.apache.httpcomponents + httpasyncclient + 4.1.3 + + + + diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java index a3a74255..e95b5407 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java @@ -11,11 +11,8 @@ import com.yahoo.bullet.pubsub.Publisher; import com.yahoo.bullet.pubsub.Subscriber; import lombok.extern.slf4j.Slf4j; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.AsyncHttpClientConfig; -import org.asynchttpclient.DefaultAsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClientConfig; - +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClients; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -57,7 +54,7 @@ public List getPublishers(int n) { @Override public Subscriber getSubscriber() { int maxUncommittedMessages = config.getAs(RESTPubSubConfig.MAX_UNCOMMITTED_MESSAGES, Integer.class); - AsyncHttpClient client = getClient(); + CloseableHttpAsyncClient client = getClient(); List urls; Long minWait; @@ -76,15 +73,16 @@ public List getSubscribers(int n) { return IntStream.range(0, n).mapToObj(i -> getSubscriber()).collect(Collectors.toList()); } - private AsyncHttpClient getClient() { - Long connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Long.class); - int retryLimit = config.getAs(RESTPubSubConfig.CONNECT_RETRY_LIMIT, Integer.class); - AsyncHttpClientConfig clientConfig = - new DefaultAsyncHttpClientConfig.Builder().setConnectTimeout(connectTimeout.intValue()) - .setMaxRequestRetry(retryLimit) - .setReadTimeout(NO_TIMEOUT) - .setRequestTimeout(NO_TIMEOUT) - .build(); - return new DefaultAsyncHttpClient(clientConfig); + private CloseableHttpAsyncClient getClient() { + return HttpAsyncClients.createDefault(); +// Long connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Long.class); +// int retryLimit = config.getAs(RESTPubSubConfig.CONNECT_RETRY_LIMIT, Integer.class); +// AsyncHttpClientConfig clientConfig = +// new DefaultAsyncHttpClientConfig.Builder().setConnectTimeout(connectTimeout.intValue()) +// .setMaxRequestRetry(retryLimit) +// .setReadTimeout(NO_TIMEOUT) +// .setRequestTimeout(NO_TIMEOUT) +// .build(); +// return new DefaultAsyncHttpClient(clientConfig); } } diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java index 039413df..2d931598 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java @@ -8,26 +8,27 @@ import com.yahoo.bullet.pubsub.PubSubMessage; import com.yahoo.bullet.pubsub.Publisher; import lombok.extern.slf4j.Slf4j; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.Response; - +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import java.io.IOException; -import java.util.function.Consumer; +import java.io.UnsupportedEncodingException; @Slf4j public abstract class RESTPublisher implements Publisher { public static final String APPLICATION_JSON = "application/json"; public static final String CONTENT_TYPE = "content-type"; - private AsyncHttpClient client; + private CloseableHttpAsyncClient client; /** - * Create a RESTQueryPublisher from a {@link RESTPubSubConfig} and a {@link AsyncHttpClient}. + * Create a RESTQueryPublisher from a {@link CloseableHttpAsyncClient}. * * @param client The client. */ - public RESTPublisher(AsyncHttpClient client) { + public RESTPublisher(CloseableHttpAsyncClient client) { this.client = client; + client.start(); } @Override @@ -47,30 +48,31 @@ public void close() { */ protected void sendToURL(String url, PubSubMessage message) { log.debug("Sending message: {} to url: {}", message, url); - client.preparePost(url) - .setBody(message.asJSON()) - .setHeader(CONTENT_TYPE, APPLICATION_JSON) - .execute() - .toCompletableFuture() - .exceptionally(this::handleException) - .thenAcceptAsync(createResponseConsumer(message.getId())); - } - - private Consumer createResponseConsumer(String id) { - // Create a closure with id - return response -> handleResponse(id, response); - } - - private void handleResponse(String id, Response response) { - if (response == null || response.getStatusCode() != RESTPubSub.OK_200) { - log.error("Failed to write message with id: {}. Couldn't reach pubsub server. Got response: {}", id, response); - return; + try { + HttpPost httpPost = new HttpPost(url); + httpPost.setEntity(new StringEntity(message.asJSON())); + httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON); + client.execute(httpPost, null); + } catch (UnsupportedEncodingException e) { + log.error("Error encoding message in preparation for POST: ", e); } - log.debug("Successfully wrote message with id {}. Response was: {} {}", id, response.getStatusCode(), response.getStatusText()); } - private Response handleException(Throwable throwable) { - log.error("Received error while posting query", throwable); - return null; - } +// private Consumer createResponseConsumer(String id) { +// // Create a closure with id +// return response -> handleResponse(id, response); +// } +// +// private void handleResponse(String id, Response response) { +// if (response == null || response.getStatusCode() != RESTPubSub.OK_200) { +// log.error("Failed to write message with id: {}. Couldn't reach pubsub server. Got response: {}", id, response); +// return; +// } +// log.debug("Successfully wrote message with id {}. Response was: {} {}", id, response.getStatusCode(), response.getStatusText()); +// } +// +// private Response handleException(Throwable throwable) { +// log.error("Received error while posting query", throwable); +// return null; +// } } diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java index 79d19a68..4496854f 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java @@ -11,7 +11,7 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.asynchttpclient.AsyncHttpClient; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; @Slf4j public class RESTQueryPublisher extends RESTPublisher { @@ -20,14 +20,14 @@ public class RESTQueryPublisher extends RESTPublisher { private String resultURL; /** - * Create a RESTQueryPublisher from a {@link AsyncHttpClient}, queryURL and resultURL. The BulletConfig must + * Create a RESTQueryPublisher from a {@link CloseableHttpAsyncClient}, queryURL and resultURL. The BulletConfig must * contain a valid url in the bullet.pubsub.rest.query.urls field. * * @param client The client. * @param queryURL The URL to which to POST queries. * @param resultURL The URL that will be added to the Metadata (results will be sent to this URL from the backend). */ - public RESTQueryPublisher(AsyncHttpClient client, String queryURL, String resultURL) { + public RESTQueryPublisher(CloseableHttpAsyncClient client, String queryURL, String resultURL) { super(client); this.queryURL = queryURL; this.resultURL = resultURL; diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java index d4e5dc24..c5847b8e 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java @@ -8,16 +8,16 @@ import com.yahoo.bullet.pubsub.PubSubException; import com.yahoo.bullet.pubsub.PubSubMessage; import lombok.extern.slf4j.Slf4j; -import org.asynchttpclient.AsyncHttpClient; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; @Slf4j public class RESTResultPublisher extends RESTPublisher { /** - * Create a RESTQueryPublisher from a {@link AsyncHttpClient}. + * Create a RESTQueryPublisher from a {@link CloseableHttpAsyncClient}. * * @param client The client. */ - public RESTResultPublisher(AsyncHttpClient client) { + public RESTResultPublisher(CloseableHttpAsyncClient client) { super(client); } diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java index 02ba628c..56456b42 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java @@ -15,28 +15,31 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.Response; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; @Slf4j public class RESTSubscriber extends BufferingSubscriber { @Getter(AccessLevel.PACKAGE) private List urls; - private AsyncHttpClient client; + private CloseableHttpAsyncClient client; private long minWait; private long lastRequest; /** - * Create a RESTSubscriber from a {@link RESTPubSubConfig}. + * Create a RESTSubscriber. * * @param maxUncommittedMessages The maximum number of records that will be buffered before commit() must be called. * @param urls The URLs which will be used to make the http request. * @param client The client to use to make http requests. * @param minWait The minimum time (ms) to wait between subsequent http requests. */ - public RESTSubscriber(int maxUncommittedMessages, List urls, AsyncHttpClient client, long minWait) { + public RESTSubscriber(int maxUncommittedMessages, List urls, CloseableHttpAsyncClient client, long minWait) { super(maxUncommittedMessages); this.client = client; + this.client.start(); this.urls = urls; this.minWait = minWait; this.lastRequest = 0; @@ -52,11 +55,13 @@ public List getMessages() throws PubSubException { lastRequest = currentTime; for (String url : urls) { try { - log.debug("Getting messages from url: ", url); - Response response = client.prepareGet(url).execute().get(); - int statusCode = response.getStatusCode(); + log.debug("Getting messages from url: {}", url); + HttpGet httpGet = new HttpGet(url); + HttpResponse response = client.execute(httpGet, null).get(); + int statusCode = response.getStatusLine().getStatusCode(); if (statusCode == RESTPubSub.OK_200) { - messages.add(PubSubMessage.fromJSON(response.getResponseBody())); + String message = IOUtils.toString(response.getEntity().getContent(), "UTF-8"); + messages.add(PubSubMessage.fromJSON(message)); } else if (statusCode != RESTPubSub.NO_CONTENT_204) { // NO_CONTENT_204 indicates there are no new messages - anything else indicates a problem log.error("Http call failed with status code {} and response {}.", statusCode, response); From c168f5d7c95e4f48bbbf905ded318435d046be62 Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Wed, 21 Mar 2018 18:12:02 -0700 Subject: [PATCH 02/17] Changed connect_timeout to int --- .../com/yahoo/bullet/pubsub/rest/RESTPubSub.java | 16 +++++----------- .../bullet/pubsub/rest/RESTPubSubConfig.java | 4 ++-- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java index e95b5407..6417dd3d 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java @@ -13,6 +13,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.apache.http.impl.nio.reactor.IOReactorConfig; + import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -20,7 +22,6 @@ @Slf4j public class RESTPubSub extends PubSub { - private static final int NO_TIMEOUT = -1; public static final int OK_200 = 200; public static final int NO_CONTENT_204 = 204; @@ -74,15 +75,8 @@ public List getSubscribers(int n) { } private CloseableHttpAsyncClient getClient() { - return HttpAsyncClients.createDefault(); -// Long connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Long.class); -// int retryLimit = config.getAs(RESTPubSubConfig.CONNECT_RETRY_LIMIT, Integer.class); -// AsyncHttpClientConfig clientConfig = -// new DefaultAsyncHttpClientConfig.Builder().setConnectTimeout(connectTimeout.intValue()) -// .setMaxRequestRetry(retryLimit) -// .setReadTimeout(NO_TIMEOUT) -// .setRequestTimeout(NO_TIMEOUT) -// .build(); -// return new DefaultAsyncHttpClient(clientConfig); + int connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Integer.class); + IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setConnectTimeout(connectTimeout).build(); + return HttpAsyncClients.custom().setDefaultIOReactorConfig(ioReactorConfig).build(); } } diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java index c8a9e6b8..5b158d3b 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java @@ -25,7 +25,7 @@ public class RESTPubSubConfig extends BulletConfig { public static final String QUERY_SUBSCRIBER_MIN_WAIT = PREFIX + "query.subscriber.min.wait.ms"; // Defaults - public static final long DEFAULT_CONNECT_TIMEOUT = 5000L; + public static final int DEFAULT_CONNECT_TIMEOUT = 5000; public static final int DEFAULT_CONNECT_RETRY_LIMIT = 3; public static final int DEFAULT_MAX_UNCOMMITTED_MESSAGES = 100; public static final List DEFAULT_QUERY_URLS = Arrays.asList("http://localhost:9901/api/bullet/pubsub/query", @@ -41,7 +41,7 @@ public class RESTPubSubConfig extends BulletConfig { VALIDATOR.define(CONNECT_TIMEOUT) .defaultTo(DEFAULT_CONNECT_TIMEOUT) .checkIf(Validator::isPositiveInt) - .castTo(Validator::asLong); + .castTo(Validator::asInt); VALIDATOR.define(CONNECT_RETRY_LIMIT) .defaultTo(DEFAULT_CONNECT_RETRY_LIMIT) .checkIf(Validator::isPositiveInt) From b5503cd506969fb1d0107cd193c8f86ff0054c05 Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Wed, 21 Mar 2018 18:22:03 -0700 Subject: [PATCH 03/17] Minor change --- src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java index 56456b42..36c50c4c 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java @@ -67,7 +67,7 @@ public List getMessages() throws PubSubException { log.error("Http call failed with status code {} and response {}.", statusCode, response); } } catch (Exception e) { - log.error("Http call failed with error: ", e); + log.error("Http call to {} failed with error: {}", url, e); } } return messages; From 7e4a0b8d85ec5fc763ae11d5d89a904d3c3fc697 Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Thu, 22 Mar 2018 14:40:20 -0700 Subject: [PATCH 04/17] Added new callback handling --- .../bullet/pubsub/rest/RESTPublisher.java | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java index 2d931598..48ac7691 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java @@ -9,6 +9,8 @@ import com.yahoo.bullet.pubsub.Publisher; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.methods.HttpPost; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.HttpResponse; import org.apache.http.entity.StringEntity; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import java.io.IOException; @@ -52,27 +54,30 @@ protected void sendToURL(String url, PubSubMessage message) { HttpPost httpPost = new HttpPost(url); httpPost.setEntity(new StringEntity(message.asJSON())); httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON); - client.execute(httpPost, null); + client.execute(httpPost, new RequestCallback()); } catch (UnsupportedEncodingException e) { log.error("Error encoding message in preparation for POST: ", e); } } -// private Consumer createResponseConsumer(String id) { -// // Create a closure with id -// return response -> handleResponse(id, response); -// } -// -// private void handleResponse(String id, Response response) { -// if (response == null || response.getStatusCode() != RESTPubSub.OK_200) { -// log.error("Failed to write message with id: {}. Couldn't reach pubsub server. Got response: {}", id, response); -// return; -// } -// log.debug("Successfully wrote message with id {}. Response was: {} {}", id, response.getStatusCode(), response.getStatusText()); -// } -// -// private Response handleException(Throwable throwable) { -// log.error("Received error while posting query", throwable); -// return null; -// } + private class RequestCallback implements FutureCallback { + @Override + public void completed(HttpResponse response) { + if (response == null || response.getStatusLine().getStatusCode() != RESTPubSub.OK_200) { + log.error("Couldn't reach REST pubsub server. Got response: {}", response); + return; + } + log.debug("Successfully wrote message with status code {}. Response was: {}", response.getStatusLine().getStatusCode(), response); + } + + @Override + public void failed(Exception e) { + log.error("Failed to post message to RESTPubSub endpoint. Failed with error: ", e); + } + + @Override + public void cancelled() { + log.error("Failed to post message to RESTPubSub endpoint. Request was cancelled."); + } + } } From c5bdc47b05c1258acaaa27029e72465ee85fe574 Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Thu, 22 Mar 2018 14:50:44 -0700 Subject: [PATCH 05/17] Removed retry limit --- .../bullet/pubsub/rest/RESTPubSubConfig.java | 6 - src/main/resources/rest_pubsub_defaults.yaml | 2 - .../pubsub/rest/RESTPubSubConfigTest.java | 30 +-- .../pubsub/rest/RESTQueryPublisherTest.java | 178 +++++++++--------- .../pubsub/rest/RESTResultPublisherTest.java | 88 ++++----- src/test/resources/test_config.yaml | 2 +- 6 files changed, 149 insertions(+), 157 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java index 5b158d3b..4108f27c 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java @@ -17,7 +17,6 @@ public class RESTPubSubConfig extends BulletConfig { // Field names public static final String PREFIX = "bullet.pubsub.rest."; public static final String CONNECT_TIMEOUT = PREFIX + "connect.timeout.ms"; - public static final String CONNECT_RETRY_LIMIT = PREFIX + "connect.retry.limit"; public static final String MAX_UNCOMMITTED_MESSAGES = PREFIX + "subscriber.max.uncommitted.messages"; public static final String QUERY_URLS = PREFIX + "query.urls"; public static final String RESULT_URL = PREFIX + "result.url"; @@ -26,7 +25,6 @@ public class RESTPubSubConfig extends BulletConfig { // Defaults public static final int DEFAULT_CONNECT_TIMEOUT = 5000; - public static final int DEFAULT_CONNECT_RETRY_LIMIT = 3; public static final int DEFAULT_MAX_UNCOMMITTED_MESSAGES = 100; public static final List DEFAULT_QUERY_URLS = Arrays.asList("http://localhost:9901/api/bullet/pubsub/query", "http://localhost:9902/api/bullet/pubsub/query"); @@ -42,10 +40,6 @@ public class RESTPubSubConfig extends BulletConfig { .defaultTo(DEFAULT_CONNECT_TIMEOUT) .checkIf(Validator::isPositiveInt) .castTo(Validator::asInt); - VALIDATOR.define(CONNECT_RETRY_LIMIT) - .defaultTo(DEFAULT_CONNECT_RETRY_LIMIT) - .checkIf(Validator::isPositiveInt) - .castTo(Validator::asInt); VALIDATOR.define(MAX_UNCOMMITTED_MESSAGES) .defaultTo(DEFAULT_MAX_UNCOMMITTED_MESSAGES) .checkIf(Validator::isPositiveInt) diff --git a/src/main/resources/rest_pubsub_defaults.yaml b/src/main/resources/rest_pubsub_defaults.yaml index ddc71a22..e8ba5d28 100644 --- a/src/main/resources/rest_pubsub_defaults.yaml +++ b/src/main/resources/rest_pubsub_defaults.yaml @@ -1,7 +1,5 @@ # Http connection timout (used by both the web service and the backend) bullet.pubsub.rest.connect.timeout.ms: 5000 -# Http connection retry limit (used by both the web service and the backend) -bullet.pubsub.rest.connect.retry.limit: 3 # Maxiumum number of uncommitted messages allowed before read requests will wait for commits (used by both the web service and the backend) bullet.pubsub.rest.subscriber.max.uncommitted.messages: 100 # Minimum time (ms) between http calls to the result subscriber REST endpoint. This can be used to limit the number of http requests to the REST endpoints diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java index dcab425f..572a85f2 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java @@ -20,25 +20,25 @@ public class RESTPubSubConfigTest { @Test public void testNoFiles() { RESTPubSubConfig config = new RESTPubSubConfig((String) null); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 3); + Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000); config = new RESTPubSubConfig((Config) null); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 3); + Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000); config = new RESTPubSubConfig(""); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 3); + Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000); } @Test public void testMissingFile() { RESTPubSubConfig config = new RESTPubSubConfig("/path/to/non/existant/file"); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 3); + Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000); } @Test public void testCustomConfig() { RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 88); + Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 88); Assert.assertEquals(config.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), "com.yahoo.bullet.pubsub.MockPubSub"); List queries = ((List) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class)); Assert.assertEquals(queries.size(), 2); @@ -57,7 +57,7 @@ public void testCustomProperties() { @Test public void testGettingWithDefault() { RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); - Assert.assertEquals(config.getOrDefault(RESTPubSubConfig.CONNECT_RETRY_LIMIT, "51"), 88); + Assert.assertEquals(config.getOrDefault(RESTPubSubConfig.CONNECT_TIMEOUT, "51"), 88); Assert.assertEquals(config.getOrDefault("does.not.exist", "foo"), "foo"); Assert.assertEquals(config.getOrDefault("fake.setting", "bar"), "bar"); } @@ -90,12 +90,12 @@ public void testMerging() { RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); int configSize = config.getAll(Optional.empty()).size(); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 88); + Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 88); Assert.assertEquals(config.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), "com.yahoo.bullet.pubsub.MockPubSub"); Config another = new RESTPubSubConfig((String) null); another.clear(); - another.set(RESTPubSubConfig.CONNECT_RETRY_LIMIT, 51L); + another.set(RESTPubSubConfig.CONNECT_TIMEOUT, 51L); // This is a bad setting another.set(RESTPubSubConfig.AGGREGATION_MAX_SIZE, -1); // Some other non-Bullet setting @@ -104,7 +104,7 @@ public void testMerging() { config.merge(another); Assert.assertEquals(config.getAll(Optional.empty()).size(), configSize + 1); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 51); + Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 51); // Bad setting gets defaulted. Assert.assertEquals(config.get(RESTPubSubConfig.AGGREGATION_MAX_SIZE), RESTPubSubConfig.DEFAULT_AGGREGATION_MAX_SIZE); // Other setting is preserved. @@ -113,7 +113,7 @@ public void testMerging() { // Test null and verify it is unchanged config.merge(null); Assert.assertEquals(config.getAll(Optional.empty()).size(), configSize + 1); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 51); + Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 51); Assert.assertEquals(config.get(RESTPubSubConfig.AGGREGATION_MAX_SIZE), RESTPubSubConfig.DEFAULT_AGGREGATION_MAX_SIZE); Assert.assertEquals(config.get("pi"), 3.14); } @@ -125,7 +125,7 @@ public void testPropertiesWithPrefix() { String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub"; int configSize = config.getAllWithPrefix(Optional.empty(), prefix, false).size(); - Assert.assertEquals(configSize, 9); + Assert.assertEquals(configSize, 8); Map properties = config.getAllWithPrefix(Optional.empty(), prefix, false); Assert.assertEquals(properties.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), fieldValue); @@ -139,7 +139,7 @@ public void testPropertiesStripPrefix() { String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub"; int configSize = config.getAllWithPrefix(Optional.empty(), prefix, true).size(); - Assert.assertEquals(configSize, 9); + Assert.assertEquals(configSize, 8); Map properties = config.getAllWithPrefix(Optional.empty(), prefix, true); Assert.assertNull(properties.get(RESTPubSubConfig.PUBSUB_CLASS_NAME)); @@ -196,9 +196,9 @@ public void testValidate() { Assert.assertEquals(config.get(BulletConfig.AGGREGATION_DEFAULT_SIZE), BulletConfig.DEFAULT_AGGREGATION_SIZE); // Test validate() corrects RESTPubSubConfig settings - config.set(RESTPubSubConfig.CONNECT_RETRY_LIMIT, -88); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), -88); + config.set(RESTPubSubConfig.CONNECT_TIMEOUT, -88); + Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), -88); config.validate(); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), RESTPubSubConfig.DEFAULT_CONNECT_RETRY_LIMIT); + Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), RESTPubSubConfig.DEFAULT_CONNECT_TIMEOUT); } } diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java index 0bbdb471..d8f3d2e8 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java @@ -28,93 +28,93 @@ import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getNotOkResponse; public class RESTQueryPublisherTest { - @Test - public void testSendResultUrlPutInMetadataAckPreserved() throws Exception { - CompletableFuture response = getOkFuture(getOkResponse(null)); - BoundRequestBuilder mockBuilder = mockBuilderWith(response); - AsyncHttpClient mockClient = mockClientWith(mockBuilder); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); - - publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.ACKNOWLEDGE)); - verify(mockClient).preparePost("my/custom/query/url"); - verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"ACKNOWLEDGE\",\"content\":\"my/custom/url\"}}"); - verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); - } - - @Test - public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception { - CompletableFuture response = getOkFuture(getOkResponse(null)); - BoundRequestBuilder mockBuilder = mockBuilderWith(response); - AsyncHttpClient mockClient = mockClientWith(mockBuilder); - RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); - config.set(RESTPubSubConfig.RESULT_URL, "my/custom/url"); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); - - publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); - verify(mockClient).preparePost("my/custom/query/url"); - verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/result/url\"}}"); - verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); - } - - @Test - public void testSendMetadataCreated() throws Exception { - CompletableFuture response = getOkFuture(getOkResponse(null)); - BoundRequestBuilder mockBuilder = mockBuilderWith(response); - AsyncHttpClient mockClient = mockClientWith(mockBuilder); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); - - publisher.send("foo", "bar"); - verify(mockClient).preparePost("my/custom/query/url"); - verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":null,\"content\":\"my/custom/result/url\"}}"); - verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); - } - - @Test - public void testClose() throws Exception { - AsyncHttpClient mockClient = mock(AsyncHttpClient.class); - doNothing().when(mockClient).close(); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null); - - publisher.close(); - verify(mockClient).close(); - } - - @Test - public void testCloseDoesNotThrow() throws Exception { - AsyncHttpClient mockClient = mock(AsyncHttpClient.class); - doThrow(new IOException("error!")).when(mockClient).close(); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null); - - publisher.close(); - verify(mockClient).close(); - } - - @Test - public void testHandleBadResponse() throws Exception { - CompletableFuture response = getOkFuture(getNotOkResponse(500)); - BoundRequestBuilder mockBuilder = mockBuilderWith(response); - AsyncHttpClient mockClient = mockClientWith(mockBuilder); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); - - publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); - verify(mockClient).preparePost("my/custom/query/url"); - verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/result/url\"}}"); - verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); - } - - @Test(timeOut = 5000L) - public void testException() throws Exception { - // This will hit a non-existent url and fail, testing our exceptions - AsyncHttpClientConfig clientConfig = new DefaultAsyncHttpClientConfig.Builder().setConnectTimeout(100) - .setMaxRequestRetry(1) - .setReadTimeout(-1) - .setRequestTimeout(-1) - .build(); - AsyncHttpClient client = new DefaultAsyncHttpClient(clientConfig); - AsyncHttpClient spyClient = spy(client); - RESTQueryPublisher publisher = new RESTQueryPublisher(spyClient, "http://this/does/not/exist:8080", "my/custom/result/url"); - - publisher.send(new PubSubMessage("foo", "bar")); - verify(spyClient).preparePost("http://this/does/not/exist:8080"); - } +// @Test +// public void testSendResultUrlPutInMetadataAckPreserved() throws Exception { +// CompletableFuture response = getOkFuture(getOkResponse(null)); +// BoundRequestBuilder mockBuilder = mockBuilderWith(response); +// AsyncHttpClient mockClient = mockClientWith(mockBuilder); +// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); +// +// publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.ACKNOWLEDGE)); +// verify(mockClient).preparePost("my/custom/query/url"); +// verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"ACKNOWLEDGE\",\"content\":\"my/custom/url\"}}"); +// verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); +// } +// +// @Test +// public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception { +// CompletableFuture response = getOkFuture(getOkResponse(null)); +// BoundRequestBuilder mockBuilder = mockBuilderWith(response); +// AsyncHttpClient mockClient = mockClientWith(mockBuilder); +// RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); +// config.set(RESTPubSubConfig.RESULT_URL, "my/custom/url"); +// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); +// +// publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); +// verify(mockClient).preparePost("my/custom/query/url"); +// verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/result/url\"}}"); +// verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); +// } +// +// @Test +// public void testSendMetadataCreated() throws Exception { +// CompletableFuture response = getOkFuture(getOkResponse(null)); +// BoundRequestBuilder mockBuilder = mockBuilderWith(response); +// AsyncHttpClient mockClient = mockClientWith(mockBuilder); +// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); +// +// publisher.send("foo", "bar"); +// verify(mockClient).preparePost("my/custom/query/url"); +// verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":null,\"content\":\"my/custom/result/url\"}}"); +// verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); +// } +// +// @Test +// public void testClose() throws Exception { +// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); +// doNothing().when(mockClient).close(); +// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null); +// +// publisher.close(); +// verify(mockClient).close(); +// } +// +// @Test +// public void testCloseDoesNotThrow() throws Exception { +// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); +// doThrow(new IOException("error!")).when(mockClient).close(); +// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null); +// +// publisher.close(); +// verify(mockClient).close(); +// } +// +// @Test +// public void testHandleBadResponse() throws Exception { +// CompletableFuture response = getOkFuture(getNotOkResponse(500)); +// BoundRequestBuilder mockBuilder = mockBuilderWith(response); +// AsyncHttpClient mockClient = mockClientWith(mockBuilder); +// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); +// +// publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); +// verify(mockClient).preparePost("my/custom/query/url"); +// verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/result/url\"}}"); +// verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); +// } +// +// @Test(timeOut = 5000L) +// public void testException() throws Exception { +// // This will hit a non-existent url and fail, testing our exceptions +// AsyncHttpClientConfig clientConfig = new DefaultAsyncHttpClientConfig.Builder().setConnectTimeout(100) +// .setMaxRequestRetry(1) +// .setReadTimeout(-1) +// .setRequestTimeout(-1) +// .build(); +// AsyncHttpClient client = new DefaultAsyncHttpClient(clientConfig); +// AsyncHttpClient spyClient = spy(client); +// RESTQueryPublisher publisher = new RESTQueryPublisher(spyClient, "http://this/does/not/exist:8080", "my/custom/result/url"); +// +// publisher.send(new PubSubMessage("foo", "bar")); +// verify(spyClient).preparePost("http://this/does/not/exist:8080"); +// } } diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java index 6fe93d61..9064fb91 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java @@ -24,48 +24,48 @@ import static org.mockito.Mockito.verify; public class RESTResultPublisherTest { - @Test - public void testSend() throws Exception { - CompletableFuture response = getOkFuture(getOkResponse(null)); - BoundRequestBuilder mockBuilder = mockBuilderWith(response); - AsyncHttpClient mockClient = mockClientWith(mockBuilder); - RESTResultPublisher publisher = new RESTResultPublisher(mockClient); - - PubSubMessage message = new PubSubMessage("someId", "someContent", new Metadata(null, "custom/url")); - publisher.send(message); - verify(mockClient).preparePost("custom/url"); - verify(mockBuilder).setBody("{\"id\":\"someId\",\"sequence\":-1,\"content\":\"someContent\",\"metadata\":{\"signal\":null,\"content\":\"custom/url\"}}"); - verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); - } - - @Test(expectedExceptions = ClassCastException.class) - public void testSendBadURL() throws Exception { - CompletableFuture response = getOkFuture(getOkResponse(null)); - BoundRequestBuilder mockBuilder = mockBuilderWith(response); - AsyncHttpClient mockClient = mockClientWith(mockBuilder); - RESTResultPublisher publisher = new RESTResultPublisher(mockClient); - - PubSubMessage message = new PubSubMessage("someId", "someContent", new Metadata(null, 88)); - publisher.send(message); - } - - @Test - public void testClose() throws Exception { - AsyncHttpClient mockClient = mock(AsyncHttpClient.class); - doNothing().when(mockClient).close(); - RESTResultPublisher publisher = new RESTResultPublisher(mockClient); - - publisher.close(); - verify(mockClient).close(); - } - - @Test - public void testCloseDoesNotThrow() throws Exception { - AsyncHttpClient mockClient = mock(AsyncHttpClient.class); - doThrow(new IOException("error!")).when(mockClient).close(); - RESTResultPublisher publisher = new RESTResultPublisher(mockClient); - - publisher.close(); - verify(mockClient).close(); - } +// @Test +// public void testSend() throws Exception { +// CompletableFuture response = getOkFuture(getOkResponse(null)); +// BoundRequestBuilder mockBuilder = mockBuilderWith(response); +// AsyncHttpClient mockClient = mockClientWith(mockBuilder); +// RESTResultPublisher publisher = new RESTResultPublisher(mockClient); +// +// PubSubMessage message = new PubSubMessage("someId", "someContent", new Metadata(null, "custom/url")); +// publisher.send(message); +// verify(mockClient).preparePost("custom/url"); +// verify(mockBuilder).setBody("{\"id\":\"someId\",\"sequence\":-1,\"content\":\"someContent\",\"metadata\":{\"signal\":null,\"content\":\"custom/url\"}}"); +// verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); +// } +// +// @Test(expectedExceptions = ClassCastException.class) +// public void testSendBadURL() throws Exception { +// CompletableFuture response = getOkFuture(getOkResponse(null)); +// BoundRequestBuilder mockBuilder = mockBuilderWith(response); +// AsyncHttpClient mockClient = mockClientWith(mockBuilder); +// RESTResultPublisher publisher = new RESTResultPublisher(mockClient); +// +// PubSubMessage message = new PubSubMessage("someId", "someContent", new Metadata(null, 88)); +// publisher.send(message); +// } +// +// @Test +// public void testClose() throws Exception { +// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); +// doNothing().when(mockClient).close(); +// RESTResultPublisher publisher = new RESTResultPublisher(mockClient); +// +// publisher.close(); +// verify(mockClient).close(); +// } +// +// @Test +// public void testCloseDoesNotThrow() throws Exception { +// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); +// doThrow(new IOException("error!")).when(mockClient).close(); +// RESTResultPublisher publisher = new RESTResultPublisher(mockClient); +// +// publisher.close(); +// verify(mockClient).close(); +// } } diff --git a/src/test/resources/test_config.yaml b/src/test/resources/test_config.yaml index 3027b029..d0568a9c 100644 --- a/src/test/resources/test_config.yaml +++ b/src/test/resources/test_config.yaml @@ -5,7 +5,7 @@ bullet.query.aggregation.max.size: 100 fake.setting: null bullet.pubsub.context.name: "QUERY_SUBMISSION" bullet.pubsub.class.name: "com.yahoo.bullet.pubsub.MockPubSub" -bullet.pubsub.rest.connect.retry.limit: 88 +bullet.pubsub.rest.connect.timeout.ms: 88 bullet.pubsub.rest.query.urls: - "http://localhost:9901/CUSTOM/query" - "http://localhost:9902/CUSTOM/query" From f782615074a7c8500ddd2ffbb71ad2b780cda71b Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Thu, 22 Mar 2018 16:46:19 -0700 Subject: [PATCH 06/17] Fixed some tests --- .../yahoo/bullet/pubsub/rest/RESTPubSub.java | 1 + .../bullet/pubsub/rest/RESTPublisher.java | 2 +- .../bullet/pubsub/rest/RESTSubscriber.java | 2 +- .../pubsub/rest/RESTQueryPublisherTest.java | 180 +++++++++-------- .../pubsub/rest/RESTSubscriberTest.java | 186 +++++++++--------- 5 files changed, 189 insertions(+), 182 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java index 6417dd3d..89391f52 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java @@ -24,6 +24,7 @@ public class RESTPubSub extends PubSub { public static final int OK_200 = 200; public static final int NO_CONTENT_204 = 204; + public static final String UTF_8 = "UTF-8"; /** * Create a RESTPubSub from a {@link BulletConfig}. diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java index 48ac7691..253e08ee 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java @@ -60,7 +60,7 @@ protected void sendToURL(String url, PubSubMessage message) { } } - private class RequestCallback implements FutureCallback { + static class RequestCallback implements FutureCallback { @Override public void completed(HttpResponse response) { if (response == null || response.getStatusLine().getStatusCode() != RESTPubSub.OK_200) { diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java index 36c50c4c..4aa77b04 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java @@ -60,7 +60,7 @@ public List getMessages() throws PubSubException { HttpResponse response = client.execute(httpGet, null).get(); int statusCode = response.getStatusLine().getStatusCode(); if (statusCode == RESTPubSub.OK_200) { - String message = IOUtils.toString(response.getEntity().getContent(), "UTF-8"); + String message = IOUtils.toString(response.getEntity().getContent(), RESTPubSub.UTF_8); messages.add(PubSubMessage.fromJSON(message)); } else if (statusCode != RESTPubSub.NO_CONTENT_204) { // NO_CONTENT_204 indicates there are no new messages - anything else indicates a problem diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java index d8f3d2e8..14a91fd4 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java @@ -7,101 +7,107 @@ import com.yahoo.bullet.pubsub.Metadata; import com.yahoo.bullet.pubsub.PubSubMessage; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.AsyncHttpClientConfig; -import org.asynchttpclient.BoundRequestBuilder; -import org.asynchttpclient.DefaultAsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClientConfig; -import org.asynchttpclient.Response; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.HttpPost; +import org.mockito.ArgumentCaptor; import org.testng.annotations.Test; import java.io.IOException; -import java.util.concurrent.CompletableFuture; +import org.testng.Assert; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkFuture; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkResponse; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockBuilderWith; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockClientWith; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getNotOkResponse; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; public class RESTQueryPublisherTest { -// @Test -// public void testSendResultUrlPutInMetadataAckPreserved() throws Exception { -// CompletableFuture response = getOkFuture(getOkResponse(null)); -// BoundRequestBuilder mockBuilder = mockBuilderWith(response); -// AsyncHttpClient mockClient = mockClientWith(mockBuilder); -// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); -// -// publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.ACKNOWLEDGE)); -// verify(mockClient).preparePost("my/custom/query/url"); -// verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"ACKNOWLEDGE\",\"content\":\"my/custom/url\"}}"); -// verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); -// } -// -// @Test -// public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception { -// CompletableFuture response = getOkFuture(getOkResponse(null)); -// BoundRequestBuilder mockBuilder = mockBuilderWith(response); -// AsyncHttpClient mockClient = mockClientWith(mockBuilder); -// RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); -// config.set(RESTPubSubConfig.RESULT_URL, "my/custom/url"); -// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); -// -// publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); -// verify(mockClient).preparePost("my/custom/query/url"); -// verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/result/url\"}}"); -// verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); -// } -// -// @Test -// public void testSendMetadataCreated() throws Exception { -// CompletableFuture response = getOkFuture(getOkResponse(null)); -// BoundRequestBuilder mockBuilder = mockBuilderWith(response); -// AsyncHttpClient mockClient = mockClientWith(mockBuilder); -// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); -// -// publisher.send("foo", "bar"); -// verify(mockClient).preparePost("my/custom/query/url"); -// verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":null,\"content\":\"my/custom/result/url\"}}"); -// verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); -// } -// -// @Test -// public void testClose() throws Exception { -// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); -// doNothing().when(mockClient).close(); -// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null); -// -// publisher.close(); -// verify(mockClient).close(); -// } -// -// @Test -// public void testCloseDoesNotThrow() throws Exception { -// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); -// doThrow(new IOException("error!")).when(mockClient).close(); -// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null); -// -// publisher.close(); -// verify(mockClient).close(); -// } -// -// @Test -// public void testHandleBadResponse() throws Exception { -// CompletableFuture response = getOkFuture(getNotOkResponse(500)); -// BoundRequestBuilder mockBuilder = mockBuilderWith(response); -// AsyncHttpClient mockClient = mockClientWith(mockBuilder); -// RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); -// -// publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); -// verify(mockClient).preparePost("my/custom/query/url"); -// verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/result/url\"}}"); -// verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); -// } -// + @Test + public void testSendResultUrlPutInMetadataAckPreserved() throws Exception { + CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); + publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.ACKNOWLEDGE)); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); + ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RequestCallback.class); + verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); + HttpPost post = argumentCaptor.getValue(); + String actualMessage = IOUtils.toString(post.getEntity().getContent(), RESTPubSub.UTF_8); + String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"ACKNOWLEDGE\",\"content\":\"my/custom/url\"}}"; + String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue(); + String expectedHeader = RESTPublisher.APPLICATION_JSON; + Assert.assertEquals(expectedMessage, actualMessage); + Assert.assertEquals(expectedHeader, actualHeader); + Assert.assertEquals("my/custom/query/url", post.getURI().toString()); + } + + @Test + public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception { + CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); + publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); + ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RequestCallback.class); + verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); + HttpPost post = argumentCaptor.getValue(); + String actualMessage = IOUtils.toString(post.getEntity().getContent(), RESTPubSub.UTF_8); + String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/url\"}}"; + String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue(); + String expectedHeader = RESTPublisher.APPLICATION_JSON; + Assert.assertEquals(expectedMessage, actualMessage); + Assert.assertEquals(expectedHeader, actualHeader); + Assert.assertEquals("my/custom/query/url", post.getURI().toString()); + } + + @Test + public void testSendMetadataCreated() throws Exception { + CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); + publisher.send("foo", "bar"); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); + ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RequestCallback.class); + verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); + HttpPost post = argumentCaptor.getValue(); + String actualMessage = IOUtils.toString(post.getEntity().getContent(), RESTPubSub.UTF_8); + String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":null,\"content\":\"my/custom/url\"}}"; + String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue(); + String expectedHeader = RESTPublisher.APPLICATION_JSON; + Assert.assertEquals(expectedMessage, actualMessage); + Assert.assertEquals(expectedHeader, actualHeader); + Assert.assertEquals("my/custom/query/url", post.getURI().toString()); + } + + @Test + public void testClose() throws Exception { + CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + doNothing().when(mockClient).close(); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); + publisher.close(); + verify(mockClient).close(); + } + + @Test + public void testCloseDoesNotThrow() throws Exception { + CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + doThrow(new IOException("error!")).when(mockClient).close(); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null); + + publisher.close(); + verify(mockClient).close(); + } + + @Test + public void testBadResponseDoesNotThrow() throws Exception { + CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + // This won't work because this method doesn't declare that it throws - figure out how to make the HttpPost + // object throw somehow? + doThrow(new IOException("error!")).when(mockClient).execute(any(), any()); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); + + publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); + verify(mockClient).execute(any(), any()); + } + // @Test(timeOut = 5000L) // public void testException() throws Exception { // // This will hit a non-existent url and fail, testing our exceptions diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java index 77515590..7992e79f 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java @@ -27,97 +27,97 @@ import static org.mockito.Mockito.verify; public class RESTSubscriberTest { - @Test - public void testGetMessages() throws Exception { - PubSubMessage responseData = new PubSubMessage("someID", "someContent"); - CompletableFuture response = getOkFuture(getOkResponse(responseData.asJSON())); - BoundRequestBuilder mockBuilder = mockBuilderWith(response); - AsyncHttpClient mockClient = mockClientWith(mockBuilder); - RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); - - List messages = subscriber.getMessages(); - Assert.assertEquals(messages.size(), 2); - Assert.assertEquals(messages.get(0).asJSON(), "{\"id\":\"someID\",\"sequence\":-1,\"content\":\"someContent\",\"metadata\":null}"); - } - - @Test - public void testGetMessages204() throws Exception { - CompletableFuture response = getOkFuture(getNotOkResponse(204)); - BoundRequestBuilder mockBuilder = mockBuilderWith(response); - AsyncHttpClient mockClient = mockClientWith(mockBuilder); - RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); - - List messages = subscriber.getMessages(); - Assert.assertEquals(messages.size(), 0); - } - - @Test - public void testGetMessages500() throws Exception { - CompletableFuture response = getOkFuture(getNotOkResponse(500)); - BoundRequestBuilder mockBuilder = mockBuilderWith(response); - AsyncHttpClient mockClient = mockClientWith(mockBuilder); - RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); - - List messages = subscriber.getMessages(); - Assert.assertEquals(messages.size(), 0); - } - - @Test - public void testGetMessagesDoesNotThrow() throws Exception { - // PubSubMessage will throw an error when it fails to parse this into a PubSubMessage - CompletableFuture response = getOkFuture(getOkResponse("thisCannotBeTurnedIntoAPubSubMessage")); - BoundRequestBuilder mockBuilder = mockBuilderWith(response); - AsyncHttpClient mockClient = mockClientWith(mockBuilder); - RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); - - List messages = subscriber.getMessages(); - Assert.assertEquals(messages.size(), 0); - } - - @Test - public void testClose() throws Exception { - AsyncHttpClient mockClient = mock(AsyncHttpClient.class); - doNothing().when(mockClient).close(); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); - - subscriber.close(); - verify(mockClient).close(); - } - - @Test - public void testCloseDoesNotThrow() throws Exception { - AsyncHttpClient mockClient = mock(AsyncHttpClient.class); - doThrow(new IOException("error!")).when(mockClient).close(); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); - - subscriber.close(); - verify(mockClient).close(); - } - - @Test - public void testMinWait() throws Exception { - PubSubMessage responseData = new PubSubMessage("someID", "someContent"); - CompletableFuture response = getOkFuture(getOkResponse(responseData.asJSON())); - BoundRequestBuilder mockBuilder = mockBuilderWith(response); - AsyncHttpClient mockClient = mockClientWith(mockBuilder); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 1000); - - // First response should give content (2 events since we have 2 endpoints in the config) - List messages = subscriber.getMessages(); - Assert.assertEquals(messages.size(), 2); - // Second and third response should give nothing since the wait duration hasn't passed - messages = subscriber.getMessages(); - Assert.assertEquals(messages.size(), 0); - messages = subscriber.getMessages(); - Assert.assertEquals(messages.size(), 0); - - // After waiting a second it should return messages again - Thread.sleep(3000); - messages = subscriber.getMessages(); - Assert.assertEquals(messages.size(), 2); - } +// @Test +// public void testGetMessages() throws Exception { +// PubSubMessage responseData = new PubSubMessage("someID", "someContent"); +// CompletableFuture response = getOkFuture(getOkResponse(responseData.asJSON())); +// BoundRequestBuilder mockBuilder = mockBuilderWith(response); +// AsyncHttpClient mockClient = mockClientWith(mockBuilder); +// RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); +// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); +// +// List messages = subscriber.getMessages(); +// Assert.assertEquals(messages.size(), 2); +// Assert.assertEquals(messages.get(0).asJSON(), "{\"id\":\"someID\",\"sequence\":-1,\"content\":\"someContent\",\"metadata\":null}"); +// } +// +// @Test +// public void testGetMessages204() throws Exception { +// CompletableFuture response = getOkFuture(getNotOkResponse(204)); +// BoundRequestBuilder mockBuilder = mockBuilderWith(response); +// AsyncHttpClient mockClient = mockClientWith(mockBuilder); +// RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); +// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); +// +// List messages = subscriber.getMessages(); +// Assert.assertEquals(messages.size(), 0); +// } +// +// @Test +// public void testGetMessages500() throws Exception { +// CompletableFuture response = getOkFuture(getNotOkResponse(500)); +// BoundRequestBuilder mockBuilder = mockBuilderWith(response); +// AsyncHttpClient mockClient = mockClientWith(mockBuilder); +// RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); +// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); +// +// List messages = subscriber.getMessages(); +// Assert.assertEquals(messages.size(), 0); +// } +// +// @Test +// public void testGetMessagesDoesNotThrow() throws Exception { +// // PubSubMessage will throw an error when it fails to parse this into a PubSubMessage +// CompletableFuture response = getOkFuture(getOkResponse("thisCannotBeTurnedIntoAPubSubMessage")); +// BoundRequestBuilder mockBuilder = mockBuilderWith(response); +// AsyncHttpClient mockClient = mockClientWith(mockBuilder); +// RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); +// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); +// +// List messages = subscriber.getMessages(); +// Assert.assertEquals(messages.size(), 0); +// } +// +// @Test +// public void testClose() throws Exception { +// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); +// doNothing().when(mockClient).close(); +// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); +// +// subscriber.close(); +// verify(mockClient).close(); +// } +// +// @Test +// public void testCloseDoesNotThrow() throws Exception { +// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); +// doThrow(new IOException("error!")).when(mockClient).close(); +// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); +// +// subscriber.close(); +// verify(mockClient).close(); +// } +// +// @Test +// public void testMinWait() throws Exception { +// PubSubMessage responseData = new PubSubMessage("someID", "someContent"); +// CompletableFuture response = getOkFuture(getOkResponse(responseData.asJSON())); +// BoundRequestBuilder mockBuilder = mockBuilderWith(response); +// AsyncHttpClient mockClient = mockClientWith(mockBuilder); +// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 1000); +// +// // First response should give content (2 events since we have 2 endpoints in the config) +// List messages = subscriber.getMessages(); +// Assert.assertEquals(messages.size(), 2); +// // Second and third response should give nothing since the wait duration hasn't passed +// messages = subscriber.getMessages(); +// Assert.assertEquals(messages.size(), 0); +// messages = subscriber.getMessages(); +// Assert.assertEquals(messages.size(), 0); +// +// // After waiting a second it should return messages again +// Thread.sleep(3000); +// messages = subscriber.getMessages(); +// Assert.assertEquals(messages.size(), 2); +// } } From 0ec4acda8e992f4b64034221ea110cfc75eae00a Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Tue, 27 Mar 2018 15:05:54 -0700 Subject: [PATCH 07/17] Fixing tests --- .../bullet/pubsub/rest/RESTPublisher.java | 18 +++- .../pubsub/rest/RESTResultPublisher.java | 2 +- .../pubsub/rest/RESTQueryPublisherTest.java | 66 ++++++++---- .../pubsub/rest/RESTResultPublisherTest.java | 101 +++++++++--------- .../pubsub/rest/RESTSubscriberTest.java | 24 ++++- 5 files changed, 130 insertions(+), 81 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java index 253e08ee..582e145b 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java @@ -55,7 +55,7 @@ protected void sendToURL(String url, PubSubMessage message) { httpPost.setEntity(new StringEntity(message.asJSON())); httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON); client.execute(httpPost, new RequestCallback()); - } catch (UnsupportedEncodingException e) { + } catch (Exception e) { log.error("Error encoding message in preparation for POST: ", e); } } @@ -64,7 +64,7 @@ static class RequestCallback implements FutureCallback { @Override public void completed(HttpResponse response) { if (response == null || response.getStatusLine().getStatusCode() != RESTPubSub.OK_200) { - log.error("Couldn't reach REST pubsub server. Got response: {}", response); + error("Couldn't reach REST pubsub server. Got response: {}", response); return; } log.debug("Successfully wrote message with status code {}. Response was: {}", response.getStatusLine().getStatusCode(), response); @@ -72,12 +72,22 @@ public void completed(HttpResponse response) { @Override public void failed(Exception e) { - log.error("Failed to post message to RESTPubSub endpoint. Failed with error: ", e); + error("Failed to post message to RESTPubSub endpoint. Failed with error: ", e); } @Override public void cancelled() { - log.error("Failed to post message to RESTPubSub endpoint. Request was cancelled."); + error("Failed to post message to RESTPubSub endpoint. Request was cancelled."); + } + + // Exposed for testing + void error(String s) { + log.error(s); + } + + // Exposed for testing + void error(String s, Object o) { + log.error(s, o); } } } diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java index c5847b8e..197b940f 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java @@ -22,7 +22,7 @@ public RESTResultPublisher(CloseableHttpAsyncClient client) { } @Override - public void send(PubSubMessage message) throws PubSubException { + public void send(PubSubMessage message) { String url = (String) message.getMetadata().getContent(); log.debug("Extracted url to which to send results: {}", url); sendToURL(url, message); diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java index 14a91fd4..581c4e06 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java @@ -7,15 +7,24 @@ import com.yahoo.bullet.pubsub.Metadata; import com.yahoo.bullet.pubsub.PubSubMessage; +import org.apache.http.HttpResponse; import org.apache.commons.io.IOUtils; +import org.apache.http.StatusLine; import org.apache.http.client.methods.HttpPost; import org.mockito.ArgumentCaptor; + import org.testng.annotations.Test; import java.io.IOException; import org.testng.Assert; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; @@ -99,28 +108,45 @@ public void testCloseDoesNotThrow() throws Exception { @Test public void testBadResponseDoesNotThrow() throws Exception { CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); - // This won't work because this method doesn't declare that it throws - figure out how to make the HttpPost - // object throw somehow? - doThrow(new IOException("error!")).when(mockClient).execute(any(), any()); RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); + PubSubMessage message = mock(PubSubMessage.class); + // This will compel the HttpPost object to throw an exception in RESTPublisher.sendToURL() + doReturn(null).when(message).asJSON(); + publisher.send(message); + Assert.assertTrue(true); + verify(mockClient, never()).execute(any(), any()); + } - publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); - verify(mockClient).execute(any(), any()); + @Test + public void testRequestCallbackCompletedGood() throws Exception { + HttpResponse mockResponse = mock(HttpResponse.class); + StatusLine mockStatusLine = mock(StatusLine.class); + doReturn(RESTPubSub.OK_200).when(mockStatusLine).getStatusCode(); + doReturn(mockStatusLine).when(mockResponse).getStatusLine(); + RESTPublisher.RequestCallback requestCallback = spy(RESTPublisher.RequestCallback.class); + requestCallback.completed(mockResponse); + verify(requestCallback, never()).error(anyString()); + verify(requestCallback, never()).error(anyString(), any()); } -// @Test(timeOut = 5000L) -// public void testException() throws Exception { -// // This will hit a non-existent url and fail, testing our exceptions -// AsyncHttpClientConfig clientConfig = new DefaultAsyncHttpClientConfig.Builder().setConnectTimeout(100) -// .setMaxRequestRetry(1) -// .setReadTimeout(-1) -// .setRequestTimeout(-1) -// .build(); -// AsyncHttpClient client = new DefaultAsyncHttpClient(clientConfig); -// AsyncHttpClient spyClient = spy(client); -// RESTQueryPublisher publisher = new RESTQueryPublisher(spyClient, "http://this/does/not/exist:8080", "my/custom/result/url"); -// -// publisher.send(new PubSubMessage("foo", "bar")); -// verify(spyClient).preparePost("http://this/does/not/exist:8080"); -// } + @Test + public void testRequestCallbackCompletedBad() throws Exception { + RESTPublisher.RequestCallback requestCallback = spy(RESTPublisher.RequestCallback.class); + requestCallback.completed(null); + verify(requestCallback).error(anyString(), any()); + } + + @Test + public void testRequestCallbackFailed() throws Exception { + RESTPublisher.RequestCallback requestCallback = spy(RESTPublisher.RequestCallback.class); + requestCallback.failed(new RuntimeException("error")); + verify(requestCallback).error(anyString(), any()); + } + + @Test + public void testRequestCallbackCancelled() throws Exception { + RESTPublisher.RequestCallback requestCallback = spy(RESTPublisher.RequestCallback.class); + requestCallback.cancelled(); + verify(requestCallback).error(anyString()); + } } diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java index 9064fb91..bcd5553e 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java @@ -7,65 +7,62 @@ import com.yahoo.bullet.pubsub.Metadata; import com.yahoo.bullet.pubsub.PubSubMessage; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.BoundRequestBuilder; -import org.asynchttpclient.Response; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.mockito.ArgumentCaptor; +import org.testng.Assert; import org.testng.annotations.Test; import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkFuture; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkResponse; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockBuilderWith; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockClientWith; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; public class RESTResultPublisherTest { -// @Test -// public void testSend() throws Exception { -// CompletableFuture response = getOkFuture(getOkResponse(null)); -// BoundRequestBuilder mockBuilder = mockBuilderWith(response); -// AsyncHttpClient mockClient = mockClientWith(mockBuilder); -// RESTResultPublisher publisher = new RESTResultPublisher(mockClient); -// -// PubSubMessage message = new PubSubMessage("someId", "someContent", new Metadata(null, "custom/url")); -// publisher.send(message); -// verify(mockClient).preparePost("custom/url"); -// verify(mockBuilder).setBody("{\"id\":\"someId\",\"sequence\":-1,\"content\":\"someContent\",\"metadata\":{\"signal\":null,\"content\":\"custom/url\"}}"); -// verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON); -// } -// -// @Test(expectedExceptions = ClassCastException.class) -// public void testSendBadURL() throws Exception { -// CompletableFuture response = getOkFuture(getOkResponse(null)); -// BoundRequestBuilder mockBuilder = mockBuilderWith(response); -// AsyncHttpClient mockClient = mockClientWith(mockBuilder); -// RESTResultPublisher publisher = new RESTResultPublisher(mockClient); -// -// PubSubMessage message = new PubSubMessage("someId", "someContent", new Metadata(null, 88)); -// publisher.send(message); -// } -// -// @Test -// public void testClose() throws Exception { -// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); -// doNothing().when(mockClient).close(); -// RESTResultPublisher publisher = new RESTResultPublisher(mockClient); -// -// publisher.close(); -// verify(mockClient).close(); -// } -// -// @Test -// public void testCloseDoesNotThrow() throws Exception { -// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); -// doThrow(new IOException("error!")).when(mockClient).close(); -// RESTResultPublisher publisher = new RESTResultPublisher(mockClient); -// -// publisher.close(); -// verify(mockClient).close(); -// } + @Test + public void testSendPullsURLFromMessage() throws Exception { + CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + RESTResultPublisher publisher = new RESTResultPublisher(mockClient); + Metadata metadata = new Metadata(null, "my/custom/url"); + publisher.send(new PubSubMessage("foo", "bar", metadata)); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); + ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RequestCallback.class); + verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); + HttpPost post = argumentCaptor.getValue(); + + String actualURI = post.getURI().toString(); + String actualMessage = IOUtils.toString(post.getEntity().getContent(), RESTPubSub.UTF_8); + String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue(); + + String expectedURI = "my/custom/url"; + String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":null,\"content\":\"my/custom/url\"}}"; + String expectedHeader = RESTPublisher.APPLICATION_JSON; + + Assert.assertEquals(expectedMessage, actualMessage); + Assert.assertEquals(expectedHeader, actualHeader); + Assert.assertEquals(expectedURI, actualURI); + } + + @Test + public void testClose() throws Exception { + CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + doNothing().when(mockClient).close(); + RESTResultPublisher publisher = new RESTResultPublisher(mockClient); + + publisher.close(); + verify(mockClient).close(); + } + + @Test + public void testCloseDoesNotThrow() throws Exception { + CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + doThrow(new IOException("error!")).when(mockClient).close(); + RESTResultPublisher publisher = new RESTResultPublisher(mockClient); + + publisher.close(); + verify(mockClient).close(); + } } diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java index 7992e79f..cc5aa2fc 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java @@ -6,29 +6,45 @@ package com.yahoo.bullet.pubsub.rest; import com.yahoo.bullet.pubsub.PubSubMessage; +import org.apache.http.HttpResponse; +import org.apache.http.StatusLine; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.tools.ant.taskdefs.condition.Http; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.BoundRequestBuilder; import org.asynchttpclient.Response; import org.testng.Assert; import org.testng.annotations.Test; import java.io.IOException; +import java.net.HttpRetryException; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getNotOkResponse; import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkFuture; import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkResponse; import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockBuilderWith; import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockClientWith; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; public class RESTSubscriberTest { -// @Test -// public void testGetMessages() throws Exception { + @Test + public void testGetMessages() throws Exception { + CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + Future mockFuture = mock(Future.class); + HttpResponse mockResponse = mock(HttpResponse.class); + StatusLine mockStatusLine = mock(StatusLine.class); + doReturn(RESTPubSub.OK_200).when(mockStatusLine).getStatusCode(); + doReturn(mockStatusLine).when(mockResponse).getStatusLine(); + doReturn(mockResponse).when(mockFuture).get(); + doReturn(mockFuture).when(mockClient).execute(any(), any()); // PubSubMessage responseData = new PubSubMessage("someID", "someContent"); // CompletableFuture response = getOkFuture(getOkResponse(responseData.asJSON())); // BoundRequestBuilder mockBuilder = mockBuilderWith(response); @@ -39,8 +55,8 @@ public class RESTSubscriberTest { // List messages = subscriber.getMessages(); // Assert.assertEquals(messages.size(), 2); // Assert.assertEquals(messages.get(0).asJSON(), "{\"id\":\"someID\",\"sequence\":-1,\"content\":\"someContent\",\"metadata\":null}"); -// } -// + } + // @Test // public void testGetMessages204() throws Exception { // CompletableFuture response = getOkFuture(getNotOkResponse(204)); From b0a52a8bf86c9610ee0be46f82d5f79b96546526 Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Tue, 27 Mar 2018 16:05:14 -0700 Subject: [PATCH 08/17] Fixed tests --- .../bullet/pubsub/rest/RESTPublisher.java | 1 - .../pubsub/rest/RESTResultPublisher.java | 1 - .../bullet/pubsub/rest/RESTSubscriber.java | 1 - .../pubsub/rest/RESTSubscriberTest.java | 202 +++++++++--------- 4 files changed, 99 insertions(+), 106 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java index 582e145b..c8fc7716 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java @@ -14,7 +14,6 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import java.io.IOException; -import java.io.UnsupportedEncodingException; @Slf4j public abstract class RESTPublisher implements Publisher { diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java index 197b940f..fedfbfd0 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java @@ -5,7 +5,6 @@ */ package com.yahoo.bullet.pubsub.rest; -import com.yahoo.bullet.pubsub.PubSubException; import com.yahoo.bullet.pubsub.PubSubMessage; import lombok.extern.slf4j.Slf4j; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java index 4aa77b04..8a736fd7 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java @@ -11,7 +11,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; - import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java index cc5aa2fc..6dbc876e 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java @@ -6,27 +6,20 @@ package com.yahoo.bullet.pubsub.rest; import com.yahoo.bullet.pubsub.PubSubMessage; +import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.StatusLine; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; -import org.apache.tools.ant.taskdefs.condition.Http; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.BoundRequestBuilder; -import org.asynchttpclient.Response; import org.testng.Assert; import org.testng.annotations.Test; +import java.io.ByteArrayInputStream; import java.io.IOException; -import java.net.HttpRetryException; +import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getNotOkResponse; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkFuture; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkResponse; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockBuilderWith; -import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockClientWith; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -35,105 +28,108 @@ import static org.mockito.Mockito.verify; public class RESTSubscriberTest { - @Test - public void testGetMessages() throws Exception { + private CloseableHttpAsyncClient mockClient(int responseCode, String message) throws Exception { CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); Future mockFuture = mock(Future.class); HttpResponse mockResponse = mock(HttpResponse.class); StatusLine mockStatusLine = mock(StatusLine.class); - doReturn(RESTPubSub.OK_200).when(mockStatusLine).getStatusCode(); + doReturn(responseCode).when(mockStatusLine).getStatusCode(); doReturn(mockStatusLine).when(mockResponse).getStatusLine(); + + HttpEntity mockEntity = mock(HttpEntity.class); + InputStream inputStream = new ByteArrayInputStream(message.getBytes(RESTPubSub.UTF_8)); + doReturn(inputStream).when(mockEntity).getContent(); + doReturn(mockEntity).when(mockResponse).getEntity(); + doReturn(mockResponse).when(mockFuture).get(); doReturn(mockFuture).when(mockClient).execute(any(), any()); -// PubSubMessage responseData = new PubSubMessage("someID", "someContent"); -// CompletableFuture response = getOkFuture(getOkResponse(responseData.asJSON())); -// BoundRequestBuilder mockBuilder = mockBuilderWith(response); -// AsyncHttpClient mockClient = mockClientWith(mockBuilder); -// RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); -// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); -// -// List messages = subscriber.getMessages(); -// Assert.assertEquals(messages.size(), 2); -// Assert.assertEquals(messages.get(0).asJSON(), "{\"id\":\"someID\",\"sequence\":-1,\"content\":\"someContent\",\"metadata\":null}"); + + return mockClient; + } + + @Test + public void testGetMessages() throws Exception { + String message = new PubSubMessage("foo", "bar").asJSON(); + CloseableHttpAsyncClient mockClient = mockClient(RESTPubSub.OK_200, message); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); + List messages = subscriber.getMessages(); + Assert.assertEquals(messages.size(), 2); + Assert.assertEquals(messages.get(0).asJSON(), "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":null}"); + } + + @Test + public void testGetMessages204() throws Exception { + String message = new PubSubMessage("foo", "bar").asJSON(); + CloseableHttpAsyncClient mockClient = mockClient(RESTPubSub.NO_CONTENT_204, message); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); + + List messages = subscriber.getMessages(); + Assert.assertEquals(messages.size(), 0); + } + + @Test + public void testGetMessages500() throws Exception { + String message = new PubSubMessage("foo", "bar").asJSON(); + CloseableHttpAsyncClient mockClient = mockClient(500, message); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); + + List messages = subscriber.getMessages(); + Assert.assertEquals(messages.size(), 0); } -// @Test -// public void testGetMessages204() throws Exception { -// CompletableFuture response = getOkFuture(getNotOkResponse(204)); -// BoundRequestBuilder mockBuilder = mockBuilderWith(response); -// AsyncHttpClient mockClient = mockClientWith(mockBuilder); -// RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); -// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); -// -// List messages = subscriber.getMessages(); -// Assert.assertEquals(messages.size(), 0); -// } -// -// @Test -// public void testGetMessages500() throws Exception { -// CompletableFuture response = getOkFuture(getNotOkResponse(500)); -// BoundRequestBuilder mockBuilder = mockBuilderWith(response); -// AsyncHttpClient mockClient = mockClientWith(mockBuilder); -// RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); -// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); -// -// List messages = subscriber.getMessages(); -// Assert.assertEquals(messages.size(), 0); -// } -// -// @Test -// public void testGetMessagesDoesNotThrow() throws Exception { -// // PubSubMessage will throw an error when it fails to parse this into a PubSubMessage -// CompletableFuture response = getOkFuture(getOkResponse("thisCannotBeTurnedIntoAPubSubMessage")); -// BoundRequestBuilder mockBuilder = mockBuilderWith(response); -// AsyncHttpClient mockClient = mockClientWith(mockBuilder); -// RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); -// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); -// -// List messages = subscriber.getMessages(); -// Assert.assertEquals(messages.size(), 0); -// } -// -// @Test -// public void testClose() throws Exception { -// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); -// doNothing().when(mockClient).close(); -// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); -// -// subscriber.close(); -// verify(mockClient).close(); -// } -// -// @Test -// public void testCloseDoesNotThrow() throws Exception { -// AsyncHttpClient mockClient = mock(AsyncHttpClient.class); -// doThrow(new IOException("error!")).when(mockClient).close(); -// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); -// -// subscriber.close(); -// verify(mockClient).close(); -// } -// -// @Test -// public void testMinWait() throws Exception { -// PubSubMessage responseData = new PubSubMessage("someID", "someContent"); -// CompletableFuture response = getOkFuture(getOkResponse(responseData.asJSON())); -// BoundRequestBuilder mockBuilder = mockBuilderWith(response); -// AsyncHttpClient mockClient = mockClientWith(mockBuilder); -// RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 1000); -// -// // First response should give content (2 events since we have 2 endpoints in the config) -// List messages = subscriber.getMessages(); -// Assert.assertEquals(messages.size(), 2); -// // Second and third response should give nothing since the wait duration hasn't passed -// messages = subscriber.getMessages(); -// Assert.assertEquals(messages.size(), 0); -// messages = subscriber.getMessages(); -// Assert.assertEquals(messages.size(), 0); -// -// // After waiting a second it should return messages again -// Thread.sleep(3000); -// messages = subscriber.getMessages(); -// Assert.assertEquals(messages.size(), 2); -// } + @Test + public void testGetMessagesDoesNotThrow() throws Exception { + String message = new PubSubMessage("foo", "bar").asJSON(); + CloseableHttpAsyncClient mockClient = mockClient(500, message); + List urls = new ArrayList<>(); + // A null url will throw an error - make sure it handled eloquently + urls.add(null); + RESTSubscriber subscriber = new RESTSubscriber(88, urls, mockClient, 10); + + List messages = subscriber.getMessages(); + Assert.assertEquals(messages.size(), 0); + } + + @Test + public void testClose() throws Exception { + String message = new PubSubMessage("foo", "bar").asJSON(); + CloseableHttpAsyncClient mockClient = mockClient(500, message); + doNothing().when(mockClient).close(); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); + + subscriber.close(); + verify(mockClient).close(); + } + + @Test + public void testCloseDoesNotThrow() throws Exception { + String message = new PubSubMessage("foo", "bar").asJSON(); + CloseableHttpAsyncClient mockClient = mockClient(500, message); + doThrow(new IOException("error!")).when(mockClient).close(); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); + + subscriber.close(); + verify(mockClient).close(); + } + + @Test + public void testMinWait() throws Exception { + String message = new PubSubMessage("someID", "someContent").asJSON(); + CloseableHttpAsyncClient mockClient = mockClient(RESTPubSub.OK_200, message); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 1000); + + // First response should give content (2 events since we have 2 endpoints in the config) + List messages = subscriber.getMessages(); + Assert.assertEquals(messages.size(), 2); + // Second and third response should give nothing since the wait duration hasn't passed + messages = subscriber.getMessages(); + Assert.assertEquals(messages.size(), 0); + messages = subscriber.getMessages(); + Assert.assertEquals(messages.size(), 0); + + // After waiting a second it should return messages again + Thread.sleep(3000); + messages = subscriber.getMessages(); + Assert.assertEquals(messages.size(), 2); + } } From b25bbc1918b6691ce8da3f25d774a6858887af8d Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Tue, 27 Mar 2018 16:10:36 -0700 Subject: [PATCH 09/17] Fixed pom --- pom.xml | 17 ------ .../bullet/pubsub/rest/RESTPubSubTest.java | 60 ------------------- 2 files changed, 77 deletions(-) diff --git a/pom.xml b/pom.xml index 64eaa1c9..fdf06dc5 100644 --- a/pom.xml +++ b/pom.xml @@ -102,20 +102,6 @@ jvyaml 0.2.1 - - - - - - - - org.asynchttpclient - async-http-client - 2.0.37 - - - - org.apache.directory.studio org.apache.commons.io @@ -126,9 +112,6 @@ httpasyncclient 4.1.3 - - - diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubTest.java index 3535c613..e6f7659b 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubTest.java @@ -9,21 +9,9 @@ import com.yahoo.bullet.pubsub.PubSubException; import com.yahoo.bullet.pubsub.Publisher; import com.yahoo.bullet.pubsub.Subscriber; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.BoundRequestBuilder; -import org.asynchttpclient.ListenableFuture; -import org.asynchttpclient.Response; import org.testng.Assert; import org.testng.annotations.Test; - import java.util.List; -import java.util.concurrent.CompletableFuture; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; public class RESTPubSubTest { @Test @@ -43,54 +31,6 @@ public void testSettings() throws PubSubException { Assert.assertEquals(urls.get(0), "http://localhost:9901/api/bullet/pubsub/result"); } - public static AsyncHttpClient mockClientWith(BoundRequestBuilder builder) { - AsyncHttpClient mockClient = mock(AsyncHttpClient.class); - doReturn(builder).when(mockClient).preparePost(anyString()); - doReturn(builder).when(mockClient).prepareGet(anyString()); - return mockClient; - } - - public static BoundRequestBuilder mockBuilderWith(CompletableFuture future) throws Exception { - ListenableFuture mockListenable = (ListenableFuture) mock(ListenableFuture.class); - doReturn(future.get()).when(mockListenable).get(); - doReturn(future).when(mockListenable).toCompletableFuture(); - - BoundRequestBuilder mockBuilder = mock(BoundRequestBuilder.class); - doReturn(mockBuilder).when(mockBuilder).setHeader(any(), anyString()); - doReturn(mockBuilder).when(mockBuilder).setBody(anyString()); - doReturn(mockListenable).when(mockBuilder).execute(); - return mockBuilder; - } - - public static CompletableFuture getOkFuture(Response response) throws Exception { - CompletableFuture finished = CompletableFuture.completedFuture(response); - CompletableFuture mock = mock(CompletableFuture.class); - // This is the weird bit. We mock the call to exceptionally to return the finished response so that chaining - // a thenAcceptAsync on it will call the consumer of it with the finished response. This is why it looks - // weird: mocking the exceptionally to take the "good" path. - doReturn(finished).when(mock).exceptionally(any()); - doReturn(finished.get()).when(mock).get(); - // So if we do get to thenAccept on our mock, we should throw an exception because we shouldn't get there. - doThrow(new RuntimeException("Good futures don't throw")).when(mock).thenAcceptAsync(any()); - return mock; - } - - public static Response getOkResponse(String data) { - return getResponse(RESTPubSub.OK_200, "Ok", data); - } - - public static Response getNotOkResponse(int status) { - return getResponse(status, "Error", null); - } - - public static Response getResponse(int status, String statusText, String body) { - Response mock = mock(Response.class); - doReturn(status).when(mock).getStatusCode(); - doReturn(statusText).when(mock).getStatusText(); - doReturn(body).when(mock).getResponseBody(); - return mock; - } - @Test public void testGetPublisher() throws PubSubException { BulletConfig config = new BulletConfig("src/test/resources/test_config.yaml"); From 0284828c78e1c4467ac86b219c410504501695fa Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Tue, 27 Mar 2018 18:04:04 -0700 Subject: [PATCH 10/17] Tried something --- src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java index 89391f52..5b1b45d6 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java @@ -77,7 +77,9 @@ public List getSubscribers(int n) { private CloseableHttpAsyncClient getClient() { int connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Integer.class); - IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setConnectTimeout(connectTimeout).build(); - return HttpAsyncClients.custom().setDefaultIOReactorConfig(ioReactorConfig).build(); + IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setConnectTimeout(connectTimeout) + .setSoTimeout(connectTimeout).build(); + return HttpAsyncClients.custom().setDefaultIOReactorConfig(ioReactorConfig) + .setMaxConnPerRoute(10).setMaxConnTotal(15).build(); } } From e2381637f634c0fbc0c55b85dc26daf99ce57b00 Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Tue, 27 Mar 2018 18:09:59 -0700 Subject: [PATCH 11/17] Reverts commit 0284828c78e1c4467ac86b219c410504501695fa. --- src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java index 5b1b45d6..89391f52 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java @@ -77,9 +77,7 @@ public List getSubscribers(int n) { private CloseableHttpAsyncClient getClient() { int connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Integer.class); - IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setConnectTimeout(connectTimeout) - .setSoTimeout(connectTimeout).build(); - return HttpAsyncClients.custom().setDefaultIOReactorConfig(ioReactorConfig) - .setMaxConnPerRoute(10).setMaxConnTotal(15).build(); + IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setConnectTimeout(connectTimeout).build(); + return HttpAsyncClients.custom().setDefaultIOReactorConfig(ioReactorConfig).build(); } } From 7376d794e7a9fe0e625e57a5c5b25bedb86f6da8 Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Thu, 29 Mar 2018 14:14:47 -0700 Subject: [PATCH 12/17] Removed apache.commons.io dependecy --- pom.xml | 5 ----- .../java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java | 4 ++-- .../yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java | 8 ++++---- .../yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java | 4 ++-- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index fdf06dc5..041d89a6 100644 --- a/pom.xml +++ b/pom.xml @@ -102,11 +102,6 @@ jvyaml 0.2.1 - - org.apache.directory.studio - org.apache.commons.io - 2.4 - org.apache.httpcomponents httpasyncclient diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java index 8a736fd7..51f6d503 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java @@ -14,10 +14,10 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.util.EntityUtils; @Slf4j public class RESTSubscriber extends BufferingSubscriber { @@ -59,7 +59,7 @@ public List getMessages() throws PubSubException { HttpResponse response = client.execute(httpGet, null).get(); int statusCode = response.getStatusLine().getStatusCode(); if (statusCode == RESTPubSub.OK_200) { - String message = IOUtils.toString(response.getEntity().getContent(), RESTPubSub.UTF_8); + String message = EntityUtils.toString(response.getEntity(), RESTPubSub.UTF_8); messages.add(PubSubMessage.fromJSON(message)); } else if (statusCode != RESTPubSub.NO_CONTENT_204) { // NO_CONTENT_204 indicates there are no new messages - anything else indicates a problem diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java index 581c4e06..559f5a89 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java @@ -8,9 +8,9 @@ import com.yahoo.bullet.pubsub.Metadata; import com.yahoo.bullet.pubsub.PubSubMessage; import org.apache.http.HttpResponse; -import org.apache.commons.io.IOUtils; import org.apache.http.StatusLine; import org.apache.http.client.methods.HttpPost; +import org.apache.http.util.EntityUtils; import org.mockito.ArgumentCaptor; import org.testng.annotations.Test; @@ -39,7 +39,7 @@ public void testSendResultUrlPutInMetadataAckPreserved() throws Exception { ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RequestCallback.class); verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); HttpPost post = argumentCaptor.getValue(); - String actualMessage = IOUtils.toString(post.getEntity().getContent(), RESTPubSub.UTF_8); + String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8); String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"ACKNOWLEDGE\",\"content\":\"my/custom/url\"}}"; String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue(); String expectedHeader = RESTPublisher.APPLICATION_JSON; @@ -58,7 +58,7 @@ public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception { ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RequestCallback.class); verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); HttpPost post = argumentCaptor.getValue(); - String actualMessage = IOUtils.toString(post.getEntity().getContent(), RESTPubSub.UTF_8); + String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8); String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/url\"}}"; String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue(); String expectedHeader = RESTPublisher.APPLICATION_JSON; @@ -77,7 +77,7 @@ public void testSendMetadataCreated() throws Exception { ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RequestCallback.class); verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); HttpPost post = argumentCaptor.getValue(); - String actualMessage = IOUtils.toString(post.getEntity().getContent(), RESTPubSub.UTF_8); + String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8); String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":null,\"content\":\"my/custom/url\"}}"; String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue(); String expectedHeader = RESTPublisher.APPLICATION_JSON; diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java index bcd5553e..f9089801 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java @@ -7,9 +7,9 @@ import com.yahoo.bullet.pubsub.Metadata; import com.yahoo.bullet.pubsub.PubSubMessage; -import org.apache.commons.io.IOUtils; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.util.EntityUtils; import org.mockito.ArgumentCaptor; import org.testng.Assert; import org.testng.annotations.Test; @@ -34,7 +34,7 @@ public void testSendPullsURLFromMessage() throws Exception { HttpPost post = argumentCaptor.getValue(); String actualURI = post.getURI().toString(); - String actualMessage = IOUtils.toString(post.getEntity().getContent(), RESTPubSub.UTF_8); + String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8); String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue(); String expectedURI = "my/custom/url"; From ee3a69d07a2a80fb74392122fe3ed8b93caa5fdb Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Fri, 30 Mar 2018 10:15:39 -0700 Subject: [PATCH 13/17] Changed send logic to be inside callback object --- .../bullet/pubsub/rest/RESTPublisher.java | 41 ++++++++++++++----- .../pubsub/rest/RESTQueryPublisherTest.java | 40 +++++++++--------- .../pubsub/rest/RESTResultPublisherTest.java | 2 +- 3 files changed, 52 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java index c8fc7716..505fba51 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java @@ -48,18 +48,24 @@ public void close() { * @param message The message to send. */ protected void sendToURL(String url, PubSubMessage message) { - log.debug("Sending message: {} to url: {}", message, url); - try { - HttpPost httpPost = new HttpPost(url); - httpPost.setEntity(new StringEntity(message.asJSON())); - httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON); - client.execute(httpPost, new RequestCallback()); - } catch (Exception e) { - log.error("Error encoding message in preparation for POST: ", e); - } + new RESTRequest(url, message.asJSON(), 3, client).send(); } - static class RequestCallback implements FutureCallback { + static class RESTRequest implements FutureCallback { + private String url; + private String message; + private int maxRetries; + private int retries; + private CloseableHttpAsyncClient client; + + public RESTRequest(String url, String message, int maxRetries, CloseableHttpAsyncClient client) { + this.url = url; + this.message = message; + this.maxRetries = maxRetries; + this.client = client; + this.retries = 0; + } + @Override public void completed(HttpResponse response) { if (response == null || response.getStatusLine().getStatusCode() != RESTPubSub.OK_200) { @@ -79,6 +85,21 @@ public void cancelled() { error("Failed to post message to RESTPubSub endpoint. Request was cancelled."); } + public void send() { + this.retries++; + log.debug("Sending message: {} to url: {}", message, url); + try { + synchronized (client) { + HttpPost httpPost = new HttpPost(url); + httpPost.setEntity(new StringEntity(message)); + httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON); + client.execute(httpPost, this); + } + } catch (Exception e) { + log.error("Error encoding message in preparation for POST: ", e); + } + } + // Exposed for testing void error(String s) { log.error(s); diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java index 559f5a89..e52686ac 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java @@ -36,7 +36,7 @@ public void testSendResultUrlPutInMetadataAckPreserved() throws Exception { publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.ACKNOWLEDGE)); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); - ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RequestCallback.class); + ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RESTRequest.class); verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); HttpPost post = argumentCaptor.getValue(); String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8); @@ -55,7 +55,7 @@ public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception { publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); - ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RequestCallback.class); + ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RESTRequest.class); verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); HttpPost post = argumentCaptor.getValue(); String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8); @@ -74,7 +74,7 @@ public void testSendMetadataCreated() throws Exception { publisher.send("foo", "bar"); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); - ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RequestCallback.class); + ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RESTRequest.class); verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); HttpPost post = argumentCaptor.getValue(); String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8); @@ -118,35 +118,35 @@ public void testBadResponseDoesNotThrow() throws Exception { } @Test - public void testRequestCallbackCompletedGood() throws Exception { + public void testrestRequestCompletedGood() throws Exception { HttpResponse mockResponse = mock(HttpResponse.class); StatusLine mockStatusLine = mock(StatusLine.class); doReturn(RESTPubSub.OK_200).when(mockStatusLine).getStatusCode(); doReturn(mockStatusLine).when(mockResponse).getStatusLine(); - RESTPublisher.RequestCallback requestCallback = spy(RESTPublisher.RequestCallback.class); - requestCallback.completed(mockResponse); - verify(requestCallback, never()).error(anyString()); - verify(requestCallback, never()).error(anyString(), any()); + RESTPublisher.RESTRequest restRequest = spy(RESTPublisher.RESTRequest.class); + restRequest.completed(mockResponse); + verify(restRequest, never()).error(anyString()); + verify(restRequest, never()).error(anyString(), any()); } @Test - public void testRequestCallbackCompletedBad() throws Exception { - RESTPublisher.RequestCallback requestCallback = spy(RESTPublisher.RequestCallback.class); - requestCallback.completed(null); - verify(requestCallback).error(anyString(), any()); + public void testrestRequestCompletedBad() throws Exception { + RESTPublisher.RESTRequest restRequest = spy(RESTPublisher.RESTRequest.class); + restRequest.completed(null); + verify(restRequest).error(anyString(), any()); } @Test - public void testRequestCallbackFailed() throws Exception { - RESTPublisher.RequestCallback requestCallback = spy(RESTPublisher.RequestCallback.class); - requestCallback.failed(new RuntimeException("error")); - verify(requestCallback).error(anyString(), any()); + public void testrestRequestFailed() throws Exception { + RESTPublisher.RESTRequest restRequest = spy(RESTPublisher.RESTRequest.class); + restRequest.failed(new RuntimeException("error")); + verify(restRequest).error(anyString(), any()); } @Test - public void testRequestCallbackCancelled() throws Exception { - RESTPublisher.RequestCallback requestCallback = spy(RESTPublisher.RequestCallback.class); - requestCallback.cancelled(); - verify(requestCallback).error(anyString()); + public void testrestRequestCancelled() throws Exception { + RESTPublisher.RESTRequest restRequest = spy(RESTPublisher.RESTRequest.class); + restRequest.cancelled(); + verify(restRequest).error(anyString()); } } diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java index f9089801..e39d7e81 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java @@ -29,7 +29,7 @@ public void testSendPullsURLFromMessage() throws Exception { publisher.send(new PubSubMessage("foo", "bar", metadata)); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); - ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RequestCallback.class); + ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RESTRequest.class); verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); HttpPost post = argumentCaptor.getValue(); From e612575a99510260b54a626af224f5601d1d30b1 Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Fri, 30 Mar 2018 14:34:36 -0700 Subject: [PATCH 14/17] Use synchronous client no tests --- .../yahoo/bullet/pubsub/rest/RESTPubSub.java | 20 ++++++++----- .../bullet/pubsub/rest/RESTPublisher.java | 30 +++++++++++++++---- .../pubsub/rest/RESTQueryPublisher.java | 6 ++-- .../pubsub/rest/RESTResultPublisher.java | 6 ++-- .../bullet/pubsub/rest/RESTSubscriber.java | 24 +++++++++++---- 5 files changed, 61 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java index 89391f52..a1411eac 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java @@ -11,10 +11,13 @@ import com.yahoo.bullet.pubsub.Publisher; import com.yahoo.bullet.pubsub.Subscriber; import lombok.extern.slf4j.Slf4j; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; +import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.impl.nio.reactor.IOReactorConfig; - import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -30,7 +33,7 @@ public class RESTPubSub extends PubSub { * Create a RESTPubSub from a {@link BulletConfig}. * * @param config The config. - * @throws PubSubException + * @throws PubSubException if the context name is not present or cannot be parsed. */ public RESTPubSub(BulletConfig config) throws PubSubException { super(config); @@ -40,11 +43,13 @@ public RESTPubSub(BulletConfig config) throws PubSubException { @Override public Publisher getPublisher() { if (context == Context.QUERY_PROCESSING) { - return new RESTResultPublisher(getClient()); + return new RESTResultPublisher(HttpClients.createDefault()); } else { String queryURL = ((List) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class)).get(0); String resultURL = config.getAs(RESTPubSubConfig.RESULT_URL, String.class); - return new RESTQueryPublisher(getClient(), queryURL, resultURL); + HttpRequestRetryHandler retryHandler = new DefaultHttpRequestRetryHandler(3, true); // <---- remove this (don't remove just fix the hard-coded "3") + CloseableHttpClient client = HttpClients.custom().setRetryHandler(retryHandler).build(); + return new RESTQueryPublisher(client, queryURL, resultURL); } } @@ -56,7 +61,7 @@ public List getPublishers(int n) { @Override public Subscriber getSubscriber() { int maxUncommittedMessages = config.getAs(RESTPubSubConfig.MAX_UNCOMMITTED_MESSAGES, Integer.class); - CloseableHttpAsyncClient client = getClient(); + int connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Integer.class); List urls; Long minWait; @@ -67,7 +72,7 @@ public Subscriber getSubscriber() { urls = Collections.singletonList(config.getAs(RESTPubSubConfig.RESULT_URL, String.class)); minWait = config.getAs(RESTPubSubConfig.RESULT_SUBSCRIBER_MIN_WAIT, Long.class); } - return new RESTSubscriber(maxUncommittedMessages, urls, client, minWait); + return new RESTSubscriber(maxUncommittedMessages, urls, HttpClients.createDefault(), minWait, connectTimeout); } @Override @@ -75,7 +80,8 @@ public List getSubscribers(int n) { return IntStream.range(0, n).mapToObj(i -> getSubscriber()).collect(Collectors.toList()); } - private CloseableHttpAsyncClient getClient() { + // <------------- remove this function + private CloseableHttpAsyncClient getClient2() { int connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Integer.class); IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setConnectTimeout(connectTimeout).build(); return HttpAsyncClients.custom().setDefaultIOReactorConfig(ioReactorConfig).build(); diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java index 505fba51..e8794f77 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java @@ -12,6 +12,7 @@ import org.apache.http.concurrent.FutureCallback; import org.apache.http.HttpResponse; import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import java.io.IOException; @@ -20,16 +21,15 @@ public abstract class RESTPublisher implements Publisher { public static final String APPLICATION_JSON = "application/json"; public static final String CONTENT_TYPE = "content-type"; - private CloseableHttpAsyncClient client; + private CloseableHttpClient client; /** - * Create a RESTQueryPublisher from a {@link CloseableHttpAsyncClient}. + * Create a RESTQueryPublisher from a {@link CloseableHttpClient}. * * @param client The client. */ - public RESTPublisher(CloseableHttpAsyncClient client) { + public RESTPublisher(CloseableHttpClient client) { this.client = client; - client.start(); } @Override @@ -48,9 +48,25 @@ public void close() { * @param message The message to send. */ protected void sendToURL(String url, PubSubMessage message) { - new RESTRequest(url, message.asJSON(), 3, client).send(); + log.debug("Sending message: {} to url: {}", message, url); + try { + HttpPost httpPost = new HttpPost(url); + httpPost.setEntity(new StringEntity(message.asJSON())); + httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON); + client.execute(httpPost); + } catch (Exception e) { + log.error("Error encoding message in preparation for POST: ", e); + } + + } + + + + + + static class RESTRequest implements FutureCallback { private String url; private String message; @@ -78,6 +94,8 @@ public void completed(HttpResponse response) { @Override public void failed(Exception e) { error("Failed to post message to RESTPubSub endpoint. Failed with error: ", e); + retries++; + send(); } @Override @@ -88,7 +106,7 @@ public void cancelled() { public void send() { this.retries++; log.debug("Sending message: {} to url: {}", message, url); - try { + try { synchronized (client) { HttpPost httpPost = new HttpPost(url); httpPost.setEntity(new StringEntity(message)); diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java index 4496854f..19d1796c 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java @@ -11,7 +11,7 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.client.CloseableHttpClient; @Slf4j public class RESTQueryPublisher extends RESTPublisher { @@ -20,14 +20,14 @@ public class RESTQueryPublisher extends RESTPublisher { private String resultURL; /** - * Create a RESTQueryPublisher from a {@link CloseableHttpAsyncClient}, queryURL and resultURL. The BulletConfig must + * Create a RESTQueryPublisher from a {@link CloseableHttpClient}, queryURL and resultURL. The BulletConfig must * contain a valid url in the bullet.pubsub.rest.query.urls field. * * @param client The client. * @param queryURL The URL to which to POST queries. * @param resultURL The URL that will be added to the Metadata (results will be sent to this URL from the backend). */ - public RESTQueryPublisher(CloseableHttpAsyncClient client, String queryURL, String resultURL) { + public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String resultURL) { super(client); this.queryURL = queryURL; this.resultURL = resultURL; diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java index fedfbfd0..7e09afdf 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java @@ -7,16 +7,16 @@ import com.yahoo.bullet.pubsub.PubSubMessage; import lombok.extern.slf4j.Slf4j; -import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.client.CloseableHttpClient; @Slf4j public class RESTResultPublisher extends RESTPublisher { /** - * Create a RESTQueryPublisher from a {@link CloseableHttpAsyncClient}. + * Create a RESTQueryPublisher from a {@link CloseableHttpClient}. * * @param client The client. */ - public RESTResultPublisher(CloseableHttpAsyncClient client) { + public RESTResultPublisher(CloseableHttpClient client) { super(client); } diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java index 51f6d503..0cd38351 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java @@ -15,17 +15,20 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; +import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.client.CloseableHttpClient; + import org.apache.http.util.EntityUtils; @Slf4j public class RESTSubscriber extends BufferingSubscriber { @Getter(AccessLevel.PACKAGE) private List urls; - private CloseableHttpAsyncClient client; + private CloseableHttpClient client; private long minWait; private long lastRequest; + private int connectTimeout; /** * Create a RESTSubscriber. @@ -35,13 +38,13 @@ public class RESTSubscriber extends BufferingSubscriber { * @param client The client to use to make http requests. * @param minWait The minimum time (ms) to wait between subsequent http requests. */ - public RESTSubscriber(int maxUncommittedMessages, List urls, CloseableHttpAsyncClient client, long minWait) { + public RESTSubscriber(int maxUncommittedMessages, List urls, CloseableHttpClient client, long minWait, int connectTimeout) { super(maxUncommittedMessages); this.client = client; - this.client.start(); this.urls = urls; this.minWait = minWait; this.lastRequest = 0; + this.connectTimeout = connectTimeout; } @Override @@ -55,8 +58,7 @@ public List getMessages() throws PubSubException { for (String url : urls) { try { log.debug("Getting messages from url: {}", url); - HttpGet httpGet = new HttpGet(url); - HttpResponse response = client.execute(httpGet, null).get(); + HttpResponse response = client.execute(makeHttpGet(url)); int statusCode = response.getStatusLine().getStatusCode(); if (statusCode == RESTPubSub.OK_200) { String message = EntityUtils.toString(response.getEntity(), RESTPubSub.UTF_8); @@ -80,4 +82,14 @@ public void close() { log.warn("Caught exception when closing AsyncHttpClient: ", e); } } + + private HttpGet makeHttpGet(String url) { + HttpGet httpGet = new HttpGet(url); + RequestConfig requestConfig = + RequestConfig.custom().setConnectTimeout(connectTimeout) + .setSocketTimeout(connectTimeout) + .build(); + httpGet.setConfig(requestConfig); + return httpGet; + } } From f7e3c54226494665ee76d7dbe611a3b20631e02d Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Fri, 30 Mar 2018 15:15:58 -0700 Subject: [PATCH 15/17] Using simple synchronous client --- .../yahoo/bullet/pubsub/rest/RESTPubSub.java | 17 +--- .../bullet/pubsub/rest/RESTPublisher.java | 77 ++----------------- 2 files changed, 6 insertions(+), 88 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java index a1411eac..b6df08cf 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java @@ -11,13 +11,7 @@ import com.yahoo.bullet.pubsub.Publisher; import com.yahoo.bullet.pubsub.Subscriber; import lombok.extern.slf4j.Slf4j; -import org.apache.http.client.HttpRequestRetryHandler; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; import org.apache.http.impl.client.HttpClients; -import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; -import org.apache.http.impl.nio.client.HttpAsyncClients; -import org.apache.http.impl.nio.reactor.IOReactorConfig; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -47,9 +41,7 @@ public Publisher getPublisher() { } else { String queryURL = ((List) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class)).get(0); String resultURL = config.getAs(RESTPubSubConfig.RESULT_URL, String.class); - HttpRequestRetryHandler retryHandler = new DefaultHttpRequestRetryHandler(3, true); // <---- remove this (don't remove just fix the hard-coded "3") - CloseableHttpClient client = HttpClients.custom().setRetryHandler(retryHandler).build(); - return new RESTQueryPublisher(client, queryURL, resultURL); + return new RESTQueryPublisher(HttpClients.createDefault(), queryURL, resultURL); } } @@ -79,11 +71,4 @@ public Subscriber getSubscriber() { public List getSubscribers(int n) { return IntStream.range(0, n).mapToObj(i -> getSubscriber()).collect(Collectors.toList()); } - - // <------------- remove this function - private CloseableHttpAsyncClient getClient2() { - int connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Integer.class); - IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setConnectTimeout(connectTimeout).build(); - return HttpAsyncClients.custom().setDefaultIOReactorConfig(ioReactorConfig).build(); - } } diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java index e8794f77..5914d300 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java @@ -9,11 +9,9 @@ import com.yahoo.bullet.pubsub.Publisher; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.methods.HttpPost; -import org.apache.http.concurrent.FutureCallback; import org.apache.http.HttpResponse; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import java.io.IOException; @Slf4j @@ -37,7 +35,7 @@ public void close() { try { client.close(); } catch (IOException e) { - log.error("Caught exception when closing AsyncHttpClient...: ", e); + log.error("Caught exception when closing client: ", e); } } @@ -53,79 +51,14 @@ protected void sendToURL(String url, PubSubMessage message) { HttpPost httpPost = new HttpPost(url); httpPost.setEntity(new StringEntity(message.asJSON())); httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON); - client.execute(httpPost); - } catch (Exception e) { - log.error("Error encoding message in preparation for POST: ", e); - } - - - } - - - - - - - - static class RESTRequest implements FutureCallback { - private String url; - private String message; - private int maxRetries; - private int retries; - private CloseableHttpAsyncClient client; - - public RESTRequest(String url, String message, int maxRetries, CloseableHttpAsyncClient client) { - this.url = url; - this.message = message; - this.maxRetries = maxRetries; - this.client = client; - this.retries = 0; - } - - @Override - public void completed(HttpResponse response) { + HttpResponse response = client.execute(httpPost); if (response == null || response.getStatusLine().getStatusCode() != RESTPubSub.OK_200) { - error("Couldn't reach REST pubsub server. Got response: {}", response); + log.error("Couldn't reach REST pubsub server. Got response: {}", response); return; } log.debug("Successfully wrote message with status code {}. Response was: {}", response.getStatusLine().getStatusCode(), response); - } - - @Override - public void failed(Exception e) { - error("Failed to post message to RESTPubSub endpoint. Failed with error: ", e); - retries++; - send(); - } - - @Override - public void cancelled() { - error("Failed to post message to RESTPubSub endpoint. Request was cancelled."); - } - - public void send() { - this.retries++; - log.debug("Sending message: {} to url: {}", message, url); - try { - synchronized (client) { - HttpPost httpPost = new HttpPost(url); - httpPost.setEntity(new StringEntity(message)); - httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON); - client.execute(httpPost, this); - } - } catch (Exception e) { - log.error("Error encoding message in preparation for POST: ", e); - } - } - - // Exposed for testing - void error(String s) { - log.error(s); - } - - // Exposed for testing - void error(String s, Object o) { - log.error(s, o); + } catch (Exception e) { + log.error("Error encoding message in preparation for POST: ", e); } } } From bd7f6af4c18486516a7dda6f3180498b359936d4 Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Fri, 30 Mar 2018 16:25:13 -0700 Subject: [PATCH 16/17] Finished tests --- .../pubsub/rest/RESTQueryPublisherTest.java | 68 +++++-------------- .../pubsub/rest/RESTResultPublisherTest.java | 11 ++- .../pubsub/rest/RESTSubscriberTest.java | 53 +++++++-------- 3 files changed, 45 insertions(+), 87 deletions(-) diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java index e52686ac..c42c10f5 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java @@ -7,37 +7,38 @@ import com.yahoo.bullet.pubsub.Metadata; import com.yahoo.bullet.pubsub.PubSubMessage; -import org.apache.http.HttpResponse; import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; import org.mockito.ArgumentCaptor; - import org.testng.annotations.Test; import java.io.IOException; import org.testng.Assert; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; -import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; public class RESTQueryPublisherTest { @Test public void testSendResultUrlPutInMetadataAckPreserved() throws Exception { - CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + CloseableHttpClient mockClient = mock(CloseableHttpClient.class); + CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class); + StatusLine mockStatusLine = mock(StatusLine.class); + doReturn(200).when(mockStatusLine).getStatusCode(); + doReturn(mockStatusLine).when(mockResponse).getStatusLine(); + doReturn(mockResponse).when(mockClient).execute(any()); RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.ACKNOWLEDGE)); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); - ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RESTRequest.class); - verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); + verify(mockClient).execute(argumentCaptor.capture()); HttpPost post = argumentCaptor.getValue(); String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8); String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"ACKNOWLEDGE\",\"content\":\"my/custom/url\"}}"; @@ -50,13 +51,12 @@ public void testSendResultUrlPutInMetadataAckPreserved() throws Exception { @Test public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception { - CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + CloseableHttpClient mockClient = mock(CloseableHttpClient.class); RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); - ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RESTRequest.class); - verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); + verify(mockClient).execute(argumentCaptor.capture()); HttpPost post = argumentCaptor.getValue(); String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8); String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/url\"}}"; @@ -69,13 +69,12 @@ public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception { @Test public void testSendMetadataCreated() throws Exception { - CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + CloseableHttpClient mockClient = mock(CloseableHttpClient.class); RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); publisher.send("foo", "bar"); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); - ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RESTRequest.class); - verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); + verify(mockClient).execute(argumentCaptor.capture()); HttpPost post = argumentCaptor.getValue(); String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8); String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":null,\"content\":\"my/custom/url\"}}"; @@ -88,7 +87,7 @@ public void testSendMetadataCreated() throws Exception { @Test public void testClose() throws Exception { - CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + CloseableHttpClient mockClient = mock(CloseableHttpClient.class); doNothing().when(mockClient).close(); RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); publisher.close(); @@ -97,7 +96,7 @@ public void testClose() throws Exception { @Test public void testCloseDoesNotThrow() throws Exception { - CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + CloseableHttpClient mockClient = mock(CloseableHttpClient.class); doThrow(new IOException("error!")).when(mockClient).close(); RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null); @@ -107,46 +106,13 @@ public void testCloseDoesNotThrow() throws Exception { @Test public void testBadResponseDoesNotThrow() throws Exception { - CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + CloseableHttpClient mockClient = mock(CloseableHttpClient.class); RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); PubSubMessage message = mock(PubSubMessage.class); // This will compel the HttpPost object to throw an exception in RESTPublisher.sendToURL() doReturn(null).when(message).asJSON(); publisher.send(message); Assert.assertTrue(true); - verify(mockClient, never()).execute(any(), any()); - } - - @Test - public void testrestRequestCompletedGood() throws Exception { - HttpResponse mockResponse = mock(HttpResponse.class); - StatusLine mockStatusLine = mock(StatusLine.class); - doReturn(RESTPubSub.OK_200).when(mockStatusLine).getStatusCode(); - doReturn(mockStatusLine).when(mockResponse).getStatusLine(); - RESTPublisher.RESTRequest restRequest = spy(RESTPublisher.RESTRequest.class); - restRequest.completed(mockResponse); - verify(restRequest, never()).error(anyString()); - verify(restRequest, never()).error(anyString(), any()); - } - - @Test - public void testrestRequestCompletedBad() throws Exception { - RESTPublisher.RESTRequest restRequest = spy(RESTPublisher.RESTRequest.class); - restRequest.completed(null); - verify(restRequest).error(anyString(), any()); - } - - @Test - public void testrestRequestFailed() throws Exception { - RESTPublisher.RESTRequest restRequest = spy(RESTPublisher.RESTRequest.class); - restRequest.failed(new RuntimeException("error")); - verify(restRequest).error(anyString(), any()); - } - - @Test - public void testrestRequestCancelled() throws Exception { - RESTPublisher.RESTRequest restRequest = spy(RESTPublisher.RESTRequest.class); - restRequest.cancelled(); - verify(restRequest).error(anyString()); + verify(mockClient, never()).execute(any()); } } diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java index e39d7e81..de80120b 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java @@ -8,7 +8,7 @@ import com.yahoo.bullet.pubsub.Metadata; import com.yahoo.bullet.pubsub.PubSubMessage; import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; import org.mockito.ArgumentCaptor; import org.testng.Assert; @@ -23,14 +23,13 @@ public class RESTResultPublisherTest { @Test public void testSendPullsURLFromMessage() throws Exception { - CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + CloseableHttpClient mockClient = mock(CloseableHttpClient.class); RESTResultPublisher publisher = new RESTResultPublisher(mockClient); Metadata metadata = new Metadata(null, "my/custom/url"); publisher.send(new PubSubMessage("foo", "bar", metadata)); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); - ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(RESTPublisher.RESTRequest.class); - verify(mockClient).execute(argumentCaptor.capture(), argumentCaptor2.capture()); + verify(mockClient).execute(argumentCaptor.capture()); HttpPost post = argumentCaptor.getValue(); String actualURI = post.getURI().toString(); @@ -48,7 +47,7 @@ public void testSendPullsURLFromMessage() throws Exception { @Test public void testClose() throws Exception { - CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + CloseableHttpClient mockClient = mock(CloseableHttpClient.class); doNothing().when(mockClient).close(); RESTResultPublisher publisher = new RESTResultPublisher(mockClient); @@ -58,7 +57,7 @@ public void testClose() throws Exception { @Test public void testCloseDoesNotThrow() throws Exception { - CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); + CloseableHttpClient mockClient = mock(CloseableHttpClient.class); doThrow(new IOException("error!")).when(mockClient).close(); RESTResultPublisher publisher = new RESTResultPublisher(mockClient); diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java index 6dbc876e..c23bfa87 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java @@ -7,9 +7,9 @@ import com.yahoo.bullet.pubsub.PubSubMessage; import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; import org.apache.http.StatusLine; -import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.impl.client.CloseableHttpClient; import org.testng.Assert; import org.testng.annotations.Test; import java.io.ByteArrayInputStream; @@ -18,7 +18,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Future; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; @@ -28,21 +27,17 @@ import static org.mockito.Mockito.verify; public class RESTSubscriberTest { - private CloseableHttpAsyncClient mockClient(int responseCode, String message) throws Exception { - CloseableHttpAsyncClient mockClient = mock(CloseableHttpAsyncClient.class); - Future mockFuture = mock(Future.class); - HttpResponse mockResponse = mock(HttpResponse.class); + private CloseableHttpClient mockClient(int responseCode, String message) throws Exception { + CloseableHttpClient mockClient = mock(CloseableHttpClient.class); + CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class); StatusLine mockStatusLine = mock(StatusLine.class); doReturn(responseCode).when(mockStatusLine).getStatusCode(); doReturn(mockStatusLine).when(mockResponse).getStatusLine(); - HttpEntity mockEntity = mock(HttpEntity.class); InputStream inputStream = new ByteArrayInputStream(message.getBytes(RESTPubSub.UTF_8)); doReturn(inputStream).when(mockEntity).getContent(); doReturn(mockEntity).when(mockResponse).getEntity(); - - doReturn(mockResponse).when(mockFuture).get(); - doReturn(mockFuture).when(mockClient).execute(any(), any()); + doReturn(mockResponse).when(mockClient).execute(any()); return mockClient; } @@ -50,8 +45,8 @@ private CloseableHttpAsyncClient mockClient(int responseCode, String message) th @Test public void testGetMessages() throws Exception { String message = new PubSubMessage("foo", "bar").asJSON(); - CloseableHttpAsyncClient mockClient = mockClient(RESTPubSub.OK_200, message); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); + CloseableHttpClient mockClient = mockClient(RESTPubSub.OK_200, message); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10, 3000); List messages = subscriber.getMessages(); Assert.assertEquals(messages.size(), 2); Assert.assertEquals(messages.get(0).asJSON(), "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":null}"); @@ -60,8 +55,8 @@ public void testGetMessages() throws Exception { @Test public void testGetMessages204() throws Exception { String message = new PubSubMessage("foo", "bar").asJSON(); - CloseableHttpAsyncClient mockClient = mockClient(RESTPubSub.NO_CONTENT_204, message); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); + CloseableHttpClient mockClient = mockClient(RESTPubSub.NO_CONTENT_204, message); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10, 3000); List messages = subscriber.getMessages(); Assert.assertEquals(messages.size(), 0); @@ -70,8 +65,8 @@ public void testGetMessages204() throws Exception { @Test public void testGetMessages500() throws Exception { String message = new PubSubMessage("foo", "bar").asJSON(); - CloseableHttpAsyncClient mockClient = mockClient(500, message); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); + CloseableHttpClient mockClient = mockClient(500, message); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10, 3000); List messages = subscriber.getMessages(); Assert.assertEquals(messages.size(), 0); @@ -80,11 +75,11 @@ public void testGetMessages500() throws Exception { @Test public void testGetMessagesDoesNotThrow() throws Exception { String message = new PubSubMessage("foo", "bar").asJSON(); - CloseableHttpAsyncClient mockClient = mockClient(500, message); + CloseableHttpClient mockClient = mockClient(500, message); List urls = new ArrayList<>(); // A null url will throw an error - make sure it handled eloquently urls.add(null); - RESTSubscriber subscriber = new RESTSubscriber(88, urls, mockClient, 10); + RESTSubscriber subscriber = new RESTSubscriber(88, urls, mockClient, 10, 3000); List messages = subscriber.getMessages(); Assert.assertEquals(messages.size(), 0); @@ -93,9 +88,9 @@ public void testGetMessagesDoesNotThrow() throws Exception { @Test public void testClose() throws Exception { String message = new PubSubMessage("foo", "bar").asJSON(); - CloseableHttpAsyncClient mockClient = mockClient(500, message); + CloseableHttpClient mockClient = mockClient(500, message); doNothing().when(mockClient).close(); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10, 3000); subscriber.close(); verify(mockClient).close(); @@ -104,9 +99,9 @@ public void testClose() throws Exception { @Test public void testCloseDoesNotThrow() throws Exception { String message = new PubSubMessage("foo", "bar").asJSON(); - CloseableHttpAsyncClient mockClient = mockClient(500, message); + CloseableHttpClient mockClient = mockClient(500, message); doThrow(new IOException("error!")).when(mockClient).close(); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10, 3000); subscriber.close(); verify(mockClient).close(); @@ -115,20 +110,18 @@ public void testCloseDoesNotThrow() throws Exception { @Test public void testMinWait() throws Exception { String message = new PubSubMessage("someID", "someContent").asJSON(); - CloseableHttpAsyncClient mockClient = mockClient(RESTPubSub.OK_200, message); - RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 1000); + CloseableHttpClient mockClient = mockClient(RESTPubSub.OK_200, message); + RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 100, 3000); // First response should give content (2 events since we have 2 endpoints in the config) List messages = subscriber.getMessages(); Assert.assertEquals(messages.size(), 2); - // Second and third response should give nothing since the wait duration hasn't passed - messages = subscriber.getMessages(); - Assert.assertEquals(messages.size(), 0); + // Second response should give nothing since the wait duration hasn't passed messages = subscriber.getMessages(); Assert.assertEquals(messages.size(), 0); - // After waiting a second it should return messages again - Thread.sleep(3000); + // After waiting it should return messages again + Thread.sleep(150); messages = subscriber.getMessages(); Assert.assertEquals(messages.size(), 2); } From 2358e17047c1fed9cc8adb0494b504a24384724f Mon Sep 17 00:00:00 2001 From: Nathan Speidel Date: Fri, 30 Mar 2018 16:54:26 -0700 Subject: [PATCH 17/17] Fixed pom --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 533a6eb7..fa0702da 100644 --- a/pom.xml +++ b/pom.xml @@ -104,8 +104,8 @@ org.apache.httpcomponents - httpasyncclient - 4.1.3 + httpclient + 4.3.4