Skip to content

Commit

Permalink
DRILL-3688: Drill should honor "skip.header.line.count" and "skip.foo…
Browse files Browse the repository at this point in the history
…ter.line.count" attribute of Hive table

1. Functionality to skip header and footer lines while reading Hive data.
2. Unit tests.
  • Loading branch information
arina-ielchiieva authored and parthchandra committed Mar 1, 2016
1 parent b9960f8 commit 84ce21c
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 15 deletions.
Expand Up @@ -18,9 +18,13 @@
package org.apache.drill.exec.store.hive; package org.apache.drill.exec.store.hive;


import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;


Expand All @@ -46,6 +50,8 @@
import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeException;
Expand All @@ -57,9 +63,9 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;


import com.google.common.collect.Lists; import com.google.common.collect.Lists;
Expand Down Expand Up @@ -95,15 +101,16 @@ public class HiveRecordReader extends AbstractRecordReader {
// Converter which converts data from partition schema to table schema. // Converter which converts data from partition schema to table schema.
private Converter partTblObjectInspectorConverter; private Converter partTblObjectInspectorConverter;


protected Object key, value; protected Object key;
protected org.apache.hadoop.mapred.RecordReader reader; protected RecordReader reader;
protected List<ValueVector> vectors = Lists.newArrayList(); protected List<ValueVector> vectors = Lists.newArrayList();
protected List<ValueVector> pVectors = Lists.newArrayList(); protected List<ValueVector> pVectors = Lists.newArrayList();
protected boolean empty; protected boolean empty;
private HiveConf hiveConf; private HiveConf hiveConf;
private FragmentContext fragmentContext; private FragmentContext fragmentContext;
private String defaultPartitionValue; private String defaultPartitionValue;
private final UserGroupInformation proxyUgi; private final UserGroupInformation proxyUgi;
private SkipRecordsInspector skipRecordsInspector;


protected static final int TARGET_RECORD_COUNT = 4000; protected static final int TARGET_RECORD_COUNT = 4000;


Expand All @@ -127,8 +134,9 @@ private void init() throws ExecutionSetupException {
// Get the configured default val // Get the configured default val
defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname); defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);


Properties tableProperties;
try { try {
final Properties tableProperties = MetaStoreUtils.getTableMetadata(table); tableProperties = MetaStoreUtils.getTableMetadata(table);
final Properties partitionProperties = final Properties partitionProperties =
(partition == null) ? tableProperties : (partition == null) ? tableProperties :
HiveUtilities.getPartitionMetadata(partition, table); HiveUtilities.getPartitionMetadata(partition, table);
Expand Down Expand Up @@ -220,7 +228,7 @@ private void init() throws ExecutionSetupException {
throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e); throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
} }
key = reader.createKey(); key = reader.createKey();
value = reader.createValue(); skipRecordsInspector = new SkipRecordsInspector(tableProperties, reader);
} }
} }


Expand Down Expand Up @@ -286,6 +294,16 @@ public Void call() throws Exception {
} }
} }


/**
* To take into account Hive "skip.header.lines.count" property first N values from file are skipped.
* Since file can be read in batches (depends on TARGET_RECORD_COUNT), additional checks are made
* to determine if it's new file or continuance.
*
* To take into account Hive "skip.footer.lines.count" property values are buffered in queue
* until queue size exceeds number of footer lines to skip, then first value in queue is retrieved.
* Buffer of value objects is used to re-use value objects in order to reduce number of created value objects.
* For each new file queue is cleared to drop footer lines from previous file.
*/
@Override @Override
public int next() { public int next() {
for (ValueVector vv : vectors) { for (ValueVector vv : vectors) {
Expand All @@ -297,18 +315,28 @@ public int next() {
} }


try { try {
skipRecordsInspector.reset();
int recordCount = 0; int recordCount = 0;
while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) { Object value;
Object deSerializedValue = partitionSerDe.deserialize((Writable) value); while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value = skipRecordsInspector.getNextValue())) {
if (partTblObjectInspectorConverter != null) { if (skipRecordsInspector.doSkipHeader(recordCount++)) {
deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue); continue;
} }
readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordCount); Object bufferedValue = skipRecordsInspector.bufferAdd(value);
recordCount++; if (bufferedValue != null) {
Object deSerializedValue = partitionSerDe.deserialize((Writable) bufferedValue);
if (partTblObjectInspectorConverter != null) {
deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
}
readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, skipRecordsInspector.getActualCount());
skipRecordsInspector.incrementActualCount();
}
skipRecordsInspector.incrementTempCount();
} }


