Skip to content

Commit

Permalink
make v1,v2 Iterators in DirectDruidClient driven by factory
Browse files Browse the repository at this point in the history
  • Loading branch information
himanshug committed Aug 16, 2016
1 parent 0d77581 commit f6a7142
Show file tree
Hide file tree
Showing 10 changed files with 523 additions and 287 deletions.
2 changes: 1 addition & 1 deletion server/src/main/java/io/druid/client/BrokerServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private DirectDruidClient makeDirectClient(DruidServer server)
httpClient,
server.getHost(),
emitter,
directDruidClientConfig.isUseV3QueryUrl()
directDruidClientConfig
);
}

Expand Down
275 changes: 16 additions & 259 deletions server/src/main/java/io/druid/client/DirectDruidClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

package io.druid.client;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -35,7 +32,6 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.RE;
import com.metamx.common.guava.BaseSequence;
Expand All @@ -54,14 +50,12 @@
import io.druid.query.BaseQuery;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulatorFns;
import io.druid.server.QueryResourceV3;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpChunk;
Expand All @@ -70,18 +64,14 @@
import org.jboss.netty.handler.codec.http.HttpResponse;

import javax.ws.rs.core.MediaType;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URL;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -105,7 +95,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private final AtomicInteger openConnections;
private final boolean isSmile;

private final boolean useV3QueryUrl;
private final DirectDruidClientConfig config;

