Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@
<version>0.2.1</version>
</dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>2.0.37</version>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.4</version>
</dependency>
</dependencies>

Expand Down
30 changes: 7 additions & 23 deletions src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,23 @@
import com.yahoo.bullet.pubsub.Publisher;
import com.yahoo.bullet.pubsub.Subscriber;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;

import org.apache.http.impl.client.HttpClients;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Slf4j
public class RESTPubSub extends PubSub {
private static final int NO_TIMEOUT = -1;
public static final int OK_200 = 200;
public static final int NO_CONTENT_204 = 204;
public static final String UTF_8 = "UTF-8";

/**
* Create a RESTPubSub from a {@link BulletConfig}.
*
* @param config The config.
* @throws PubSubException
* @throws PubSubException if the context name is not present or cannot be parsed.
*/
public RESTPubSub(BulletConfig config) throws PubSubException {
super(config);
Expand All @@ -41,11 +37,11 @@ public RESTPubSub(BulletConfig config) throws PubSubException {
@Override
public Publisher getPublisher() {
if (context == Context.QUERY_PROCESSING) {
return new RESTResultPublisher(getClient());
return new RESTResultPublisher(HttpClients.createDefault());
} else {
String queryURL = ((List<String>) config.getAs(RESTPubSubConfig.QUERY_URLS, List.class)).get(0);
String resultURL = config.getAs(RESTPubSubConfig.RESULT_URL, String.class);
return new RESTQueryPublisher(getClient(), queryURL, resultURL);
return new RESTQueryPublisher(HttpClients.createDefault(), queryURL, resultURL);
}
}

Expand All @@ -57,7 +53,7 @@ public List<Publisher> getPublishers(int n) {
@Override
public Subscriber getSubscriber() {
int maxUncommittedMessages = config.getAs(RESTPubSubConfig.MAX_UNCOMMITTED_MESSAGES, Integer.class);
AsyncHttpClient client = getClient();
int connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Integer.class);
List<String> urls;
Long minWait;

Expand All @@ -68,23 +64,11 @@ public Subscriber getSubscriber() {
urls = Collections.singletonList(config.getAs(RESTPubSubConfig.RESULT_URL, String.class));
minWait = config.getAs(RESTPubSubConfig.RESULT_SUBSCRIBER_MIN_WAIT, Long.class);
}
return new RESTSubscriber(maxUncommittedMessages, urls, client, minWait);
return new RESTSubscriber(maxUncommittedMessages, urls, HttpClients.createDefault(), minWait, connectTimeout);
}

@Override
public List<Subscriber> getSubscribers(int n) {
return IntStream.range(0, n).mapToObj(i -> getSubscriber()).collect(Collectors.toList());
}

private AsyncHttpClient getClient() {
Long connectTimeout = config.getAs(RESTPubSubConfig.CONNECT_TIMEOUT, Long.class);
int retryLimit = config.getAs(RESTPubSubConfig.CONNECT_RETRY_LIMIT, Integer.class);
AsyncHttpClientConfig clientConfig =
new DefaultAsyncHttpClientConfig.Builder().setConnectTimeout(connectTimeout.intValue())
.setMaxRequestRetry(retryLimit)
.setReadTimeout(NO_TIMEOUT)
.setRequestTimeout(NO_TIMEOUT)
.build();
return new DefaultAsyncHttpClient(clientConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ public class RESTPubSubConfig extends BulletConfig {
// Field names
public static final String PREFIX = "bullet.pubsub.rest.";
public static final String CONNECT_TIMEOUT = PREFIX + "connect.timeout.ms";
public static final String CONNECT_RETRY_LIMIT = PREFIX + "connect.retry.limit";
public static final String MAX_UNCOMMITTED_MESSAGES = PREFIX + "subscriber.max.uncommitted.messages";
public static final String QUERY_URLS = PREFIX + "query.urls";
public static final String RESULT_URL = PREFIX + "result.url";
public static final String RESULT_SUBSCRIBER_MIN_WAIT = PREFIX + "result.subscriber.min.wait.ms";
public static final String QUERY_SUBSCRIBER_MIN_WAIT = PREFIX + "query.subscriber.min.wait.ms";

// Defaults
public static final long DEFAULT_CONNECT_TIMEOUT = 5000L;
public static final int DEFAULT_CONNECT_RETRY_LIMIT = 3;
public static final int DEFAULT_CONNECT_TIMEOUT = 5000;
public static final int DEFAULT_MAX_UNCOMMITTED_MESSAGES = 100;
public static final List<String> DEFAULT_QUERY_URLS = Arrays.asList("http://localhost:9901/api/bullet/pubsub/query",
"http://localhost:9902/api/bullet/pubsub/query");
Expand All @@ -41,10 +39,6 @@ public class RESTPubSubConfig extends BulletConfig {
VALIDATOR.define(CONNECT_TIMEOUT)
.defaultTo(DEFAULT_CONNECT_TIMEOUT)
.checkIf(Validator::isPositiveInt)
.castTo(Validator::asLong);
VALIDATOR.define(CONNECT_RETRY_LIMIT)
.defaultTo(DEFAULT_CONNECT_RETRY_LIMIT)
.checkIf(Validator::isPositiveInt)
.castTo(Validator::asInt);
VALIDATOR.define(MAX_UNCOMMITTED_MESSAGES)
.defaultTo(DEFAULT_MAX_UNCOMMITTED_MESSAGES)
Expand Down
52 changes: 20 additions & 32 deletions src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.pubsub.Publisher;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Response;

import org.apache.http.client.methods.HttpPost;
import org.apache.http.HttpResponse;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import java.io.IOException;
import java.util.function.Consumer;

@Slf4j
public abstract class RESTPublisher implements Publisher {
public static final String APPLICATION_JSON = "application/json";
public static final String CONTENT_TYPE = "content-type";

private AsyncHttpClient client;
private CloseableHttpClient client;

/**
* Create a RESTQueryPublisher from a {@link RESTPubSubConfig} and a {@link AsyncHttpClient}.
* Create a RESTQueryPublisher from a {@link CloseableHttpClient}.
*
* @param client The client.
*/
public RESTPublisher(AsyncHttpClient client) {
public RESTPublisher(CloseableHttpClient client) {
this.client = client;
}

Expand All @@ -35,7 +35,7 @@ public void close() {
try {
client.close();
} catch (IOException e) {
log.error("Caught exception when closing AsyncHttpClient...: ", e);
log.error("Caught exception when closing client: ", e);
}
}

Expand All @@ -47,30 +47,18 @@ public void close() {
*/
protected void sendToURL(String url, PubSubMessage message) {
log.debug("Sending message: {} to url: {}", message, url);
client.preparePost(url)
.setBody(message.asJSON())
.setHeader(CONTENT_TYPE, APPLICATION_JSON)
.execute()
.toCompletableFuture()
.exceptionally(this::handleException)
.thenAcceptAsync(createResponseConsumer(message.getId()));
}

private Consumer<Response> createResponseConsumer(String id) {
// Create a closure with id
return response -> handleResponse(id, response);
}

private void handleResponse(String id, Response response) {
if (response == null || response.getStatusCode() != RESTPubSub.OK_200) {
log.error("Failed to write message with id: {}. Couldn't reach pubsub server. Got response: {}", id, response);
return;
try {
HttpPost httpPost = new HttpPost(url);
httpPost.setEntity(new StringEntity(message.asJSON()));
httpPost.setHeader(CONTENT_TYPE, APPLICATION_JSON);
HttpResponse response = client.execute(httpPost);
if (response == null || response.getStatusLine().getStatusCode() != RESTPubSub.OK_200) {
log.error("Couldn't reach REST pubsub server. Got response: {}", response);
return;
}
log.debug("Successfully wrote message with status code {}. Response was: {}", response.getStatusLine().getStatusCode(), response);
} catch (Exception e) {
log.error("Error encoding message in preparation for POST: ", e);
}
log.debug("Successfully wrote message with id {}. Response was: {} {}", id, response.getStatusCode(), response.getStatusText());
}

private Response handleException(Throwable throwable) {
log.error("Received error while posting query", throwable);
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.AsyncHttpClient;
import org.apache.http.impl.client.CloseableHttpClient;

@Slf4j
public class RESTQueryPublisher extends RESTPublisher {
Expand All @@ -20,14 +20,14 @@ public class RESTQueryPublisher extends RESTPublisher {
private String resultURL;

/**
* Create a RESTQueryPublisher from a {@link AsyncHttpClient}, queryURL and resultURL. The BulletConfig must
* Create a RESTQueryPublisher from a {@link CloseableHttpClient}, queryURL and resultURL. The BulletConfig must
* contain a valid url in the bullet.pubsub.rest.query.urls field.
*
* @param client The client.
* @param queryURL The URL to which to POST queries.
* @param resultURL The URL that will be added to the Metadata (results will be sent to this URL from the backend).
*/
public RESTQueryPublisher(AsyncHttpClient client, String queryURL, String resultURL) {
public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String resultURL) {
super(client);
this.queryURL = queryURL;
this.resultURL = resultURL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,23 @@
*/
package com.yahoo.bullet.pubsub.rest;

import com.yahoo.bullet.pubsub.PubSubException;
import com.yahoo.bullet.pubsub.PubSubMessage;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.AsyncHttpClient;
import org.apache.http.impl.client.CloseableHttpClient;

@Slf4j
public class RESTResultPublisher extends RESTPublisher {
/**
* Create a RESTQueryPublisher from a {@link AsyncHttpClient}.
* Create a RESTQueryPublisher from a {@link CloseableHttpClient}.
*
* @param client The client.
*/
public RESTResultPublisher(AsyncHttpClient client) {
public RESTResultPublisher(CloseableHttpClient client) {
super(client);
}

@Override
public void send(PubSubMessage message) throws PubSubException {
public void send(PubSubMessage message) {
String url = (String) message.getMetadata().getContent();
log.debug("Extracted url to which to send results: {}", url);
sendToURL(url, message);
Expand Down
38 changes: 27 additions & 11 deletions src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,40 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Response;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;

import org.apache.http.util.EntityUtils;

@Slf4j
public class RESTSubscriber extends BufferingSubscriber {
@Getter(AccessLevel.PACKAGE)
private List<String> urls;
private AsyncHttpClient client;
private CloseableHttpClient client;
private long minWait;
private long lastRequest;
private int connectTimeout;

/**
* Create a RESTSubscriber from a {@link RESTPubSubConfig}.
* Create a RESTSubscriber.
*
* @param maxUncommittedMessages The maximum number of records that will be buffered before commit() must be called.
* @param urls The URLs which will be used to make the http request.
* @param client The client to use to make http requests.
* @param minWait The minimum time (ms) to wait between subsequent http requests.
*/
public RESTSubscriber(int maxUncommittedMessages, List<String> urls, AsyncHttpClient client, long minWait) {
public RESTSubscriber(int maxUncommittedMessages, List<String> urls, CloseableHttpClient client, long minWait, int connectTimeout) {
super(maxUncommittedMessages);
this.client = client;
this.urls = urls;
this.minWait = minWait;
this.lastRequest = 0;
this.connectTimeout = connectTimeout;
}

@Override
Expand All @@ -52,17 +57,18 @@ public List<PubSubMessage> getMessages() throws PubSubException {
lastRequest = currentTime;
for (String url : urls) {
try {
log.debug("Getting messages from url: ", url);
Response response = client.prepareGet(url).execute().get();
int statusCode = response.getStatusCode();
log.debug("Getting messages from url: {}", url);
HttpResponse response = client.execute(makeHttpGet(url));
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == RESTPubSub.OK_200) {
messages.add(PubSubMessage.fromJSON(response.getResponseBody()));
String message = EntityUtils.toString(response.getEntity(), RESTPubSub.UTF_8);
messages.add(PubSubMessage.fromJSON(message));
} else if (statusCode != RESTPubSub.NO_CONTENT_204) {
// NO_CONTENT_204 indicates there are no new messages - anything else indicates a problem
log.error("Http call failed with status code {} and response {}.", statusCode, response);
}
} catch (Exception e) {
log.error("Http call failed with error: ", e);
log.error("Http call to {} failed with error: {}", url, e);
}
}
return messages;
Expand All @@ -76,4 +82,14 @@ public void close() {
log.warn("Caught exception when closing AsyncHttpClient: ", e);
}
}

private HttpGet makeHttpGet(String url) {
HttpGet httpGet = new HttpGet(url);
RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout(connectTimeout)
.setSocketTimeout(connectTimeout)
.build();
httpGet.setConfig(requestConfig);
return httpGet;
}
}
2 changes: 0 additions & 2 deletions src/main/resources/rest_pubsub_defaults.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# Http connection timout (used by both the web service and the backend)
bullet.pubsub.rest.connect.timeout.ms: 5000
# Http connection retry limit (used by both the web service and the backend)
bullet.pubsub.rest.connect.retry.limit: 3
# Maxiumum number of uncommitted messages allowed before read requests will wait for commits (used by both the web service and the backend)
bullet.pubsub.rest.subscriber.max.uncommitted.messages: 100
# Minimum time (ms) between http calls to the result subscriber REST endpoint. This can be used to limit the number of http requests to the REST endpoints
Expand Down
Loading