Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,44 @@ public int numValues() {
return vector.getValueCount();
}

public static <T> VectorHolder constantHolder(int numRows, T constantValue) {
return new ConstantVectorHolder(numRows, constantValue);
}

public static VectorHolder dummyHolder(int numRows) {
return new VectorHolder() {
@Override
public int numValues() {
return numRows;
}
};
return new ConstantVectorHolder(numRows);
}

public boolean isDummy() {
return vector == null;
}

/**
* A Vector Holder which does not actually produce values, consumers of this class should
* use the constantValue to populate their ColumnVector implementation.
*/
public static class ConstantVectorHolder<T> extends VectorHolder {
private final T constantValue;
private final int numRows;

public ConstantVectorHolder(int numRows) {
this.numRows = numRows;
this.constantValue = null;
}

public ConstantVectorHolder(int numRows, T constantValue) {
this.numRows = numRows;
this.constantValue = constantValue;
}

@Override
public int numValues() {
return this.numRows;
}

public Object getConstant() {
return constantValue;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -356,5 +356,36 @@ public String toString() {
public void setBatchSize(int batchSize) {}
}

/**
* A Dummy Vector Reader which doesn't actually read files, instead it returns a dummy
* VectorHolder which indicates the constant value which should be used for this column.
* @param <T> The constant value to use
*/
public static class ConstantVectorReader<T> extends VectorizedArrowReader {
private final T value;

public ConstantVectorReader(T value) {
this.value = value;
}

@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
return VectorHolder.constantHolder(numValsToRead, value);
}

@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
}

@Override
public String toString() {
return String.format("ConstantReader: %s", value);
}

@Override
public void setBatchSize(int batchSize) {}

}

}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.iceberg.arrow.vectorized.VectorHolder;
import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.types.Decimal;
Expand Down Expand Up @@ -144,7 +145,8 @@ public ArrowColumnVector getChild(int ordinal) {
}

static ColumnVector forHolder(VectorHolder holder, int numRows) {
return holder.isDummy() ? new ConstantColumnVector(Types.IntegerType.get(), numRows, null) :
return holder.isDummy() ?
new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) :
new IcebergArrowColumnVector(holder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.arrow.ArrowAllocation;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.ConstantVectorReader;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand All @@ -47,26 +48,36 @@ public static ColumnarBatchReader buildReader(
Schema expectedSchema,
MessageType fileSchema,
boolean setArrowValidityVector) {
return buildReader(expectedSchema, fileSchema, setArrowValidityVector, Maps.newHashMap());
}

public static ColumnarBatchReader buildReader(
Schema expectedSchema,
MessageType fileSchema,
boolean setArrowValidityVector,
Map<Integer, ?> idToConstant) {
return (ColumnarBatchReader)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new VectorizedReaderBuilder(expectedSchema, fileSchema, setArrowValidityVector));
new VectorizedReaderBuilder(expectedSchema, fileSchema, setArrowValidityVector, idToConstant));
}

