Skip to content

Commit

Permalink
Merge pull request #1622 from metamx/smile-eventreceiverfirehose
Browse files Browse the repository at this point in the history
Support for JSON Smile format for EventReceiverFirehoseFactory
  • Loading branch information
drcrallen committed Aug 27, 2015
2 parents c1388a1 + e6b20db commit 10af233
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.api.client.util.Charsets;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
Expand All @@ -44,14 +45,22 @@ public class EventReceiverFirehoseTestClient
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final String chatID;
private final ObjectMapper smileMapper;

public EventReceiverFirehoseTestClient(String host, String chatID, ObjectMapper jsonMapper, HttpClient httpClient)
public EventReceiverFirehoseTestClient(
String host,
String chatID,
ObjectMapper jsonMapper,
HttpClient httpClient,
ObjectMapper smileMapper
)
{
this.host = host;
this.jsonMapper = jsonMapper;
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8);
this.httpClient = httpClient;
this.chatID = chatID;
this.smileMapper = smileMapper;
}

private String getURL()
Expand All @@ -70,15 +79,15 @@ private String getURL()
*
* @return
*/
public int postEvents(Collection<Map<String, Object>> events)
public int postEvents(Collection<Map<String, Object>> events, ObjectMapper objectMapper, String mediaType)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.POST, new URL(getURL())
).setContent(
MediaType.APPLICATION_JSON,
this.jsonMapper.writeValueAsBytes(events)
mediaType,
objectMapper.writeValueAsBytes(events)
),
responseHandler
).get();
Expand All @@ -91,7 +100,7 @@ HttpMethod.POST, new URL(getURL())
response.getContent()
);
}
Map<String, Integer> responseData = jsonMapper.readValue(
Map<String, Integer> responseData = objectMapper.readValue(
response.getContent(), new TypeReference<Map<String, Integer>>()
{
}
Expand All @@ -103,18 +112,31 @@ HttpMethod.POST, new URL(getURL())
}
}

/**
* Reads each events from file and post them to the indexing service.
* Uses both smileMapper and jsonMapper to send events alternately.
*
* @param file location of file to post events from
*
* @return number of events sent to the indexing service
*/
public int postEventsFromFile(String file)
{
try {
BufferedReader reader = new BufferedReader(
new InputStreamReader(
EventReceiverFirehoseTestClient.class.getResourceAsStream(
file
)
)
);
try (
BufferedReader reader = new BufferedReader(
new InputStreamReader(
EventReceiverFirehoseTestClient.class.getResourceAsStream(
file
)
)
);
) {

String s;
Collection<Map<String, Object>> events = new ArrayList<Map<String, Object>>();
// Test sending events using both jsonMapper and smileMapper.
// sends events one by one using both jsonMapper and smileMapper.
int totalEventsPosted = 0;
while ((s = reader.readLine()) != null) {
events.add(
(Map<String, Object>) this.jsonMapper.readValue(
Expand All @@ -123,12 +145,19 @@ public int postEventsFromFile(String file)
}
)
);
ObjectMapper mapper = (totalEventsPosted % 2 == 0) ? jsonMapper : smileMapper;
String mediaType = (totalEventsPosted % 2 == 0)
? MediaType.APPLICATION_JSON
: SmileMediaTypes.APPLICATION_JACKSON_SMILE;
totalEventsPosted += postEvents(events, mapper, mediaType);
;
events = new ArrayList<>();
}
int eventsPosted = postEvents(events);
if (eventsPosted != events.size()) {
throw new ISE("All events not posted, expected : %d actual : %d", events.size(), eventsPosted);

if (totalEventsPosted != events.size()) {
throw new ISE("All events not posted, expected : %d actual : %d", events.size(), totalEventsPosted);
}
return eventsPosted;
return totalEventsPosted;
}
catch (Exception e) {
throw Throwables.propagate(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.testing.clients.CoordinatorResourceTestClient;
import io.druid.testing.clients.OverlordResourceTestClient;
import io.druid.testing.utils.FromFileTestQueryHelper;
Expand All @@ -38,8 +40,11 @@ public abstract class AbstractIndexerTest
@Inject
protected OverlordResourceTestClient indexer;
@Inject
@Json
protected ObjectMapper jsonMapper;

@Inject
@Smile
protected ObjectMapper smileMapper;
@Inject
protected FromFileTestQueryHelper queryHelper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void postEvents() throws Exception
host,
EVENT_RECEIVER_SERVICE_NAME,
jsonMapper,
httpClient
httpClient,
smileMapper
);
client.postEventsFromFile(EVENT_DATA_FILE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ public void postEvents(int id) throws Exception
host,
EVENT_RECEIVER_SERVICE_PREFIX + id,
jsonMapper,
httpClient
httpClient,
smileMapper
);
client.postEventsFromFile(UNION_DATA_FILE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
Expand All @@ -32,9 +36,15 @@
import io.druid.data.input.Rows;
import io.druid.data.input.impl.MapInputRowParser;

import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import java.io.InputStream;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
Expand All @@ -58,19 +68,25 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
private final String serviceName;
private final int bufferSize;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;

@JsonCreator
public EventReceiverFirehoseFactory(
@JsonProperty("serviceName") String serviceName,
@JsonProperty("bufferSize") Integer bufferSize,
@JacksonInject ChatHandlerProvider chatHandlerProvider
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject @Json ObjectMapper jsonMapper,
@JacksonInject @Smile ObjectMapper smileMapper
)
{
Preconditions.checkNotNull(serviceName, "serviceName");

this.serviceName = serviceName;
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
}

@Override
Expand Down Expand Up @@ -123,9 +139,30 @@ public EventReceiverFirehose(MapInputRowParser parser)

@POST
@Path("/push-events")
@Produces(MediaType.APPLICATION_JSON)
public Response addAll(Collection<Map<String, Object>> events)
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response addAll(
InputStream in,
@Context final HttpServletRequest req // used only to get request content-type
)
{
final String reqContentType = req.getContentType();
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType);
final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON;

ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;

Collection<Map<String, Object>> events = null;
try {
events = objectMapper.readValue(
in, new TypeReference<Collection<Map<String, Object>>>()
{
}
);
}
catch (IOException e) {
return Response.serverError().entity(ImmutableMap.<String, Object>of("error", e.getMessage())).build();
}
log.debug("Adding %,d events to firehose: %s", events.size(), serviceName);

final List<InputRow> rows = Lists.newArrayList();
Expand All @@ -146,12 +183,18 @@ public Response addAll(Collection<Map<String, Object>> events)
}
}

return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build();
return Response.ok(
objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())),
contentType
).build();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
}

@Override
Expand Down

0 comments on commit 10af233

Please sign in to comment.