Skip to content

Commit

Permalink
DRILL-6248: Added limit push down support for system tables
Browse files Browse the repository at this point in the history
1. PojoRecordReader started returning data in batches instead of returing all in one batch. Default batch size is 4000.
2. SystemTableScan supports limit push down while extrating data in record reader based on given max records to read.
3. Profiles and profiles_json tables apply limit push down while extracting data from store accessing profiles by range.

closes #1183
  • Loading branch information
arina-ielchiieva authored and vdiravka committed Mar 24, 2018
1 parent 3595664 commit 8663e8a
Show file tree
Hide file tree
Showing 14 changed files with 265 additions and 145 deletions.
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -30,10 +30,8 @@
import org.apache.drill.exec.planner.physical.PlannerSettings;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Iterators;

public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);

public AbstractGroupScan(String userName) {
super(userName);
Expand All @@ -45,7 +43,7 @@ public AbstractGroupScan(AbstractGroupScan that) {

@Override
public Iterator<PhysicalOperator> iterator() {
return Iterators.emptyIterator();
return Collections.emptyIterator();
}

@Override
Expand Down Expand Up @@ -135,7 +133,6 @@ public List<SchemaPath> getPartitionColumns() {

/**
* Default is not to support limit pushdown.
* @return
*/
@Override
@JsonIgnore
Expand All @@ -144,12 +141,12 @@ public boolean supportsLimitPushdown() {
}

/**
* By default, return null to indicate rowcount based prune is not supported.
* Each groupscan subclass should override, if it supports rowcount based prune.
* By default, return null to indicate row count based prune is not supported.
* Each group scan subclass should override, if it supports row count based prune.
*/
@Override
@JsonIgnore
public GroupScan applyLimit(long maxRecords) {
public GroupScan applyLimit(int maxRecords) {
return null;
}

Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -40,24 +40,24 @@ public interface GroupScan extends Scan, HasAffinity{
* 2) NULL is interpreted as ALL_COLUMNS.
* How to handle skipAll query is up to each storage plugin, with different policy in corresponding RecordReader.
*/
public static final List<SchemaPath> ALL_COLUMNS = ImmutableList.of(SchemaPath.STAR_COLUMN);
List<SchemaPath> ALL_COLUMNS = ImmutableList.of(SchemaPath.STAR_COLUMN);

public static final long NO_COLUMN_STATS = -1;
long NO_COLUMN_STATS = -1;

public abstract void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException;
void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException;

public abstract SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException;
SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException;

@JsonIgnore
public int getMaxParallelizationWidth();
int getMaxParallelizationWidth();

/**
* At minimum, the GroupScan requires these many fragments to run.
* Currently, this is used in {@link org.apache.drill.exec.planner.fragment.SimpleParallelizer}
* @return the minimum number of fragments that should run
*/
@JsonIgnore
public int getMinParallelizationWidth();
int getMinParallelizationWidth();

/**
* Check if GroupScan enforces width to be maximum parallelization width.
Expand All @@ -69,69 +69,69 @@ public interface GroupScan extends Scan, HasAffinity{
*/
@JsonIgnore
@Deprecated
public boolean enforceWidth();
boolean enforceWidth();

/**
* Returns a signature of the {@link GroupScan} which should usually be composed of
* all its attributes which could describe it uniquely.
*/
@JsonIgnore
public abstract String getDigest();
String getDigest();

@JsonIgnore
public ScanStats getScanStats(PlannerSettings settings);
ScanStats getScanStats(PlannerSettings settings);

/**
* Returns a clone of GroupScan instance, except that the new GroupScan will use the provided list of columns .
*/
public GroupScan clone(List<SchemaPath> columns);
GroupScan clone(List<SchemaPath> columns);

/**
* GroupScan should check the list of columns, and see if it could support all the columns in the list.
*/
public boolean canPushdownProjects(List<SchemaPath> columns);
boolean canPushdownProjects(List<SchemaPath> columns);

/**
* Return the number of non-null value in the specified column. Raise exception, if groupscan does not
* have exact column row count.
*/
public long getColumnValueCount(SchemaPath column);
long getColumnValueCount(SchemaPath column);

/**
* Whether or not this GroupScan supports pushdown of partition filters (directories for filesystems)
*/
public boolean supportsPartitionFilterPushdown();
boolean supportsPartitionFilterPushdown();

/**
* Returns a list of columns that can be used for partition pruning
*
*/
@JsonIgnore
public List<SchemaPath> getPartitionColumns();
List<SchemaPath> getPartitionColumns();

/**
* Whether or not this GroupScan supports limit pushdown
*/
public boolean supportsLimitPushdown();
boolean supportsLimitPushdown();

/**
* Apply rowcount based prune for "LIMIT n" query.
* @param maxRecords : the number of rows requested from group scan.
* @return a new instance of group scan if the prune is successful.
* null when either if row-based prune is not supported, or if prune is not successful.
*/
public GroupScan applyLimit(long maxRecords);
GroupScan applyLimit(int maxRecords);

/**
* Return true if this GroupScan can return its selection as a list of file names (retrieved by getFiles()).
*/
@JsonIgnore
public boolean hasFiles();
boolean hasFiles();

/**
* Returns a collection of file names associated with this GroupScan. This should be called after checking
* hasFiles(). If this GroupScan cannot provide file names, it returns null.
*/
public Collection<String> getFiles();
Collection<String> getFiles();

}
Expand Up @@ -1189,7 +1189,7 @@ public boolean supportsLimitPushdown() {
}

@Override
public GroupScan applyLimit(long maxRecords) {
public GroupScan applyLimit(int maxRecords) {
Preconditions.checkArgument(rowGroupInfos.size() >= 0);

maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup.
Expand Down
Expand Up @@ -41,15 +41,23 @@ public abstract class AbstractPojoRecordReader<T> extends AbstractRecordReader i

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPojoRecordReader.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(AbstractPojoRecordReader.class);
public static final int DEFAULT_RECORDS_PER_BATCH = 4000;

@JsonProperty private final int recordsPerBatch;
@JsonProperty protected final List<T> records;

protected List<PojoWriter> writers;

private Iterator<T> currentIterator;
private OperatorContext operatorContext;

protected AbstractPojoRecordReader(List<T> records) {
this(records, DEFAULT_RECORDS_PER_BATCH);
}

protected AbstractPojoRecordReader(List<T> records, int recordsPerBatch) {
this.records = records;
this.recordsPerBatch = Math.min(recordsPerBatch, DEFAULT_RECORDS_PER_BATCH);
}

@Override
Expand All @@ -65,7 +73,7 @@ public int next() {
injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger);

int recordCount = 0;
while (currentIterator.hasNext()) {
while (currentIterator.hasNext() && recordCount < recordsPerBatch) {
if (!allocated) {
allocate();
allocated = true;
Expand All @@ -88,7 +96,7 @@ public int next() {
@Override
public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
for (final ValueVector v : vectorMap.values()) {
AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
AllocationHelper.allocate(v, recordsPerBatch, 50, 10);
}
}

Expand Down
Expand Up @@ -45,12 +45,16 @@
public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>> {

@JsonProperty
private final LinkedHashMap<String, Class<?>> schema;
private LinkedHashMap<String, Class<?>> schema;

public DynamicPojoRecordReader(LinkedHashMap<String, Class<?>> schema, List<List<T>> records) {
super(records);
Preconditions.checkState(schema != null && !schema.isEmpty(), "Undefined schema is not allowed.");
this.schema = schema;
validateAndSetSchema(schema);
}

public DynamicPojoRecordReader(LinkedHashMap<String, Class<?>> schema, List<List<T>> records, int maxRecordsToRead) {
super(records, maxRecordsToRead);
validateAndSetSchema(schema);
}

/**
Expand Down Expand Up @@ -80,12 +84,16 @@ public String toString() {
"}";
}

private void validateAndSetSchema(LinkedHashMap<String, Class<?>> schema) {
Preconditions.checkState(schema != null && !schema.isEmpty(), "Undefined schema is not allowed.");
this.schema = schema;
}

/**
* An utility class that converts from {@link com.fasterxml.jackson.databind.JsonNode}
* to DynamicPojoRecordReader during physical plan fragment deserialization.
*/
public static class Converter extends StdConverter<JsonNode, DynamicPojoRecordReader>
{
public static class Converter extends StdConverter<JsonNode, DynamicPojoRecordReader> {
private static final TypeReference<LinkedHashMap<String, Class<?>>> schemaType =
new TypeReference<LinkedHashMap<String, Class<?>>>() {};

Expand All @@ -105,7 +113,8 @@ public DynamicPojoRecordReader convert(JsonNode value) {
for (Class<?> fieldType : schema.values()) {
records.add(mapper.convertValue(recordsIterator.next(), fieldType));
}
return new DynamicPojoRecordReader(schema, Collections.singletonList(records));
int maxRecordsToRead = value.get("recordsPerBatch").asInt();
return new DynamicPojoRecordReader(schema, Collections.singletonList(records), maxRecordsToRead);
}
}
}
Expand Up @@ -43,6 +43,12 @@ public PojoRecordReader(Class<T> pojoClass, List<T> records) {
this.fields = new ArrayList<>();
}

public PojoRecordReader(Class<T> pojoClass, List<T> records, int maxRecordToRead) {
super(records, maxRecordToRead);
this.pojoClass = pojoClass;
this.fields = new ArrayList<>();
}

/**
* Creates writers based on pojo field class types. Ignores static fields.
*
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -37,24 +37,25 @@ public class ProfileInfoIterator extends ProfileIterator {

private final Iterator<ProfileInfo> itr;

public ProfileInfoIterator(ExecutorFragmentContext context) {
super(context);
itr = iterateProfileInfo();
public ProfileInfoIterator(ExecutorFragmentContext context, int maxRecords) {
super(context, maxRecords);
this.itr = iterateProfileInfo();
}

@Override
protected Iterator<Entry<String, QueryProfile>> getProfiles(int skip, int take) {
return profileStoreContext
.getCompletedProfileStore()
.getRange(skip, take);
}

//Returns an iterator for authorized profiles
private Iterator<ProfileInfo> iterateProfileInfo() {
try {
//Transform authorized profiles to iterator for ProfileInfo
return transform(
getAuthorizedProfiles(
profileStoreContext
.getCompletedProfileStore()
.getAll(),
queryingUsername, isAdmin));

return transform(getAuthorizedProfiles(queryingUsername, isAdmin));
} catch (Exception e) {
logger.error(e.getMessage());
logger.error(e.getMessage(), e);
return Iterators.singletonIterator(ProfileInfo.getDefault());
}
}
Expand Down Expand Up @@ -109,7 +110,7 @@ public void remove() {
}

public static class ProfileInfo {
private static final String UnknownValue = "N/A";
private static final String UNKNOWN_VALUE = "N/A";

private static final ProfileInfo DEFAULT = new ProfileInfo();

Expand Down Expand Up @@ -144,14 +145,16 @@ public ProfileInfo(String query_id, Timestamp time, String foreman, long fragmen
}

private ProfileInfo() {
this(UnknownValue, new Timestamp(0), UnknownValue, 0L, UnknownValue, UnknownValue, 0L, 0L, 0L, UnknownValue, UnknownValue);
this(UNKNOWN_VALUE, new Timestamp(0), UNKNOWN_VALUE, 0L,
UNKNOWN_VALUE, UNKNOWN_VALUE, 0L, 0L,
0L, UNKNOWN_VALUE, UNKNOWN_VALUE);
}

/**
* If unable to get ProfileInfo, use this default instance instead.
* @return the default instance
*/
public static final ProfileInfo getDefault() {
public static ProfileInfo getDefault() {
return DEFAULT;
}
}
Expand Down

0 comments on commit 8663e8a

Please sign in to comment.