diff --git a/pom.xml b/pom.xml
index 97f78d9e..fa0702da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,9 +103,9 @@
0.2.1
- org.asynchttpclient
- async-http-client
- 2.0.37
+ org.apache.httpcomponents
+ httpclient
+ 4.3.4
diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java
index a3a74255..b6df08cf 100644
--- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java
+++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java
@@ -11,11 +11,7 @@
import com.yahoo.bullet.pubsub.Publisher;
import com.yahoo.bullet.pubsub.Subscriber;
import lombok.extern.slf4j.Slf4j;
-import org.asynchttpclient.AsyncHttpClient;
-import org.asynchttpclient.AsyncHttpClientConfig;
-import org.asynchttpclient.DefaultAsyncHttpClient;
-import org.asynchttpclient.DefaultAsyncHttpClientConfig;
-
+import org.apache.http.impl.client.HttpClients;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -23,15 +19,15 @@
@Slf4j
public class RESTPubSub extends PubSub {
- private static final int NO_TIMEOUT = -1;
public static final int OK_200 = 200;
public static final int NO_CONTENT_204 = 204;
+ public static final String UTF_8 = "UTF-8";
/**
* Create a RESTPubSub from a {@link BulletConfig}.
*
* @param config The config.
- * @throws PubSubException
+ * @throws PubSubException if the context name is not present or cannot be parsed.
*/
public RESTPubSub(BulletConfig config) throws PubSubException {
super(config);
@@ -41,11 +37,11 @@ public RESTPubSub(BulletConfig config) throws PubSubException {
@Override
public Publisher getPublisher() {
if (context == Context.QUERY_PROCESSING) {
- return new RESTResultPublisher(getClient());
+ return new RESTResultPublisher(HttpClients.createDefault());
} else {
String queryURL = ((List) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class)).get(0);
String resultURL = config.getAs(RESTPubSubConfig.RESULT_URL, String.class);
- return new RESTQueryPublisher(getClient(), queryURL, resultURL);
+ return new RESTQueryPublisher(HttpClients.createDefault(), queryURL, resultURL);
}
}
@@ -57,7 +53,7 @@ public List getPublishers(int n) {
@Override
public Subscriber getSubscriber() {
int maxUncommittedMessages = config.getAs(RESTPubSubConfig.MAX_UNCOMMITTED_MESSAGES, Integer.class);
- AsyncHttpClient client = getClient();
+ int connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Integer.class);
List urls;
Long minWait;
@@ -68,23 +64,11 @@ public Subscriber getSubscriber() {
urls = Collections.singletonList(config.getAs(RESTPubSubConfig.RESULT_URL, String.class));
minWait = config.getAs(RESTPubSubConfig.RESULT_SUBSCRIBER_MIN_WAIT, Long.class);
}
- return new RESTSubscriber(maxUncommittedMessages, urls, client, minWait);
+ return new RESTSubscriber(maxUncommittedMessages, urls, HttpClients.createDefault(), minWait, connectTimeout);
}
@Override
public List getSubscribers(int n) {
return IntStream.range(0, n).mapToObj(i -> getSubscriber()).collect(Collectors.toList());
}
-
- private AsyncHttpClient getClient() {
- Long connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Long.class);
- int retryLimit = config.getAs(RESTPubSubConfig.CONNECT_RETRY_LIMIT, Integer.class);
- AsyncHttpClientConfig clientConfig =
- new DefaultAsyncHttpClientConfig.Builder().setConnectTimeout(connectTimeout.intValue())
- .setMaxRequestRetry(retryLimit)
- .setReadTimeout(NO_TIMEOUT)
- .setRequestTimeout(NO_TIMEOUT)
- .build();
- return new DefaultAsyncHttpClient(clientConfig);
- }
}
diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java
index c8a9e6b8..4108f27c 100644
--- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java
+++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java
@@ -17,7 +17,6 @@ public class RESTPubSubConfig extends BulletConfig {
// Field names
public static final String PREFIX = "bullet.pubsub.rest.";
public static final String CONNECT_TIMEOUT = PREFIX + "connect.timeout.ms";
- public static final String CONNECT_RETRY_LIMIT = PREFIX + "connect.retry.limit";
public static final String MAX_UNCOMMITTED_MESSAGES = PREFIX + "subscriber.max.uncommitted.messages";
public static final String QUERY_URLS = PREFIX + "query.urls";
public static final String RESULT_URL = PREFIX + "result.url";
@@ -25,8 +24,7 @@ public class RESTPubSubConfig extends BulletConfig {
public static final String QUERY_SUBSCRIBER_MIN_WAIT = PREFIX + "query.subscriber.min.wait.ms";
// Defaults
- public static final long DEFAULT_CONNECT_TIMEOUT = 5000L;
- public static final int DEFAULT_CONNECT_RETRY_LIMIT = 3;
+ public static final int DEFAULT_CONNECT_TIMEOUT = 5000;
public static final int DEFAULT_MAX_UNCOMMITTED_MESSAGES = 100;
public static final List DEFAULT_QUERY_URLS = Arrays.asList("http://localhost:9901/api/bullet/pubsub/query",
"http://localhost:9902/api/bullet/pubsub/query");
@@ -41,10 +39,6 @@ public class RESTPubSubConfig extends BulletConfig {
VALIDATOR.define(CONNECT_TIMEOUT)
.defaultTo(DEFAULT_CONNECT_TIMEOUT)
.checkIf(Validator::isPositiveInt)
- .castTo(Validator::asLong);
- VALIDATOR.define(CONNECT_RETRY_LIMIT)
- .defaultTo(DEFAULT_CONNECT_RETRY_LIMIT)
- .checkIf(Validator::isPositiveInt)
.castTo(Validator::asInt);
VALIDATOR.define(MAX_UNCOMMITTED_MESSAGES)
.defaultTo(DEFAULT_MAX_UNCOMMITTED_MESSAGES)
diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java
index 039413df..5914d300 100644
--- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java
+++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java
@@ -8,25 +8,25 @@
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.pubsub.Publisher;
import lombok.extern.slf4j.Slf4j;
-import org.asynchttpclient.AsyncHttpClient;
-import org.asynchttpclient.Response;
-
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
import java.io.IOException;
-import java.util.function.Consumer;
@Slf4j
public abstract class RESTPublisher implements Publisher {
public static final String APPLICATION_JSON = "application/json";
public static final String CONTENT_TYPE = "content-type";
- private AsyncHttpClient client;
+ private CloseableHttpClient client;
/**
- * Create a RESTQueryPublisher from a {@link RESTPubSubConfig} and a {@link AsyncHttpClient}.
+ * Create a RESTQueryPublisher from a {@link CloseableHttpClient}.
*
* @param client The client.
*/
- public RESTPublisher(AsyncHttpClient client) {
+ public RESTPublisher(CloseableHttpClient client) {
this.client = client;
}
@@ -35,7 +35,7 @@ public void close() {
try {
client.close();
} catch (IOException e) {
- log.error("Caught exception when closing AsyncHttpClient...: ", e);
+ log.error("Caught exception when closing client: ", e);
}
}
@@ -47,30 +47,18 @@ public void close() {
*/
protected void sendToURL(String url, PubSubMessage message) {
log.debug("Sending message: {} to url: {}", message, url);
- client.preparePost(url)
- .setBody(message.asJSON())
- .setHeader(CONTENT_TYPE, APPLICATION_JSON)
- .execute()
- .toCompletableFuture()
- .exceptionally(this::handleException)
- .thenAcceptAsync(createResponseConsumer(message.getId()));
- }
-
- private Consumer createResponseConsumer(String id) {
- // Create a closure with id
- return response -> handleResponse(id, response);
- }
-
- private void handleResponse(String id, Response response) {
- if (response == null || response.getStatusCode() != RESTPubSub.OK_200) {
- log.error("Failed to write message with id: {}. Couldn't reach pubsub server. Got response: {}", id, response);
- return;
+ try {
+ HttpPost httpPost = new HttpPost(url);
+ httpPost.setEntity(new StringEntity(message.asJSON()));
+ httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON);
+ HttpResponse response = client.execute(httpPost);
+ if (response == null || response.getStatusLine().getStatusCode() != RESTPubSub.OK_200) {
+ log.error("Couldn't reach REST pubsub server. Got response: {}", response);
+ return;
+ }
+ log.debug("Successfully wrote message with status code {}. Response was: {}", response.getStatusLine().getStatusCode(), response);
+ } catch (Exception e) {
+ log.error("Error encoding message in preparation for POST: ", e);
}
- log.debug("Successfully wrote message with id {}. Response was: {} {}", id, response.getStatusCode(), response.getStatusText());
- }
-
- private Response handleException(Throwable throwable) {
- log.error("Received error while posting query", throwable);
- return null;
}
}
diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java
index 79d19a68..19d1796c 100644
--- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java
+++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java
@@ -11,7 +11,7 @@
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.asynchttpclient.AsyncHttpClient;
+import org.apache.http.impl.client.CloseableHttpClient;
@Slf4j
public class RESTQueryPublisher extends RESTPublisher {
@@ -20,14 +20,14 @@ public class RESTQueryPublisher extends RESTPublisher {
private String resultURL;
/**
- * Create a RESTQueryPublisher from a {@link AsyncHttpClient}, queryURL and resultURL. The BulletConfig must
+ * Create a RESTQueryPublisher from a {@link CloseableHttpClient}, queryURL and resultURL. The BulletConfig must
* contain a valid url in the bullet.pubsub.rest.query.urls field.
*
* @param client The client.
* @param queryURL The URL to which to POST queries.
* @param resultURL The URL that will be added to the Metadata (results will be sent to this URL from the backend).
*/
- public RESTQueryPublisher(AsyncHttpClient client, String queryURL, String resultURL) {
+ public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String resultURL) {
super(client);
this.queryURL = queryURL;
this.resultURL = resultURL;
diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java
index d4e5dc24..7e09afdf 100644
--- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java
+++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java
@@ -5,24 +5,23 @@
*/
package com.yahoo.bullet.pubsub.rest;
-import com.yahoo.bullet.pubsub.PubSubException;
import com.yahoo.bullet.pubsub.PubSubMessage;
import lombok.extern.slf4j.Slf4j;
-import org.asynchttpclient.AsyncHttpClient;
+import org.apache.http.impl.client.CloseableHttpClient;
@Slf4j
public class RESTResultPublisher extends RESTPublisher {
/**
- * Create a RESTQueryPublisher from a {@link AsyncHttpClient}.
+ * Create a RESTQueryPublisher from a {@link CloseableHttpClient}.
*
* @param client The client.
*/
- public RESTResultPublisher(AsyncHttpClient client) {
+ public RESTResultPublisher(CloseableHttpClient client) {
super(client);
}
@Override
- public void send(PubSubMessage message) throws PubSubException {
+ public void send(PubSubMessage message) {
String url = (String) message.getMetadata().getContent();
log.debug("Extracted url to which to send results: {}", url);
sendToURL(url, message);
diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java
index 02ba628c..0cd38351 100644
--- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java
+++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java
@@ -11,35 +11,40 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.asynchttpclient.AsyncHttpClient;
-import org.asynchttpclient.Response;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import org.apache.http.util.EntityUtils;
@Slf4j
public class RESTSubscriber extends BufferingSubscriber {
@Getter(AccessLevel.PACKAGE)
private List urls;
- private AsyncHttpClient client;
+ private CloseableHttpClient client;
private long minWait;
private long lastRequest;
+ private int connectTimeout;
/**
- * Create a RESTSubscriber from a {@link RESTPubSubConfig}.
+ * Create a RESTSubscriber.
*
* @param maxUncommittedMessages The maximum number of records that will be buffered before commit() must be called.
* @param urls The URLs which will be used to make the http request.
* @param client The client to use to make http requests.
* @param minWait The minimum time (ms) to wait between subsequent http requests.
*/
- public RESTSubscriber(int maxUncommittedMessages, List urls, AsyncHttpClient client, long minWait) {
+ public RESTSubscriber(int maxUncommittedMessages, List urls, CloseableHttpClient client, long minWait, int connectTimeout) {
super(maxUncommittedMessages);
this.client = client;
this.urls = urls;
this.minWait = minWait;
this.lastRequest = 0;
+ this.connectTimeout = connectTimeout;
}
@Override
@@ -52,17 +57,18 @@ public List getMessages() throws PubSubException {
lastRequest = currentTime;
for (String url : urls) {
try {
- log.debug("Getting messages from url: ", url);
- Response response = client.prepareGet(url).execute().get();
- int statusCode = response.getStatusCode();
+ log.debug("Getting messages from url: {}", url);
+ HttpResponse response = client.execute(makeHttpGet(url));
+ int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == RESTPubSub.OK_200) {
- messages.add(PubSubMessage.fromJSON(response.getResponseBody()));
+ String message = EntityUtils.toString(response.getEntity(), RESTPubSub.UTF_8);
+ messages.add(PubSubMessage.fromJSON(message));
} else if (statusCode != RESTPubSub.NO_CONTENT_204) {
// NO_CONTENT_204 indicates there are no new messages - anything else indicates a problem
log.error("Http call failed with status code {} and response {}.", statusCode, response);
}
} catch (Exception e) {
- log.error("Http call failed with error: ", e);
+ log.error("Http call to {} failed with error: {}", url, e);
}
}
return messages;
@@ -76,4 +82,14 @@ public void close() {
log.warn("Caught exception when closing AsyncHttpClient: ", e);
}
}
+
+ private HttpGet makeHttpGet(String url) {
+ HttpGet httpGet = new HttpGet(url);
+ RequestConfig requestConfig =
+ RequestConfig.custom().setConnectTimeout(connectTimeout)
+ .setSocketTimeout(connectTimeout)
+ .build();
+ httpGet.setConfig(requestConfig);
+ return httpGet;
+ }
}
diff --git a/src/main/resources/rest_pubsub_defaults.yaml b/src/main/resources/rest_pubsub_defaults.yaml
index ddc71a22..e8ba5d28 100644
--- a/src/main/resources/rest_pubsub_defaults.yaml
+++ b/src/main/resources/rest_pubsub_defaults.yaml
@@ -1,7 +1,5 @@
# Http connection timout (used by both the web service and the backend)
bullet.pubsub.rest.connect.timeout.ms: 5000
-# Http connection retry limit (used by both the web service and the backend)
-bullet.pubsub.rest.connect.retry.limit: 3
# Maxiumum number of uncommitted messages allowed before read requests will wait for commits (used by both the web service and the backend)
bullet.pubsub.rest.subscriber.max.uncommitted.messages: 100
# Minimum time (ms) between http calls to the result subscriber REST endpoint. This can be used to limit the number of http requests to the REST endpoints
diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java
index dcab425f..572a85f2 100644
--- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java
+++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java
@@ -20,25 +20,25 @@ public class RESTPubSubConfigTest {
@Test
public void testNoFiles() {
RESTPubSubConfig config = new RESTPubSubConfig((String) null);
- Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 3);
+ Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
config = new RESTPubSubConfig((Config) null);
- Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 3);
+ Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
config = new RESTPubSubConfig("");
- Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 3);
+ Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
}
@Test
public void testMissingFile() {
RESTPubSubConfig config = new RESTPubSubConfig("/path/to/non/existant/file");
- Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 3);
+ Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
}
@Test
public void testCustomConfig() {
RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
- Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 88);
+ Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 88);
Assert.assertEquals(config.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), "com.yahoo.bullet.pubsub.MockPubSub");
List queries = ((List) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class));
Assert.assertEquals(queries.size(), 2);
@@ -57,7 +57,7 @@ public void testCustomProperties() {
@Test
public void testGettingWithDefault() {
RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
- Assert.assertEquals(config.getOrDefault(RESTPubSubConfig.CONNECT_RETRY_LIMIT, "51"), 88);
+ Assert.assertEquals(config.getOrDefault(RESTPubSubConfig.CONNECT_TIMEOUT, "51"), 88);
Assert.assertEquals(config.getOrDefault("does.not.exist", "foo"), "foo");
Assert.assertEquals(config.getOrDefault("fake.setting", "bar"), "bar");
}
@@ -90,12 +90,12 @@ public void testMerging() {
RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
int configSize = config.getAll(Optional.empty()).size();
- Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 88);
+ Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 88);
Assert.assertEquals(config.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), "com.yahoo.bullet.pubsub.MockPubSub");
Config another = new RESTPubSubConfig((String) null);
another.clear();
- another.set(RESTPubSubConfig.CONNECT_RETRY_LIMIT, 51L);
+ another.set(RESTPubSubConfig.CONNECT_TIMEOUT, 51L);
// This is a bad setting
another.set(RESTPubSubConfig.AGGREGATION_MAX_SIZE, -1);
// Some other non-Bullet setting
@@ -104,7 +104,7 @@ public void testMerging() {
config.merge(another);
Assert.assertEquals(config.getAll(Optional.empty()).size(), configSize + 1);
- Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 51);
+ Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 51);
// Bad setting gets defaulted.
Assert.assertEquals(config.get(RESTPubSubConfig.AGGREGATION_MAX_SIZE), RESTPubSubConfig.DEFAULT_AGGREGATION_MAX_SIZE);
// Other setting is preserved.
@@ -113,7 +113,7 @@ public void testMerging() {
// Test null and verify it is unchanged
config.merge(null);
Assert.assertEquals(config.getAll(Optional.empty()).size(), configSize + 1);
- Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), 51);
+ Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 51);
Assert.assertEquals(config.get(RESTPubSubConfig.AGGREGATION_MAX_SIZE), RESTPubSubConfig.DEFAULT_AGGREGATION_MAX_SIZE);
Assert.assertEquals(config.get("pi"), 3.14);
}
@@ -125,7 +125,7 @@ public void testPropertiesWithPrefix() {
String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub";
int configSize = config.getAllWithPrefix(Optional.empty(), prefix, false).size();
- Assert.assertEquals(configSize, 9);
+ Assert.assertEquals(configSize, 8);
Map properties = config.getAllWithPrefix(Optional.empty(), prefix, false);
Assert.assertEquals(properties.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), fieldValue);
@@ -139,7 +139,7 @@ public void testPropertiesStripPrefix() {
String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub";
int configSize = config.getAllWithPrefix(Optional.empty(), prefix, true).size();
- Assert.assertEquals(configSize, 9);
+ Assert.assertEquals(configSize, 8);
Map properties = config.getAllWithPrefix(Optional.empty(), prefix, true);
Assert.assertNull(properties.get(RESTPubSubConfig.PUBSUB_CLASS_NAME));
@@ -196,9 +196,9 @@ public void testValidate() {
Assert.assertEquals(config.get(BulletConfig.AGGREGATION_DEFAULT_SIZE), BulletConfig.DEFAULT_AGGREGATION_SIZE);
// Test validate() corrects RESTPubSubConfig settings
- config.set(RESTPubSubConfig.CONNECT_RETRY_LIMIT, -88);
- Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), -88);
+ config.set(RESTPubSubConfig.CONNECT_TIMEOUT, -88);
+ Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), -88);
config.validate();
- Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_RETRY_LIMIT), RESTPubSubConfig.DEFAULT_CONNECT_RETRY_LIMIT);
+ Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), RESTPubSubConfig.DEFAULT_CONNECT_TIMEOUT);
}
}
diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubTest.java
index 3535c613..e6f7659b 100644
--- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubTest.java
+++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubTest.java
@@ -9,21 +9,9 @@
import com.yahoo.bullet.pubsub.PubSubException;
import com.yahoo.bullet.pubsub.Publisher;
import com.yahoo.bullet.pubsub.Subscriber;
-import org.asynchttpclient.AsyncHttpClient;
-import org.asynchttpclient.BoundRequestBuilder;
-import org.asynchttpclient.ListenableFuture;
-import org.asynchttpclient.Response;
import org.testng.Assert;
import org.testng.annotations.Test;
-
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
public class RESTPubSubTest {
@Test
@@ -43,54 +31,6 @@ public void testSettings() throws PubSubException {
Assert.assertEquals(urls.get(0), "http://localhost:9901/api/bullet/pubsub/result");
}
- public static AsyncHttpClient mockClientWith(BoundRequestBuilder builder) {
- AsyncHttpClient mockClient = mock(AsyncHttpClient.class);
- doReturn(builder).when(mockClient).preparePost(anyString());
- doReturn(builder).when(mockClient).prepareGet(anyString());
- return mockClient;
- }
-
- public static BoundRequestBuilder mockBuilderWith(CompletableFuture future) throws Exception {
- ListenableFuture mockListenable = (ListenableFuture) mock(ListenableFuture.class);
- doReturn(future.get()).when(mockListenable).get();
- doReturn(future).when(mockListenable).toCompletableFuture();
-
- BoundRequestBuilder mockBuilder = mock(BoundRequestBuilder.class);
- doReturn(mockBuilder).when(mockBuilder).setHeader(any(), anyString());
- doReturn(mockBuilder).when(mockBuilder).setBody(anyString());
- doReturn(mockListenable).when(mockBuilder).execute();
- return mockBuilder;
- }
-
- public static CompletableFuture getOkFuture(Response response) throws Exception {
- CompletableFuture finished = CompletableFuture.completedFuture(response);
- CompletableFuture mock = mock(CompletableFuture.class);
- // This is the weird bit. We mock the call to exceptionally to return the finished response so that chaining
- // a thenAcceptAsync on it will call the consumer of it with the finished response. This is why it looks
- // weird: mocking the exceptionally to take the "good" path.
- doReturn(finished).when(mock).exceptionally(any());
- doReturn(finished.get()).when(mock).get();
- // So if we do get to thenAccept on our mock, we should throw an exception because we shouldn't get there.
- doThrow(new RuntimeException("Good futures don't throw")).when(mock).thenAcceptAsync(any());
- return mock;
- }
-
- public static Response getOkResponse(String data) {
- return getResponse(RESTPubSub.OK_200, "Ok", data);
- }
-
- public static Response getNotOkResponse(int status) {
- return getResponse(status, "Error", null);
- }
-
- public static Response getResponse(int status, String statusText, String body) {
- Response mock = mock(Response.class);
- doReturn(status).when(mock).getStatusCode();
- doReturn(statusText).when(mock).getStatusText();
- doReturn(body).when(mock).getResponseBody();
- return mock;
- }
-
@Test
public void testGetPublisher() throws PubSubException {
BulletConfig config = new BulletConfig("src/test/resources/test_config.yaml");
diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java
index 0bbdb471..c42c10f5 100644
--- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java
+++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java
@@ -7,81 +7,96 @@
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
-import org.asynchttpclient.AsyncHttpClient;
-import org.asynchttpclient.AsyncHttpClientConfig;
-import org.asynchttpclient.BoundRequestBuilder;
-import org.asynchttpclient.DefaultAsyncHttpClient;
-import org.asynchttpclient.DefaultAsyncHttpClientConfig;
-import org.asynchttpclient.Response;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
+import org.testng.Assert;
+
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkFuture;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkResponse;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockBuilderWith;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockClientWith;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getNotOkResponse;
public class RESTQueryPublisherTest {
@Test
public void testSendResultUrlPutInMetadataAckPreserved() throws Exception {
- CompletableFuture response = getOkFuture(getOkResponse(null));
- BoundRequestBuilder mockBuilder = mockBuilderWith(response);
- AsyncHttpClient mockClient = mockClientWith(mockBuilder);
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
+ CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+ StatusLine mockStatusLine = mock(StatusLine.class);
+ doReturn(200).when(mockStatusLine).getStatusCode();
+ doReturn(mockStatusLine).when(mockResponse).getStatusLine();
+ doReturn(mockResponse).when(mockClient).execute(any());
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
-
publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.ACKNOWLEDGE));
- verify(mockClient).preparePost("my/custom/query/url");
- verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"ACKNOWLEDGE\",\"content\":\"my/custom/url\"}}");
- verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON);
+
+ ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
+ verify(mockClient).execute(argumentCaptor.capture());
+ HttpPost post = argumentCaptor.getValue();
+ String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8);
+ String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"ACKNOWLEDGE\",\"content\":\"my/custom/url\"}}";
+ String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue();
+ String expectedHeader = RESTPublisher.APPLICATION_JSON;
+ Assert.assertEquals(expectedMessage, actualMessage);
+ Assert.assertEquals(expectedHeader, actualHeader);
+ Assert.assertEquals("my/custom/query/url", post.getURI().toString());
}
@Test
public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception {
- CompletableFuture response = getOkFuture(getOkResponse(null));
- BoundRequestBuilder mockBuilder = mockBuilderWith(response);
- AsyncHttpClient mockClient = mockClientWith(mockBuilder);
- RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
- config.set(RESTPubSubConfig.RESULT_URL, "my/custom/url");
- RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url");
-
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
+ RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE));
- verify(mockClient).preparePost("my/custom/query/url");
- verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/result/url\"}}");
- verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON);
+
+ ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
+ verify(mockClient).execute(argumentCaptor.capture());
+ HttpPost post = argumentCaptor.getValue();
+ String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8);
+ String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/url\"}}";
+ String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue();
+ String expectedHeader = RESTPublisher.APPLICATION_JSON;
+ Assert.assertEquals(expectedMessage, actualMessage);
+ Assert.assertEquals(expectedHeader, actualHeader);
+ Assert.assertEquals("my/custom/query/url", post.getURI().toString());
}
@Test
public void testSendMetadataCreated() throws Exception {
- CompletableFuture response = getOkFuture(getOkResponse(null));
- BoundRequestBuilder mockBuilder = mockBuilderWith(response);
- AsyncHttpClient mockClient = mockClientWith(mockBuilder);
- RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url");
-
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
+ RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
publisher.send("foo", "bar");
- verify(mockClient).preparePost("my/custom/query/url");
- verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":null,\"content\":\"my/custom/result/url\"}}");
- verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON);
+
+ ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
+ verify(mockClient).execute(argumentCaptor.capture());
+ HttpPost post = argumentCaptor.getValue();
+ String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8);
+ String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":null,\"content\":\"my/custom/url\"}}";
+ String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue();
+ String expectedHeader = RESTPublisher.APPLICATION_JSON;
+ Assert.assertEquals(expectedMessage, actualMessage);
+ Assert.assertEquals(expectedHeader, actualHeader);
+ Assert.assertEquals("my/custom/query/url", post.getURI().toString());
}
@Test
public void testClose() throws Exception {
- AsyncHttpClient mockClient = mock(AsyncHttpClient.class);
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
doNothing().when(mockClient).close();
- RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null);
-
+ RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
publisher.close();
verify(mockClient).close();
}
@Test
public void testCloseDoesNotThrow() throws Exception {
- AsyncHttpClient mockClient = mock(AsyncHttpClient.class);
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
doThrow(new IOException("error!")).when(mockClient).close();
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null);
@@ -90,31 +105,14 @@ public void testCloseDoesNotThrow() throws Exception {
}
@Test
- public void testHandleBadResponse() throws Exception {
- CompletableFuture response = getOkFuture(getNotOkResponse(500));
- BoundRequestBuilder mockBuilder = mockBuilderWith(response);
- AsyncHttpClient mockClient = mockClientWith(mockBuilder);
+ public void testBadResponseDoesNotThrow() throws Exception {
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url");
-
- publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE));
- verify(mockClient).preparePost("my/custom/query/url");
- verify(mockBuilder).setBody("{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":\"COMPLETE\",\"content\":\"my/custom/result/url\"}}");
- verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON);
- }
-
- @Test(timeOut = 5000L)
- public void testException() throws Exception {
- // This will hit a non-existent url and fail, testing our exceptions
- AsyncHttpClientConfig clientConfig = new DefaultAsyncHttpClientConfig.Builder().setConnectTimeout(100)
- .setMaxRequestRetry(1)
- .setReadTimeout(-1)
- .setRequestTimeout(-1)
- .build();
- AsyncHttpClient client = new DefaultAsyncHttpClient(clientConfig);
- AsyncHttpClient spyClient = spy(client);
- RESTQueryPublisher publisher = new RESTQueryPublisher(spyClient, "http://this/does/not/exist:8080", "my/custom/result/url");
-
- publisher.send(new PubSubMessage("foo", "bar"));
- verify(spyClient).preparePost("http://this/does/not/exist:8080");
+ PubSubMessage message = mock(PubSubMessage.class);
+ // This will compel the HttpPost object to throw an exception in RESTPublisher.sendToURL()
+ doReturn(null).when(message).asJSON();
+ publisher.send(message);
+ Assert.assertTrue(true);
+ verify(mockClient, never()).execute(any());
}
}
diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java
index 6fe93d61..de80120b 100644
--- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java
+++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java
@@ -7,17 +7,14 @@
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
-import org.asynchttpclient.AsyncHttpClient;
-import org.asynchttpclient.BoundRequestBuilder;
-import org.asynchttpclient.Response;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.mockito.ArgumentCaptor;
+import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkFuture;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkResponse;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockBuilderWith;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockClientWith;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -25,33 +22,32 @@
public class RESTResultPublisherTest {
@Test
- public void testSend() throws Exception {
- CompletableFuture response = getOkFuture(getOkResponse(null));
- BoundRequestBuilder mockBuilder = mockBuilderWith(response);
- AsyncHttpClient mockClient = mockClientWith(mockBuilder);
+ public void testSendPullsURLFromMessage() throws Exception {
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
RESTResultPublisher publisher = new RESTResultPublisher(mockClient);
+ Metadata metadata = new Metadata(null, "my/custom/url");
+ publisher.send(new PubSubMessage("foo", "bar", metadata));
- PubSubMessage message = new PubSubMessage("someId", "someContent", new Metadata(null, "custom/url"));
- publisher.send(message);
- verify(mockClient).preparePost("custom/url");
- verify(mockBuilder).setBody("{\"id\":\"someId\",\"sequence\":-1,\"content\":\"someContent\",\"metadata\":{\"signal\":null,\"content\":\"custom/url\"}}");
- verify(mockBuilder).setHeader(RESTPublisher.CONTENT_TYPE, RESTPublisher.APPLICATION_JSON);
- }
+ ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
+ verify(mockClient).execute(argumentCaptor.capture());
+ HttpPost post = argumentCaptor.getValue();
- @Test(expectedExceptions = ClassCastException.class)
- public void testSendBadURL() throws Exception {
- CompletableFuture response = getOkFuture(getOkResponse(null));
- BoundRequestBuilder mockBuilder = mockBuilderWith(response);
- AsyncHttpClient mockClient = mockClientWith(mockBuilder);
- RESTResultPublisher publisher = new RESTResultPublisher(mockClient);
+ String actualURI = post.getURI().toString();
+ String actualMessage = EntityUtils.toString(post.getEntity(), RESTPubSub.UTF_8);
+ String actualHeader = post.getHeaders(RESTPublisher.CONTENT_TYPE)[0].getValue();
+
+ String expectedURI = "my/custom/url";
+ String expectedMessage = "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":{\"signal\":null,\"content\":\"my/custom/url\"}}";
+ String expectedHeader = RESTPublisher.APPLICATION_JSON;
- PubSubMessage message = new PubSubMessage("someId", "someContent", new Metadata(null, 88));
- publisher.send(message);
+ Assert.assertEquals(expectedMessage, actualMessage);
+ Assert.assertEquals(expectedHeader, actualHeader);
+ Assert.assertEquals(expectedURI, actualURI);
}
@Test
public void testClose() throws Exception {
- AsyncHttpClient mockClient = mock(AsyncHttpClient.class);
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
doNothing().when(mockClient).close();
RESTResultPublisher publisher = new RESTResultPublisher(mockClient);
@@ -61,7 +57,7 @@ public void testClose() throws Exception {
@Test
public void testCloseDoesNotThrow() throws Exception {
- AsyncHttpClient mockClient = mock(AsyncHttpClient.class);
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
doThrow(new IOException("error!")).when(mockClient).close();
RESTResultPublisher publisher = new RESTResultPublisher(mockClient);
diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java
index 77515590..c23bfa87 100644
--- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java
+++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTSubscriberTest.java
@@ -6,48 +6,57 @@
package com.yahoo.bullet.pubsub.rest;
import com.yahoo.bullet.pubsub.PubSubMessage;
-import org.asynchttpclient.AsyncHttpClient;
-import org.asynchttpclient.BoundRequestBuilder;
-import org.asynchttpclient.Response;
+import org.apache.http.HttpEntity;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getNotOkResponse;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkFuture;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.getOkResponse;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockBuilderWith;
-import static com.yahoo.bullet.pubsub.rest.RESTPubSubTest.mockClientWith;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class RESTSubscriberTest {
+ private CloseableHttpClient mockClient(int responseCode, String message) throws Exception {
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
+ CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+ StatusLine mockStatusLine = mock(StatusLine.class);
+ doReturn(responseCode).when(mockStatusLine).getStatusCode();
+ doReturn(mockStatusLine).when(mockResponse).getStatusLine();
+ HttpEntity mockEntity = mock(HttpEntity.class);
+ InputStream inputStream = new ByteArrayInputStream(message.getBytes(RESTPubSub.UTF_8));
+ doReturn(inputStream).when(mockEntity).getContent();
+ doReturn(mockEntity).when(mockResponse).getEntity();
+ doReturn(mockResponse).when(mockClient).execute(any());
+
+ return mockClient;
+ }
+
@Test
public void testGetMessages() throws Exception {
- PubSubMessage responseData = new PubSubMessage("someID", "someContent");
- CompletableFuture response = getOkFuture(getOkResponse(responseData.asJSON()));
- BoundRequestBuilder mockBuilder = mockBuilderWith(response);
- AsyncHttpClient mockClient = mockClientWith(mockBuilder);
- RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
- RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10);
-
+ String message = new PubSubMessage("foo", "bar").asJSON();
+ CloseableHttpClient mockClient = mockClient(RESTPubSub.OK_200, message);
+ RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10, 3000);
List messages = subscriber.getMessages();
Assert.assertEquals(messages.size(), 2);
- Assert.assertEquals(messages.get(0).asJSON(), "{\"id\":\"someID\",\"sequence\":-1,\"content\":\"someContent\",\"metadata\":null}");
+ Assert.assertEquals(messages.get(0).asJSON(), "{\"id\":\"foo\",\"sequence\":-1,\"content\":\"bar\",\"metadata\":null}");
}
@Test
public void testGetMessages204() throws Exception {
- CompletableFuture response = getOkFuture(getNotOkResponse(204));
- BoundRequestBuilder mockBuilder = mockBuilderWith(response);
- AsyncHttpClient mockClient = mockClientWith(mockBuilder);
- RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
- RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10);
+ String message = new PubSubMessage("foo", "bar").asJSON();
+ CloseableHttpClient mockClient = mockClient(RESTPubSub.NO_CONTENT_204, message);
+ RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10, 3000);
List messages = subscriber.getMessages();
Assert.assertEquals(messages.size(), 0);
@@ -55,11 +64,9 @@ public void testGetMessages204() throws Exception {
@Test
public void testGetMessages500() throws Exception {
- CompletableFuture response = getOkFuture(getNotOkResponse(500));
- BoundRequestBuilder mockBuilder = mockBuilderWith(response);
- AsyncHttpClient mockClient = mockClientWith(mockBuilder);
- RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
- RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10);
+ String message = new PubSubMessage("foo", "bar").asJSON();
+ CloseableHttpClient mockClient = mockClient(500, message);
+ RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10, 3000);
List messages = subscriber.getMessages();
Assert.assertEquals(messages.size(), 0);
@@ -67,12 +74,12 @@ public void testGetMessages500() throws Exception {
@Test
public void testGetMessagesDoesNotThrow() throws Exception {
- // PubSubMessage will throw an error when it fails to parse this into a PubSubMessage
- CompletableFuture response = getOkFuture(getOkResponse("thisCannotBeTurnedIntoAPubSubMessage"));
- BoundRequestBuilder mockBuilder = mockBuilderWith(response);
- AsyncHttpClient mockClient = mockClientWith(mockBuilder);
- RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
- RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10);
+ String message = new PubSubMessage("foo", "bar").asJSON();
+ CloseableHttpClient mockClient = mockClient(500, message);
+ List urls = new ArrayList<>();
+ // A null url will throw an error - make sure it handled eloquently
+ urls.add(null);
+ RESTSubscriber subscriber = new RESTSubscriber(88, urls, mockClient, 10, 3000);
List messages = subscriber.getMessages();
Assert.assertEquals(messages.size(), 0);
@@ -80,9 +87,10 @@ public void testGetMessagesDoesNotThrow() throws Exception {
@Test
public void testClose() throws Exception {
- AsyncHttpClient mockClient = mock(AsyncHttpClient.class);
+ String message = new PubSubMessage("foo", "bar").asJSON();
+ CloseableHttpClient mockClient = mockClient(500, message);
doNothing().when(mockClient).close();
- RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10);
+ RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10, 3000);
subscriber.close();
verify(mockClient).close();
@@ -90,9 +98,10 @@ public void testClose() throws Exception {
@Test
public void testCloseDoesNotThrow() throws Exception {
- AsyncHttpClient mockClient = mock(AsyncHttpClient.class);
+ String message = new PubSubMessage("foo", "bar").asJSON();
+ CloseableHttpClient mockClient = mockClient(500, message);
doThrow(new IOException("error!")).when(mockClient).close();
- RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10);
+ RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 10, 3000);
subscriber.close();
verify(mockClient).close();
@@ -100,23 +109,19 @@ public void testCloseDoesNotThrow() throws Exception {
@Test
public void testMinWait() throws Exception {
- PubSubMessage responseData = new PubSubMessage("someID", "someContent");
- CompletableFuture response = getOkFuture(getOkResponse(responseData.asJSON()));
- BoundRequestBuilder mockBuilder = mockBuilderWith(response);
- AsyncHttpClient mockClient = mockClientWith(mockBuilder);
- RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 1000);
+ String message = new PubSubMessage("someID", "someContent").asJSON();
+ CloseableHttpClient mockClient = mockClient(RESTPubSub.OK_200, message);
+ RESTSubscriber subscriber = new RESTSubscriber(88, Arrays.asList("url", "anotherURL"), mockClient, 100, 3000);
// First response should give content (2 events since we have 2 endpoints in the config)
List messages = subscriber.getMessages();
Assert.assertEquals(messages.size(), 2);
- // Second and third response should give nothing since the wait duration hasn't passed
- messages = subscriber.getMessages();
- Assert.assertEquals(messages.size(), 0);
+ // Second response should give nothing since the wait duration hasn't passed
messages = subscriber.getMessages();
Assert.assertEquals(messages.size(), 0);
- // After waiting a second it should return messages again
- Thread.sleep(3000);
+ // After waiting it should return messages again
+ Thread.sleep(150);
messages = subscriber.getMessages();
Assert.assertEquals(messages.size(), 2);
}
diff --git a/src/test/resources/test_config.yaml b/src/test/resources/test_config.yaml
index 3027b029..d0568a9c 100644
--- a/src/test/resources/test_config.yaml
+++ b/src/test/resources/test_config.yaml
@@ -5,7 +5,7 @@ bullet.query.aggregation.max.size: 100
fake.setting: null
bullet.pubsub.context.name: "QUERY_SUBMISSION"
bullet.pubsub.class.name: "com.yahoo.bullet.pubsub.MockPubSub"
-bullet.pubsub.rest.connect.retry.limit: 88
+bullet.pubsub.rest.connect.timeout.ms: 88
bullet.pubsub.rest.query.urls:
- "http://localhost:9901/CUSTOM/query"
- "http://localhost:9902/CUSTOM/query"