Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
TAJO-2010: Parquet can not read null value.
Browse files Browse the repository at this point in the history
Closes #903
  • Loading branch information
jinossy committed Dec 4, 2015
1 parent 0ec2a89 commit 80218d0
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 28 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Expand Up @@ -55,6 +55,8 @@ Release 0.12.0 - unreleased

BUG FIXES

TAJO-2010: Parquet can not read null value. (jinho)

TAJO-2001: DirectRawFileScanner.getProgress occasionally fails. (jinho)

TAJO-1753: GlobalEngine causes NPE occurs occasionally. (jinho)
Expand Down
Expand Up @@ -35,6 +35,7 @@
import parquet.schema.Type;

import java.nio.ByteBuffer;
import java.util.Arrays;

/**
* Converter to convert a Parquet record into a Tajo Tuple.
Expand Down Expand Up @@ -146,22 +147,16 @@ public Converter getConverter(int fieldIndex) {
*/
@Override
public void start() {
currentTuple = new VTuple(projectionMap.length);
Datum[] datums = new Datum[projectionMap.length];
Arrays.fill(datums, NullDatum.get());
currentTuple = new VTuple(datums);
}

/**
* Called after all fields have been processed.
*/
@Override
public void end() {
for (int i = 0; i < projectionMap.length; ++i) {
final int projectionIndex = projectionMap[i];
Column column = tajoReadSchema.getColumn(projectionIndex);
if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE
|| currentTuple.isBlankOrNull(i)) {
set(projectionIndex, NullDatum.get());
}
}
}

/**
Expand Down
Expand Up @@ -18,8 +18,8 @@

package org.apache.tajo.storage.parquet;

import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.Tuple;
import parquet.io.api.GroupConverter;
import parquet.io.api.RecordMaterializer;
Expand All @@ -35,24 +35,13 @@ class TajoRecordMaterializer extends RecordMaterializer<Tuple> {
* Creates a new TajoRecordMaterializer.
*
* @param parquetSchema The Parquet schema of the projection.
* @param tajoSchema The Tajo schema of the projection.
* @param tajoRequestSchema The Tajo schema of the projection.
* @param tajoReadSchema The Tajo schema of the table.
*/
public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema,
public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoRequestSchema,
Schema tajoReadSchema) {
int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema);
this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema,
projectionMap);
}

private int[] getProjectionMap(Schema schema, Schema projection) {
Column[] targets = projection.toArray();
int[] projectionMap = new int[targets.length];
for (int i = 0; i < targets.length; ++i) {
int tid = schema.getColumnId(targets[i].getQualifiedName());
projectionMap[i] = tid;
}
return projectionMap;
int[] projectionMap = PlannerUtil.getTargetIds(tajoReadSchema, tajoRequestSchema.toArray());
this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema, projectionMap);
}

/**
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatumFactory;
import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.rcfile.RCFile;
Expand All @@ -55,10 +56,9 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

@RunWith(Parameterized.class)
public class TestStorages {
Expand Down Expand Up @@ -540,6 +540,116 @@ public void testNullHandlingTypes() throws IOException {
scanner.close();
}

@Test
public void testNullHandlingTypesWithProjection() throws IOException {
if (internalType) return;

boolean handleProtobuf = !dataFormat.equalsIgnoreCase(BuiltinStorages.JSON);

Schema schema = new Schema();
schema.addColumn("col1", Type.BOOLEAN);
schema.addColumn("col2", Type.CHAR, 7);
schema.addColumn("col3", Type.INT2);
schema.addColumn("col4", Type.INT4);
schema.addColumn("col5", Type.INT8);
schema.addColumn("col6", Type.FLOAT4);
schema.addColumn("col7", Type.FLOAT8);
schema.addColumn("col8", Type.TEXT);
schema.addColumn("col9", Type.BLOB);
schema.addColumn("col10", Type.INET4);
schema.addColumn("col11", Type.NULL_TYPE);

if (handleProtobuf) {
schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
}

KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(dataFormat, options);
meta.setPropertySet(CatalogUtil.newDefaultProperty(dataFormat));
meta.putProperty(StorageConstants.TEXT_NULL, "\\\\N");
meta.putProperty(StorageConstants.RCFILE_NULL, "\\\\N");
meta.putProperty(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
meta.putProperty(StorageConstants.SEQUENCEFILE_NULL, "\\");
if (dataFormat.equalsIgnoreCase("AVRO")) {
meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
}

Path tablePath = new Path(testDir, "testProjectedNullHandlingTypes.data");
FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.init();

QueryId queryid = new QueryId("12345", 5);
ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
int columnNum = 11 + (handleProtobuf ? 1 : 0);
VTuple seedTuple = new VTuple(columnNum);
seedTuple.put(new Datum[]{
DatumFactory.createBool(true), // 0
DatumFactory.createChar("hyunsik"), // 2
DatumFactory.createInt2((short) 17), // 3
DatumFactory.createInt4(59), // 4
DatumFactory.createInt8(23l), // 5
DatumFactory.createFloat4(77.9f), // 6
DatumFactory.createFloat8(271.9f), // 7
DatumFactory.createText("hyunsik"), // 8
DatumFactory.createBlob("hyunsik".getBytes()),// 9
DatumFactory.createInet4("192.168.0.1"), // 10
NullDatum.get(), // 11
});

if (handleProtobuf) {
seedTuple.put(11, factory.createDatum(queryid.getProto())); // 12
}

// Making tuples with different null column positions
Tuple tuple;
for (int i = 0; i < columnNum; i++) {
tuple = new VTuple(columnNum);
for (int j = 0; j < columnNum; j++) {
if (i == j) { // i'th column will have NULL value
tuple.put(j, NullDatum.get());
} else {
tuple.put(j, seedTuple.get(j));
}
}
appender.addTuple(tuple);
}
appender.flush();
appender.close();


// Making projection schema with different column positions
Schema target = new Schema();
Random random = new Random();
for (int i = 1; i < schema.size(); i++) {
int num = random.nextInt(schema.size() - 1) + 1;
if (i % num == 0) {
target.addColumn(schema.getColumn(i));
}
}

FileStatus status = fs.getFileStatus(tablePath);
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, target);
scanner.init();

Tuple retrieved;
int[] targetIds = PlannerUtil.getTargetIds(schema, target.toArray());
int i = 0;
while ((retrieved = scanner.next()) != null) {
assertEquals(target.size(), retrieved.size());
for (int j = 0; j < targetIds.length; j++) {
if (i == targetIds[j]) {
assertEquals(NullDatum.get(), retrieved.asDatum(j));
} else {
assertEquals(seedTuple.get(targetIds[j]), retrieved.asDatum(j));
}
}
i++;
}
scanner.close();
}

@Test
public void testRCFileTextSerializeDeserialize() throws IOException {
if(!dataFormat.equalsIgnoreCase(BuiltinStorages.RCFILE)) return;
Expand Down

0 comments on commit 80218d0

Please sign in to comment.