diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterStringWriteSupport.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterStringWriteSupport.java new file mode 100644 index 0000000000000..904f6ba3e63ff --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterStringWriteSupport.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; +import org.apache.hudi.common.bloom.BloomFilter; + +import java.nio.charset.StandardCharsets; + +/** + * Bloom-filter footer support for Flink RowData Lance writers. + */ +class HoodieBloomFilterStringWriteSupport extends HoodieBloomFilterWriteSupport { + + HoodieBloomFilterStringWriteSupport(BloomFilter bloomFilter) { + super(bloomFilter); + } + + @Override + protected int compareRecordKey(String a, String b) { + return a.compareTo(b); + } + + @Override + protected byte[] getUTF8Bytes(String key) { + return key.getBytes(StandardCharsets.UTF_8); + } + + @Override + protected String dereference(String key) { + return key; + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkLanceArrowUtils.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkLanceArrowUtils.java new file mode 100644 index 0000000000000..45a34b607b0cb --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkLanceArrowUtils.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.hudi.exception.HoodieNotSupportedException; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; + +/** + * Primitive RowData/Arrow conversion helpers for Flink Lance base files. + */ +public final class HoodieFlinkLanceArrowUtils { + + private HoodieFlinkLanceArrowUtils() { + } + + public static Schema toArrowSchema(RowType rowType) { + List fields = new ArrayList<>(rowType.getFieldCount()); + for (RowType.RowField field : rowType.getFields()) { + fields.add(toArrowField(field.getName(), field.getType())); + } + return new Schema(fields); + } + + public static RowType toRowType(Schema schema) { + List fields = new ArrayList<>(schema.getFields().size()); + for (Field field : schema.getFields()) { + fields.add(new RowType.RowField(field.getName(), toLogicalType(field.getType()))); + } + return new RowType(fields); + } + + public static RowData toRowData(RowType rowType, List vectors, int rowId) { + GenericRowData rowData = new GenericRowData(vectors.size()); + for (int i = 0; i < vectors.size(); i++) { + FieldVector vector = vectors.get(i); + if (vector.isNull(rowId)) { + rowData.setField(i, null); + } else { + rowData.setField(i, readValue(rowType.getTypeAt(i), vector, rowId)); + } + } + return rowData; + } + + public static void writeValue(LogicalType type, FieldVector vector, int rowId, RowData rowData, int ordinal) { + if (rowData.isNullAt(ordinal)) { + vector.setNull(rowId); + return; + } + switch (type.getTypeRoot()) { + case BOOLEAN: + ((BitVector) vector).setSafe(rowId, rowData.getBoolean(ordinal) ? 1 : 0); + return; + case TINYINT: + ((TinyIntVector) vector).setSafe(rowId, rowData.getByte(ordinal)); + return; + case SMALLINT: + ((SmallIntVector) vector).setSafe(rowId, rowData.getShort(ordinal)); + return; + case INTEGER: + ((IntVector) vector).setSafe(rowId, rowData.getInt(ordinal)); + return; + case DATE: + ((DateDayVector) vector).setSafe(rowId, rowData.getInt(ordinal)); + return; + case TIME_WITHOUT_TIME_ZONE: + ((TimeMilliVector) vector).setSafe(rowId, rowData.getInt(ordinal)); + return; + case BIGINT: + ((BigIntVector) vector).setSafe(rowId, rowData.getLong(ordinal)); + return; + case FLOAT: + ((Float4Vector) vector).setSafe(rowId, rowData.getFloat(ordinal)); + return; + case DOUBLE: + ((Float8Vector) vector).setSafe(rowId, rowData.getDouble(ordinal)); + return; + case CHAR: + case VARCHAR: + ((VarCharVector) vector).setSafe(rowId, rowData.getString(ordinal).toBytes()); + return; + case BINARY: + case VARBINARY: + ((VarBinaryVector) vector).setSafe(rowId, rowData.getBinary(ordinal)); + return; + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + DecimalData decimal = rowData.getDecimal(ordinal, decimalType.getPrecision(), decimalType.getScale()); + ((DecimalVector) vector).setSafe(rowId, decimal.toBigDecimal()); + return; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + TimestampData timestamp = rowData.getTimestamp(ordinal, getPrecision(type)); + long micros = timestamp.getMillisecond() * 1000L + timestamp.getNanoOfMillisecond() / 1000L; + ((TimeStampMicroVector) vector).setSafe(rowId, micros); + return; + default: + throw unsupported(type); + } + } + + private static Object readValue(LogicalType type, ValueVector vector, int rowId) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return ((BitVector) vector).get(rowId) == 1; + case TINYINT: + return ((TinyIntVector) vector).get(rowId); + case SMALLINT: + return ((SmallIntVector) vector).get(rowId); + case INTEGER: + return ((IntVector) vector).get(rowId); + case DATE: + return ((DateDayVector) vector).get(rowId); + case TIME_WITHOUT_TIME_ZONE: + return ((TimeMilliVector) vector).get(rowId); + case BIGINT: + return ((BigIntVector) vector).get(rowId); + case FLOAT: + return ((Float4Vector) vector).get(rowId); + case DOUBLE: + return ((Float8Vector) vector).get(rowId); + case CHAR: + case VARCHAR: + return StringData.fromBytes(((VarCharVector) vector).get(rowId)); + case BINARY: + case VARBINARY: + return ((VarBinaryVector) vector).get(rowId); + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + BigDecimal decimal = ((DecimalVector) vector).getObject(rowId); + return DecimalData.fromBigDecimal(decimal, decimalType.getPrecision(), decimalType.getScale()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + long micros = ((TimeStampMicroVector) vector).get(rowId); + return TimestampData.fromEpochMillis(micros / 1000L, (int) (micros % 1000L) * 1000); + default: + throw unsupported(type); + } + } + + private static Field toArrowField(String name, LogicalType type) { + return new Field(name, FieldType.nullable(toArrowType(type)), Collections.emptyList()); + } + + private static ArrowType toArrowType(LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return ArrowType.Bool.INSTANCE; + case TINYINT: + return new ArrowType.Int(8, true); + case SMALLINT: + return new ArrowType.Int(16, true); + case INTEGER: + return new ArrowType.Int(32, true); + case BIGINT: + return new ArrowType.Int(64, true); + case FLOAT: + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + case DOUBLE: + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + case CHAR: + case VARCHAR: + return ArrowType.Utf8.INSTANCE; + case BINARY: + case VARBINARY: + return ArrowType.Binary.INSTANCE; + case DATE: + return new ArrowType.Date(DateUnit.DAY); + case TIME_WITHOUT_TIME_ZONE: + return new ArrowType.Time(TimeUnit.MILLISECOND, 32); + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + return new ArrowType.Decimal(decimalType.getPrecision(), decimalType.getScale(), 128); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"); + default: + throw unsupported(type); + } + } + + private static LogicalType toLogicalType(ArrowType arrowType) { + if (arrowType instanceof ArrowType.Bool) { + return new org.apache.flink.table.types.logical.BooleanType(); + } else if (arrowType instanceof ArrowType.Int) { + ArrowType.Int intType = (ArrowType.Int) arrowType; + switch (intType.getBitWidth()) { + case 8: + return new org.apache.flink.table.types.logical.TinyIntType(); + case 16: + return new org.apache.flink.table.types.logical.SmallIntType(); + case 32: + return new org.apache.flink.table.types.logical.IntType(); + case 64: + return new org.apache.flink.table.types.logical.BigIntType(); + default: + throw new HoodieNotSupportedException("Unsupported Arrow int width for Lance Flink reader: " + intType.getBitWidth()); + } + } else if (arrowType instanceof ArrowType.FloatingPoint) { + ArrowType.FloatingPoint fp = (ArrowType.FloatingPoint) arrowType; + return fp.getPrecision() == FloatingPointPrecision.SINGLE + ? new org.apache.flink.table.types.logical.FloatType() + : new org.apache.flink.table.types.logical.DoubleType(); + } else if (arrowType instanceof ArrowType.Utf8) { + return new org.apache.flink.table.types.logical.VarCharType(); + } else if (arrowType instanceof ArrowType.Binary) { + return new org.apache.flink.table.types.logical.VarBinaryType(); + } else if (arrowType instanceof ArrowType.Date) { + return new org.apache.flink.table.types.logical.DateType(); + } else if (arrowType instanceof ArrowType.Time) { + return new org.apache.flink.table.types.logical.TimeType(); + } else if (arrowType instanceof ArrowType.Decimal) { + ArrowType.Decimal decimal = (ArrowType.Decimal) arrowType; + return new DecimalType(decimal.getPrecision(), decimal.getScale()); + } else if (arrowType instanceof ArrowType.Timestamp) { + ArrowType.Timestamp timestamp = (ArrowType.Timestamp) arrowType; + return timestamp.getTimezone() == null + ? new TimestampType(6) + : new LocalZonedTimestampType(6); + } + throw new HoodieNotSupportedException("Unsupported Arrow type for Lance Flink reader: " + arrowType); + } + + private static HoodieNotSupportedException unsupported(LogicalType type) { + return new HoodieNotSupportedException("Flink Lance base-file support currently supports primitive append-only columns; unsupported type: " + type); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java index 45333cf4b5dbe..dbccd3975012d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java @@ -294,6 +294,6 @@ protected HoodieRowDataFileWriter createNewFileWriter( throws IOException { StoragePath storagePath = new StoragePath(path.toUri()); return (HoodieRowDataFileWriter) new HoodieRowDataFileWriterFactory(hoodieTable.getStorage()) - .newParquetFileWriter(instantTime, storagePath, config, rowType, hoodieTable.getTaskContextSupplier()); + .getFileWriter(instantTime, storagePath, config, rowType, hoodieTable.getTaskContextSupplier()); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java index be3242164a40e..4cb3e4f0ab2f0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java @@ -23,8 +23,10 @@ import org.apache.hudi.common.config.HoodieParquetConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; @@ -35,6 +37,7 @@ import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.hudi.util.RowDataQueryContexts; import org.apache.flink.table.types.logical.RowType; @@ -44,6 +47,10 @@ import java.io.IOException; import java.io.OutputStream; +import static org.apache.hudi.common.model.HoodieFileFormat.HFILE; +import static org.apache.hudi.common.model.HoodieFileFormat.LANCE; +import static org.apache.hudi.common.model.HoodieFileFormat.ORC; +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName; /** @@ -55,6 +62,30 @@ public HoodieRowDataFileWriterFactory(HoodieStorage storage) { super(storage); } + public HoodieFileWriter getFileWriter(String instantTime, StoragePath storagePath, HoodieWriteConfig config, RowType rowType, + TaskContextSupplier taskContextSupplier) throws IOException { + final String extension = FSUtils.getFileExtension(storagePath.getName()); + return getFileWriterByFormat(extension, instantTime, storagePath, config, rowType, taskContextSupplier); + } + + private HoodieFileWriter getFileWriterByFormat( + String extension, String instantTime, StoragePath path, HoodieConfig config, RowType rowType, + TaskContextSupplier taskContextSupplier) throws IOException { + if (PARQUET.getFileExtension().equals(extension)) { + return newParquetFileWriter(instantTime, path, config, rowType, taskContextSupplier); + } + if (HFILE.getFileExtension().equals(extension)) { + return newHFileFileWriter(instantTime, path, config, HoodieSchemaConverter.convertToSchema(rowType), taskContextSupplier); + } + if (ORC.getFileExtension().equals(extension)) { + return newOrcFileWriter(instantTime, path, config, HoodieSchemaConverter.convertToSchema(rowType), taskContextSupplier); + } + if (LANCE.getFileExtension().equals(extension)) { + return newLanceFileWriter(instantTime, path, config, rowType, taskContextSupplier); + } + throw new UnsupportedOperationException(extension + " format not supported yet."); + } + /** * Create a parquet writer on a given OutputStream. * @@ -136,6 +167,25 @@ public HoodieFileWriter newParquetFileWriter( instantTime, taskContextSupplier, populateMetaFields, withOperation); } + public HoodieFileWriter newLanceFileWriter( + String instantTime, + StoragePath path, + HoodieConfig config, + RowType rowType, + TaskContextSupplier taskContextSupplier) { + boolean populateMetaFields = config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS); + Option bloomFilter = enableBloomFilter(populateMetaFields, config) + ? Option.of(createBloomFilter(config)) : Option.empty(); + return new HoodieRowDataLanceWriter( + path, + rowType, + taskContextSupplier, + bloomFilter, + config.getLongOrDefault(HoodieStorageConfig.LANCE_MAX_FILE_SIZE), + config.getLongOrDefault(HoodieStorageConfig.LANCE_WRITE_ALLOCATOR_SIZE_BYTES), + config.getLongOrDefault(HoodieStorageConfig.LANCE_WRITE_FLUSH_BYTE_WATERMARK)); + } + private static HoodieParquetConfig getParquetConfig( HoodieConfig config, HoodieRowDataParquetWriteSupport writeSupport) { return new HoodieParquetConfig<>( diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java new file mode 100644 index 0000000000000..3ea4544c4ba58 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.io.lance.HoodieBaseLanceWriter; +import org.apache.hudi.storage.StoragePath; + +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import java.io.IOException; + +/** + * Lance writer for Flink {@link RowData} append-only base files. + */ +public class HoodieRowDataLanceWriter extends HoodieBaseLanceWriter + implements HoodieRowDataFileWriter { + + private static final long MIN_RECORDS_FOR_SIZE_CHECK = 100L; + private static final long MAX_RECORDS_FOR_SIZE_CHECK = 10000L; + + private final RowType rowType; + private final Schema arrowSchema; + private final long maxFileSize; + private long recordCountForNextSizeCheck = MIN_RECORDS_FOR_SIZE_CHECK; + + public HoodieRowDataLanceWriter( + StoragePath file, + RowType rowType, + TaskContextSupplier taskContextSupplier, + Option bloomFilterOpt, + long maxFileSize, + long allocatorSize, + long flushByteWatermark) { + super(file, DEFAULT_BATCH_SIZE, allocatorSize, flushByteWatermark, + bloomFilterOpt.map(HoodieBloomFilterStringWriteSupport::new)); + ValidationUtils.checkArgument(maxFileSize > 0, "maxFileSize must be a positive number"); + ValidationUtils.checkArgument(allocatorSize > 0, "allocatorSize must be a positive number"); + ValidationUtils.checkArgument(flushByteWatermark > 0, "flushByteWatermark must be a positive number"); + ValidationUtils.checkArgument(flushByteWatermark < allocatorSize, + "flushByteWatermark (" + flushByteWatermark + ") must be less than allocatorSize (" + + allocatorSize + ")"); + this.rowType = rowType; + this.arrowSchema = HoodieFlinkLanceArrowUtils.toArrowSchema(rowType); + this.maxFileSize = maxFileSize; + } + + @Override + public boolean canWrite() { + long writtenCount = getWrittenRecordCount(); + if (writtenCount >= recordCountForNextSizeCheck) { + long dataSize = getDataSize(); + long avgRecordSize = Math.max(dataSize / writtenCount, 1); + if (dataSize > (maxFileSize - avgRecordSize * 2)) { + return false; + } + recordCountForNextSizeCheck = writtenCount + Math.min( + Math.max(MIN_RECORDS_FOR_SIZE_CHECK, (maxFileSize / avgRecordSize - writtenCount) / 2), + MAX_RECORDS_FOR_SIZE_CHECK); + } + return true; + } + + @Override + public void writeRow(String key, RowData row) throws IOException { + bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport -> bloomFilterWriteSupport.addKey(key)); + super.write(row); + } + + @Override + public void writeRowWithMetaData(HoodieKey key, RowData row) throws IOException { + writeRow(key.getRecordKey(), row); + } + + @Override + protected ArrowWriter createArrowWriter(VectorSchemaRoot root) { + return new RowDataArrowWriter(root); + } + + @Override + protected Schema getArrowSchema() { + return arrowSchema; + } + + private class RowDataArrowWriter implements ArrowWriter { + private final VectorSchemaRoot root; + private int rowId; + + private RowDataArrowWriter(VectorSchemaRoot root) { + this.root = root; + } + + @Override + public void write(RowData row) { + for (int i = 0; i < rowType.getFieldCount(); i++) { + HoodieFlinkLanceArrowUtils.writeValue(rowType.getTypeAt(i), root.getVector(i), rowId, row, i); + } + rowId++; + } + + @Override + public void finishBatch() { + root.getFieldVectors().forEach(vector -> vector.setValueCount(rowId)); + root.setRowCount(rowId); + } + + @Override + public void reset() { + rowId = 0; + } + } + + @Override + public void writeWithMetadata(HoodieKey key, HoodieRecord record, org.apache.hudi.common.schema.HoodieSchema schema, + java.util.Properties props) throws IOException { + writeRowWithMetaData(key, (RowData) record.getData()); + } +} diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieFlinkLanceArrowUtils.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieFlinkLanceArrowUtils.java new file mode 100644 index 0000000000000..a4c955dc3bc43 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieFlinkLanceArrowUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +/** + * Tests for {@link HoodieFlinkLanceArrowUtils}. + */ +public class TestHoodieFlinkLanceArrowUtils { + + @Test + public void testTimestampSchemaRoundTripPreservesLocalTimezone() { + RowType rowType = RowType.of( + new LogicalType[] {new TimestampType(6), new LocalZonedTimestampType(6)}, + new String[] {"timestamp", "local_timestamp"}); + + RowType roundTripped = HoodieFlinkLanceArrowUtils.toRowType( + HoodieFlinkLanceArrowUtils.toArrowSchema(rowType)); + + assertInstanceOf(TimestampType.class, roundTripped.getTypeAt(0)); + assertInstanceOf(LocalZonedTimestampType.class, roundTripped.getTypeAt(1)); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 2f2ada47731a0..ffc8e78b6dec8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -86,9 +86,9 @@ public DynamicTableSource createDynamicTableSource(Context context) { StoragePath path = new StoragePath(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> new ValidationException("Option [path] should not be empty."))); setupTableOptions(conf.get(FlinkOptions.PATH), conf); - checkBaseFileFormat(conf); ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); + checkBaseFileFormatForRead(conf); return new HoodieTableSource( SerializableSchema.create(schema), path, @@ -116,11 +116,6 @@ public DynamicTableSink createDynamicTableSink(Context context) { private void setupTableOptions(String basePath, Configuration conf) { StreamerUtil.getTableConfig(basePath, HadoopConfigurations.getHadoopConf(conf)) .ifPresent(tableConfig -> { - // Guard: reject Lance from existing table config (hoodie.properties); checkBaseFileFormat() handles user-supplied config separately - if (tableConfig.contains(HoodieTableConfig.BASE_FILE_FORMAT) - && HoodieFileFormat.LANCE.name().equalsIgnoreCase(tableConfig.getString(HoodieTableConfig.BASE_FILE_FORMAT))) { - throw new HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG); - } if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS) && !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) { conf.set(FlinkOptions.RECORD_KEY_FIELD, tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)); @@ -177,8 +172,8 @@ public Set> optionalOptions() { * @param schema The table schema */ private void sanityCheck(Configuration conf, ResolvedSchema schema) { - checkBaseFileFormat(conf); checkTableType(conf); + checkBaseFileFormatForWrite(conf); checkIndexType(conf); if (!OptionsResolver.isAppendMode(conf)) { @@ -220,12 +215,29 @@ private void checkIndexType(Configuration conf) { } /** - * Validate the base file format. Lance is only supported with the Spark engine. + * Validate the base file format. Flink Lance support is scoped to append-only COW tables. */ - private void checkBaseFileFormat(Configuration conf) { + private void checkBaseFileFormatForRead(Configuration conf) { + checkBaseFileFormat(conf, false); + } + + private void checkBaseFileFormatForWrite(Configuration conf) { + checkBaseFileFormat(conf, true); + } + + private void checkBaseFileFormat(Configuration conf, boolean isWritePath) { String baseFileFormat = conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(), null); if (baseFileFormat != null && HoodieFileFormat.LANCE.name().equalsIgnoreCase(baseFileFormat)) { - throw new HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG); + if (conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) { + throw new HoodieValidationException("Flink Lance base-file support is only available for append-only tables without primary keys."); + } + if (OptionsResolver.isMorTable(conf)) { + throw new HoodieValidationException("Flink Lance base-file support is only available for COPY_ON_WRITE append-only tables."); + } + if (isWritePath && !OptionsResolver.isAppendMode(conf)) { + throw new HoodieValidationException("Flink Lance base-file writes require append-only INSERT mode. Set '" + + FlinkOptions.OPERATION.key() + "' = 'insert'."); + } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java index 2015244e24ceb..b12e4fd408dcb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java @@ -39,6 +39,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.source.ExpressionPredicates; @@ -98,18 +99,27 @@ public ClosableIterator getFileRecordIterator( HoodieSchema dataSchema, HoodieSchema requiredSchema, HoodieStorage storage) throws IOException { - if (filePath.toString().endsWith(HoodieFileFormat.LANCE.getFileExtension())) { - throw new UnsupportedOperationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG); - } boolean isLogFile = FSUtils.isLogFile(filePath); // disable schema evolution in fileReader if it's log file, since schema evolution for log file is handled in `FileGroupRecordBuffer` InternalSchemaManager schemaManager = isLogFile ? InternalSchemaManager.DISABLED : internalSchemaManager.get(); + if (filePath.getName().endsWith(HoodieFileFormat.LANCE.getFileExtension())) { + HoodieRowDataLanceReader rowDataLanceReader = + (HoodieRowDataLanceReader) HoodieIOFactory.getIOFactory(storage) + .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK) + .getFileReader(tableConfig, filePath, HoodieFileFormat.LANCE, Option.empty()); + try { + return rowDataLanceReader.getRowDataIterator(RowDataQueryContexts.fromSchema(requiredSchema).getRowType(), requiredSchema); + } catch (RuntimeException e) { + rowDataLanceReader.close(); + throw new HoodieException("Failed to get iterator from lance reader", e); + } + } + DataType rowType = RowDataQueryContexts.fromSchema(dataSchema).getRowType(); HoodieRowDataParquetReader rowDataParquetReader = (HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage) .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK) .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, Option.empty()); - DataType rowType = RowDataQueryContexts.fromSchema(dataSchema).getRowType(); return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, requiredSchema, getSafePredicates(requiredSchema)); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java index 2e7401d4caa0d..3ad64bd3972a5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.format; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.HoodieStorage; @@ -35,4 +36,9 @@ public HoodieRowDataFileReaderFactory(HoodieStorage storage) { protected HoodieFileReader newParquetFileReader(StoragePath path) { return new HoodieRowDataParquetReader(storage, path); } + + @Override + protected HoodieFileReader newLanceFileReader(HoodieConfig hoodieConfig, StoragePath path) { + return new HoodieRowDataLanceReader(path, hoodieConfig); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java new file mode 100644 index 0000000000000..8c7564af67303 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.client.model.HoodieFlinkRecord; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; +import org.apache.hudi.common.bloom.SimpleBloomFilter; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.memory.HoodieArrowAllocator; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.row.HoodieFlinkLanceArrowUtils; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.util.HoodieSchemaConverter; +import org.apache.hudi.util.RowDataQueryContexts; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.lance.file.LanceFileReader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY; +import static org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE; +import static org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER; +import static org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; + +/** + * Lance reader for Flink RowData base files. + */ +public class HoodieRowDataLanceReader implements HoodieFileReader { + + private static final int DEFAULT_BATCH_SIZE = 512; + + private final StoragePath path; + private final long dataAllocatorSize; + private final BufferAllocator metadataAllocator; + private final LanceFileReader metadataReader; + private final Schema arrowSchema; + private boolean closed; + + public HoodieRowDataLanceReader(StoragePath path, HoodieConfig hoodieConfig) { + this.path = path; + this.dataAllocatorSize = hoodieConfig.getLongOrDefault(HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES); + this.metadataAllocator = HoodieArrowAllocator.newChildAllocator( + getClass().getSimpleName() + "-metadata-" + path.getName(), + hoodieConfig.getLongOrDefault(HoodieStorageConfig.LANCE_READ_METADATA_ALLOCATOR_SIZE_BYTES)); + try { + this.metadataReader = LanceFileReader.open(path.toString(), metadataAllocator); + this.arrowSchema = metadataReader.schema(); + } catch (Exception e) { + close(); + throw new HoodieException("Failed to create Lance reader for: " + path, e); + } + } + + @Override + public String[] readMinMaxRecordKeys() { + Map metadata = arrowSchema.getCustomMetadata(); + if (metadata != null) { + String minKey = metadata.get(HOODIE_MIN_RECORD_KEY_FOOTER); + String maxKey = metadata.get(HOODIE_MAX_RECORD_KEY_FOOTER); + if (minKey != null && maxKey != null) { + return new String[] {minKey, maxKey}; + } + } + throw new HoodieException("Could not read min/max record key out of Lance file: " + path); + } + + @Override + public BloomFilter readBloomFilter() { + Map metadata = arrowSchema.getCustomMetadata(); + if (metadata == null || !metadata.containsKey(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)) { + return null; + } + String bloomSer = metadata.get(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); + String filterType = metadata.get(HOODIE_BLOOM_FILTER_TYPE_CODE); + if (filterType != null && filterType.contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) { + return new HoodieDynamicBoundedBloomFilter(bloomSer); + } + return new SimpleBloomFilter(bloomSer); + } + + @Override + public Set> filterRowKeys(Set candidateRowKeys) { + throw new HoodieException("Filtering row keys from Lance files is not supported for Flink append-only tables without primary keys: " + path); + } + + @Override + public ClosableIterator> getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema) throws IOException { + ClosableIterator rowDataItr = getRowDataIterator(RowDataQueryContexts.fromSchema(requestedSchema).getRowType(), requestedSchema); + return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new); + } + + @Override + public ClosableIterator getRecordKeyIterator() throws IOException { + HoodieSchema schema = HoodieSchemaUtils.getRecordKeySchema(); + ClosableIterator rowDataItr = getRowDataIterator(RowDataQueryContexts.fromSchema(schema).getRowType(), schema); + return new CloseableMappingIterator<>(rowDataItr, rowData -> rowData.getString(0).toString()); + } + + public ClosableIterator getRowDataIterator(DataType dataType, HoodieSchema requestedSchema) { + RowType rowType = (RowType) dataType.getLogicalType(); + List columnNames = new ArrayList<>(rowType.getFieldCount()); + for (RowType.RowField field : rowType.getFields()) { + columnNames.add(field.getName()); + } + BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator( + getClass().getSimpleName() + "-data-" + path.getName(), dataAllocatorSize); + LanceFileReader lanceReader = null; + ArrowReader arrowReader = null; + try { + lanceReader = LanceFileReader.open(path.toString(), allocator); + arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE); + return new LanceRowDataIterator(allocator, lanceReader, arrowReader, rowType, this); + } catch (Exception e) { + if (arrowReader != null) { + try { + arrowReader.close(); + } catch (Exception closeException) { + e.addSuppressed(closeException); + } + } + if (lanceReader != null) { + try { + lanceReader.close(); + } catch (Exception closeException) { + e.addSuppressed(closeException); + } + } + allocator.close(); + throw new HoodieException("Failed to create Lance row iterator for: " + path, e); + } + } + + @Override + public HoodieSchema getSchema() { + RowType rowType = HoodieFlinkLanceArrowUtils.toRowType(arrowSchema); + return HoodieSchemaConverter.convertToSchema(rowType); + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + if (metadataReader != null) { + try { + metadataReader.close(); + } catch (Exception e) { + // ignore close failure; readers surface data-path exceptions earlier + } + } + if (metadataAllocator != null) { + metadataAllocator.close(); + } + } + + @Override + public long getTotalRecords() { + try { + return metadataReader.numRows(); + } catch (Exception e) { + throw new HoodieException("Failed to read row count from Lance file: " + path, e); + } + } + + private static class LanceRowDataIterator implements ClosableIterator { + private final BufferAllocator allocator; + private final LanceFileReader lanceReader; + private final ArrowReader arrowReader; + private final RowType rowType; + private final HoodieRowDataLanceReader reader; + private VectorSchemaRoot batch; + private List orderedVectors; + private int rowId; + private boolean hasNext; + private boolean closed; + + private LanceRowDataIterator( + BufferAllocator allocator, + LanceFileReader lanceReader, + ArrowReader arrowReader, + RowType rowType, + HoodieRowDataLanceReader reader) { + this.allocator = allocator; + this.lanceReader = lanceReader; + this.arrowReader = arrowReader; + this.rowType = rowType; + this.reader = reader; + loadNextBatch(); + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public RowData next() { + RowData rowData = HoodieFlinkLanceArrowUtils.toRowData(rowType, orderedVectors, rowId++); + if (rowId >= batch.getRowCount()) { + loadNextBatch(); + } + return rowData; + } + + private void loadNextBatch() { + try { + do { + hasNext = arrowReader.loadNextBatch(); + if (hasNext) { + batch = arrowReader.getVectorSchemaRoot(); + orderedVectors = orderVectors(rowType, batch.getFieldVectors()); + rowId = 0; + } + } while (hasNext && batch.getRowCount() == 0); + } catch (IOException e) { + throw new HoodieIOException("Failed to read Lance batch", e); + } + } + + private static List orderVectors(RowType rowType, List vectors) { + Map vectorsByName = new HashMap<>(); + for (FieldVector vector : vectors) { + vectorsByName.put(vector.getName(), vector); + } + List orderedVectors = new ArrayList<>(rowType.getFieldCount()); + for (RowType.RowField field : rowType.getFields()) { + FieldVector vector = vectorsByName.get(field.getName()); + if (vector == null) { + throw new HoodieException("Missing Lance column in projected batch: " + field.getName()); + } + orderedVectors.add(vector); + } + return orderedVectors; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + try { + arrowReader.close(); + } catch (Exception e) { + throw new HoodieException("Failed to close Lance Arrow reader", e); + } finally { + try { + lanceReader.close(); + } catch (Exception e) { + throw new HoodieException("Failed to close Lance reader", e); + } finally { + try { + allocator.close(); + } finally { + reader.close(); + } + } + } + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java index 5a2e94b833baf..9e59e1ed33447 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java @@ -18,12 +18,19 @@ package org.apache.hudi.table.format.cow; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.source.ExpressionPredicates.Predicate; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.format.FilePathUtils; +import org.apache.hudi.table.format.HoodieRowDataLanceReader; import org.apache.hudi.table.format.InternalSchemaManager; import org.apache.hudi.table.format.RecordIterators; +import org.apache.hudi.util.HoodieSchemaConverter; +import org.apache.hudi.util.StreamerUtil; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.io.FileInputFormat; @@ -33,6 +40,7 @@ import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.utils.SerializableConfiguration; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.hadoop.conf.Configuration; @@ -116,32 +124,50 @@ public CopyOnWriteInputFormat( @Override public void open(FileInputSplit fileSplit) throws IOException { - LinkedHashMap partObjects = FilePathUtils.generatePartitionSpecs( - fileSplit.getPath().getPath(), - Arrays.asList(fullFieldNames), - Arrays.asList(fullFieldTypes), - this.partDefaultName, - this.partPathField, - this.hiveStylePartitioning - ); - - this.itr = RecordIterators.getParquetRecordIterator( - internalSchemaManager, - utcTimestamp, - true, - conf.conf(), - fullFieldNames, - fullFieldTypes, - partObjects, - selectedFields, - 2048, - fileSplit.getPath(), - fileSplit.getStart(), - fileSplit.getLength(), - predicates); + if (fileSplit.getPath().getName().endsWith(HoodieFileFormat.LANCE.getFileExtension())) { + this.itr = getLanceRecordIterator(fileSplit.getPath()); + } else { + LinkedHashMap partObjects = FilePathUtils.generatePartitionSpecs( + fileSplit.getPath().getPath(), + Arrays.asList(fullFieldNames), + Arrays.asList(fullFieldTypes), + this.partDefaultName, + this.partPathField, + this.hiveStylePartitioning + ); + this.itr = RecordIterators.getParquetRecordIterator( + internalSchemaManager, + utcTimestamp, + true, + conf.conf(), + fullFieldNames, + fullFieldTypes, + partObjects, + selectedFields, + 2048, + fileSplit.getPath(), + fileSplit.getStart(), + fileSplit.getLength(), + predicates); + } this.currentReadCount = 0L; } + private ClosableIterator getLanceRecordIterator(Path path) { + DataType selectedDataType = DataTypes.ROW(Arrays.stream(selectedFields) + .mapToObj(i -> DataTypes.FIELD(fullFieldNames[i], fullFieldTypes[i])) + .toArray(DataTypes.Field[]::new)) + .bridgedTo(RowData.class); + HoodieSchema requestedSchema = HoodieSchemaConverter.convertToSchema(selectedDataType.getLogicalType()); + HoodieRowDataLanceReader reader = new HoodieRowDataLanceReader(new StoragePath(path.toString()), StreamerUtil.getLanceReadConfig(conf.conf())); + try { + return reader.getRowDataIterator(selectedDataType, requestedSchema); + } catch (RuntimeException e) { + reader.close(); + throw new HoodieException("Failed to get iterator from lance reader", e); + } + } + @Override public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { if (minNumSplits < 1) { @@ -379,6 +405,10 @@ private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, long h } private boolean testForUnsplittable(FileStatus pathFile) { + if (pathFile.getPath().getName().endsWith(HoodieFileFormat.LANCE.getFileExtension())) { + unsplittable = true; + return true; + } if (getInflaterInputStreamFactory(pathFile.getPath()) != null) { unsplittable = true; return true; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index f88d1a76a3251..c17e0528e4586 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -25,7 +25,9 @@ import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger; import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.common.config.DFSPropertiesConfiguration; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.HoodieTimeGeneratorConfig; import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; @@ -275,6 +277,22 @@ public static TypedProperties flinkConf2TypedProperties(Configuration conf) { return properties; } + /** + * Builds a Lance read config from storage options carried in the Hadoop configuration. + */ + public static HoodieConfig getLanceReadConfig(org.apache.hadoop.conf.Configuration conf) { + HoodieConfig hoodieConfig = new HoodieConfig(); + String dataAllocatorSize = conf.get(HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES.key()); + if (dataAllocatorSize != null) { + hoodieConfig.setValue(HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES, dataAllocatorSize); + } + String metadataAllocatorSize = conf.get(HoodieStorageConfig.LANCE_READ_METADATA_ALLOCATOR_SIZE_BYTES.key()); + if (metadataAllocatorSize != null) { + hoodieConfig.setValue(HoodieStorageConfig.LANCE_READ_METADATA_ALLOCATOR_SIZE_BYTES, metadataAllocatorSize); + } + return hoodieConfig; + } + public static void initTableFromClientIfNecessary(Configuration conf) { // Since Flink 2.0, the adaptive execution for batch job will generate job graph incrementally // for multiple stages (FLIP-469). And the write coordinator is initialized along with write @@ -318,6 +336,7 @@ public static HoodieTableMetaClient initTableIfNotExists( .setTableName(conf.get(FlinkOptions.TABLE_NAME)) .setTableVersion(conf.get(FlinkOptions.WRITE_TABLE_VERSION)) .setTableFormat(conf.get(FlinkOptions.WRITE_TABLE_FORMAT)) + .setBaseFileFormat(conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(), null)) .setRecordMergeMode(getMergeMode(conf)) .setRecordMergeStrategyId(getMergeStrategyId(conf)) .setPayloadClassName(getPayloadClass(conf)) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 2da870f1ef8cd..51c9097bd559f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; @@ -65,7 +64,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; -import org.apache.flink.util.ExceptionUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -1360,31 +1358,27 @@ void testBatchReadEmptyTablePath() throws Exception { } @Test - void testLanceFormatRejectedByFlink() { - // Lance base file format is only supported with the Spark engine. - // Flink should reject it early with a clear error on both read and write paths. - String createLanceTable = sql("lance_t1") + void testLanceFormatAppendOnlyWriteAndRead() { + String createHoodieTable = sql("lance_t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .options(getDefaultKeys()) + .option(FlinkOptions.OPERATION, "insert") .option("hoodie.table.base.file.format", "LANCE") .end(); + batchTableEnv.executeSql(createHoodieTable); - // Creating the table itself succeeds (DDL is just metadata registration), - // but any attempt to read or write should fail. - // Flink wraps our HoodieValidationException in its own ValidationException. - batchTableEnv.executeSql(createLanceTable); + execInsertSql(batchTableEnv, "insert into lance_t1 values " + + "('id1', 'Alice', 23, TIMESTAMP '1970-01-01 00:00:01', 'par1')," + + "('id2', 'Bob', 31, TIMESTAMP '1970-01-01 00:00:02', 'par2')"); - // Source (read) path should throw - ValidationException readEx = assertThrows(ValidationException.class, - () -> execSelectSql(batchTableEnv, "select * from lance_t1"), - "Lance format should be rejected when reading via Flink"); - assertTrue(ExceptionUtils.findThrowableWithMessage(readEx, HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG).isPresent()); + List rows = CollectionUtil.iteratorToList( + batchTableEnv.executeSql("select uuid, name, age, ts, `partition` from lance_t1").collect()); + assertRowsEquals(rows, + "[+I[id1, Alice, 23, 1970-01-01T00:00:01, par1], " + + "+I[id2, Bob, 31, 1970-01-01T00:00:02, par2]]"); - // Sink (write) path should throw - ValidationException writeEx = assertThrows(ValidationException.class, - () -> execInsertSql(batchTableEnv, "insert into lance_t1 values ('id1', 'Alice', 23, TIMESTAMP '1970-01-01 00:00:01', 'par1')"), - "Lance format should be rejected when writing via Flink"); - assertTrue(ExceptionUtils.findThrowableWithMessage(writeEx, HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG).isPresent()); + List projectedRows = CollectionUtil.iteratorToList( + batchTableEnv.executeSql("select name, uuid from lance_t1").collect()); + assertRowsEquals(projectedRows, "[+I[Alice, id1], +I[Bob, id2]]"); } @ParameterizedTest diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java index 7a8ce8dcd80a4..84e1136bc2965 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.schema.HoodieSchema; @@ -57,6 +58,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -85,6 +87,12 @@ public class TestHoodieFileGroupReaderOnFlink extends TestHoodieFileGroupReaderB private Configuration conf; private Option instantRangeOpt = Option.empty(); + @BeforeAll + public static void setUpClass() { + // add the lance format when composition type is supported + supportedFileFormats = Collections.singletonList(HoodieFileFormat.PARQUET); + } + @BeforeEach public void setup() { conf = new Configuration(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index f594e9575860c..3bd13748319a8 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.EventTimeAvroPayload; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieValidationException; @@ -788,28 +787,42 @@ void testSetupWriteOptionsForSink() { } @Test - void testLanceFormatNotSupportedByFlink() { - // Lance base file format is only supported with the Spark engine. - // Both source and sink should reject it with a clear error message. - this.conf.setString("hoodie.table.base.file.format", "LANCE"); - ResolvedSchema schema = SchemaBuilder.instance() + void testLanceFormatSupportedForAppendOnlyTables() { + Configuration lanceConf = new Configuration(); + lanceConf.set(FlinkOptions.PATH, new File(tempFile, "lance").getAbsolutePath()); + lanceConf.set(FlinkOptions.TABLE_NAME, "lance_t1"); + lanceConf.set(FlinkOptions.OPERATION, "insert"); + lanceConf.setString("hoodie.table.base.file.format", "LANCE"); + ResolvedSchema appendOnlySchema = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) .field("ts", DataTypes.TIMESTAMP(3)) - .primaryKey("f0") .build(); - final MockContext context = MockContext.getInstance(this.conf, schema, "f2"); - - // Source path should throw - HoodieValidationException sourceEx = assertThrows(HoodieValidationException.class, - () -> new HoodieTableFactory().createDynamicTableSource(context)); - assertThat(sourceEx.getMessage(), is(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG)); - - // Sink path should throw + final MockContext appendOnlyContext = MockContext.getInstance(lanceConf, appendOnlySchema, "f2"); + + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(appendOnlyContext)); + + Configuration morConf = new Configuration(lanceConf); + morConf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + final MockContext morContext = MockContext.getInstance(morConf, appendOnlySchema, "f2"); + HoodieValidationException morEx = assertThrows(HoodieValidationException.class, + () -> new HoodieTableFactory().createDynamicTableSink(morContext)); + assertThat(morEx.getMessage(), is("Flink Lance base-file support is only available for COPY_ON_WRITE append-only tables.")); + + Configuration upsertConf = new Configuration(lanceConf); + upsertConf.set(FlinkOptions.OPERATION, "upsert"); + final MockContext upsertContext = MockContext.getInstance(upsertConf, appendOnlySchema, "f2"); + HoodieValidationException operationEx = assertThrows(HoodieValidationException.class, + () -> new HoodieTableFactory().createDynamicTableSink(upsertContext)); + assertThat(operationEx.getMessage(), is("Flink Lance base-file writes require append-only INSERT mode. Set '" + + FlinkOptions.OPERATION.key() + "' = 'insert'.")); + + lanceConf.set(FlinkOptions.RECORD_KEY_FIELD, "f0"); + final MockContext keyedContext = MockContext.getInstance(lanceConf, appendOnlySchema, "f2"); HoodieValidationException sinkEx = assertThrows(HoodieValidationException.class, - () -> new HoodieTableFactory().createDynamicTableSink(context)); - assertThat(sinkEx.getMessage(), is(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG)); + () -> new HoodieTableFactory().createDynamicTableSink(keyedContext)); + assertThat(sinkEx.getMessage(), is("Flink Lance base-file support is only available for append-only tables without primary keys.")); } // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java index 9934d8a4e1605..379a769ec8153 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; import org.apache.hudi.common.schema.HoodieSchema; @@ -380,6 +381,35 @@ public void testCreateNonAppendTableWithoutRecordKey() { assertEquals("Primary key definition is missing", exception.getMessage()); } + @Test + public void testCreateAppendOnlyLanceTableWithoutPrimaryKey() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb_lance_append_only"); + Map lanceOptions = new HashMap<>(EXPECTED_OPTIONS); + lanceOptions.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); + lanceOptions.put(FlinkOptions.OPERATION.key(), "insert"); + lanceOptions.put(FlinkOptions.PRE_COMBINE.key(), "false"); + lanceOptions.put(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.LANCE.name()); + ResolvedSchema appendOnlySchema = new ResolvedSchema(CREATE_COLUMNS, Collections.emptyList(), null); + ResolvedCatalogTable lanceTable = new ResolvedCatalogTable( + CatalogUtils.createCatalogTable( + Schema.newBuilder().fromResolvedSchema(appendOnlySchema).build(), + Arrays.asList("partition"), + lanceOptions, + "test_lance_append_only"), + appendOnlySchema + ); + + catalog.createTable(tablePath, lanceTable, false); + + assertTrue(catalog.tableExists(tablePath)); + CatalogBaseTable actualTable = catalog.getTable(tablePath); + assertFalse(actualTable.getOptions().containsKey(TableOptionProperties.PK_COLUMNS)); + HoodieTableMetaClient metaClient = createMetaClient( + new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new Configuration())), + catalog.inferTablePath(catalogPathStr, tablePath)); + assertThat(metaClient.getTableConfig().getBaseFileFormat(), is(HoodieFileFormat.LANCE)); + } + @Test void testCreateTableWithPartitionBucketIndex() throws TableAlreadyExistException, DatabaseNotExistException, IOException { String rule = "regex"; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java index 189e297024fb4..751f63d6de75a 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.source.ExpressionPredicates; import org.apache.hudi.storage.StorageConfiguration; -import org.apache.hudi.storage.StoragePath; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -44,7 +43,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -134,14 +132,6 @@ void testConstructEngineRecordWithNullUpdate() { assertTrue(result.getBoolean(2)); } - @Test - void testLanceFormatThrowsInGetFileRecordIterator() { - StoragePath lancePath = new StoragePath("/tmp/test-table/partition/file.lance"); - UnsupportedOperationException ex = assertThrows(UnsupportedOperationException.class, - () -> readerContext.getFileRecordIterator(lancePath, 0, 100, SCHEMA, SCHEMA, null)); - assertEquals(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG, ex.getMessage()); - } - private GenericRowData createBaseRow(int id, String name, boolean active) { return GenericRowData.of(id, StringData.fromString(name), active); }