Skip to content

Commit

Permalink
DRILL-7258: Remove field width limit for text reader
Browse files Browse the repository at this point in the history
The V2 text reader enforced a limit of 64K characters when using
column headers, but not when using the columns[] array. The V3 reader
enforced the 64K limit in both cases.

This patch removes the limit in both cases. The limit now is the
16MB vector size limit. With headers, no one column can exceed 16MB.
With the columns[] array, no one row can exceed 16MB. (The 16MB
limit is set by the Netty memory allocator.)

Added an "appendBytes()" method to the scalar column writer which adds
additional bytes to those already written for a specific column or
array element value. The method is implemented for VarChar, Var16Char
 and VarBinary vectors. It throws an exception for all other types.

When used with a type conversion shim, the appendBytes() method throws
an exception. This should be OK because, the previous setBytes() should
have failed because a huge value is not acceptable for numeric or date
types conversions.

Added unit tests of the append feature, and for the append feature in
the batch overflow case (when appending bytes causes the vector or
batch to overflow.) Also added tests to verify the lack of column width
limit with the text reader, both with and without headers.

closes #1802
  • Loading branch information
paul-rogers authored and arina-ielchiieva committed Jun 7, 2019
1 parent 2615d68 commit 8a7007f
Show file tree
Hide file tree
Showing 25 changed files with 376 additions and 27 deletions.
Expand Up @@ -105,6 +105,9 @@ public void rollover() {
@Override
public void nextElement() { }

@Override
public void prevElement() { }

@Override
public ColumnWriterIndex outerIndex() { return null; }

Expand Down
Expand Up @@ -17,20 +17,26 @@
*/
package org.apache.drill.exec.store.easy.text.compliant.v3;

import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.rowSet.RowSetLoader;
import org.apache.drill.exec.vector.accessor.ScalarWriter;

public abstract class BaseFieldOutput extends TextOutput {

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class);
private static final int MAX_FIELD_LENGTH = 1024 * 64;
/**
* Width of the per-field data buffer. Fields can be larger.
* In that case, subsequent buffers are appended to the vector
* to form the full field.
*/
private static final int BUFFER_LEN = 1024;

// track which field is getting appended
protected int currentFieldIndex = -1;
// track chars within field
protected int currentDataPointer;
// track if field is still getting appended
private boolean fieldOpen = true;
// number of bytes written to field thus far
protected int fieldWriteCount;
// holds chars for a field
protected byte[] fieldBytes;
protected final RowSetLoader writer;
Expand Down Expand Up @@ -84,7 +90,7 @@ public BaseFieldOutput(RowSetLoader writer, int maxField, boolean[] projectionMa
// If we project at least one field, allocate a buffer.

if (maxField >= 0) {
fieldBytes = new byte[MAX_FIELD_LENGTH];
fieldBytes = new byte[BUFFER_LEN];
}
}

Expand All @@ -104,6 +110,7 @@ public void startField(int index) {
assert index == currentFieldIndex + 1;
currentFieldIndex = index;
currentDataPointer = 0;
fieldWriteCount = 0;
fieldOpen = true;

// Figure out if this field is projected.
Expand All @@ -122,18 +129,41 @@ public void append(byte data) {
if (! fieldProjected) {
return;
}
if (currentDataPointer >= MAX_FIELD_LENGTH - 1) {
throw UserException
.unsupportedError()
.message("Text column is too large.")
.addContext("Column", currentFieldIndex)
.addContext("Limit", MAX_FIELD_LENGTH)
.build(logger);
if (currentDataPointer >= BUFFER_LEN - 1) {
writeToVector();
}

fieldBytes[currentDataPointer++] = data;
}


/**
* Write a buffer of data to the underlying vector using the
* column writer. The buffer holds a complete or partial chunk
* of data for the field. If this is the first data for the field,
* write the bytes. If this is a second buffer for the same field,
* append the bytes. The append will work if the underlying vector
* is VarChar, it will fail if a type conversion shim is in between.
* (This is generally OK because the previous setBytes should have
* failed because a large int or date is not supported.)
*/

protected void writeToVector() {
if (!fieldProjected) {
return;
}
ScalarWriter colWriter = columnWriter();
if (fieldWriteCount == 0) {
colWriter.setBytes(fieldBytes, currentDataPointer);
} else {
colWriter.appendBytes(fieldBytes, currentDataPointer);
}
fieldWriteCount += currentDataPointer;
currentDataPointer = 0;
}

protected abstract ScalarWriter columnWriter();

@Override
public boolean endField() {
fieldOpen = false;
Expand Down
Expand Up @@ -19,6 +19,7 @@

import org.apache.drill.exec.physical.rowSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ScalarWriter;

/**
* Class is responsible for generating record batches for text file inputs. We generate
Expand Down Expand Up @@ -52,11 +53,12 @@ private static boolean[] makeMask(RowSetLoader writer) {

@Override
public boolean endField() {
if (fieldProjected) {
writer.scalar(currentFieldIndex)
.setBytes(fieldBytes, currentDataPointer);
}

writeToVector();
return super.endField();
}

@Override
protected ScalarWriter columnWriter() {
return writer.scalar(currentFieldIndex);
}
}
Expand Up @@ -120,7 +120,7 @@ public boolean endField() {

// Save the field.

columnWriter.setBytes(fieldBytes, currentDataPointer);
writeToVector();
} else {

// The field is not projected.
Expand All @@ -134,4 +134,9 @@ public boolean endField() {

return super.endField();
}

@Override
protected ScalarWriter columnWriter() {
return columnWriter;
}
}
Expand Up @@ -17,22 +17,21 @@
*/
package org.apache.drill.exec.store.easy.text.compliant.v3;

import io.netty.buffer.DrillBuf;
import io.netty.util.internal.PlatformDependent;
import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.compress.CompressionInputStream;

import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;

import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck;
import io.netty.buffer.DrillBuf;
import io.netty.util.internal.PlatformDependent;

/**
* Class that fronts an InputStream to provide a byte consumption interface.
Expand Down
Expand Up @@ -37,12 +37,12 @@
import org.apache.drill.exec.vector.accessor.ArrayReader;
import org.apache.drill.exec.vector.accessor.ScalarReader;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetReader;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;

/**
* Exercise the vector overflow functionality for the result set loader.
Expand Down Expand Up @@ -706,4 +706,78 @@ public void testOverflowWithNullables() {

rsLoader.close();
}

@Test
public void testVectorSizeLimitWithAppend() {
TupleMetadata schema = new SchemaBuilder()
.add("s", MinorType.VARCHAR)
.buildSchema();
ResultSetOptions options = new OptionBuilder()
.setRowCountLimit(ValueVector.MAX_ROW_COUNT)
.setSchema(schema)
.build();
ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
RowSetLoader rootWriter = rsLoader.writer();

rsLoader.startBatch();
byte head[] = "abc".getBytes();
byte tail[] = new byte[523];
Arrays.fill(tail, (byte) 'X');
int count = 0;
ScalarWriter colWriter = rootWriter.scalar(0);
while (! rootWriter.isFull()) {
rootWriter.start();
colWriter.setBytes(head, head.length);
colWriter.appendBytes(tail, tail.length);
colWriter.appendBytes(tail, tail.length);
rootWriter.save();
count++;
}

// Number of rows should be driven by vector size.
// Our row count should include the overflow row

int valueLength = head.length + 2 * tail.length;
int expectedCount = ValueVector.MAX_BUFFER_SIZE / valueLength;
assertEquals(expectedCount + 1, count);

// Loader's row count should include only "visible" rows

assertEquals(expectedCount, rootWriter.rowCount());

// Total count should include invisible and look-ahead rows.

assertEquals(expectedCount + 1, rsLoader.totalRowCount());

// Result should exclude the overflow row

RowSet result = fixture.wrap(rsLoader.harvest());
assertEquals(expectedCount, result.rowCount());

// Verify that the values were, in fact, appended.

String expected = new String(head, Charsets.UTF_8);
expected += new String(tail, Charsets.UTF_8);
expected += new String(tail, Charsets.UTF_8);
RowSetReader reader = result.reader();
while (reader.next()) {
assertEquals(expected, reader.scalar(0).getString());
}
result.clear();

// Next batch should start with the overflow row

rsLoader.startBatch();
assertEquals(1, rootWriter.rowCount());
assertEquals(expectedCount + 1, rsLoader.totalRowCount());
result = fixture.wrap(rsLoader.harvest());
assertEquals(1, result.rowCount());
reader = result.reader();
while (reader.next()) {
assertEquals(expected, reader.scalar(0).getString());
}
result.clear();

rsLoader.close();
}
}
Expand Up @@ -29,6 +29,8 @@

public class BaseCsvTest extends ClusterTest {

protected final int BIG_COL_SIZE = 70_000;

protected static final String PART_DIR = "root";
protected static final String NESTED_DIR = "nested";
protected static final String ROOT_FILE = "first.csv";
Expand Down Expand Up @@ -118,4 +120,22 @@ protected static void buildFile(File file, String[] data) throws IOException {
}
}
}
protected String buildBigColFile(boolean withHeader) throws IOException {
String fileName = "hugeCol.csv";
try(PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) {
if (withHeader) {
out.println("id,big,n");
}
for (int i = 0; i < 10; i++) {
out.print(i + 1);
out.print(",");
for (int j = 0; j < BIG_COL_SIZE; j++) {
out.print((char) ((j + i) % 26 + 'A'));
}
out.print(",");
out.println((i + 1) * 10);
}
}
return fileName;
}
}
Expand Up @@ -981,4 +981,29 @@ public void testColumnsIndexMissingV3() throws IOException {
resetV3();
}
}

@Test
public void testHugeColumn() throws IOException {
String fileName = buildBigColFile(true);
try {
enableV3(true);
String sql = "SELECT * FROM `dfs.data`.`%s`";
RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
assertEquals(10, actual.rowCount());
RowSetReader reader = actual.reader();
while (reader.next()) {
int i = reader.logicalIndex();
assertEquals(Integer.toString(i + 1), reader.scalar(0).getString());
String big = reader.scalar(1).getString();
assertEquals(BIG_COL_SIZE, big.length());
for (int j = 0; j < BIG_COL_SIZE; j++) {
assertEquals((char) ((j + i) % 26 + 'A'), big.charAt(j));
}
assertEquals(Integer.toString((i + 1) * 10), reader.scalar(2).getString());
}
actual.clear();
} finally {
resetV3();
}
}
}
Expand Up @@ -442,4 +442,33 @@ public void testColumnsIndexOverflow() throws IOException {
resetV3();
}
}

@Test
public void testHugeColumn() throws IOException {
String fileName = buildBigColFile(false);
try {
enableV3(true);
String sql = "SELECT * FROM `dfs.data`.`%s`";
RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
assertEquals(10, actual.rowCount());
RowSetReader reader = actual.reader();
ArrayReader arrayReader = reader.array(0);
while (reader.next()) {
int i = reader.logicalIndex();
arrayReader.next();
assertEquals(Integer.toString(i + 1), arrayReader.scalar().getString());
arrayReader.next();
String big = arrayReader.scalar().getString();
assertEquals(BIG_COL_SIZE, big.length());
for (int j = 0; j < BIG_COL_SIZE; j++) {
assertEquals((char) ((j + i) % 26 + 'A'), big.charAt(j));
}
arrayReader.next();
assertEquals(Integer.toString((i + 1) * 10), arrayReader.scalar().getString());
}
actual.clear();
} finally {
resetV3();
}
}
}
Expand Up @@ -80,6 +80,9 @@ public int size() {
@Override
public final void nextElement() { }

@Override
public final void prevElement() { }

@Override
public void rollover() {
throw new UnsupportedOperationException("Rollover not supported in the row set writer.");
Expand Down
Expand Up @@ -34,8 +34,8 @@
import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.NullableScalarWriter;
import org.apache.drill.exec.vector.accessor.writer.ScalarArrayWriter;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.test.OperatorFixture;

/**
* Tests the performance of the writers compared to using the value
Expand Down Expand Up @@ -179,6 +179,9 @@ private static class TestWriterIndex implements ColumnWriterIndex {
@Override
public final void nextElement() { index++; }

@Override
public final void prevElement() { }

@Override
public void rollover() { }

Expand Down

0 comments on commit 8a7007f

Please sign in to comment.