Skip to content

Commit

Permalink
DRILL-3735: For partition pruning divide up the partition lists into …
Browse files Browse the repository at this point in the history
…sublists of 64K each and iterate over each sublist.

Add abstract base class for various partition descriptors.  Add logging messages in PruneScanRule for better debuggability.

Address review comments.

Close #156
  • Loading branch information
Aman Sinha committed Sep 21, 2015
1 parent 3c89b30 commit 9f54aac
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 112 deletions.
Expand Up @@ -18,11 +18,13 @@
package org.apache.drill.exec.planner.sql; package org.apache.drill.exec.planner.sql;


import io.netty.buffer.DrillBuf; import io.netty.buffer.DrillBuf;

import org.apache.calcite.util.BitSets; import org.apache.calcite.util.BitSets;
import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.AbstractPartitionDescriptor;
import org.apache.drill.exec.planner.PartitionDescriptor; import org.apache.drill.exec.planner.PartitionDescriptor;
import org.apache.drill.exec.planner.PartitionLocation; import org.apache.drill.exec.planner.PartitionLocation;
import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.DrillScanRel;
Expand All @@ -43,8 +45,10 @@
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;


import com.google.common.collect.Lists;

// Partition descriptor for hive tables // Partition descriptor for hive tables
public class HivePartitionDescriptor implements PartitionDescriptor { public class HivePartitionDescriptor extends AbstractPartitionDescriptor {


private final Map<String, Integer> partitionMap = new HashMap<>(); private final Map<String, Integer> partitionMap = new HashMap<>();
private final int numPartitionLevels; private final int numPartitionLevels;
Expand Down Expand Up @@ -106,16 +110,6 @@ public GroupScan createNewGroupScan(List<String> newFiles) throws ExecutionSetup
return newScan; return newScan;
} }


@Override
public List<PartitionLocation> getPartitions() {
List<PartitionLocation> partitions = new LinkedList<>();
HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry;
for (Partition partition: origEntry.getPartitions()) {
partitions.add(new HivePartitionLocation(partition.getValues(), partition.getSd().getLocation()));
}
return partitions;
}

@Override @Override
public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions,
BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) {
Expand Down Expand Up @@ -164,4 +158,15 @@ public Integer getIdIfValid(String name) {
return partitionMap.get(name); return partitionMap.get(name);
} }


@Override
protected void createPartitionSublists() {
List<PartitionLocation> locations = new LinkedList<>();
HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry;
for (Partition partition: origEntry.getPartitions()) {
locations.add(new HivePartitionLocation(partition.getValues(), partition.getSd().getLocation()));
}
locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
sublistsCreated = true;
}

} }
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner;

import java.util.Iterator;
import java.util.List;

/**
* Abstract base class for file system based partition descriptors and Hive partition descriptors.
*
*/
public abstract class AbstractPartitionDescriptor implements PartitionDescriptor, Iterable<List<PartitionLocation>> {

/**
* A sequence of sublists of partition locations combined into a single super list.
* The size of each sublist is at most {@link PartitionDescriptor.PARTITION_BATCH_SIZE}
* For example if the size is 3, the complete list could be: {(a, b, c), {d, e, f), (g, h)}
*/
protected List<List<PartitionLocation>> locationSuperList;
/**
* Flag to indicate if the sublists of the partition locations has been created
*/
protected boolean sublistsCreated = false;

/**
* Create sublists of the partition locations, each sublist of size
* at most {@link PartitionDescriptor.PARTITION_BATCH_SIZE}
*/
protected abstract void createPartitionSublists() ;

/**
* Iterator that traverses over the super list of partition locations and
* each time returns a single sublist of partition locations.
*/
@Override
public Iterator<List<PartitionLocation>> iterator() {
if (!sublistsCreated) {
createPartitionSublists();
}
return locationSuperList.iterator();
}

}
Expand Up @@ -24,7 +24,9 @@
import java.util.Map; import java.util.Map;


import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;

import org.apache.calcite.util.BitSets; import org.apache.calcite.util.BitSets;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos;
Expand All @@ -40,7 +42,7 @@




// partition descriptor for file system based tables // partition descriptor for file system based tables
public class FileSystemPartitionDescriptor implements PartitionDescriptor { public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {


static final int MAX_NESTED_SUBDIRS = 10; // allow up to 10 nested sub-directories static final int MAX_NESTED_SUBDIRS = 10; // allow up to 10 nested sub-directories


Expand Down Expand Up @@ -86,16 +88,6 @@ public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
return newScan; return newScan;
} }


