Skip to content

Commit

Permalink
DRILL-7006: Add type conversion to row writers
Browse files Browse the repository at this point in the history
Modifies the column metadata and writer abstractions to allow a type conversion "shim" to be specified as part of the schema, then inserted as part of the row set writer. Allows, say, setting an Int or Date from a string, parsing the string to obtain the proper data type to store in the vector.

Type conversion not yet supported in the result set loader: some additional complexity needs to be resolved.

Adds unit tests for this functionality. Refactors some existing tests to remove rough edges.

closes #1623
  • Loading branch information
Paul Rogers authored and arina-ielchiieva committed Jan 30, 2019
1 parent 85c656e commit 8fb85cd
Show file tree
Hide file tree
Showing 13 changed files with 654 additions and 80 deletions.
Expand Up @@ -21,6 +21,8 @@
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;

/**
* Abstract definition of column metadata. Allows applications to create
Expand Down Expand Up @@ -178,6 +180,20 @@ public void setProjected(boolean projected) {
@Override
public boolean isProjected() { return projected; }

@Override
public void setDefaultValue(Object value) { }

@Override
public Object defaultValue() { return null; }

@Override
public void setTypeConverter(ColumnConversionFactory factory) {
throw new UnsupportedConversionError("Type conversion not supported for non-scalar writers");
}

@Override
public ColumnConversionFactory typeConverter() { return null; }

@Override
public String toString() {
final StringBuilder buf = new StringBuilder()
Expand Down
Expand Up @@ -23,16 +23,52 @@
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;

/**
* Primitive (non-map) column. Describes non-nullable, nullable and
* array types (which differ only in mode, but not in metadata structure.)
* Primitive (non-map) column. Describes non-nullable, nullable and array types
* (which differ only in mode, but not in metadata structure.)
* <p>
* Metadata is of two types:
* <ul>
* <li>Storage metadata that describes how the column is materialized in a
* vector. Storage metadata is immutable because revising an existing vector is
* a complex operation.</li>
* <li>Supplemental metadata used when reading or writing the column.
* Supplemental metadata can be changed after the column is created, though it
* should generally be set before invoking code that uses the metadata.</li>
* </ul>
*/

public class PrimitiveColumnMetadata extends AbstractColumnMetadata {

/**
* Expected (average) width for variable-width columns.
*/

private int expectedWidth;

/**
* Default value to use for filling a vector when no real data is
* available, such as for columns added in new files but which does not
* exist in existing files. The ultimate default value is the SQL null
* value, which works only for nullable columns.
*/

private Object defaultValue;

/**
* Factory for an optional shim writer that translates from the type of
* data available to the code that creates the vectors on the one hand,
* and the actual type of the column on the other. For example, a shim
* might parse a string form of a date into the form stored in vectors.
* <p>
* The default is to use the "natural" type: that is, to insert no
* conversion shim.
*/

private ColumnConversionFactory shimFactory;

public PrimitiveColumnMetadata(MaterializedField schema) {
super(schema);
expectedWidth = estimateWidth(schema.getType());
Expand Down Expand Up @@ -98,6 +134,22 @@ public void setExpectedWidth(int width) {
}
}

@Override
public void setDefaultValue(Object value) {
defaultValue = value;
}

@Override
public Object defaultValue() { return defaultValue; }

@Override
public void setTypeConverter(ColumnConversionFactory factory) {
shimFactory = factory;
}

@Override
public ColumnConversionFactory typeConverter() { return shimFactory; }

@Override
public ColumnMetadata cloneEmpty() {
return new PrimitiveColumnMetadata(this);
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.rowSet.impl;

import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -43,8 +44,10 @@
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.test.TestColumnConvertor.TestConvertor;
import org.apache.drill.test.rowSet.RowSetReader;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.Ignore;
import org.junit.Test;

/**
Expand Down Expand Up @@ -602,4 +605,55 @@ public void testCloseWithoutHarvest() {

rsLoader.close();
}

/**
* Test the use of a column type converter in the result set loader for
* required, nullable and repeated columns.
*/

@Ignore("Not yet")
@Test
public void testTypeConversion() {
TupleMetadata schema = new SchemaBuilder()
.add("n1", MinorType.INT)
.addNullable("n2", MinorType.INT)
.addArray("n3", MinorType.INT)
.buildSchema();

// Add a type convertor. Passed in as a factory
// since we must create a new one for each row set writer.

schema.metadata("n1").setTypeConverter(TestConvertor.factory());
schema.metadata("n2").setTypeConverter(TestConvertor.factory());
schema.metadata("n3").setTypeConverter(TestConvertor.factory());

ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.setRowCountLimit(ValueVector.MAX_ROW_COUNT)
.build();
ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);

// Write data as both a string as an integer

RowSetLoader rootWriter = rsLoader.writer();
rootWriter.addRow("123", "12", strArray("123", "124"));
rootWriter.addRow(234, 23, intArray(234, 235));
RowSet actual = fixture.wrap(rsLoader.harvest());

// Build the expected vector without a type convertor.

TupleMetadata expectedSchema = new SchemaBuilder()
.add("n1", MinorType.INT)
.addNullable("n2", MinorType.INT)
.addArray("n3", MinorType.INT)
.buildSchema();
final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(123, 12, intArray(123, 124))
.addRow(234, 23, intArray(234, 235))
.build();

// Compare

RowSetUtilities.verify(expected, actual);
}
}
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.test.rowSet.test;