private static class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader<?>> {
private final MessageType parquetSchema;
private final Schema icebergSchema;
private final BufferAllocator rootAllocator;
private final Map<Integer, ?> idToConstant;
private final boolean setArrowValidityVector;

VectorizedReaderBuilder(
Schema expectedSchema,
MessageType parquetSchema,
boolean setArrowValidityVector) {
boolean setArrowValidityVector, Map<Integer, ?> idToConstant) {
this.parquetSchema = parquetSchema;
this.icebergSchema = expectedSchema;
this.rootAllocator = ArrowAllocation.rootAllocator()
.newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
this.setArrowValidityVector = setArrowValidityVector;
this.idToConstant = idToConstant;
}

@Override
Expand All @@ -90,7 +101,9 @@ public VectorizedReader<?> message(
for (Types.NestedField field : icebergFields) {
int id = field.fieldId();
VectorizedReader<?> reader = readersById.get(id);
if (reader != null) {
if (idToConstant.containsKey(id)) {
reorderedFields.add(new ConstantVectorReader(idToConstant.get(id)));
} else if (reader != null) {
reorderedFields.add(reader);
} else {
reorderedFields.add(VectorizedArrowReader.nulls());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
.project(expectedSchema)
.split(task.start(), task.length())
.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(expectedSchema,
fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED))
fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant))
.recordsPerBatch(batchSize)
.filter(task.residual())
.caseSensitive(caseSensitive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -110,24 +113,52 @@ public static void stopSpark() {
private Table table = null;
private Dataset<Row> logs = null;

@Before
public void setupTable() throws Exception {
/**
* Use the Hive Based table to make Identity Partition Columns with no duplication of the data in the underlying
* parquet files. This makes sure that if the identity mapping fails, the test will also fail.
*/
private void setupParquet() throws Exception {
File location = temp.newFolder("logs");
File hiveLocation = temp.newFolder("hive");
String hiveTable = "hivetable";
Assert.assertTrue("Temp folder should exist", location.exists());

Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");
spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
logs.orderBy("date", "level", "id").write().partitionBy("date", "level").format("parquet")
.option("path", hiveLocation.toString()).saveAsTable(hiveTable);

this.table = TABLES.create(SparkSchemaUtil.schemaForTable(spark, hiveTable),
SparkSchemaUtil.specForTable(spark, hiveTable), properties, location.toString());

SparkTableUtil.importSparkTable(spark, new TableIdentifier(hiveTable), table, location.toString());
}

logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
@Before
public void setupTable() throws Exception {
if (format.equals("parquet")) {
setupParquet();
} else {
File location = temp.newFolder("logs");
Assert.assertTrue("Temp folder should exist", location.exists());

Map<String, String> properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString());
this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message");

logs.orderBy("date", "level", "id").write().format("iceberg").mode("append").save(location.toString());
}
}

@Test
public void testFullProjection() {
List<Row> expected = logs.orderBy("id").collectAsList();
List<Row> actual = spark.read().format("iceberg")
.option("vectorization-enabled", String.valueOf(vectorized))
.load(table.location()).orderBy("id").collectAsList();
.load(table.location()).orderBy("id")
.select("id", "date", "level", "message")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the default? Why was it necessary to add select?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I added in the Hive Import it gets the schema in a different order, I think this may be an issue with the import code? I'm not sure, but I know the default column order does not come out the same way :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's suspicious. We'll have to look into why the schema has the wrong order. I see select before all the writes, so it shouldn't need the reorder here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to figure out the actual issue today, but I agree it shouldn't work this way. My assumption is that the Hive table schema is just being listed in a different order or when we use SparkSchemaUtil the order is getting scrambled.

Copy link
Member Author

@RussellSpitzer RussellSpitzer Aug 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time digging into this,
When you call saveAsTable it ends up in this bit of code in DataFrameWriter

    val tableDesc = CatalogTable(
      identifier = tableIdent,
      tableType = tableType,
      storage = storage,
      schema = new StructType,
      provider = Some(source),
      partitionColumnNames = partitioningColumns.getOrElse(Nil),
      bucketSpec = getBucketSpec)

Which strips out whatever incoming schema you have. So the new table is created without any information about the actual ordering of columns you used in the create.

Then when the Relation is resolved, that's when the attributes are looked up again and the schema is created from the Attribute output. So long story short, saveAsTable doesn't care about your field ordering as far as I can tell. This is all in Spark and I'm not sure we can do anything about it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this, then. Thanks for looking into it!

.collectAsList();
Assert.assertEquals("Rows should match", expected, actual);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,10 @@ public boolean enableBatchRead() {

boolean atLeastOneColumn = lazySchema().columns().size() > 0;

boolean hasNoIdentityProjections = tasks().stream()
.allMatch(combinedScanTask -> combinedScanTask.files()
.stream()
.allMatch(fileScanTask -> fileScanTask.spec().identitySourceIds().isEmpty()));

boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType());

this.readUsingBatch = batchReadsEnabled && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && hasNoIdentityProjections && onlyPrimitives));
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
}
return readUsingBatch;
}
Expand Down