Skip to content

Commit

Permalink
[AMORO-2266] Improve combined data reader performance (apache#2268)
Browse files Browse the repository at this point in the history
* [AMORO-2266] Improve combined data reader performance

Signed-off-by: Qishang Zhong <zhongqishang@gmail.com>

* Add doc

* Fix spotless apply

* [AMORO-2266] Use guava bloom filter

Signed-off-by: Qishang Zhong <zhongqishang@gmail.com>

* [AMORO-2266] Use guava bloom filter

Signed-off-by: Qishang Zhong <zhongqishang@gmail.com>

* Fix structLike serialize

* Fix `rewrittenDataRecordCnt` to constructor

* Add test case for various types

* Fix test name and add assert

* Fix comment

* Move readIdentifierData() to CombinedDeleteFilter

* Remove StructLikeWrapper

* Fix comment

* Fix comment and add test

* Add units test

* fix log

Signed-off-by: Qishang Zhong <zhongqishang@gmail.com>

* Fix filterEqDelete trigger logic

* Add parameter FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT

* Fix test

---------

Signed-off-by: Qishang Zhong <zhongqishang@gmail.com>
  • Loading branch information
zhongqishang authored and ShawHee committed Dec 29, 2023
1 parent 32a453b commit 3accd93
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netease.arctic.hive.TestHMS;
import com.netease.arctic.hive.catalog.HiveCatalogTestHelper;
import com.netease.arctic.hive.catalog.HiveTableTestHelper;
import com.netease.arctic.io.reader.CombinedDeleteFilter;
import com.netease.arctic.server.optimizing.flow.checker.DataConcurrencyChecker;
import com.netease.arctic.server.optimizing.flow.checker.FullOptimizingMove2HiveChecker;
import com.netease.arctic.server.optimizing.flow.checker.FullOptimizingWrite2HiveChecker;
Expand Down Expand Up @@ -61,7 +62,7 @@ public static Object[] parameters() {
{
new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG), new BasicTableTestHelper(true, false)
},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(true, false)},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(true, true)},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(true, false)},
{
new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf()),
Expand All @@ -88,6 +89,7 @@ public void run() throws Exception {

int cycle = 5;
int recordCountOnceWrite = 2500;
CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 2499L;

// close full optimize
table.updateProperties().set(SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, "-1").commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static Object[] parameters() {
new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
new BasicTableTestHelper(false, false)
},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(false, false)},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(false, true)},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(false, false)},
{
new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ private List<Record> randomRecord(int count) {
}

private boolean equRecord(Record r1, Record r2) {
if ((r1 == null && r2 != null) || (r1 != null && r2 == null)) {
return false;
}
if (r2.size() < schemaSize) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import com.netease.arctic.io.ArcticFileIO;
import com.netease.arctic.io.CloseablePredicate;
import com.netease.arctic.optimizing.RewriteFilesInput;
import com.netease.arctic.utils.ContentFiles;
import com.netease.arctic.utils.map.StructLikeBaseMap;
import com.netease.arctic.utils.map.StructLikeCollections;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
Expand All @@ -41,29 +43,38 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.Filter;
import org.apache.paimon.shade.guava30.com.google.common.hash.BloomFilter;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Special point: 1. Apply all delete file to all data file 2. EQUALITY_DELETES only be written by
* flink in current, so the schemas of all EQUALITY_DELETES is primary key
* Special point:
*
* <ul>
* <li>Apply all delete file to all data file
* <li>EQUALITY_DELETES only be written by flink in current, so the schemas of all
* EQUALITY_DELETES is primary key
* </ul>
*/
public abstract class CombinedDeleteFilter<T extends StructLike> {

Expand All @@ -76,6 +87,9 @@ public abstract class CombinedDeleteFilter<T extends StructLike> {
private static final Accessor<StructLike> POSITION_ACCESSOR =
POS_DELETE_SCHEMA.accessorForField(MetadataColumns.DELETE_FILE_POS.fieldId());

@VisibleForTesting public static long FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 1000000L;

private final RewriteFilesInput input;
private final List<DeleteFile> posDeletes;
private final List<DeleteFile> eqDeletes;

Expand All @@ -91,15 +105,20 @@ public abstract class CombinedDeleteFilter<T extends StructLike> {

private StructLikeCollections structLikeCollections = StructLikeCollections.DEFAULT;

private final long dataRecordCnt;
private final boolean filterEqDelete;

protected CombinedDeleteFilter(
ContentFile<?>[] deleteFiles,
Set<String> positionPathSets,
RewriteFilesInput rewriteFilesInput,
Schema tableSchema,
StructLikeCollections structLikeCollections) {
this.input = rewriteFilesInput;
this.dataRecordCnt =
Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum();
ImmutableList.Builder<DeleteFile> posDeleteBuilder = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> eqDeleteBuilder = ImmutableList.builder();
if (deleteFiles != null) {
for (ContentFile<?> delete : deleteFiles) {
if (rewriteFilesInput.deleteFiles() != null) {
for (ContentFile<?> delete : rewriteFilesInput.deleteFiles()) {
switch (delete.content()) {
case POSITION_DELETES:
posDeleteBuilder.add(ContentFiles.asDeleteFile(delete));
Expand All @@ -121,15 +140,36 @@ protected CombinedDeleteFilter(
}
}
}

this.positionPathSets = positionPathSets;
this.positionPathSets =
Arrays.stream(rewriteFilesInput.dataFiles())
.map(s -> s.path().toString())
.collect(Collectors.toSet());
this.posDeletes = posDeleteBuilder.build();
this.eqDeletes = eqDeleteBuilder.build();
this.deleteSchema = TypeUtil.select(tableSchema, deleteIds);

if (structLikeCollections != null) {
this.structLikeCollections = structLikeCollections;
}
this.filterEqDelete = filterEqDelete();
}

/**
* Whether to use {@link BloomFilter} to filter eq delete and reduce the amount of data written to
* {@link StructLikeBaseMap} by eq delete
*/
private boolean filterEqDelete() {
long eqDeleteRecordCnt =
Arrays.stream(input.deleteFiles())
.filter(file -> file.content() == FileContent.EQUALITY_DELETES)
.mapToLong(ContentFile::recordCount)
.sum();
return eqDeleteRecordCnt > FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT;
}

@VisibleForTesting
public boolean isFilterEqDelete() {
return filterEqDelete;
}

protected abstract InputFile getInputFile(String location);
Expand Down Expand Up @@ -186,20 +226,42 @@ private Predicate<StructForDelete<T>> applyEqDeletes() {
return record -> false;
}

InternalRecordWrapper internalRecordWrapper =
new InternalRecordWrapper(deleteSchema.asStruct());

BloomFilter<StructLike> bloomFilter = null;
if (filterEqDelete) {
LOG.debug(
"Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data count is {}",
dataRecordCnt);
// one million data is about 1.71M memory usage
bloomFilter = BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001);
try (CloseableIterable<Record> deletes =
CloseableIterable.concat(
CloseableIterable.transform(
CloseableIterable.withNoopClose(
Arrays.stream(input.dataFiles()).collect(Collectors.toList())),
s -> openFile(s, deleteSchema)))) {
for (Record record : deletes) {
StructLike identifier = internalRecordWrapper.copyFor(record);
bloomFilter.put(identifier);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

CloseableIterable<RecordWithLsn> deleteRecords =
CloseableIterable.transform(
CloseableIterable.concat(
Iterables.transform(
eqDeletes,
s ->
CloseableIterable.transform(
openDeletes(ContentFiles.asDeleteFile(s), deleteSchema),
openFile(s, deleteSchema),
r -> new RecordWithLsn(s.dataSequenceNumber(), r)))),
RecordWithLsn::recordCopy);

InternalRecordWrapper internalRecordWrapper =
new InternalRecordWrapper(deleteSchema.asStruct());

StructLikeBaseMap<Long> structLikeMap =
structLikeCollections.createStructLikeMap(deleteSchema.asStruct());

Expand All @@ -211,8 +273,11 @@ private Predicate<StructForDelete<T>> applyEqDeletes() {
: getArcticFileIo().doAs(deletes::iterator);
while (it.hasNext()) {
RecordWithLsn recordWithLsn = it.next();
Long lsn = recordWithLsn.getLsn();
StructLike deletePK = internalRecordWrapper.copyFor(recordWithLsn.getRecord());
if (filterEqDelete && !bloomFilter.mightContain(deletePK)) {
continue;
}
Long lsn = recordWithLsn.getLsn();
Long old = structLikeMap.get(deletePK);
if (old == null || old.compareTo(lsn) <= 0) {
structLikeMap.put(deletePK, lsn);
Expand Down Expand Up @@ -318,12 +383,12 @@ protected boolean shouldKeep(StructForDelete<T> item) {
}

private CloseableIterable<Record> openPosDeletes(DeleteFile file) {
return openDeletes(file, POS_DELETE_SCHEMA);
return openFile(file, POS_DELETE_SCHEMA);
}

private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema deleteSchema) {
InputFile input = getInputFile(deleteFile.path().toString());
switch (deleteFile.format()) {
private CloseableIterable<Record> openFile(ContentFile<?> contentFile, Schema deleteSchema) {
InputFile input = getInputFile(contentFile.path().toString());
switch (contentFile.format()) {
case AVRO:
return Avro.read(input)
.project(deleteSchema)
Expand All @@ -350,7 +415,7 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
throw new UnsupportedOperationException(
String.format(
"Cannot read deletes, %s is not a supported format: %s",
deleteFile.format().name(), deleteFile.path()));
contentFile.format().name(), contentFile.path()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.netease.arctic.optimizing.RewriteFilesInput;
import com.netease.arctic.scan.CombinedIcebergScanTask;
import com.netease.arctic.utils.map.StructLikeCollections;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -36,6 +35,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -63,8 +63,6 @@ public class GenericCombinedIcebergDataReader implements OptimizingDataReader {
protected final ArcticFileIO fileIO;
protected final BiFunction<Type, Object, Object> convertConstant;
protected final boolean reuseContainer;

protected final ContentFile[] deleteFiles;
protected CombinedDeleteFilter<Record> deleteFilter;

protected PartitionSpec spec;
Expand All @@ -89,15 +87,11 @@ public GenericCombinedIcebergDataReader(
this.convertConstant = convertConstant;
this.reuseContainer = reuseContainer;
this.input = rewriteFilesInput;
this.deleteFiles = rewriteFilesInput.deleteFiles();
Set<String> positionPathSet =
Arrays.stream(rewriteFilesInput.dataFiles())
.map(s -> s.path().toString())
.collect(Collectors.toSet());
this.deleteFilter =
new GenericDeleteFilter(deleteFiles, positionPathSet, tableSchema, structLikeCollections);
new GenericDeleteFilter(rewriteFilesInput, tableSchema, structLikeCollections);
}

@Override
public CloseableIterable<Record> readData() {
if (input.rewrittenDataFiles() == null) {
return CloseableIterable.empty();
Expand All @@ -116,14 +110,15 @@ public CloseableIterable<Record> readData() {
StructForDelete<Record> structForDelete =
new StructForDelete<>(requireSchema, deleteFilter.deleteIds());
CloseableIterable<StructForDelete<Record>> structForDeleteCloseableIterable =
CloseableIterable.transform(concat, record -> structForDelete.wrap(record));
CloseableIterable.transform(concat, structForDelete::wrap);

CloseableIterable<Record> iterable =
CloseableIterable.transform(
deleteFilter.filter(structForDeleteCloseableIterable), StructForDelete::recover);
return iterable;
}

@Override
public CloseableIterable<Record> readDeletedData() {
if (input.rePosDeletedDataFiles() == null) {
return CloseableIterable.empty();
Expand All @@ -147,7 +142,7 @@ public CloseableIterable<Record> readDeletedData() {
StructForDelete<Record> structForDelete =
new StructForDelete<>(requireSchema, deleteFilter.deleteIds());
CloseableIterable<StructForDelete<Record>> structForDeleteCloseableIterable =
CloseableIterable.transform(concat, record -> structForDelete.wrap(record));
CloseableIterable.transform(concat, structForDelete::wrap);

CloseableIterable<Record> iterable =
CloseableIterable.transform(
Expand Down Expand Up @@ -286,14 +281,18 @@ private static Schema fileProjection(
return new Schema(columns);
}

@VisibleForTesting
public CombinedDeleteFilter<Record> getDeleteFilter() {
return deleteFilter;
}

protected class GenericDeleteFilter extends CombinedDeleteFilter<Record> {

public GenericDeleteFilter(
ContentFile[] deleteFiles,
Set<String> positionPathSets,
RewriteFilesInput rewriteFilesInput,
Schema tableSchema,
StructLikeCollections structLikeCollections) {
super(deleteFiles, positionPathSets, tableSchema, structLikeCollections);
super(rewriteFilesInput, tableSchema, structLikeCollections);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.netease.arctic.io.reader;

import com.netease.arctic.utils.SerializationUtil;
import org.apache.iceberg.StructLike;
import org.apache.paimon.shade.guava30.com.google.common.hash.Funnel;
import org.apache.paimon.shade.guava30.com.google.common.hash.PrimitiveSink;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;

public enum StructLikeFunnel implements Funnel<StructLike> {
INSTANCE;

StructLikeFunnel() {}

@Override
public void funnel(@NotNull StructLike structLike, PrimitiveSink primitiveSink) {
StructLike copy = SerializationUtil.StructLikeCopy.copy(structLike);
try {
primitiveSink.putBytes(SerializationUtil.kryoSerialize(copy));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public T deserialize(byte[] bytes) {
}
}

private static class StructLikeCopy implements StructLike {
public static class StructLikeCopy implements StructLike {

public static StructLike copy(StructLike struct) {
return struct != null ? new StructLikeCopy(struct) : null;
Expand Down
Loading

0 comments on commit 3accd93

Please sign in to comment.