import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;

import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractWriteConvertor;
import org.apache.drill.exec.vector.accessor.writer.ConcreteWriter;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetBuilder;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
import org.junit.Test;

/**
* Tests the column type convertor feature of the column metadata
* and of the RowSetWriter.
*/

public class TestColumnConvertor extends SubOperatorTest {

/**
* Simple type converter that allows string-to-int conversions.
* Inherits usual int value support from the base writer.
*/
public static class TestConvertor extends AbstractWriteConvertor {

public TestConvertor(ScalarWriter baseWriter) {
super(baseWriter);
}

@Override
public void setString(String value) {
setInt(Integer.parseInt(value));
}

public static ColumnConversionFactory factory() {
return new ColumnConversionFactory() {
@Override
public ConcreteWriter newWriter(ColumnMetadata colDefn,
ConcreteWriter baseWriter) {
return new TestConvertor(baseWriter);
}
};
}
}

@Test
public void testScalarConvertor() {

// Create the schema

TupleMetadata schema = new SchemaBuilder()
.add("n1", MinorType.INT)
.addNullable("n2", MinorType.INT)
.buildSchema();

// Add a type convertor. Passed in as a factory
// since we must create a new one for each row set writer.

schema.metadata("n1").setTypeConverter(TestConvertor.factory());
schema.metadata("n2").setTypeConverter(TestConvertor.factory());

// Write data as both a string as an integer

RowSet actual = new RowSetBuilder(fixture.allocator(), schema)
.addRow("123", "12")
.addRow(234, 23)
.build();

// Build the expected vector without a type convertor.

TupleMetadata expectedSchema = new SchemaBuilder()
.add("n1", MinorType.INT)
.addNullable("n2", MinorType.INT)
.buildSchema();
final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(123, 12)
.addRow(234, 23)
.build();

// Compare

RowSetUtilities.verify(expected, actual);
}

@Test
public void testArrayConvertor() {

// Create the schema

TupleMetadata schema = new SchemaBuilder()
.addArray("n", MinorType.INT)
.buildSchema();

// Add a type convertor. Passed in as a factory
// since we must create a new one for each row set writer.

schema.metadata("n").setTypeConverter(TestConvertor.factory());

// Write data as both a string as an integer

RowSet actual = new RowSetBuilder(fixture.allocator(), schema)
.addSingleCol(strArray("123", "124"))
.addSingleCol(intArray(234, 235))
.build();

// Build the expected vector without a type convertor.

TupleMetadata expectedSchema = new SchemaBuilder()
.addArray("n", MinorType.INT)
.buildSchema();
final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addSingleCol(intArray(123, 124))
.addSingleCol(intArray(234, 235))
.build();

// Compare

RowSetUtilities.verify(expected, actual);
}
}

0 comments on commit 8fb85cd

Please sign in to comment.