Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public RESTPubSub(BulletConfig config) throws PubSubException {

@Override
public Publisher getPublisher() {
int connectTimeout = config.getAs(RESTPubSubConfig.PUBLISHER_CONNECT_TIMEOUT, Integer.class);
if (context == Context.QUERY_PROCESSING) {
return new RESTResultPublisher(HttpClients.createDefault());
return new RESTResultPublisher(HttpClients.createDefault(), connectTimeout);
} else {
String queryURL = ((List<String>) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class)).get(0);
String resultURL = config.getAs(RESTPubSubConfig.RESULT_URL, String.class);
return new RESTQueryPublisher(HttpClients.createDefault(), queryURL, resultURL);
return new RESTQueryPublisher(HttpClients.createDefault(), queryURL, resultURL, connectTimeout);
}
}

Expand All @@ -53,7 +54,7 @@ public List<Publisher> getPublishers(int n) {
@Override
public Subscriber getSubscriber() {
int maxUncommittedMessages = config.getAs(RESTPubSubConfig.MAX_UNCOMMITTED_MESSAGES, Integer.class);
int connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Integer.class);
int connectTimeout = config.getAs(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, Integer.class);
List<String> urls;
Long minWait;

Expand Down
14 changes: 10 additions & 4 deletions src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
public class RESTPubSubConfig extends BulletConfig {
// Field names
public static final String PREFIX = "bullet.pubsub.rest.";
public static final String CONNECT_TIMEOUT = PREFIX + "connect.timeout.ms";
public static final String SUBSCRIBER_CONNECT_TIMEOUT = PREFIX + "subscriber.connect.timeout.ms";
public static final String PUBLISHER_CONNECT_TIMEOUT = PREFIX + "publisher.connect.timeout.ms";
public static final String MAX_UNCOMMITTED_MESSAGES = PREFIX + "subscriber.max.uncommitted.messages";
public static final String QUERY_URLS = PREFIX + "query.urls";
public static final String RESULT_URL = PREFIX + "result.url";
public static final String RESULT_SUBSCRIBER_MIN_WAIT = PREFIX + "result.subscriber.min.wait.ms";
public static final String QUERY_SUBSCRIBER_MIN_WAIT = PREFIX + "query.subscriber.min.wait.ms";

// Defaults
public static final int DEFAULT_CONNECT_TIMEOUT = 5000;
public static final int DEFAULT_SUBSCRIBER_CONNECT_TIMEOUT = 5000;
public static final int DEFAULT_PUBLISHER_CONNECT_TIMEOUT = 5000;
public static final int DEFAULT_MAX_UNCOMMITTED_MESSAGES = 100;
public static final List<String> DEFAULT_QUERY_URLS = Arrays.asList("http://localhost:9901/api/bullet/pubsub/query",
"http://localhost:9902/api/bullet/pubsub/query");
Expand All @@ -36,10 +38,14 @@ public class RESTPubSubConfig extends BulletConfig {

private static final Validator VALIDATOR = new Validator();
static {
VALIDATOR.define(CONNECT_TIMEOUT)
.defaultTo(DEFAULT_CONNECT_TIMEOUT)
VALIDATOR.define(SUBSCRIBER_CONNECT_TIMEOUT)
.defaultTo(DEFAULT_SUBSCRIBER_CONNECT_TIMEOUT)
.checkIf(Validator::isPositiveInt)
.castTo(Validator::asInt);
VALIDATOR.define(PUBLISHER_CONNECT_TIMEOUT)
.defaultTo(DEFAULT_PUBLISHER_CONNECT_TIMEOUT)
.checkIf(Validator::isPositiveInt)
.castTo(Validator::asInt);
VALIDATOR.define(MAX_UNCOMMITTED_MESSAGES)
.defaultTo(DEFAULT_MAX_UNCOMMITTED_MESSAGES)
.checkIf(Validator::isPositiveInt)
Expand Down
25 changes: 19 additions & 6 deletions src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.pubsub.Publisher;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.HttpResponse;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

@Slf4j
public abstract class RESTPublisher implements Publisher {
public static final String APPLICATION_JSON = "application/json";
public static final String CONTENT_TYPE = "content-type";
private int connectTimeout;

private CloseableHttpClient client;

Expand All @@ -26,8 +29,9 @@ public abstract class RESTPublisher implements Publisher {
*
* @param client The client.
*/
public RESTPublisher(CloseableHttpClient client) {
public RESTPublisher(CloseableHttpClient client, int connectTimeout) {
this.client = client;
this.connectTimeout = connectTimeout;
}

@Override
Expand All @@ -48,17 +52,26 @@ public void close() {
protected void sendToURL(String url, PubSubMessage message) {
log.debug("Sending message: {} to url: {}", message, url);
try {
HttpPost httpPost = new HttpPost(url);
httpPost.setEntity(new StringEntity(message.asJSON()));
httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON);
HttpResponse response = client.execute(httpPost);
HttpResponse response = client.execute(makeHttpPost(url, message));
if (response == null || response.getStatusLine().getStatusCode() != RESTPubSub.OK_200) {
log.error("Couldn't reach REST pubsub server. Got response: {}", response);
return;
}
log.debug("Successfully wrote message with status code {}. Response was: {}", response.getStatusLine().getStatusCode(), response);
} catch (Exception e) {
log.error("Error encoding message in preparation for POST: ", e);
log.error("Error when trying to POST. Message was: {}. Error was: ", message.asJSON(), e);
}
}

private HttpPost makeHttpPost(String url, PubSubMessage message) throws UnsupportedEncodingException {
HttpPost httpPost = new HttpPost(url);
httpPost.setEntity(new StringEntity(message.asJSON()));
httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON);
RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout(connectTimeout)
.setSocketTimeout(connectTimeout)
.build();
httpPost.setConfig(requestConfig);
return httpPost;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public class RESTQueryPublisher extends RESTPublisher {
* @param queryURL The URL to which to POST queries.
* @param resultURL The URL that will be added to the Metadata (results will be sent to this URL from the backend).
*/
public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String resultURL) {
super(client);
public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String resultURL, int connectTimeout) {
super(client, connectTimeout);
this.queryURL = queryURL;
this.resultURL = resultURL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ public class RESTResultPublisher extends RESTPublisher {
*
* @param client The client.
*/
public RESTResultPublisher(CloseableHttpClient client) {
super(client);
public RESTResultPublisher(CloseableHttpClient client, int connectTimeout) {
super(client, connectTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ public List<PubSubMessage> getMessages() {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == RESTPubSub.OK_200) {
String message = EntityUtils.toString(response.getEntity(), RESTPubSub.UTF_8);
log.debug("Received message from url: {}. Message was {}", url, message);
messages.add(PubSubMessage.fromJSON(message));
} else if (statusCode != RESTPubSub.NO_CONTENT_204) {
// NO_CONTENT_204 indicates there are no new messages - anything else indicates a problem
log.error("Http call failed with status code {} and response {}.", statusCode, response);
}
} catch (Exception e) {
log.error("Http call to {} failed with error: {}", url, e);
log.error("Http call to {} failed with error:", url, e);
}
}
return messages;
Expand Down
6 changes: 4 additions & 2 deletions src/main/resources/rest_pubsub_defaults.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Http connection timout (used by both the web service and the backend)
bullet.pubsub.rest.connect.timeout.ms: 5000
# Http connection timout for subscribers
bullet.pubsub.rest.subscriber.connect.timeout.ms: 5000
# Http connection timout for publishers
bullet.pubsub.rest.publisher.connect.timeout.ms: 5000
# Maxiumum number of uncommitted messages allowed before read requests will wait for commits (used by both the web service and the backend)
bullet.pubsub.rest.subscriber.max.uncommitted.messages: 100
# Minimum time (ms) between http calls to the result subscriber REST endpoint. This can be used to limit the number of http requests to the REST endpoints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@ public class RESTPubSubConfigTest {
@Test
public void testNoFiles() {
RESTPubSubConfig config = new RESTPubSubConfig((String) null);
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000);

config = new RESTPubSubConfig((Config) null);
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000);

config = new RESTPubSubConfig("");
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000);
}

@Test
public void testMissingFile() {
RESTPubSubConfig config = new RESTPubSubConfig("/path/to/non/existant/file");
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 5000);
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 5000);
}

@Test
public void testCustomConfig() {
RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 88);
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 88);
Assert.assertEquals(config.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), "com.yahoo.bullet.pubsub.MockPubSub");
List<String> queries = ((List<String>) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class));
Assert.assertEquals(queries.size(), 2);
Expand All @@ -57,7 +57,7 @@ public void testCustomProperties() {
@Test
public void testGettingWithDefault() {
RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
Assert.assertEquals(config.getOrDefault(RESTPubSubConfig.CONNECT_TIMEOUT, "51"), 88);
Assert.assertEquals(config.getOrDefault(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, "51"), 88);
Assert.assertEquals(config.getOrDefault("does.not.exist", "foo"), "foo");
Assert.assertEquals(config.getOrDefault("fake.setting", "bar"), "bar");
}
Expand Down Expand Up @@ -90,12 +90,12 @@ public void testMerging() {
RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");

int configSize = config.getAll(Optional.empty()).size();
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 88);
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 88);
Assert.assertEquals(config.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), "com.yahoo.bullet.pubsub.MockPubSub");

Config another = new RESTPubSubConfig((String) null);
another.clear();
another.set(RESTPubSubConfig.CONNECT_TIMEOUT, 51L);
another.set(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, 51L);
// This is a bad setting
another.set(RESTPubSubConfig.AGGREGATION_MAX_SIZE, -1);
// Some other non-Bullet setting
Expand All @@ -104,7 +104,7 @@ public void testMerging() {
config.merge(another);

Assert.assertEquals(config.getAll(Optional.empty()).size(), configSize + 1);
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 51);
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 51);
// Bad setting gets defaulted.
Assert.assertEquals(config.get(RESTPubSubConfig.AGGREGATION_MAX_SIZE), RESTPubSubConfig.DEFAULT_AGGREGATION_MAX_SIZE);
// Other setting is preserved.
Expand All @@ -113,7 +113,7 @@ public void testMerging() {
// Test null and verify it is unchanged
config.merge(null);
Assert.assertEquals(config.getAll(Optional.empty()).size(), configSize + 1);
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), 51);
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), 51);
Assert.assertEquals(config.get(RESTPubSubConfig.AGGREGATION_MAX_SIZE), RESTPubSubConfig.DEFAULT_AGGREGATION_MAX_SIZE);
Assert.assertEquals(config.get("pi"), 3.14);
}
Expand All @@ -125,7 +125,7 @@ public void testPropertiesWithPrefix() {
String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub";

int configSize = config.getAllWithPrefix(Optional.empty(), prefix, false).size();
Assert.assertEquals(configSize, 8);
Assert.assertEquals(configSize, 9);

Map<String, Object> properties = config.getAllWithPrefix(Optional.empty(), prefix, false);
Assert.assertEquals(properties.get(RESTPubSubConfig.PUBSUB_CLASS_NAME), fieldValue);
Expand All @@ -139,7 +139,7 @@ public void testPropertiesStripPrefix() {
String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub";

int configSize = config.getAllWithPrefix(Optional.empty(), prefix, true).size();
Assert.assertEquals(configSize, 8);
Assert.assertEquals(configSize, 9);

Map<String, Object> properties = config.getAllWithPrefix(Optional.empty(), prefix, true);
Assert.assertNull(properties.get(RESTPubSubConfig.PUBSUB_CLASS_NAME));
Expand Down Expand Up @@ -196,9 +196,9 @@ public void testValidate() {
Assert.assertEquals(config.get(BulletConfig.AGGREGATION_DEFAULT_SIZE), BulletConfig.DEFAULT_AGGREGATION_SIZE);

// Test validate() corrects RESTPubSubConfig settings
config.set(RESTPubSubConfig.CONNECT_TIMEOUT, -88);
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), -88);
config.set(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT, -88);
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), -88);
config.validate();
Assert.assertEquals(config.get(RESTPubSubConfig.CONNECT_TIMEOUT), RESTPubSubConfig.DEFAULT_CONNECT_TIMEOUT);
Assert.assertEquals(config.get(RESTPubSubConfig.SUBSCRIBER_CONNECT_TIMEOUT), RESTPubSubConfig.DEFAULT_SUBSCRIBER_CONNECT_TIMEOUT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testSendResultUrlPutInMetadataAckPreserved() throws Exception {
doReturn(200).when(mockStatusLine).getStatusCode();
doReturn(mockStatusLine).when(mockResponse).getStatusLine();
doReturn(mockResponse).when(mockClient).execute(any());
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000);
publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.ACKNOWLEDGE));

