Skip to content

Commit

Permalink
DRILL-2172: make web UI bound check before accessing a vector, return…
Browse files Browse the repository at this point in the history
…ing null for overflowing indices; fix a non-string type rendering problem;
  • Loading branch information
Hanifi Gunes authored and parthchandra committed Feb 9, 2015
1 parent 0d867b0 commit 3d863b5
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 116 deletions.
Expand Up @@ -17,7 +17,7 @@
*/ */
package org.apache.drill.exec.server.rest; package org.apache.drill.exec.server.rest;


import java.util.ArrayList; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;


Expand All @@ -30,6 +30,7 @@
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;


import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.ClusterCoordinator;
Expand All @@ -55,7 +56,7 @@ public Viewable getQuery() {
@Path("/query.json") @Path("/query.json")
@Consumes(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public List<Map<String, Object>> submitQueryJSON(QueryWrapper query) throws Exception { public QueryWrapper.QueryResult submitQueryJSON(QueryWrapper query) throws Exception {
final DrillConfig config = work.getContext().getConfig(); final DrillConfig config = work.getContext().getConfig();
final ClusterCoordinator coordinator = work.getContext().getClusterCoordinator(); final ClusterCoordinator coordinator = work.getContext().getClusterCoordinator();
final BufferAllocator allocator = work.getContext().getAllocator(); final BufferAllocator allocator = work.getContext().getAllocator();
Expand All @@ -67,58 +68,43 @@ public List<Map<String, Object>> submitQueryJSON(QueryWrapper query) throws Exce
@Consumes(MediaType.APPLICATION_FORM_URLENCODED) @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces(MediaType.TEXT_HTML) @Produces(MediaType.TEXT_HTML)
public Viewable submitQuery(@FormParam("query") String query, @FormParam("queryType") String queryType) throws Exception { public Viewable submitQuery(@FormParam("query") String query, @FormParam("queryType") String queryType) throws Exception {
List<Map<String, Object>> result = submitQueryJSON(new QueryWrapper(query, queryType)); final QueryWrapper.QueryResult result = submitQueryJSON(new QueryWrapper(query, queryType));

return new Viewable("/rest/query/result.ftl", new TabularResult(result));
List<String> columnNames;
if (result.isEmpty()) {
columnNames = Lists.newArrayList();
} else {
columnNames = Lists.newArrayList(result.get(0).keySet());
}
List<List<Object>> records = new ArrayList<>();

if(!isEmptyResult(result)) {
for (Map m : result) {
records.add(new ArrayList<Object>(m.values()));
}
}

Table table = new Table(columnNames, records);

return new Viewable("/rest/query/result.ftl", table);
} }


private boolean isEmptyResult(List<Map<String, Object>> result) { public static class TabularResult {
if (result.size() > 1) { private final List<String> columns;
return false; private final List<List<String>> rows;
} else {
for(Object col : result.get(0).values()) { public TabularResult(QueryWrapper.QueryResult result) {
if(col != null) { final List<List<String>> rows = Lists.newArrayList();
return false; for (Map<String, String> rowMap:result.rows) {
final List<String> row = Lists.newArrayList();
for (String col:result.columns) {
row.add(rowMap.get(col));
} }
rows.add(row);
} }


return true; this.columns = ImmutableList.copyOf(result.columns);
this.rows = rows;
} }
}


public class Table { public TabularResult(List<String> columns, List<List<String>> rows) {
private List<String> columnNames; this.columns = columns;
private List<List<Object>> records; this.rows = rows;

public Table(List<String> columnNames, List<List<Object>> records) {
this.columnNames = columnNames;
this.records = records;
} }


public boolean isEmpty() { return getColumnNames().isEmpty(); } public boolean isEmpty() {
return columns.isEmpty();
}


public List<String> getColumnNames() { public List<String> getColumns() {
return columnNames; return columns;
} }


public List<List<Object>> getRecords() { public List<List<String>> getRows() {
return records; return rows;
} }
} }
} }
Expand Up @@ -18,35 +18,34 @@


package org.apache.drill.exec.server.rest; package org.apache.drill.exec.server.rest;


import java.util.ArrayList; import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;


import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;


import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle; import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener; import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;


import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import parquet.Preconditions;


@XmlRootElement @XmlRootElement
public class QueryWrapper { public class QueryWrapper {
Expand Down Expand Up @@ -80,24 +79,36 @@ public UserBitShared.QueryType getType() {
return type; return type;
} }


public List<Map<String, Object>> run(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) public QueryResult run(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator)
throws Exception { throws Exception {
try(DrillClient client = new DrillClient(config, coordinator, allocator)){ try(DrillClient client = new DrillClient(config, coordinator, allocator)){
Listener listener = new Listener(new RecordBatchLoader(allocator)); Listener listener = new Listener(allocator);


client.connect(); client.connect();
client.runQuery(getType(), query, listener); client.runQuery(getType(), query, listener);


List<Map<String, Object>> result = listener.waitForCompletion(); listener.waitForCompletion();
if (result.isEmpty()) { if (listener.results.isEmpty()) {
Map<String, Object> dumbRecord = new HashMap<>(); listener.results.add(Maps.<String, String>newHashMap());
for (String columnName : listener.getColumnNames()) { }
dumbRecord.put(columnName, null); final Map<String, String> first = listener.results.get(0);
for (String columnName : listener.columns) {
if (!first.containsKey(columnName)) {
first.put(columnName, null);
} }
result.add(dumbRecord);
} }


return result; return new QueryResult(listener.columns, listener.results);
}
}

public static class QueryResult {
public final Collection<String> columns;
public final List<Map<String, String>> rows;

public QueryResult(Collection<String> columns, List<Map<String, String>> rows) {
this.columns = columns;
this.rows = rows;
} }
} }


Expand All @@ -109,15 +120,13 @@ public String toString() {


private static class Listener implements UserResultsListener { private static class Listener implements UserResultsListener {
private volatile Exception exception; private volatile Exception exception;
private AtomicInteger count = new AtomicInteger(); private final CountDownLatch latch = new CountDownLatch(1);
private CountDownLatch latch = new CountDownLatch(1); private final BufferAllocator allocator;
private List<Map<String, Object>> output = new LinkedList<>(); public final List<Map<String, String>> results = Lists.newArrayList();
private ArrayList<String> columnNames; public final Set<String> columns = Sets.newLinkedHashSet();
private RecordBatchLoader loader;
private boolean schemaAdded = false; Listener(BufferAllocator allocator) {

this.allocator = Preconditions.checkNotNull(allocator, "allocator cannot be null");
Listener(RecordBatchLoader loader) {
this.loader = loader;
} }


@Override @Override
Expand All @@ -129,68 +138,45 @@ public void submissionFailed(RpcException ex) {


@Override @Override
public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) { public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
int rows = result.getHeader().getRowCount(); try {
if (result.hasData()) { final int rows = result.getHeader().getRowCount();
count.addAndGet(rows); if (result.hasData()) {
try { final RecordBatchLoader loader = new RecordBatchLoader(allocator);
loader.load(result.getHeader().getDef(), result.getData()); loader.load(result.getHeader().getDef(), result.getData());
if (!schemaAdded || output.isEmpty()) { for (int i = 0; i < loader.getSchema().getFieldCount(); ++i) {
columnNames = new ArrayList<>(); columns.add(loader.getSchema().getColumn(i).getPath().getAsUnescapedPath());
for (int i = 0; i < loader.getSchema().getFieldCount(); ++i) {
columnNames.add(loader.getSchema().getColumn(i).getPath().getAsUnescapedPath());
}
schemaAdded = true;
} }
} catch (SchemaChangeException e) { for (int i = 0; i < rows; ++i) {
throw new RuntimeException(e); final Map<String, String> record = Maps.newHashMap();
} for (VectorWrapper<?> vw : loader) {
for (int i = 0; i < rows; ++i) { final String field = vw.getValueVector().getMetadata().getNamePart().getName();
Map<String, Object> record = new HashMap<>(); final ValueVector.Accessor accessor = vw.getValueVector().getAccessor();
int j = 0; final Object value = i < accessor.getValueCount() ? accessor.getObject(i) : null;
for (VectorWrapper<?> vw : loader) { final String display = value == null ? null : value.toString();
ValueVector.Accessor accessor = vw.getValueVector().getAccessor(); record.put(field, display);
Object object = accessor.getObject(i);
if (object != null && (! object.getClass().getName().startsWith("java.lang"))) {
object = object.toString();
}
if (object != null) {
record.put(columnNames.get(j), object);
} else {
record.put(columnNames.get(j), null);
} }
++j; results.add(record);
} }
output.add(record);
} }
} else if (!schemaAdded) { } catch (SchemaChangeException e) {
columnNames = new ArrayList<>(); throw new RuntimeException(e);
schemaAdded = true; } finally {
for (SerializedField fmd : result.getHeader().getDef().getFieldList()) { result.release();
MaterializedField fieldDef = MaterializedField.create(fmd); if (result.getHeader().getIsLastChunk()) {
columnNames.add(fieldDef.getPath().getAsUnescapedPath()); latch.countDown();
} }
} }

result.release();
if (result.getHeader().getIsLastChunk()) {
latch.countDown();
}
} }


@Override @Override
public void queryIdArrived(UserBitShared.QueryId queryId) { public void queryIdArrived(UserBitShared.QueryId queryId) {
} }


public List<Map<String, Object>> waitForCompletion() throws Exception { public void waitForCompletion() throws Exception {
latch.await(); latch.await();
if (exception != null) { if (exception != null) {
throw exception; throw exception;
} }
return output;
}

public List<String> getColumnNames() {
return new ArrayList<String> (columnNames);
} }
} }
} }
8 changes: 4 additions & 4 deletions exec/java-exec/src/main/resources/rest/query/result.ftl
Expand Up @@ -33,16 +33,16 @@
<table id="result" class="table table-striped table-bordered table-condensed" style="table-layout: auto; width=100%;"> <table id="result" class="table table-striped table-bordered table-condensed" style="table-layout: auto; width=100%;">
<thead> <thead>
<tr> <tr>
<#list model.getColumnNames() as value> <#list model.getColumns() as value>
<th>${value}</th> <th>${value?html}</th>
</#list> </#list>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
<#list model.getRecords() as record> <#list model.getRows() as record>
<tr> <tr>
<#list record as value> <#list record as value>
<td><#if value??>${value}<#else>null</#if></td> <td>${value!"null"?html}</td>
</#list> </#list>
</tr> </tr>
</#list> </#list>
Expand Down

0 comments on commit 3d863b5

Please sign in to comment.