Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
<td><p>Enum</p></td>
<td>Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads.<br /><br />Possible values:<ul><li>"none": No changelog file.</li><li>"input": Double write to a changelog file when flushing memory table, the changelog is from input.</li><li>"full-compaction": Generate changelog files with each full compaction.</li><li>"lookup": Generate changelog files through 'lookup' before committing the data writing.</li></ul></td>
</tr>
<tr>
<td><h5>changelog-producer.row-deduplicate</h5></td>
<td style="word-wrap: break-word;">false</td>
<td><p>Boolean</p></td>
<td>Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.</td>
</tr>
<tr>
<td><h5>commit.force-compact</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,13 @@ public class CoreOptions implements Serializable {
+ "This changelog file keeps the details of data changes, "
+ "it can be read directly during stream reads.");

public static final ConfigOption<Boolean> CHANGELOG_PRODUCER_ROW_DEDUPLICATE =
key("changelog-producer.row-deduplicate")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.");

@Immutable
public static final ConfigOption<String> SEQUENCE_FIELD =
key("sequence.field")
Expand Down Expand Up @@ -807,6 +814,10 @@ public ChangelogProducer changelogProducer() {
return options.get(CHANGELOG_PRODUCER);
}

public boolean changelogRowDeduplicate() {
return options.get(CHANGELOG_PRODUCER_ROW_DEDUPLICATE);
}

