Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2266] Improve combined data reader performance #2268

Merged
merged 24 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
960cf88
[AMORO-2266] Improve combined data reader performance
zhongqishang Nov 8, 2023
b3a0f31
Add doc
zhongqishang Nov 9, 2023
e151ff6
Fix spotless apply
zhongqishang Nov 9, 2023
6d0146a
[AMORO-2266] Use guava bloom filter
zhongqishang Nov 9, 2023
369889c
[AMORO-2266] Use guava bloom filter
zhongqishang Nov 9, 2023
33c4e8a
Merge branch 'master' into AMORO-2266
zhongqishang Nov 9, 2023
142f917
Fix structLike serialize
zhongqishang Nov 10, 2023
d93953c
Merge branch 'master' into AMORO-2266
zhongqishang Nov 10, 2023
409f576
Fix `rewrittenDataRecordCnt` to constructor
zhongqishang Nov 10, 2023
8766f92
Add test case for various types
zhongqishang Nov 10, 2023
b799144
Fix test name and add assert
zhongqishang Nov 10, 2023
306216c
Fix comment
zhongqishang Nov 10, 2023
5a62e0d
Move readIdentifierData() to CombinedDeleteFilter
zhongqishang Nov 10, 2023
a2b8f24
Remove StructLikeWrapper
zhongqishang Nov 10, 2023
acb3048
Fix comment
zhongqishang Nov 13, 2023
7ca89cb
Merge branch 'master' into AMORO-2266
zhongqishang Nov 13, 2023
80b6d53
Fix comment and add test
zhongqishang Nov 13, 2023
cfcf8fb
Add units test
zhongqishang Nov 13, 2023
dfcab5b
Merge branch 'master' into AMORO-2266
zhongqishang Nov 13, 2023
667473e
fix log
zhongqishang Nov 13, 2023
52687d5
Fix filterEqDelete trigger logic
zhongqishang Nov 14, 2023
3a33fb1
Add parameter FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT
zhongqishang Nov 14, 2023
0df2a78
Merge branch 'master' into AMORO-2266
zhongqishang Nov 14, 2023
8c24587
Fix test
zhongqishang Nov 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netease.arctic.hive.TestHMS;
import com.netease.arctic.hive.catalog.HiveCatalogTestHelper;
import com.netease.arctic.hive.catalog.HiveTableTestHelper;
import com.netease.arctic.io.reader.CombinedDeleteFilter;
import com.netease.arctic.server.optimizing.flow.checker.DataConcurrencyChecker;
import com.netease.arctic.server.optimizing.flow.checker.FullOptimizingMove2HiveChecker;
import com.netease.arctic.server.optimizing.flow.checker.FullOptimizingWrite2HiveChecker;
Expand Down Expand Up @@ -61,7 +62,7 @@ public static Object[] parameters() {
{
new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG), new BasicTableTestHelper(true, false)
},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(true, false)},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(true, true)},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(true, false)},
{
new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf()),
Expand All @@ -88,6 +89,7 @@ public void run() throws Exception {

int cycle = 5;
int recordCountOnceWrite = 2500;
CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 2499L;

// close full optimize
table.updateProperties().set(SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, "-1").commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static Object[] parameters() {
new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
new BasicTableTestHelper(false, false)
},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(false, false)},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(false, true)},
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(false, false)},
{
new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ private List<Record> randomRecord(int count) {
}

private boolean equRecord(Record r1, Record r2) {
if ((r1 == null && r2 != null) || (r1 != null && r2 == null)) {
return false;
}
if (r2.size() < schemaSize) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import com.netease.arctic.io.ArcticFileIO;
import com.netease.arctic.io.CloseablePredicate;
import com.netease.arctic.optimizing.RewriteFilesInput;
import com.netease.arctic.utils.ContentFiles;
import com.netease.arctic.utils.map.StructLikeBaseMap;
import com.netease.arctic.utils.map.StructLikeCollections;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
Expand All @@ -41,29 +43,38 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.Filter;
import org.apache.paimon.shade.guava30.com.google.common.hash.BloomFilter;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Special point: 1. Apply all delete file to all data file 2. EQUALITY_DELETES only be written by
* flink in current, so the schemas of all EQUALITY_DELETES is primary key
* Special point:
*
* <ul>
* <li>Apply all delete file to all data file
* <li>EQUALITY_DELETES only be written by flink in current, so the schemas of all
* EQUALITY_DELETES is primary key
* </ul>
*/
public abstract class CombinedDeleteFilter<T extends StructLike> {

Expand All @@ -76,6 +87,9 @@
private static final Accessor<StructLike> POSITION_ACCESSOR =
POS_DELETE_SCHEMA.accessorForField(MetadataColumns.DELETE_FILE_POS.fieldId());

@VisibleForTesting public static long FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 1000000L;

private final RewriteFilesInput input;
private final List<DeleteFile> posDeletes;
private final List<DeleteFile> eqDeletes;

Expand All @@ -91,15 +105,20 @@

private StructLikeCollections structLikeCollections = StructLikeCollections.DEFAULT;

private final long dataRecordCnt;
private final boolean filterEqDelete;

protected CombinedDeleteFilter(
ContentFile<?>[] deleteFiles,
Set<String> positionPathSets,
RewriteFilesInput rewriteFilesInput,
Schema tableSchema,
StructLikeCollections structLikeCollections) {
this.input = rewriteFilesInput;
this.dataRecordCnt =
Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum();
ImmutableList.Builder<DeleteFile> posDeleteBuilder = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> eqDeleteBuilder = ImmutableList.builder();
if (deleteFiles != null) {
for (ContentFile<?> delete : deleteFiles) {
if (rewriteFilesInput.deleteFiles() != null) {
for (ContentFile<?> delete : rewriteFilesInput.deleteFiles()) {
switch (delete.content()) {
case POSITION_DELETES:
posDeleteBuilder.add(ContentFiles.asDeleteFile(delete));
Expand All @@ -121,15 +140,36 @@
}
}
}

this.positionPathSets = positionPathSets;
this.positionPathSets =
Arrays.stream(rewriteFilesInput.dataFiles())
.map(s -> s.path().toString())
.collect(Collectors.toSet());
this.posDeletes = posDeleteBuilder.build();
this.eqDeletes = eqDeleteBuilder.build();
this.deleteSchema = TypeUtil.select(tableSchema, deleteIds);

if (structLikeCollections != null) {
this.structLikeCollections = structLikeCollections;
}
this.filterEqDelete = filterEqDelete();
}

/**
* Whether to use {@link BloomFilter} to filter eq delete and reduce the amount of data written to
* {@link StructLikeBaseMap} by eq delete
*/
private boolean filterEqDelete() {
long eqDeleteRecordCnt =
Arrays.stream(input.deleteFiles())
.filter(file -> file.content() == FileContent.EQUALITY_DELETES)
.mapToLong(ContentFile::recordCount)
.sum();
return eqDeleteRecordCnt > FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT;
}

@VisibleForTesting
public boolean isFilterEqDelete() {
return filterEqDelete;
}

protected abstract InputFile getInputFile(String location);
Expand Down Expand Up @@ -186,20 +226,42 @@
return record -> false;
}

InternalRecordWrapper internalRecordWrapper =
new InternalRecordWrapper(deleteSchema.asStruct());

BloomFilter<StructLike> bloomFilter = null;
if (filterEqDelete) {
LOG.debug(
"Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data count is {}",
dataRecordCnt);
// one million data is about 1.71M memory usage
bloomFilter = BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001);
try (CloseableIterable<Record> deletes =
CloseableIterable.concat(
CloseableIterable.transform(
CloseableIterable.withNoopClose(
Arrays.stream(input.dataFiles()).collect(Collectors.toList())),
s -> openFile(s, deleteSchema)))) {
for (Record record : deletes) {
StructLike identifier = internalRecordWrapper.copyFor(record);
bloomFilter.put(identifier);
}
} catch (IOException e) {
throw new RuntimeException(e);

Check warning on line 250 in core/src/main/java/com/netease/arctic/io/reader/CombinedDeleteFilter.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/netease/arctic/io/reader/CombinedDeleteFilter.java#L249-L250

Added lines #L249 - L250 were not covered by tests
}
}

CloseableIterable<RecordWithLsn> deleteRecords =
CloseableIterable.transform(
CloseableIterable.concat(
Iterables.transform(
eqDeletes,
s ->
CloseableIterable.transform(
openDeletes(ContentFiles.asDeleteFile(s), deleteSchema),
openFile(s, deleteSchema),
r -> new RecordWithLsn(s.dataSequenceNumber(), r)))),
RecordWithLsn::recordCopy);

InternalRecordWrapper internalRecordWrapper =
new InternalRecordWrapper(deleteSchema.asStruct());

StructLikeBaseMap<Long> structLikeMap =
structLikeCollections.createStructLikeMap(deleteSchema.asStruct());

Expand All @@ -211,8 +273,11 @@
: getArcticFileIo().doAs(deletes::iterator);
while (it.hasNext()) {
RecordWithLsn recordWithLsn = it.next();
Long lsn = recordWithLsn.getLsn();
StructLike deletePK = internalRecordWrapper.copyFor(recordWithLsn.getRecord());
if (filterEqDelete && !bloomFilter.mightContain(deletePK)) {
continue;
}
Long lsn = recordWithLsn.getLsn();
Long old = structLikeMap.get(deletePK);
if (old == null || old.compareTo(lsn) <= 0) {
structLikeMap.put(deletePK, lsn);
Expand Down Expand Up @@ -318,12 +383,12 @@
}

private CloseableIterable<Record> openPosDeletes(DeleteFile file) {
return openDeletes(file, POS_DELETE_SCHEMA);
return openFile(file, POS_DELETE_SCHEMA);
}

private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema deleteSchema) {
InputFile input = getInputFile(deleteFile.path().toString());
switch (deleteFile.format()) {
private CloseableIterable<Record> openFile(ContentFile<?> contentFile, Schema deleteSchema) {
InputFile input = getInputFile(contentFile.path().toString());
switch (contentFile.format()) {
case AVRO:
return Avro.read(input)
.project(deleteSchema)
Expand All @@ -350,7 +415,7 @@
throw new UnsupportedOperationException(
String.format(
"Cannot read deletes, %s is not a supported format: %s",
deleteFile.format().name(), deleteFile.path()));
contentFile.format().name(), contentFile.path()));

Check warning on line 418 in core/src/main/java/com/netease/arctic/io/reader/CombinedDeleteFilter.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/netease/arctic/io/reader/CombinedDeleteFilter.java#L418

Added line #L418 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.netease.arctic.optimizing.RewriteFilesInput;
import com.netease.arctic.scan.CombinedIcebergScanTask;
import com.netease.arctic.utils.map.StructLikeCollections;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -36,6 +35,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -63,8 +63,6 @@ public class GenericCombinedIcebergDataReader implements OptimizingDataReader {
protected final ArcticFileIO fileIO;
protected final BiFunction<Type, Object, Object> convertConstant;
protected final boolean reuseContainer;

protected final ContentFile[] deleteFiles;
protected CombinedDeleteFilter<Record> deleteFilter;

protected PartitionSpec spec;
Expand All @@ -89,15 +87,11 @@ public GenericCombinedIcebergDataReader(
this.convertConstant = convertConstant;
this.reuseContainer = reuseContainer;
this.input = rewriteFilesInput;
this.deleteFiles = rewriteFilesInput.deleteFiles();
Set<String> positionPathSet =
Arrays.stream(rewriteFilesInput.dataFiles())
.map(s -> s.path().toString())
.collect(Collectors.toSet());
this.deleteFilter =
new GenericDeleteFilter(deleteFiles, positionPathSet, tableSchema, structLikeCollections);
new GenericDeleteFilter(rewriteFilesInput, tableSchema, structLikeCollections);
}

@Override
public CloseableIterable<Record> readData() {
if (input.rewrittenDataFiles() == null) {
return CloseableIterable.empty();
Expand All @@ -116,14 +110,15 @@ public CloseableIterable<Record> readData() {
StructForDelete<Record> structForDelete =
new StructForDelete<>(requireSchema, deleteFilter.deleteIds());
CloseableIterable<StructForDelete<Record>> structForDeleteCloseableIterable =
CloseableIterable.transform(concat, record -> structForDelete.wrap(record));
CloseableIterable.transform(concat, structForDelete::wrap);

CloseableIterable<Record> iterable =
CloseableIterable.transform(
deleteFilter.filter(structForDeleteCloseableIterable), StructForDelete::recover);
return iterable;
}

@Override
public CloseableIterable<Record> readDeletedData() {
if (input.rePosDeletedDataFiles() == null) {
return CloseableIterable.empty();
Expand All @@ -147,7 +142,7 @@ public CloseableIterable<Record> readDeletedData() {
StructForDelete<Record> structForDelete =
new StructForDelete<>(requireSchema, deleteFilter.deleteIds());
CloseableIterable<StructForDelete<Record>> structForDeleteCloseableIterable =
CloseableIterable.transform(concat, record -> structForDelete.wrap(record));
CloseableIterable.transform(concat, structForDelete::wrap);

CloseableIterable<Record> iterable =
CloseableIterable.transform(
Expand Down Expand Up @@ -286,14 +281,18 @@ private static Schema fileProjection(
return new Schema(columns);
}

@VisibleForTesting
public CombinedDeleteFilter<Record> getDeleteFilter() {
return deleteFilter;
}

protected class GenericDeleteFilter extends CombinedDeleteFilter<Record> {

public GenericDeleteFilter(
ContentFile[] deleteFiles,
Set<String> positionPathSets,
RewriteFilesInput rewriteFilesInput,
Schema tableSchema,
StructLikeCollections structLikeCollections) {
super(deleteFiles, positionPathSets, tableSchema, structLikeCollections);
super(rewriteFilesInput, tableSchema, structLikeCollections);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.netease.arctic.io.reader;

import com.netease.arctic.utils.SerializationUtil;
import org.apache.iceberg.StructLike;
import org.apache.paimon.shade.guava30.com.google.common.hash.Funnel;
import org.apache.paimon.shade.guava30.com.google.common.hash.PrimitiveSink;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;

public enum StructLikeFunnel implements Funnel<StructLike> {
INSTANCE;

StructLikeFunnel() {}

@Override
public void funnel(@NotNull StructLike structLike, PrimitiveSink primitiveSink) {
StructLike copy = SerializationUtil.StructLikeCopy.copy(structLike);
try {
primitiveSink.putBytes(SerializationUtil.kryoSerialize(copy));
} catch (IOException e) {
throw new RuntimeException(e);

Check warning on line 22 in core/src/main/java/com/netease/arctic/io/reader/StructLikeFunnel.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/netease/arctic/io/reader/StructLikeFunnel.java#L21-L22

Added lines #L21 - L22 were not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public T deserialize(byte[] bytes) {
}
}

private static class StructLikeCopy implements StructLike {
public static class StructLikeCopy implements StructLike {

public static StructLike copy(StructLike struct) {
return struct != null ? new StructLikeCopy(struct) : null;
Expand Down
Loading