public DirectDruidClient(
QueryToolChestWarehouse warehouse,
Expand All @@ -114,7 +104,7 @@ public DirectDruidClient(
HttpClient httpClient,
String host,
ServiceEmitter emitter,
boolean useV3QueryUrl
DirectDruidClientConfig config
)
{
this.warehouse = warehouse;
Expand All @@ -123,7 +113,7 @@ public DirectDruidClient(
this.httpClient = httpClient;
this.host = host;
this.emitter = emitter;
this.useV3QueryUrl = useV3QueryUrl;
this.config = config;

this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
this.openConnections = new AtomicInteger();
Expand Down Expand Up @@ -159,7 +149,7 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
}

final ListenableFuture<InputStream> future;
final String url = String.format("http://%s/druid/%s/", host, useV3QueryUrl ? "v3" : "v2");
final String url = String.format("http://%s/%s", host, config.getQueryResponseIteratorFactory().getQueryUrlPath());
final String cancelUrl = String.format("http://%s/druid/v2/%s", host, query.getId());

try {
Expand Down Expand Up @@ -389,42 +379,22 @@ public void onFailure(Throwable t)
throw Throwables.propagate(e);
}

Sequence<T> retVal;
if (useV3QueryUrl) {
retVal = new BaseSequence<>(
new BaseSequence.IteratorMaker<T, JsonParserV3ResponseIterator<T>>()
Sequence<T> retVal = new BaseSequence<>(
new BaseSequence.IteratorMaker<T, QueryResponseIterator<T>>()
{
@Override
public QueryResponseIterator make()
{
@Override
public JsonParserV3ResponseIterator<T> make()
{
return new JsonParserV3ResponseIterator<T>(typeRef, future, url, objectMapper, host, context);
}

@Override
public void cleanup(JsonParserV3ResponseIterator<T> iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
return config.getQueryResponseIteratorFactory().make(typeRef, future, url, objectMapper, host, context);
}
);
} else {
retVal = new BaseSequence<>(
new BaseSequence.IteratorMaker<T, JsonParserIterator<T>>()
{
@Override
public JsonParserIterator<T> make()
{
return new JsonParserIterator<T>(typeRef, future, url);
}

@Override
public void cleanup(JsonParserIterator<T> iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
@Override
public void cleanup(QueryResponseIterator<T> iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
);
}
}
);

// bySegment queries are de-serialized after caching results in order to
// avoid the cost of de-serializing and then re-serializing again when adding to cache
Expand All @@ -440,217 +410,4 @@ public void cleanup(JsonParserIterator<T> iterFromMake)

return retVal;
}

private class JsonParserIterator<T> implements Iterator<T>, Closeable
{
private JsonParser jp;
private ObjectCodec objectCodec;
private final JavaType typeRef;
private final Future<InputStream> future;
private final String url;

public JsonParserIterator(JavaType typeRef, Future<InputStream> future, String url)
{
this.typeRef = typeRef;
this.future = future;
this.url = url;
jp = null;
}

@Override
public boolean hasNext()
{
init();

if (jp.isClosed()) {
return false;
}
if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
CloseQuietly.close(jp);
return false;
}

return true;
}

@Override
public T next()
{
init();
try {
final T retVal = objectCodec.readValue(jp, typeRef);
jp.nextToken();
return retVal;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}

@Override
public void remove()
{
throw new UnsupportedOperationException();
}

private void init()
{
if (jp == null) {
try {
jp = objectMapper.getFactory().createParser(future.get());
final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw new QueryInterruptedException(cause, host);
} else if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else {
jp.nextToken();
objectCodec = jp.getCodec();
}
}
catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(e, "Failure getting results from[%s] because of [%s]", url, e.getMessage());
}
catch (CancellationException e) {
throw new QueryInterruptedException(e, host);
}
}
}

@Override
public void close() throws IOException
{
if (jp != null) {
jp.close();
}
}
}

// Mostly same as JsonParserIterator with updates to handle v3 query response of type
// { "result": [....], "context": { .... } }
// adds all the context entries from response into the Map responseContext passed to it.
// against the key <host>
static class JsonParserV3ResponseIterator<T> implements Iterator<T>, Closeable
{
private JsonParser jp;
private ObjectCodec objectCodec;
private final JavaType typeRef;
private final Future<InputStream> future;
private final String url;

private final ObjectMapper objectMapper;
private final String host;
private final Map<String, Object> responseContext;

public JsonParserV3ResponseIterator(
JavaType typeRef,
Future<InputStream> future,
String url,
ObjectMapper objectMapper,
String host,
Map<String, Object> responseContext
)
{
this.typeRef = typeRef;
this.future = future;
this.url = url;
jp = null;
this.objectMapper = objectMapper;
this.host = host;
this.responseContext = responseContext;
}

@Override
public boolean hasNext()
{
init();

if (jp.isClosed()) {
return false;
}

if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
// now we are finised reading all the result values.
try {
jp.nextToken(); //read off FIELD_NAME token for "context"
jp.nextToken();
Map<String, Object> ctx = objectCodec.readValue(jp, Map.class);
if (ctx.size() > 0) {
responseContext.put(host, ctx);
}
jp.nextToken(); //read off END_OBJECT token
} catch (IOException ex) {
throw Throwables.propagate(ex);
}

return false;
}


return true;
}

@Override
public T next()
{
init();
try {
final T retVal = objectCodec.readValue(jp, typeRef);
jp.nextToken();
return retVal;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}

@Override
public void remove()
{
throw new UnsupportedOperationException();
}

private void init()
{
if (jp == null) {
try {
jp = objectMapper.getFactory().createParser(future.get());
if (jp.nextToken() == JsonToken.START_OBJECT) {
if (jp.nextToken() == JsonToken.FIELD_NAME) {
if (QueryResourceV3.KEY_RESULT.equals(jp.getCurrentName())) {
if (jp.nextToken() == JsonToken.START_ARRAY) {
jp.nextToken();
objectCodec = jp.getCodec();
} else {
throw new IAE("result must be array, token was[%s] from url [%s]", jp.getCurrentToken(), url);
}
} else {
QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw new QueryInterruptedException(cause, host);
}
} else {
throw new IAE("Next token wasn't a FIELD_NAME, was[%s] from url [%s]", jp.getCurrentToken(), url);
}
} else {
throw new IAE("expecting json object, was[%s] from url [%s]", jp.getCurrentToken(), url);
}
}
catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(e, "Failure getting results from[%s] because of [%s]", url, e.getMessage());
}
catch (CancellationException e) {
throw new QueryInterruptedException(e, host);
}
}
}

@Override
public void close() throws IOException
{
if (jp != null) {
jp.close();
}
}
}
}
14 changes: 11 additions & 3 deletions server/src/main/java/io/druid/client/DirectDruidClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@
*/
public class DirectDruidClientConfig
{
@JsonProperty("queryResponseIterator")
private QueryResponseIteratorFactory queryResponseIteratorFactory = new V2QueryResponseIteratorFactory();

@JsonProperty
private boolean useV3QueryUrl = false;
private String stuff;

public QueryResponseIteratorFactory getQueryResponseIteratorFactory()
{
return queryResponseIteratorFactory;
}

public boolean isUseV3QueryUrl()
public String getStuff()
{
return useV3QueryUrl;
return stuff;
}
}

0 comments on commit f6a7142

Please sign in to comment.