public boolean scanPlanSortPartition() {
return options.get(SCAN_PLAN_SORT_PARTITION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.ValueComparatorSupplier;

import java.util.Comparator;
import java.util.function.Supplier;
Expand All @@ -44,6 +45,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
private final RowType valueType;
private final KeyValueFieldsExtractor keyValueFieldsExtractor;
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
private final Supplier<Comparator<InternalRow>> valueComparatorSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;

public KeyValueFileStore(
Expand All @@ -64,6 +66,7 @@ public KeyValueFileStore(
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
this.mfFactory = mfFactory;
this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
this.valueComparatorSupplier = new ValueComparatorSupplier(valueType);
}

@Override
Expand Down Expand Up @@ -101,6 +104,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
keyType,
valueType,
keyComparatorSupplier,
valueComparatorSupplier,
mfFactory,
pathFactory(),
snapshotManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,20 @@
/** A {@link MergeTreeCompactRewriter} which produces changelog files for the compaction. */
public abstract class ChangelogMergeTreeRewriter extends MergeTreeCompactRewriter {

protected final Comparator<InternalRow> valueComparator;
protected final boolean changelogRowDeduplicate;

public ChangelogMergeTreeRewriter(
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
SortEngine sortEngine) {
SortEngine sortEngine,
Comparator<InternalRow> valueComparator,
boolean changelogRowDeduplicate) {
super(readerFactory, writerFactory, keyComparator, mfFactory, sortEngine);
this.valueComparator = valueComparator;
this.changelogRowDeduplicate = changelogRowDeduplicate;
}

protected abstract boolean rewriteChangelog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.paimon.mergetree.compact;

import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Preconditions;

import java.util.Comparator;

/**
* Wrapper for {@link MergeFunction}s to produce changelog during a full compaction.
*
Expand All @@ -38,6 +41,8 @@ public class FullChangelogMergeFunctionWrapper implements MergeFunctionWrapper<C

private final MergeFunction<KeyValue> mergeFunction;
private final int maxLevel;
private final Comparator<InternalRow> valueComparator;
private final boolean changelogRowDeduplicate;

// only full compaction will write files into maxLevel, see UniversalCompaction class
private KeyValue topLevelKv;
Expand All @@ -48,13 +53,19 @@ public class FullChangelogMergeFunctionWrapper implements MergeFunctionWrapper<C
private final KeyValue reusedBefore = new KeyValue();
private final KeyValue reusedAfter = new KeyValue();

public FullChangelogMergeFunctionWrapper(MergeFunction<KeyValue> mergeFunction, int maxLevel) {
public FullChangelogMergeFunctionWrapper(
MergeFunction<KeyValue> mergeFunction,
int maxLevel,
Comparator<InternalRow> valueComparator,
boolean changelogRowDeduplicate) {
Preconditions.checkArgument(
!(mergeFunction instanceof ValueCountMergeFunction),
"Value count merge function does not need to produce changelog from full compaction. "
+ "Please set changelog producer to 'input'.");
this.mergeFunction = mergeFunction;
this.maxLevel = maxLevel;
this.valueComparator = valueComparator;
this.changelogRowDeduplicate = changelogRowDeduplicate;
}

@Override
Expand Down Expand Up @@ -99,12 +110,13 @@ public ChangelogResult getResult() {
reusedResult.addChangelog(replace(reusedAfter, RowKind.INSERT, merged));
}
} else {
if (merged != null && isAdd(merged)) {
if (merged == null || !isAdd(merged)) {
reusedResult.addChangelog(replace(reusedBefore, RowKind.DELETE, topLevelKv));
} else if (!changelogRowDeduplicate
|| valueComparator.compare(topLevelKv.value(), merged.value()) != 0) {
reusedResult
.addChangelog(replace(reusedBefore, RowKind.UPDATE_BEFORE, topLevelKv))
.addChangelog(replace(reusedAfter, RowKind.UPDATE_AFTER, merged));
} else {
reusedResult.addChangelog(replace(reusedBefore, RowKind.DELETE, topLevelKv));
}
}
return reusedResult.setResultIfNotRetract(merged);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,17 @@ public FullChangelogMergeTreeCompactRewriter(
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
SortEngine sortEngine) {
super(readerFactory, writerFactory, keyComparator, mfFactory, sortEngine);
SortEngine sortEngine,
Comparator<InternalRow> valueComparator,
boolean changelogRowDeduplicate) {
super(
readerFactory,
writerFactory,
keyComparator,
mfFactory,
sortEngine,
valueComparator,
changelogRowDeduplicate);
this.maxLevel = maxLevel;
}

Expand All @@ -66,7 +75,8 @@ protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) {

@Override
protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int outputLevel) {
return new FullChangelogMergeFunctionWrapper(mfFactory.create(), maxLevel);
return new FullChangelogMergeFunctionWrapper(
mfFactory.create(), maxLevel, valueComparator, changelogRowDeduplicate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.RowKind;

import java.util.Comparator;
import java.util.function.Function;

import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -50,10 +51,14 @@ public class LookupChangelogMergeFunctionWrapper implements MergeFunctionWrapper
private final ChangelogResult reusedResult = new ChangelogResult();
private final KeyValue reusedBefore = new KeyValue();
private final KeyValue reusedAfter = new KeyValue();
private final Comparator<InternalRow> valueComparator;
private final boolean changelogRowDeduplicate;

public LookupChangelogMergeFunctionWrapper(
MergeFunctionFactory<KeyValue> mergeFunctionFactory,
Function<InternalRow, KeyValue> lookup) {
Function<InternalRow, KeyValue> lookup,
Comparator<InternalRow> valueComparator,
boolean changelogRowDeduplicate) {
MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create();
checkArgument(
mergeFunction instanceof LookupMergeFunction,
Expand All @@ -62,6 +67,8 @@ public LookupChangelogMergeFunctionWrapper(
this.mergeFunction = (LookupMergeFunction) mergeFunction;
this.mergeFunction2 = mergeFunctionFactory.create();
this.lookup = lookup;
this.valueComparator = valueComparator;
this.changelogRowDeduplicate = changelogRowDeduplicate;
}

@Override
Expand Down Expand Up @@ -114,12 +121,13 @@ private void setChangelog(KeyValue before, KeyValue after) {
reusedResult.addChangelog(replaceAfter(RowKind.INSERT, after));
}
} else {
if (isAdd(after)) {
if (!isAdd(after)) {
reusedResult.addChangelog(replaceBefore(RowKind.DELETE, before));
} else if (!changelogRowDeduplicate
|| valueComparator.compare(before.value(), after.value()) != 0) {
reusedResult
.addChangelog(replaceBefore(RowKind.UPDATE_BEFORE, before))
.addChangelog(replaceAfter(RowKind.UPDATE_AFTER, after));
} else {
reusedResult.addChangelog(replaceBefore(RowKind.DELETE, before));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,17 @@ public LookupMergeTreeCompactRewriter(
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
SortEngine sortEngine) {
super(readerFactory, writerFactory, keyComparator, mfFactory, sortEngine);
SortEngine sortEngine,
Comparator<InternalRow> valueComparator,
boolean changelogRowDeduplicate) {
super(
readerFactory,
writerFactory,
keyComparator,
mfFactory,
sortEngine,
valueComparator,
changelogRowDeduplicate);
this.lookupLevels = lookupLevels;
}

Expand Down Expand Up @@ -85,7 +94,9 @@ protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int outputLev
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
},
valueComparator,
changelogRowDeduplicate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
private final Supplier<Comparator<InternalRow>> valueComparatorSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;
private final CoreOptions options;
private final FileIO fileIO;
Expand All @@ -85,6 +86,7 @@ public KeyValueFileStoreWrite(
RowType keyType,
RowType valueType,
Supplier<Comparator<InternalRow>> keyComparatorSupplier,
Supplier<Comparator<InternalRow>> valueComparatorSupplier,
MergeFunctionFactory<KeyValue> mfFactory,
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
Expand Down Expand Up @@ -115,6 +117,7 @@ public KeyValueFileStoreWrite(
pathFactory,
options.targetFileSize());
this.keyComparatorSupplier = keyComparatorSupplier;
this.valueComparatorSupplier = valueComparatorSupplier;
this.mfFactory = mfFactory;
this.options = options;
}
Expand Down Expand Up @@ -211,7 +214,9 @@ private MergeTreeCompactRewriter createRewriter(
writerFactory,
keyComparator,
mfFactory,
options.sortEngine());
options.sortEngine(),
valueComparatorSupplier.get(),
options.changelogRowDeduplicate());
case LOOKUP:
LookupLevels lookupLevels = createLookupLevels(levels, readerFactory);
return new LookupMergeTreeCompactRewriter(
Expand All @@ -220,7 +225,9 @@ private MergeTreeCompactRewriter createRewriter(
writerFactory,
keyComparator,
mfFactory,
options.sortEngine());
options.sortEngine(),
valueComparatorSupplier.get(),
options.changelogRowDeduplicate());
default:
return new MergeTreeCompactRewriter(
readerFactory,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.GeneratedClass;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.RowType;

import java.util.Comparator;
import java.util.function.Supplier;

/** A {@link Supplier} that returns the comparator for the file store value. */
public class ValueComparatorSupplier implements SerializableSupplier<Comparator<InternalRow>> {

private static final long serialVersionUID = 1L;

private final GeneratedClass<RecordComparator> genRecordComparator;

public ValueComparatorSupplier(RowType keyType) {
genRecordComparator =
CodeGenUtils.generateRecordComparator(keyType.getFieldTypes(), "valueComparator");
}

@Override
public RecordComparator get() {
return genRecordComparator.newInstance(ValueComparatorSupplier.class.getClassLoader());
}
}
Loading