-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for JSON Smile format for EventReceiverFirehoseFactory #1622
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; | ||
import com.google.api.client.util.Charsets; | ||
import com.google.common.base.Throwables; | ||
import com.metamx.common.ISE; | ||
|
@@ -44,14 +45,22 @@ public class EventReceiverFirehoseTestClient | |
private final ObjectMapper jsonMapper; | ||
private final HttpClient httpClient; | ||
private final String chatID; | ||
private final ObjectMapper smileMapper; | ||
|
||
public EventReceiverFirehoseTestClient(String host, String chatID, ObjectMapper jsonMapper, HttpClient httpClient) | ||
public EventReceiverFirehoseTestClient( | ||
String host, | ||
String chatID, | ||
ObjectMapper jsonMapper, | ||
HttpClient httpClient, | ||
ObjectMapper smileMapper | ||
) | ||
{ | ||
this.host = host; | ||
this.jsonMapper = jsonMapper; | ||
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8); | ||
this.httpClient = httpClient; | ||
this.chatID = chatID; | ||
this.smileMapper = smileMapper; | ||
} | ||
|
||
private String getURL() | ||
|
@@ -70,15 +79,15 @@ private String getURL() | |
* | ||
* @return | ||
*/ | ||
public int postEvents(Collection<Map<String, Object>> events) | ||
public int postEvents(Collection<Map<String, Object>> events, ObjectMapper objectMapper, String mediaType) | ||
{ | ||
try { | ||
StatusResponseHolder response = httpClient.go( | ||
new Request( | ||
HttpMethod.POST, new URL(getURL()) | ||
).setContent( | ||
MediaType.APPLICATION_JSON, | ||
this.jsonMapper.writeValueAsBytes(events) | ||
mediaType, | ||
objectMapper.writeValueAsBytes(events) | ||
), | ||
responseHandler | ||
).get(); | ||
|
@@ -91,7 +100,7 @@ HttpMethod.POST, new URL(getURL()) | |
response.getContent() | ||
); | ||
} | ||
Map<String, Integer> responseData = jsonMapper.readValue( | ||
Map<String, Integer> responseData = objectMapper.readValue( | ||
response.getContent(), new TypeReference<Map<String, Integer>>() | ||
{ | ||
} | ||
|
@@ -103,18 +112,31 @@ HttpMethod.POST, new URL(getURL()) | |
} | ||
} | ||
|
||
/** | ||
* Reads each events from file and post them to the indexing service. | ||
* Uses both smileMapper and jsonMapper to send events alternately. | ||
* | ||
* @param file location of file to post events from | ||
* | ||
* @return number of events sent to the indexing service | ||
*/ | ||
public int postEventsFromFile(String file) | ||
{ | ||
try { | ||
BufferedReader reader = new BufferedReader( | ||
new InputStreamReader( | ||
EventReceiverFirehoseTestClient.class.getResourceAsStream( | ||
file | ||
) | ||
) | ||
); | ||
try ( | ||
BufferedReader reader = new BufferedReader( | ||
new InputStreamReader( | ||
EventReceiverFirehoseTestClient.class.getResourceAsStream( | ||
file | ||
) | ||
) | ||
); | ||
) { | ||
|
||
String s; | ||
Collection<Map<String, Object>> events = new ArrayList<Map<String, Object>>(); | ||
// Test sending events using both jsonMapper and smileMapper. | ||
// sends events one by one using both jsonMapper and smileMapper. | ||
int totalEventsPosted = 0; | ||
while ((s = reader.readLine()) != null) { | ||
events.add( | ||
(Map<String, Object>) this.jsonMapper.readValue( | ||
|
@@ -123,12 +145,19 @@ public int postEventsFromFile(String file) | |
} | ||
) | ||
); | ||
ObjectMapper mapper = (totalEventsPosted % 2 == 0) ? jsonMapper : smileMapper; | ||
String mediaType = (totalEventsPosted % 2 == 0) | ||
? MediaType.APPLICATION_JSON | ||
: SmileMediaTypes.APPLICATION_JACKSON_SMILE; | ||
totalEventsPosted += postEvents(events, mapper, mediaType); | ||
; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extraneous semicolon |
||
events = new ArrayList<>(); | ||
} | ||
int eventsPosted = postEvents(events); | ||
if (eventsPosted != events.size()) { | ||
throw new ISE("All events not posted, expected : %d actual : %d", events.size(), eventsPosted); | ||
|
||
if (totalEventsPosted != events.size()) { | ||
throw new ISE("All events not posted, expected : %d actual : %d", events.size(), totalEventsPosted); | ||
} | ||
return eventsPosted; | ||
return totalEventsPosted; | ||
} | ||
catch (Exception e) { | ||
throw Throwables.propagate(e); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,10 @@ | |
import com.fasterxml.jackson.annotation.JacksonInject; | ||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; | ||
import com.google.common.base.Optional; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.base.Throwables; | ||
|
@@ -32,9 +36,15 @@ | |
import io.druid.data.input.Rows; | ||
import io.druid.data.input.impl.MapInputRowParser; | ||
|
||
import io.druid.guice.annotations.Json; | ||
import io.druid.guice.annotations.Smile; | ||
import java.io.InputStream; | ||
import javax.servlet.http.HttpServletRequest; | ||
import javax.ws.rs.Consumes; | ||
import javax.ws.rs.POST; | ||
import javax.ws.rs.Path; | ||
import javax.ws.rs.Produces; | ||
import javax.ws.rs.core.Context; | ||
import javax.ws.rs.core.MediaType; | ||
import javax.ws.rs.core.Response; | ||
import java.io.IOException; | ||
|
@@ -58,19 +68,25 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow | |
private final String serviceName; | ||
private final int bufferSize; | ||
private final Optional<ChatHandlerProvider> chatHandlerProvider; | ||
private final ObjectMapper jsonMapper; | ||
private final ObjectMapper smileMapper; | ||
|
||
@JsonCreator | ||
public EventReceiverFirehoseFactory( | ||
@JsonProperty("serviceName") String serviceName, | ||
@JsonProperty("bufferSize") Integer bufferSize, | ||
@JacksonInject ChatHandlerProvider chatHandlerProvider | ||
@JacksonInject ChatHandlerProvider chatHandlerProvider, | ||
@JacksonInject @Json ObjectMapper jsonMapper, | ||
@JacksonInject @Smile ObjectMapper smileMapper | ||
) | ||
{ | ||
Preconditions.checkNotNull(serviceName, "serviceName"); | ||
|
||
this.serviceName = serviceName; | ||
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize; | ||
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); | ||
this.jsonMapper = jsonMapper; | ||
this.smileMapper = smileMapper; | ||
} | ||
|
||
@Override | ||
|
@@ -123,9 +139,30 @@ public EventReceiverFirehose(MapInputRowParser parser) | |
|
||
@POST | ||
@Path("/push-events") | ||
@Produces(MediaType.APPLICATION_JSON) | ||
public Response addAll(Collection<Map<String, Object>> events) | ||
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) | ||
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) | ||
public Response addAll( | ||
InputStream in, | ||
@Context final HttpServletRequest req // used only to get request content-type | ||
) | ||
{ | ||
final String reqContentType = req.getContentType(); | ||
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType); | ||
final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON; | ||
|
||
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; | ||
|
||
Collection<Map<String, Object>> events = null; | ||
try { | ||
events = objectMapper.readValue( | ||
in, new TypeReference<Collection<Map<String, Object>>>() | ||
{ | ||
} | ||
); | ||
} | ||
catch (IOException e) { | ||
return Response.serverError().entity(ImmutableMap.<String, Object>of("error", e.getMessage())).build(); | ||
} | ||
log.debug("Adding %,d events to firehose: %s", events.size(), serviceName); | ||
|
||
final List<InputRow> rows = Lists.newArrayList(); | ||
|
@@ -146,12 +183,18 @@ public Response addAll(Collection<Map<String, Object>> events) | |
} | ||
} | ||
|
||
return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build(); | ||
return Response.ok( | ||
objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())), | ||
contentType | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there are a lot of other places that do not do this (add content type to the Response). Do we need to add it elsewhere? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the general contract that the existing apis follow is that if the content received is smile, the response is also sent in smile format. e.g. in QueryResource. Its better to set content type if someone relies on that to parse the response. |
||
).build(); | ||
} | ||
catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
throw Throwables.propagate(e); | ||
} | ||
catch (JsonProcessingException e) { | ||
throw Throwables.propagate(e); | ||
} | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be try with resources? I don't see the reader closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.