ArgumentCaptor<HttpPost> argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
Expand All @@ -52,7 +52,7 @@ public void testSendResultUrlPutInMetadataAckPreserved() throws Exception {
@Test
public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception {
CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000);
publisher.send(new PubSubMessage("foo", "bar", Metadata.Signal.COMPLETE));

ArgumentCaptor<HttpPost> argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
Expand All @@ -70,7 +70,7 @@ public void testSendResultUrlPutInMetadataCompletePreserved() throws Exception {
@Test
public void testSendMetadataCreated() throws Exception {
CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000);
publisher.send("foo", "bar");

ArgumentCaptor<HttpPost> argumentCaptor = ArgumentCaptor.forClass(HttpPost.class);
Expand All @@ -89,7 +89,7 @@ public void testSendMetadataCreated() throws Exception {
public void testClose() throws Exception {
CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
doNothing().when(mockClient).close();
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url");
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/url", 5000);
publisher.close();
verify(mockClient).close();
}
Expand All @@ -98,7 +98,7 @@ public void testClose() throws Exception {
public void testCloseDoesNotThrow() throws Exception {
CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
doThrow(new IOException("error!")).when(mockClient).close();
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null);
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, null, null, 5000);

publisher.close();
verify(mockClient).close();
Expand All @@ -107,7 +107,7 @@ public void testCloseDoesNotThrow() throws Exception {
@Test
public void testBadResponseDoesNotThrow() throws Exception {
CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url");
RESTQueryPublisher publisher = new RESTQueryPublisher(mockClient, "my/custom/query/url", "my/custom/result/url", 5000);
PubSubMessage message = mock(PubSubMessage.class);
// This will compel the HttpPost object to throw an exception in RESTPublisher.sendToURL()
doReturn(null).when(message).asJSON();
Expand Down
Loading