setValueCountAndPopulatePartitionVectors(recordCount); setValueCountAndPopulatePartitionVectors(skipRecordsInspector.getActualCount());
return recordCount; skipRecordsInspector.updateContinuance();
return skipRecordsInspector.getActualCount();
} catch (IOException | SerDeException e) { } catch (IOException | SerDeException e) {
throw new DrillRuntimeException(e); throw new DrillRuntimeException(e);
} }
Expand Down Expand Up @@ -362,4 +390,126 @@ protected void populatePartitionVectors(int recordCount) {
vector.getMutator().setValueCount(recordCount); vector.getMutator().setValueCount(recordCount);
} }
} }

/**
* SkipRecordsInspector encapsulates logic to skip header and footer from file.
* Logic is applicable only for predefined in constructor file formats.
*/
private class SkipRecordsInspector {

private final Set<Object> fileFormats;
private int headerCount;
private int footerCount;
private Queue<Object> footerBuffer;
// indicates if we continue reading the same file
private boolean continuance;
private int holderIndex;
private List<Object> valueHolder;
private int actualCount;
// actualCount without headerCount, used to determine holderIndex
private int tempCount;

private SkipRecordsInspector(Properties tableProperties, RecordReader reader) {
this.fileFormats = new HashSet<Object>(Arrays.asList(org.apache.hadoop.mapred.TextInputFormat.class.getName()));
this.headerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, 0);
this.footerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, 0);
this.footerBuffer = Lists.newLinkedList();
this.continuance = false;
this.holderIndex = -1;
this.valueHolder = initializeValueHolder(reader, footerCount);
this.actualCount = 0;
this.tempCount = 0;
}

private boolean doSkipHeader(int recordCount) {
return !continuance && recordCount < headerCount;
}

private void reset() {
tempCount = holderIndex + 1;
actualCount = 0;
if (!continuance) {
footerBuffer.clear();
}
}

private Object bufferAdd(Object value) throws SerDeException {
footerBuffer.add(value);
if (footerBuffer.size() <= footerCount) {
return null;
}
return footerBuffer.poll();
}

private Object getNextValue() {
holderIndex = tempCount % getHolderSize();
return valueHolder.get(holderIndex);
}

private int getHolderSize() {
return valueHolder.size();
}

private void updateContinuance() {
this.continuance = actualCount != 0;
}

private int incrementTempCount() {
return ++tempCount;
}

private int getActualCount() {
return actualCount;
}

private int incrementActualCount() {
return ++actualCount;
}

/**
* Retrieves positive numeric property from Properties object by name.
* Return default value if
* 1. file format is absent in predefined file formats list
* 2. property doesn't exist in table properties
* 3. property value is negative
* otherwise casts value to int.
*
* @param tableProperties property holder
* @param propertyName name of the property
* @param defaultValue default value
* @return property numeric value
* @throws NumberFormatException if property value is non-numeric
*/
private int retrievePositiveIntProperty(Properties tableProperties, String propertyName, int defaultValue) {
int propertyIntValue = defaultValue;
if (!fileFormats.contains(tableProperties.get(hive_metastoreConstants.FILE_INPUT_FORMAT))) {
return propertyIntValue;
}
Object propertyObject = tableProperties.get(propertyName);
if (propertyObject != null) {
try {
propertyIntValue = Integer.valueOf((String) propertyObject);
} catch (NumberFormatException e) {
throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric", propertyName, propertyObject.toString()));
}
}
return propertyIntValue < 0 ? defaultValue : propertyIntValue;
}

/**
* Creates buffer of objects to be used as values, so these values can be re-used.
* Objects number depends on number of lines to skip in the end of the file plus one object.
*
* @param reader RecordReader to return value object
* @param skipFooterLines number of lines to skip at the end of the file
* @return list of objects to be used as values
*/
private List<Object> initializeValueHolder(RecordReader reader, int skipFooterLines) {
List<Object> valueHolder = new ArrayList<>(skipFooterLines + 1);
for (int i = 0; i <= skipFooterLines; i++) {
valueHolder.add(reader.createValue());
}
return valueHolder;
}
}
} }
Expand Up @@ -18,6 +18,7 @@
package org.apache.drill.exec.hive; package org.apache.drill.exec.hive;


import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
Expand All @@ -29,8 +30,11 @@
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Date; import java.sql.Date;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.Map;


import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;


public class TestHiveStorage extends HiveTestBase { public class TestHiveStorage extends HiveTestBase {
@BeforeClass @BeforeClass
Expand Down Expand Up @@ -409,6 +413,93 @@ public void readingFromStorageHandleBasedTable2() throws Exception {
} }
} }


