Skip to content

Commit

Permalink
DRILL-3333: Parquet writer auto-partitioning and partition pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
StevenMPhillips committed Jun 24, 2015
1 parent 3aa82bc commit 5a34d81
Show file tree
Hide file tree
Showing 17 changed files with 1,237 additions and 274 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* limitations under the License. * limitations under the License.
*/ */


import java.lang.UnsupportedOperationException;

<@pp.dropOutputFile /> <@pp.dropOutputFile />
<@pp.changeOutputFile name="org/apache/drill/exec/store/AbstractRecordWriter.java" /> <@pp.changeOutputFile name="org/apache/drill/exec/store/AbstractRecordWriter.java" />
<#include "/@includes/license.ftl" /> <#include "/@includes/license.ftl" />
Expand All @@ -24,13 +26,29 @@


import org.apache.drill.exec.expr.holders.*; import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.vector.BitVector;
import org.apache.drill.exec.vector.BitVector.Accessor;
import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.drill.exec.vector.complex.reader.FieldReader;


import java.io.IOException; import java.io.IOException;
import java.lang.UnsupportedOperationException; import java.lang.UnsupportedOperationException;


public abstract class AbstractRecordWriter implements RecordWriter { public abstract class AbstractRecordWriter implements RecordWriter {


private Accessor newPartitionVector;

protected void setPartitionVector(BitVector newPartitionVector) {
this.newPartitionVector = newPartitionVector.getAccessor();
}

protected boolean newPartition(int index) {
return newPartitionVector.get(index) == 1;
}

public void checkForNewPartition(int index) {
// no op
}

@Override @Override
public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) { public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) {
throw new UnsupportedOperationException("Doesn't support writing Map'"); throw new UnsupportedOperationException("Doesn't support writing Map'");
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* limitations under the License. * limitations under the License.
*/ */


import org.apache.drill.exec.planner.physical.WriterPrel;

<@pp.dropOutputFile /> <@pp.dropOutputFile />
<@pp.changeOutputFile name="org/apache/drill/exec/store/EventBasedRecordWriter.java" /> <@pp.changeOutputFile name="org/apache/drill/exec/store/EventBasedRecordWriter.java" />
<#include "/@includes/license.ftl" /> <#include "/@includes/license.ftl" />
Expand All @@ -25,6 +27,8 @@
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;


import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.drill.exec.vector.complex.reader.FieldReader;
Expand Down Expand Up @@ -54,6 +58,7 @@ public int write(int recordCount) throws IOException {
int counter = 0; int counter = 0;


for (; counter < recordCount; counter++) { for (; counter < recordCount; counter++) {
recordWriter.checkForNewPartition(counter);
recordWriter.startRecord(); recordWriter.startRecord();
// write the current record // write the current record
for (FieldConverter converter : fieldConverters) { for (FieldConverter converter : fieldConverters) {
Expand All @@ -73,6 +78,9 @@ private void initFieldWriters() throws IOException {
try { try {
int fieldId = 0; int fieldId = 0;
for (VectorWrapper w : batch) { for (VectorWrapper w : batch) {
if (w.getField().getPath().equals(SchemaPath.getSimplePath(WriterPrel.PARTITION_COMPARATOR_FIELD))) {
continue;
}
FieldReader reader = w.getValueVector().getReader(); FieldReader reader = w.getValueVector().getReader();
FieldConverter converter = getConverter(recordWriter, fieldId++, w.getField().getLastName(), reader); FieldConverter converter = getConverter(recordWriter, fieldId++, w.getField().getLastName(), reader);
fieldConverters.add(converter); fieldConverters.add(converter);
Expand Down
103 changes: 103 additions & 0 deletions exec/java-exec/src/main/codegen/templates/NewValueFunctions.java
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<@pp.dropOutputFile />


<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/GNewValueFunctions.java" />
<#include "/@includes/license.ftl" />

package org.apache.drill.exec.expr.fn.impl;

import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.expr.holders.*;
import javax.inject.Inject;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.record.RecordBatch;

public class GNewValueFunctions {
<#list vv.types as type>
<#if type.major == "Fixed">
<#list type.minor as minor>
<#list vv.modes as mode>
<#if mode.name != "Repeated">
<#if !minor.class.startsWith("Decimal28") && !minor.class.startsWith("Decimal38") && !minor.class.startsWith("Interval")>
@SuppressWarnings("unused")
@FunctionTemplate(name = "newPartitionValue", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL)
public static class NewValue${minor.class}${mode.prefix} implements DrillSimpleFunc{
@Param ${mode.prefix}${minor.class}Holder in;
@Workspace ${mode.prefix}${minor.class}Holder previous;
@Workspace Boolean initialized;
@Output BitHolder out;
public void setup() {
initialized = false;
}
<#if mode.name == "Required">
public void eval() {
if (initialized) {
if (in.value == previous.value) {
out.value = 0;
} else {
previous.value = in.value;
out.value = 1;
}
} else {
previous.value = in.value;
out.value = 1;
initialized = true;
}
}
</#if>
<#if mode.name == "Optional">
public void eval() {
if (initialized) {
if (in.isSet == 0 && previous.isSet == 0) {
out.value = 0;
} else if (in.value == previous.value) {
out.value = 0;
} else {
previous.value = in.value;
previous.isSet = in.isSet;
out.value = 1;
}
} else {
previous.value = in.value;
previous.isSet = in.isSet;
out.value = 1;
initialized = true;
}
}
</#if>
}
</#if> <#-- minor.class.startWith -->

</#if> <#-- mode.name -->
</#list>
</#list>
</#if> <#-- type.major -->
</#list>
}
10 changes: 8 additions & 2 deletions exec/java-exec/src/main/codegen/templates/RecordWriter.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@


import org.apache.drill.exec.expr.holders.*; import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.drill.exec.vector.complex.reader.FieldReader;


Expand All @@ -43,10 +44,15 @@ public interface RecordWriter {


/** /**
* Update the schema in RecordWriter. Called at least once before starting writing the records. * Update the schema in RecordWriter. Called at least once before starting writing the records.
* @param schema * @param batch
* @throws IOException * @throws IOException
*/ */
void updateSchema(BatchSchema schema) throws IOException; void updateSchema(VectorAccessible batch) throws IOException;

/**
* Check if the writer should start a new partition, and if so, start a new partition
*/
public void checkForNewPartition(int index);


/** /**
* Called before starting writing fields in a record. * Called before starting writing fields in a record.
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* limitations under the License. * limitations under the License.
*/ */


import org.apache.drill.exec.store.AbstractRecordWriter;

import java.lang.Override; import java.lang.Override;
import java.lang.UnsupportedOperationException; import java.lang.UnsupportedOperationException;


Expand All @@ -31,6 +33,7 @@
import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.vector.*; import org.apache.drill.exec.vector.*;
import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.drill.exec.vector.complex.reader.FieldReader;
Expand All @@ -48,14 +51,16 @@
* *
* This is useful for text format writers such as CSV, TSV etc. * This is useful for text format writers such as CSV, TSV etc.
*/ */
public abstract class StringOutputRecordWriter implements RecordWriter { public abstract class StringOutputRecordWriter extends AbstractRecordWriter {


private final BufferAllocator allocator; private final BufferAllocator allocator;
protected StringOutputRecordWriter(BufferAllocator allocator){ protected StringOutputRecordWriter(BufferAllocator allocator){
this.allocator = allocator; this.allocator = allocator;
} }


public void updateSchema(BatchSchema schema) throws IOException { @Override
public void updateSchema(VectorAccessible batch) throws IOException {
BatchSchema schema = batch.getSchema();
List<String> columnNames = Lists.newArrayList(); List<String> columnNames = Lists.newArrayList();
for (int i=0; i < schema.getFieldCount(); i++) { for (int i=0; i < schema.getFieldCount(); i++) {
columnNames.add(schema.getColumn(i).getLastName()); columnNames.add(schema.getColumn(i).getLastName());
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;


import com.google.common.collect.Lists;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PlannerSettings;
Expand Down Expand Up @@ -122,4 +123,9 @@ public long getColumnValueCount(SchemaPath column) {
public int getOperatorType() { public int getOperatorType() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

@Override
public List<SchemaPath> getPartitionColumns() {
return Lists.newArrayList();
}
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -91,4 +91,11 @@ public interface GroupScan extends Scan, HasAffinity{
*/ */
public boolean supportsPartitionFilterPushdown(); public boolean supportsPartitionFilterPushdown();


/**
* Returns a list of columns that can be used for partition pruning
*
*/
@JsonIgnore
public List<SchemaPath> getPartitionColumns();

} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ protected void setupNewSchema() throws IOException {
try { try {
// update the schema in RecordWriter // update the schema in RecordWriter
stats.startSetup(); stats.startSetup();
recordWriter.updateSchema(incoming.getSchema()); recordWriter.updateSchema(incoming);
// Create two vectors for: // Create two vectors for:
// 1. Fragment unique id. // 1. Fragment unique id.
// 2. Summary: currently contains number of records written. // 2. Summary: currently contains number of records written.
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner;

import com.google.common.collect.Maps;
import org.apache.drill.common.expression.SchemaPath;

import java.util.List;
import java.util.Map;


/**
* PartitionDescriptor that describes partitions based on column names instead of directory structure
*/
public class ParquetPartitionDescriptor implements PartitionDescriptor {

private final List<SchemaPath> partitionColumns;

public ParquetPartitionDescriptor(List<SchemaPath> partitionColumns) {
this.partitionColumns = partitionColumns;
}

@Override
public int getPartitionHierarchyIndex(String partitionName) {
throw new UnsupportedOperationException();
}

@Override
public boolean isPartitionName(String name) {
return partitionColumns.contains(name);
}

@Override
public Integer getIdIfValid(String name) {
SchemaPath schemaPath = SchemaPath.getSimplePath(name);
int id = partitionColumns.indexOf(schemaPath);
if (id == -1) {
return null;
}
return id;
}

@Override
public int getMaxHierarchyLevel() {
return partitionColumns.size();
}
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ public static RuleSet getDrillBasicRules(QueryContext context) {


PruneScanRule.getFilterOnProject(context), PruneScanRule.getFilterOnProject(context),
PruneScanRule.getFilterOnScan(context), PruneScanRule.getFilterOnScan(context),
PruneScanRule.getFilterOnProjectParquet(context),
PruneScanRule.getFilterOnScanParquet(context),


/* /*
Convert from Calcite Logical to Drill Logical Rules. Convert from Calcite Logical to Drill Logical Rules.
Expand Down
Loading

0 comments on commit 5a34d81

Please sign in to comment.