diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java index b6df08cf..1c6a8f6a 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java @@ -36,12 +36,13 @@ public RESTPubSub(BulletConfig config) throws PubSubException { @Override public Publisher getPublisher() { + int connectTimeout = config.getAs(RESTPubSubConfig.PUBLISHER_CONNECT_TIMEOUT, Integer.class); if (context == Context.QUERY_PROCESSING) { - return new RESTResultPublisher(HttpClients.createDefault()); + return new RESTResultPublisher(HttpClients.createDefault(), connectTimeout); } else { String queryURL = ((List) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class)).get(0); String resultURL = config.getAs(RESTPubSubConfig.RESULT_URL, String.class); - return new RESTQueryPublisher(HttpClients.createDefault(), queryURL, resultURL); + return new RESTQueryPublisher(HttpClients.createDefault(), queryURL, resultURL, connectTimeout); } } @@ -53,7 +54,7 @@ public List getPublishers(int n) { @Override public Subscriber getSubscriber() { int maxUncommittedMessages = config.getAs(RESTPubSubConfig.MAX_UNCOMMITTED_MESSAGES, Integer.class); - int connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Integer.class); + int connectTimeout = config.getAs(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, Integer.class); List urls; Long minWait; diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java index 4108f27c..cfebee7c 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java @@ -16,7 +16,8 @@ public class RESTPubSubConfig extends BulletConfig { // Field names public static final String PREFIX = "bullet.pubsub.rest."; - public static final String CONNECT_TIMEOUT = PREFIX + "connect.timeout.ms"; + public static final String SUBSCRIBER_CONNECT_TIMEOUT = PREFIX + "subscriber.connect.timeout.ms"; + public static final String PUBLISHER_CONNECT_TIMEOUT = PREFIX + "publisher.connect.timeout.ms"; public static final String MAX_UNCOMMITTED_MESSAGES = PREFIX + "subscriber.max.uncommitted.messages"; public static final String QUERY_URLS = PREFIX + "query.urls"; public static final String RESULT_URL = PREFIX + "result.url"; @@ -24,7 +25,8 @@ public class RESTPubSubConfig extends BulletConfig { public static final String QUERY_SUBSCRIBER_MIN_WAIT = PREFIX + "query.subscriber.min.wait.ms"; // Defaults - public static final int DEFAULT_CONNECT_TIMEOUT = 5000; + public static final int DEFAULT_SUBSCRIBER_CONNECT_TIMEOUT = 5000; + public static final int DEFAULT_PUBLISHER_CONNECT_TIMEOUT = 5000; public static final int DEFAULT_MAX_UNCOMMITTED_MESSAGES = 100; public static final List DEFAULT_QUERY_URLS = Arrays.asList("http://localhost:9901/api/bullet/pubsub/query", "http://localhost:9902/api/bullet/pubsub/query"); @@ -36,10 +38,14 @@ public class RESTPubSubConfig extends BulletConfig { private static final Validator VALIDATOR = new Validator(); static { - VALIDATOR.define(CONNECT_TIMEOUT) - .defaultTo(DEFAULT_CONNECT_TIMEOUT) + VALIDATOR.define(SUBSCRIBER_CONNECT_TIMEOUT) + .defaultTo(DEFAULT_SUBSCRIBER_CONNECT_TIMEOUT) .checkIf(Validator::isPositiveInt) .castTo(Validator::asInt); + VALIDATOR.define(PUBLISHER_CONNECT_TIMEOUT) + .defaultTo(DEFAULT_PUBLISHER_CONNECT_TIMEOUT) + .checkIf(Validator::isPositiveInt) + .castTo(Validator::asInt); VALIDATOR.define(MAX_UNCOMMITTED_MESSAGES) .defaultTo(DEFAULT_MAX_UNCOMMITTED_MESSAGES) .checkIf(Validator::isPositiveInt) diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java index 5914d300..97337ac4 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java @@ -8,16 +8,19 @@ import com.yahoo.bullet.pubsub.PubSubMessage; import com.yahoo.bullet.pubsub.Publisher; import lombok.extern.slf4j.Slf4j; +import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpPost; import org.apache.http.HttpResponse; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import java.io.IOException; +import java.io.UnsupportedEncodingException; @Slf4j public abstract class RESTPublisher implements Publisher { public static final String APPLICATION_JSON = "application/json"; public static final String CONTENT_TYPE = "content-type"; + private int connectTimeout; private CloseableHttpClient client; @@ -26,8 +29,9 @@ public abstract class RESTPublisher implements Publisher { * * @param client The client. */ - public RESTPublisher(CloseableHttpClient client) { + public RESTPublisher(CloseableHttpClient client, int connectTimeout) { this.client = client; + this.connectTimeout = connectTimeout; } @Override @@ -48,17 +52,26 @@ public void close() { protected void sendToURL(String url, PubSubMessage message) { log.debug("Sending message: {} to url: {}", message, url); try { - HttpPost httpPost = new HttpPost(url); - httpPost.setEntity(new StringEntity(message.asJSON())); - httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON); - HttpResponse response = client.execute(httpPost); + HttpResponse response = client.execute(makeHttpPost(url, message)); if (response == null || response.getStatusLine().getStatusCode() != RESTPubSub.OK_200) { log.error("Couldn't reach REST pubsub server. Got response: {}", response); return; } log.debug("Successfully wrote message with status code {}. Response was: {}", response.getStatusLine().getStatusCode(), response); } catch (Exception e) { - log.error("Error encoding message in preparation for POST: ", e); + log.error("Error when trying to POST. Message was: {}. Error was: ", message.asJSON(), e); } } + + private HttpPost makeHttpPost(String url, PubSubMessage message) throws UnsupportedEncodingException { + HttpPost httpPost = new HttpPost(url); + httpPost.setEntity(new StringEntity(message.asJSON())); + httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON); + RequestConfig requestConfig = + RequestConfig.custom().setConnectTimeout(connectTimeout) + .setSocketTimeout(connectTimeout) + .build(); + httpPost.setConfig(requestConfig); + return httpPost; + } } diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java index 800fc720..e465844a 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java @@ -26,8 +26,8 @@ public class RESTQueryPublisher extends RESTPublisher { * @param queryURL The URL to which to POST queries. * @param resultURL The URL that will be added to the Metadata (results will be sent to this URL from the backend). */ - public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String resultURL) { - super(client); + public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String resultURL, int connectTimeout) { + super(client, connectTimeout); this.queryURL = queryURL; this.resultURL = resultURL; } diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java index 7e09afdf..a98cadc7 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java @@ -16,8 +16,8 @@ public class RESTResultPublisher extends RESTPublisher { * * @param client The client. */ - public RESTResultPublisher(CloseableHttpClient client) { - super(client); + public RESTResultPublisher(CloseableHttpClient client, int connectTimeout) { + super(client, connectTimeout); } @Override diff --git a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java index 9949eba3..62295d40 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java +++ b/src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java @@ -64,13 +64,14 @@ public List getMessages() { int statusCode = response.getStatusLine().getStatusCode(); if (statusCode == RESTPubSub.OK_200) { String message = EntityUtils.toString(response.getEntity(), RESTPubSub.UTF_8); + log.debug("Received message from url: {}. Message was {}", url, message); messages.add(PubSubMessage.fromJSON(message)); } else if (statusCode != RESTPubSub.NO_CONTENT_204) { // NO_CONTENT_204 indicates there are no new messages - anything else indicates a problem log.error("Http call failed with status code {} and response {}.", statusCode, response); } } catch (Exception e) { - log.error("Http call to {} failed with error: {}", url, e); + log.error("Http call to {} failed with error:", url, e); } } return messages; diff --git a/src/main/resources/rest_pubsub_defaults.yaml b/src/main/resources/rest_pubsub_defaults.yaml index e8ba5d28..dfd94525 100644 --- a/src/main/resources/rest_pubsub_defaults.yaml +++ b/src/main/resources/rest_pubsub_defaults.yaml @@ -1,5 +1,7 @@ -# Http connection timout (used by both the web service and the backend) -bullet.pubsub.rest.connect.timeout.ms: 5000 +# Http connection timout for subscribers +bullet.pubsub.rest.subscriber.connect.timeout.ms: 5000 +# Http connection timout for publishers +bullet.pubsub.rest.publisher.connect.timeout.ms: 5000 # Maxiumum number of uncommitted messages allowed before read requests will wait for commits (used by both the web service and the backend) bullet.pubsub.rest.subscriber.max.uncommitted.messages: 100 # Minimum time (ms) between http calls to the result subscriber REST endpoint. This can be used to limit the number of http requests to the REST endpoints diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java index 572a85f2..9c1f04d5 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java @@ -20,25 +20,25 @@ public class RESTPubSubConfigTest { @Test public void testNoFiles() { RESTPubSubConfig config = new RESTPubSubConfig((String) null); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000); + Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000); config = new RESTPubSubConfig((Config) null); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000); + Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000); config = new RESTPubSubConfig(""); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000); + Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000); } @Test public void testMissingFile() { RESTPubSubConfig config = new RESTPubSubConfig("/path/to/non/existant/file"); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000); + Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000); } @Test public void testCustomConfig() { RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 88); + Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 88); Assert.assertEquals(config.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), "com.yahoo.bullet.pubsub.MockPubSub"); List queries = ((List) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class)); Assert.assertEquals(queries.size(), 2); @@ -57,7 +57,7 @@ public void testCustomProperties() { @Test public void testGettingWithDefault() { RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); - Assert.assertEquals(config.getOrDefault(RESTPubSubConfig.CONNECT_TIMEOUT, "51"), 88); + Assert.assertEquals(config.getOrDefault(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, "51"), 88); Assert.assertEquals(config.getOrDefault("does.not.exist", "foo"), "foo"); Assert.assertEquals(config.getOrDefault("fake.setting", "bar"), "bar"); } @@ -90,12 +90,12 @@ public void testMerging() { RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml"); int configSize = config.getAll(Optional.empty()).size(); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 88); + Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 88); Assert.assertEquals(config.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), "com.yahoo.bullet.pubsub.MockPubSub"); Config another = new RESTPubSubConfig((String) null); another.clear(); - another.set(RESTPubSubConfig.CONNECT_TIMEOUT, 51L); + another.set(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, 51L); // This is a bad setting another.set(RESTPubSubConfig.AGGREGATION_MAX_SIZE, -1); // Some other non-Bullet setting @@ -104,7 +104,7 @@ public void testMerging() { config.merge(another); Assert.assertEquals(config.getAll(Optional.empty()).size(), configSize + 1); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 51); + Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 51); // Bad setting gets defaulted. Assert.assertEquals(config.get(RESTPubSubConfig.AGGREGATION_MAX_SIZE), RESTPubSubConfig.DEFAULT_AGGREGATION_MAX_SIZE); // Other setting is preserved. @@ -113,7 +113,7 @@ public void testMerging() { // Test null and verify it is unchanged config.merge(null); Assert.assertEquals(config.getAll(Optional.empty()).size(), configSize + 1); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 51); + Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 51); Assert.assertEquals(config.get(RESTPubSubConfig.AGGREGATION_MAX_SIZE), RESTPubSubConfig.DEFAULT_AGGREGATION_MAX_SIZE); Assert.assertEquals(config.get("pi"), 3.14); } @@ -125,7 +125,7 @@ public void testPropertiesWithPrefix() { String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub"; int configSize = config.getAllWithPrefix(Optional.empty(), prefix, false).size(); - Assert.assertEquals(configSize, 8); + Assert.assertEquals(configSize, 9); Map properties = config.getAllWithPrefix(Optional.empty(), prefix, false); Assert.assertEquals(properties.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), fieldValue); @@ -139,7 +139,7 @@ public void testPropertiesStripPrefix() { String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub"; int configSize = config.getAllWithPrefix(Optional.empty(), prefix, true).size(); - Assert.assertEquals(configSize, 8); + Assert.assertEquals(configSize, 9); Map properties = config.getAllWithPrefix(Optional.empty(), prefix, true); Assert.assertNull(properties.get(RESTPubSubConfig.PUBSUB_CLASS_NAME)); @@ -196,9 +196,9 @@ public void testValidate() { Assert.assertEquals(config.get(BulletConfig.AGGREGATION_DEFAULT_SIZE), BulletConfig.DEFAULT_AGGREGATION_SIZE); // Test validate() corrects RESTPubSubConfig settings - config.set(RESTPubSubConfig.CONNECT_TIMEOUT, -88); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), -88); + config.set(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, -88); + Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), -88); config.validate(); - Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), RESTPubSubConfig.DEFAULT_CONNECT_TIMEOUT); + Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), RESTPubSubConfig.DEFAULT_SUBSCRIBER_CONNECT_TIMEOUT); } } diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java index c42c10f5..5cb03388 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisherTest.java @@ -34,7 +34,7 @@ public void testSendResultUrlPutInMetadataAckPreserved() throws Exception { doReturn(200).when(mockStatusLine).getStatusCode(); doReturn(mockStatusLine).when(mockResponse).getStatusLine(); doReturn(mockResponse).when(mockClient).execute(any()); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000); publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.ACKNOWLEDGE)); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); @@ -52,7 +52,7 @@ public void testSendResultUrlPutInMetadataAckPreserved() throws Exception { @Test public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception { CloseableHttpClient mockClient = mock(CloseableHttpClient.class); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000); publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE)); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); @@ -70,7 +70,7 @@ public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception { @Test public void testSendMetadataCreated() throws Exception { CloseableHttpClient mockClient = mock(CloseableHttpClient.class); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000); publisher.send("foo", "bar"); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpPost.class); @@ -89,7 +89,7 @@ public void testSendMetadataCreated() throws Exception { public void testClose() throws Exception { CloseableHttpClient mockClient = mock(CloseableHttpClient.class); doNothing().when(mockClient).close(); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url"); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000); publisher.close(); verify(mockClient).close(); } @@ -98,7 +98,7 @@ public void testClose() throws Exception { public void testCloseDoesNotThrow() throws Exception { CloseableHttpClient mockClient = mock(CloseableHttpClient.class); doThrow(new IOException("error!")).when(mockClient).close(); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null, 5000); publisher.close(); verify(mockClient).close(); @@ -107,7 +107,7 @@ public void testCloseDoesNotThrow() throws Exception { @Test public void testBadResponseDoesNotThrow() throws Exception { CloseableHttpClient mockClient = mock(CloseableHttpClient.class); - RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url"); + RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url", 5000); PubSubMessage message = mock(PubSubMessage.class); // This will compel the HttpPost object to throw an exception in RESTPublisher.sendToURL() doReturn(null).when(message).asJSON(); diff --git a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java index de80120b..5f9a47cb 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisherTest.java @@ -24,7 +24,7 @@ public class RESTResultPublisherTest { @Test public void testSendPullsURLFromMessage() throws Exception { CloseableHttpClient mockClient = mock(CloseableHttpClient.class); - RESTResultPublisher publisher = new RESTResultPublisher(mockClient); + RESTResultPublisher publisher = new RESTResultPublisher(mockClient, 5000); Metadata metadata = new Metadata(null, "my/custom/url"); publisher.send(new PubSubMessage("foo", "bar", metadata)); @@ -49,7 +49,7 @@ public void testSendPullsURLFromMessage() throws Exception { public void testClose() throws Exception { CloseableHttpClient mockClient = mock(CloseableHttpClient.class); doNothing().when(mockClient).close(); - RESTResultPublisher publisher = new RESTResultPublisher(mockClient); + RESTResultPublisher publisher = new RESTResultPublisher(mockClient, 5000); publisher.close(); verify(mockClient).close(); @@ -59,7 +59,7 @@ public void testClose() throws Exception { public void testCloseDoesNotThrow() throws Exception { CloseableHttpClient mockClient = mock(CloseableHttpClient.class); doThrow(new IOException("error!")).when(mockClient).close(); - RESTResultPublisher publisher = new RESTResultPublisher(mockClient); + RESTResultPublisher publisher = new RESTResultPublisher(mockClient, 5000); publisher.close(); verify(mockClient).close(); diff --git a/src/test/resources/test_config.yaml b/src/test/resources/test_config.yaml index d0568a9c..d1f30eaf 100644 --- a/src/test/resources/test_config.yaml +++ b/src/test/resources/test_config.yaml @@ -5,7 +5,7 @@ bullet.query.aggregation.max.size: 100 fake.setting: null bullet.pubsub.context.name: "QUERY_SUBMISSION" bullet.pubsub.class.name: "com.yahoo.bullet.pubsub.MockPubSub" -bullet.pubsub.rest.connect.timeout.ms: 88 +bullet.pubsub.rest.subscriber.connect.timeout.ms: 88 bullet.pubsub.rest.query.urls: - "http://localhost:9901/CUSTOM/query" - "http://localhost:9902/CUSTOM/query"