@Test // DRILL-3688
public void readingFromSmallTableWithSkipHeaderAndFooter() throws Exception {
testBuilder()
.sqlQuery("select key, `value` from hive.skipper.kv_text_small order by key asc")
.ordered()
.baselineColumns("key", "value")
.baselineValues(1, "key_1")
.baselineValues(2, "key_2")
.baselineValues(3, "key_3")
.baselineValues(4, "key_4")
.baselineValues(5, "key_5")
.go();

testBuilder()
.sqlQuery("select count(1) as cnt from hive.skipper.kv_text_small")
.unOrdered()
.baselineColumns("cnt")
.baselineValues(5L)
.go();
}

@Test // DRILL-3688
public void readingFromLargeTableWithSkipHeaderAndFooter() throws Exception {
testBuilder()
.sqlQuery("select sum(key) as sum_keys from hive.skipper.kv_text_large")
.unOrdered()
.baselineColumns("sum_keys")
.baselineValues((long)(5000*(5000 + 1)/2))
.go();

testBuilder()
.sqlQuery("select count(1) as cnt from hive.skipper.kv_text_large")
.unOrdered()
.baselineColumns("cnt")
.baselineValues(5000L)
.go();
}

@Test // DRILL-3688
public void testIncorrectHeaderFooterProperty() throws Exception {
Map<String, String> testData = ImmutableMap.<String, String>builder()
.put("hive.skipper.kv_incorrect_skip_header","skip.header.line.count")
.put("hive.skipper.kv_incorrect_skip_footer", "skip.footer.line.count")
.build();

String query = "select * from %s";
String exceptionMessage = "Hive table property %s value 'A' is non-numeric";

for (Map.Entry<String, String> entry : testData.entrySet()) {
try {
test(String.format(query, entry.getKey()));
} catch (UserRemoteException e) {
assertThat(e.getMessage(), containsString(String.format(exceptionMessage, entry.getValue())));
}
}
}

@Test // DRILL-3688
public void testIgnoreSkipHeaderFooterForRcfile() throws Exception {
testBuilder()
.sqlQuery("select count(1) as cnt from hive.skipper.kv_rcfile_large")
.unOrdered()
.baselineColumns("cnt")
.baselineValues(5000L)
.go();
}

@Test // DRILL-3688
public void testIgnoreSkipHeaderFooterForParquet() throws Exception {
testBuilder()
.sqlQuery("select count(1) as cnt from hive.skipper.kv_parquet_large")
.unOrdered()
.baselineColumns("cnt")
.baselineValues(5000L)
.go();
}

@Test // DRILL-3688
public void testIgnoreSkipHeaderFooterForSequencefile() throws Exception {
testBuilder()
.sqlQuery("select count(1) as cnt from hive.skipper.kv_sequencefile_large")
.unOrdered()
.baselineColumns("cnt")
.baselineValues(5000L)
.go();
}

@AfterClass @AfterClass
public static void shutdownOptions() throws Exception { public static void shutdownOptions() throws Exception {
test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY)); test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
Expand Down
Expand Up @@ -51,6 +51,19 @@ public void showTablesFromDb() throws Exception{
.baselineValues("hive.db1", "kv_db1") .baselineValues("hive.db1", "kv_db1")
.baselineValues("hive.db1", "avro") .baselineValues("hive.db1", "avro")
.go(); .go();

testBuilder()
.sqlQuery("SHOW TABLES IN hive.skipper")
.unOrdered()
.baselineColumns("TABLE_SCHEMA", "TABLE_NAME")
.baselineValues("hive.skipper", "kv_text_small")
.baselineValues("hive.skipper", "kv_text_large")
.baselineValues("hive.skipper", "kv_incorrect_skip_header")
.baselineValues("hive.skipper", "kv_incorrect_skip_footer")
.baselineValues("hive.skipper", "kv_rcfile_large")
.baselineValues("hive.skipper", "kv_parquet_large")
.baselineValues("hive.skipper", "kv_sequencefile_large")
.go();
} }


@Test @Test
Expand All @@ -61,6 +74,7 @@ public void showDatabases() throws Exception{
.baselineColumns("SCHEMA_NAME") .baselineColumns("SCHEMA_NAME")
.baselineValues("hive.default") .baselineValues("hive.default")
.baselineValues("hive.db1") .baselineValues("hive.db1")
.baselineValues("hive.skipper")
.baselineValues("dfs.default") .baselineValues("dfs.default")
.baselineValues("dfs.root") .baselineValues("dfs.root")
.baselineValues("dfs.tmp") .baselineValues("dfs.tmp")
Expand Down

0 comments on commit 84ce21c

Please sign in to comment.