@Override
public List<PartitionLocation> getPartitions() {
List<String> fileLocations = ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles();
List<PartitionLocation> partitions = new LinkedList<>();
for (String file: fileLocations) {
partitions.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
}
return partitions;
}

@Override @Override
public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions,
BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) {
Expand Down Expand Up @@ -133,4 +125,16 @@ private String getBaseTableLocation() {
final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection(); final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
return origSelection.getSelection().selectionRoot; return origSelection.getSelection().selectionRoot;
} }

@Override
protected void createPartitionSublists() {
List<String> fileLocations = ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles();
List<PartitionLocation> locations = new LinkedList<>();
for (String file: fileLocations) {
locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
}
locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
sublistsCreated = true;
}

} }
Expand Up @@ -29,6 +29,8 @@
import org.apache.drill.exec.store.parquet.ParquetGroupScan; import org.apache.drill.exec.store.parquet.ParquetGroupScan;
import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.ValueVector;


import com.google.common.collect.Lists;

import java.io.IOException; import java.io.IOException;
import java.util.BitSet; import java.util.BitSet;
import java.util.LinkedList; import java.util.LinkedList;
Expand All @@ -40,7 +42,7 @@
/** /**
* PartitionDescriptor that describes partitions based on column names instead of directory structure * PartitionDescriptor that describes partitions based on column names instead of directory structure
*/ */
public class ParquetPartitionDescriptor implements PartitionDescriptor { public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {


private final List<SchemaPath> partitionColumns; private final List<SchemaPath> partitionColumns;
private final DrillScanRel scanRel; private final DrillScanRel scanRel;
Expand Down Expand Up @@ -84,16 +86,6 @@ public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
return newScan; return newScan;
} }


@Override
public List<PartitionLocation> getPartitions() {
Set<String> fileLocations = ((ParquetGroupScan) scanRel.getGroupScan()).getFileSet();
List<PartitionLocation> partitions = new LinkedList<>();
for (String file: fileLocations) {
partitions.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
}
return partitions;
}

@Override @Override
public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions,
BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) {
Expand Down Expand Up @@ -125,4 +117,16 @@ private String getBaseTableLocation() {
final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection(); final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
return origSelection.getSelection().selectionRoot; return origSelection.getSelection().selectionRoot;
} }

@Override
protected void createPartitionSublists() {
Set<String> fileLocations = ((ParquetGroupScan) scanRel.getGroupScan()).getFileSet();
List<PartitionLocation> locations = new LinkedList<>();
for (String file: fileLocations) {
locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
}
locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
sublistsCreated = true;
}

} }
Expand Up @@ -17,21 +17,20 @@
*/ */
package org.apache.drill.exec.planner; package org.apache.drill.exec.planner;


import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.ValueVector;


import java.io.IOException;
import java.util.BitSet; import java.util.BitSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;


// Interface used to describe partitions. Currently used by file system based partitions and hive partitions // Interface used to describe partitions. Currently used by file system based partitions and hive partitions
public interface PartitionDescriptor { public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {

public static final int PARTITION_BATCH_SIZE = Character.MAX_VALUE;


/* Get the hierarchy index of the given partition /* Get the hierarchy index of the given partition
* For eg: if we have the partition laid out as follows * For eg: if we have the partition laid out as follows
Expand All @@ -56,12 +55,10 @@ public interface PartitionDescriptor {


public GroupScan createNewGroupScan(List<String> newFiles) throws Exception; public GroupScan createNewGroupScan(List<String> newFiles) throws Exception;


public List<PartitionLocation> getPartitions();

/** /**
* Method creates an in memory representation of all the partitions. For each level of partitioning we * Method creates an in memory representation of all the partitions. For each level of partitioning we
* will create a value vector which this method will populate for all the partitions with the values of the * will create a value vector which this method will populate for all the partitions with the values of the
* partioning key * partitioning key
* @param vectors - Array of vectors in the container that need to be populated * @param vectors - Array of vectors in the container that need to be populated
* @param partitions - List of all the partitions that exist in the table * @param partitions - List of all the partitions that exist in the table
* @param partitionColumnBitSet - Partition columns selected in the query * @param partitionColumnBitSet - Partition columns selected in the query
Expand Down

0 comments on commit 9f54aac

Please sign in to comment.