Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.expr.holders.BigIntHolder;
import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.Float4Holder;
import org.apache.drill.exec.expr.holders.Float8Holder;
import org.apache.drill.exec.expr.holders.IntHolder;
import org.apache.drill.exec.expr.holders.VarBinaryHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
Expand Down Expand Up @@ -84,7 +77,7 @@ public class AvroRecordReader extends AbstractRecordReader {
private final String opUserName;
private final String queryUserName;

private static final int DEFAULT_BATCH_SIZE = 1000;
private static final int DEFAULT_BATCH_SIZE = 4096;


public AvroRecordReader(final FragmentContext fragmentContext,
Expand All @@ -94,18 +87,6 @@ public AvroRecordReader(final FragmentContext fragmentContext,
final FileSystem fileSystem,
final List<SchemaPath> projectedColumns,
final String userName) {
this(fragmentContext, inputPath, start, length, fileSystem, projectedColumns, userName, DEFAULT_BATCH_SIZE);
}

public AvroRecordReader(final FragmentContext fragmentContext,
final String inputPath,
final long start,
final long length,
final FileSystem fileSystem,
List<SchemaPath> projectedColumns,
final String userName,
final int defaultBatchSize) {

hadoop = new Path(inputPath);
this.start = start;
this.end = start + length;
Expand Down Expand Up @@ -161,10 +142,9 @@ public int next() {
writer.reset();

try {

// XXX - Implement batch size

for (GenericContainer container = null; reader.hasNext() && !reader.pastSync(end); recordCount++) {
for (GenericContainer container = null;
recordCount < DEFAULT_BATCH_SIZE && reader.hasNext() && !reader.pastSync(end);
recordCount++) {
writer.setPosition(recordCount);
container = reader.next(container);
processRecord(container, container.getSchema());
Expand Down Expand Up @@ -284,56 +264,39 @@ private void processPrimitive(final Object value, final Schema.Type type, final
switch (type) {
case STRING:
byte[] binary = null;
final int length;
if (value instanceof Utf8) {
binary = ((Utf8) value).getBytes();
length = ((Utf8) value).getByteLength();
} else {
binary = value.toString().getBytes(Charsets.UTF_8);
length = binary.length;
}
final int length = binary.length;
final VarCharHolder vh = new VarCharHolder();
ensure(length);
buffer.setBytes(0, binary);
vh.buffer = buffer;
vh.start = 0;
vh.end = length;
writer.varChar(fieldName).write(vh);
writer.varChar(fieldName).writeVarChar(0, length, buffer);
break;
case INT:
final IntHolder ih = new IntHolder();
ih.value = (Integer) value;
writer.integer(fieldName).write(ih);
writer.integer(fieldName).writeInt((Integer) value);
break;
case LONG:
final BigIntHolder bh = new BigIntHolder();
bh.value = (Long) value;
writer.bigInt(fieldName).write(bh);
writer.bigInt(fieldName).writeBigInt((Long) value);
break;
case FLOAT:
final Float4Holder fh = new Float4Holder();
fh.value = (Float) value;
writer.float4(fieldName).write(fh);
writer.float4(fieldName).writeFloat4((Float) value);
break;
case DOUBLE:
final Float8Holder f8h = new Float8Holder();
f8h.value = (Double) value;
writer.float8(fieldName).write(f8h);
writer.float8(fieldName).writeFloat8((Double) value);
break;
case BOOLEAN:
final BitHolder bit = new BitHolder();
bit.value = (Boolean) value ? 1 : 0;
writer.bit(fieldName).write(bit);
writer.bit(fieldName).writeBit((Boolean) value ? 1 : 0);
break;
case BYTES:
// XXX - Not sure if this is correct. Nothing prints from sqlline for byte fields.
final VarBinaryHolder vb = new VarBinaryHolder();
final ByteBuffer buf = (ByteBuffer) value;
final byte[] bytes = buf.array();
ensure(bytes.length);
buffer.setBytes(0, bytes);
vb.buffer = buffer;
vb.start = 0;
vb.end = bytes.length;
writer.binary(fieldName).write(vb);
length = buf.remaining();
ensure(length);
buffer.setBytes(0, buf);
writer.binary(fieldName).writeVarBinary(0, length, buffer);
break;
case NULL:
// Nothing to do for null type
Expand All @@ -346,13 +309,10 @@ private void processPrimitive(final Object value, final Schema.Type type, final
} catch (UnsupportedEncodingException e) {
throw new DrillRuntimeException("Unable to read enum value for field: " + fieldName, e);
}
final VarCharHolder vch = new VarCharHolder();
ensure(b.length);
buffer.setBytes(0, b);
vch.buffer = buffer;
vch.start = 0;
vch.end = b.length;
writer.varChar(fieldName).write(vch);
writer.varChar(fieldName).writeVarChar(0, b.length, buffer);

break;
default:
throw new DrillRuntimeException("Unhandled Avro type: " + type.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private static void createRecordReadersData(String user, String group) throws Ex
fs.setOwner(dfsFile, user, group);
fs.setPermission(dfsFile, new FsPermission((short) 0700));

localFile = new Path(AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues());
localFile = new Path(AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath());
dfsFile = new Path(getUserHome(user), "simple.avro");
fs.copyFromLocalFile(localFile, dfsFile);
fs.setOwner(dfsFile, user, group);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,36 +32,65 @@ public class AvroFormatTest extends BaseTestQuery {
// 1. Need to test nested field names with same name as top-level names for conflict.
// 2. Avro supports recursive types? Can we test this?

@Test
public void testBatchCutoff() throws Exception {

final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(5000);
final String file = testSetup.getFilePath();
final String sql =
"select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null " +
"from dfs_test.`" + file + "`";
test(sql);
testBuilder()
.sqlQuery(sql)
.unOrdered()
.expectsNumBatches(2)
.baselineRecords(testSetup.getExpectedRecords())
.go();
}

@Test
public void testSimplePrimitiveSchema_NoNullValues() throws Exception {

final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
final String file = testSetup.getFilePath();
final String sql =
"select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null " +
"from dfs_test.`" + file + "`";
test(sql);
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineRecords(testSetup.getExpectedRecords())
.go();
}

@Test
public void testSimplePrimitiveSchema_StarQuery() throws Exception {

final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
final String file = testSetup.getFilePath();
final String sql = "select * from dfs_test.`" + file + "`";
test(sql);
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineRecords(testSetup.getExpectedRecords())
.go();
}

@Test
public void testSimplePrimitiveSchema_SelectColumnSubset() throws Exception {

final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath();
final String sql = "select h_boolean, e_double from dfs_test.`" + file + "`";
test(sql);
}

@Test
public void testSimplePrimitiveSchema_NoColumnsExistInTheSchema() throws Exception {

final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath();
final String sql = "select h_dummy1, e_dummy2 from dfs_test.`" + file + "`";
try {
test(sql);
Expand All @@ -75,7 +104,7 @@ public void testSimplePrimitiveSchema_NoColumnsExistInTheSchema() throws Excepti
@Test
public void testSimplePrimitiveSchema_OneExistAndOneDoesNotExistInTheSchema() throws Exception {

final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath();
final String sql = "select h_boolean, e_dummy2 from dfs_test.`" + file + "`";
try {
test(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.avro;

import com.google.common.base.Charsets;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.SchemaBuilder;
Expand All @@ -26,9 +27,13 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -38,7 +43,86 @@ public class AvroTestUtil {

public static final int RECORD_COUNT = 10;

public static String generateSimplePrimitiveSchema_NoNullValues() throws Exception {
public static class AvroTestSetup {
private String filePath;
private List<Map> expectedRecords;

public AvroTestSetup(String filePath, List<Map> expectedRecords) {
this.filePath = filePath;
this.expectedRecords = expectedRecords;
}
}

/**
* Class to write records to an Avro file while simultaneously
* constructing a corresponding list of records in the format taken in
* by the Drill test builder to describe expected results.
*/
public static class AvroTestRecordWriter implements Closeable {
private final List<Map> expectedRecords;
GenericData.Record currentAvroRecord;
Map<String, Object> currentExpectedRecord;
private Schema schema;
private final DataFileWriter writer;
private final String filePath;

private AvroTestRecordWriter(Schema schema, File file) {
writer = new DataFileWriter(new GenericDatumWriter(schema));
try {
writer.create(schema, file);
} catch (IOException e) {
throw new RuntimeException("Error creating file in Avro test setup.", e);
}
this.schema = schema;
currentExpectedRecord = new HashMap<>();
expectedRecords = new ArrayList<>();
filePath = file.getAbsolutePath();
}

public void startRecord() {
currentAvroRecord = new GenericData.Record(schema);
currentExpectedRecord = new HashMap<>();
}

public void put(String key, Object value) {
currentAvroRecord.put(key, value);
// convert binary values into byte[], the format they will be given
// in the Drill result set in the test framework
if (value instanceof ByteBuffer) {
ByteBuffer bb = ((ByteBuffer)value);
byte[] drillVal = new byte[((ByteBuffer)value).remaining()];
bb.get(drillVal);
bb.position(0);
value = new String(drillVal, Charsets.UTF_8);
}
currentExpectedRecord.put("`" + key + "`", value);
}

public void endRecord() throws IOException {
writer.append(currentAvroRecord);
expectedRecords.add(currentExpectedRecord);
}

@Override
public void close() throws IOException {
writer.close();
}

public String getFilePath() {
return filePath;
}

public List<Map>getExpectedRecords() {
return expectedRecords;
}
}


public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues() throws Exception {
return generateSimplePrimitiveSchema_NoNullValues(RECORD_COUNT);
}

public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues(int numRecords) throws Exception {

final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
.namespace("org.apache.drill.exec.store.avro")
Expand All @@ -56,15 +140,14 @@ public static String generateSimplePrimitiveSchema_NoNullValues() throws Excepti
final File file = File.createTempFile("avro-primitive-test", ".avro");
file.deleteOnExit();

final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
try {
writer.create(schema, file);

ByteBuffer bb = ByteBuffer.allocate(1);
bb.put(0, (byte) 1);
bb.put(0, (byte) 'a');

for (int i = 0; i < RECORD_COUNT; i++) {
final GenericRecord record = new GenericData.Record(schema);
for (int i = 0; i < numRecords; i++) {
bb.position(0);
record.startRecord();
record.put("a_string", "a_" + i);
record.put("b_int", i);
record.put("c_long", (long) i);
Expand All @@ -73,13 +156,13 @@ public static String generateSimplePrimitiveSchema_NoNullValues() throws Excepti
record.put("f_bytes", bb);
record.put("g_null", null);
record.put("h_boolean", (i % 2 == 0));
writer.append(record);
record.endRecord();
}
} finally {
writer.close();
record.close();
}

return file.getAbsolutePath();
return record;
}

public static String generateUnionSchema_WithNullValues() throws Exception {
Expand Down