Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.operation.commit.CommitRollback;
import org.apache.paimon.operation.commit.ConflictDetection;
import org.apache.paimon.operation.commit.StrictModeChecker;
import org.apache.paimon.partition.PartitionExpireStrategy;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -291,12 +290,6 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
newIndexFileHandler(),
snapshotManager,
scanner);
StrictModeChecker strictModeChecker =
StrictModeChecker.create(
snapshotManager,
commitUser,
this::newScan,
options.commitStrictModeLastSafeSnapshot().orElse(null));
CommitRollback rollback = null;
TableRollback tableRollback = catalogEnvironment.catalogTableRollback();
if (tableRollback != null) {
Expand All @@ -310,32 +303,17 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
commitUser,
partitionType,
options,
options.partitionDefaultName(),
pathFactory(),
snapshotManager,
manifestFileFactory(),
manifestListFactory(),
indexManifestFileFactory(),
newScan(),
options.bucket(),
options.manifestTargetSize(),
options.manifestFullCompactionThresholdSize(),
options.manifestMergeMinCount(),
partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(),
options.branch(),
this::newScan,
newStatsFileHandler(),
bucketMode(),
options.scanManifestParallelism(),
createCommitPreCallbacks(table),
createCommitCallbacks(commitUser, table),
options.commitMaxRetries(),
options.commitTimeout(),
options.commitMinRetryWait(),
options.commitMaxRetryWait(),
options.rowTrackingEnabled(),
options.commitDiscardDuplicateFiles(),
conflictDetectFactory,
strictModeChecker,
rollback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.paimon.operation.commit.SuccessCommitResult;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.predicate.Predicate;
Expand Down Expand Up @@ -91,6 +90,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -135,31 +135,19 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final String commitUser;
private final RowType partitionType;
private final CoreOptions options;
private final String partitionDefaultName;
private final FileStorePathFactory pathFactory;
private final SnapshotManager snapshotManager;
private final ManifestFile manifestFile;
private final ManifestList manifestList;
private final IndexManifestFile indexManifestFile;
@Nullable private final CommitRollback rollback;
private final CommitScanner scanner;
private final int numBucket;
private final MemorySize manifestTargetSize;
private final MemorySize manifestFullCompactionSize;
private final int manifestMergeMinCount;
private final boolean dynamicPartitionOverwrite;
private final String branchName;
@Nullable private final Integer manifestReadParallelism;
private final List<CommitPreCallback> commitPreCallbacks;
private final List<CommitCallback> commitCallbacks;
private final StatsFileHandler statsFileHandler;
private final BucketMode bucketMode;
private final long commitTimeout;
private final RetryWaiter retryWaiter;
private final int commitMaxRetries;
private final InternalRowPartitionComputer partitionComputer;
private final boolean rowTrackingEnabled;
private final boolean discardDuplicateFiles;
@Nullable private final StrictModeChecker strictModeChecker;
private final ConflictDetection conflictDetection;
private final CommitCleaner commitCleaner;
Expand All @@ -176,32 +164,17 @@ public FileStoreCommitImpl(
String commitUser,
RowType partitionType,
CoreOptions options,
String partitionDefaultName,
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
IndexManifestFile.Factory indexManifestFileFactory,
FileStoreScan scan,
int numBucket,
MemorySize manifestTargetSize,
MemorySize manifestFullCompactionSize,
int manifestMergeMinCount,
boolean dynamicPartitionOverwrite,
String branchName,
Supplier<FileStoreScan> scanSupplier,
StatsFileHandler statsFileHandler,
BucketMode bucketMode,
@Nullable Integer manifestReadParallelism,
List<CommitPreCallback> commitPreCallbacks,
List<CommitCallback> commitCallbacks,
int commitMaxRetries,
long commitTimeout,
long commitMinRetryWait,
long commitMaxRetryWait,
boolean rowTrackingEnabled,
boolean discardDuplicateFiles,
ConflictDetection.Factory conflictDetectFactory,
@Nullable StrictModeChecker strictModeChecker,
@Nullable CommitRollback rollback) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
Expand All @@ -210,26 +183,17 @@ public FileStoreCommitImpl(
this.commitUser = commitUser;
this.partitionType = partitionType;
this.options = options;
this.partitionDefaultName = partitionDefaultName;
this.pathFactory = pathFactory;
this.snapshotManager = snapshotManager;
this.manifestFile = manifestFileFactory.create();
this.manifestList = manifestListFactory.create();
this.indexManifestFile = indexManifestFileFactory.create();
this.rollback = rollback;
this.scanner = new CommitScanner(scan, indexManifestFile, options);
this.numBucket = numBucket;
this.manifestTargetSize = manifestTargetSize;
this.manifestFullCompactionSize = manifestFullCompactionSize;
this.manifestMergeMinCount = manifestMergeMinCount;
this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
this.branchName = branchName;
this.manifestReadParallelism = manifestReadParallelism;
this.scanner = new CommitScanner(scanSupplier.get(), indexManifestFile, options);
this.commitPreCallbacks = commitPreCallbacks;
this.commitCallbacks = commitCallbacks;
this.commitMaxRetries = commitMaxRetries;
this.commitTimeout = commitTimeout;
this.retryWaiter = new RetryWaiter(commitMinRetryWait, commitMaxRetryWait);
this.retryWaiter =
new RetryWaiter(options.commitMinRetryWait(), options.commitMaxRetryWait());
this.partitionComputer =
new InternalRowPartitionComputer(
options.partitionDefaultName(),
Expand All @@ -240,9 +204,16 @@ public FileStoreCommitImpl(
this.commitMetrics = null;
this.statsFileHandler = statsFileHandler;
this.bucketMode = bucketMode;
this.rowTrackingEnabled = rowTrackingEnabled;
this.discardDuplicateFiles = discardDuplicateFiles;
this.strictModeChecker = strictModeChecker;
this.strictModeChecker =
options.commitStrictModeLastSafeSnapshot()
.map(
id ->
new StrictModeChecker(
snapshotManager,
commitUser,
scanSupplier.get(),
id))
.orElse(null);
this.conflictDetection = conflictDetectFactory.create(scanner);
this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile);
}
Expand Down Expand Up @@ -468,7 +439,7 @@ public int overwritePartition(
boolean skipOverwrite = false;
// partition filter is built from static or dynamic partition according to properties
PartitionPredicate partitionFilter = null;
if (dynamicPartitionOverwrite) {
if (partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite()) {
if (changes.appendTableFiles.isEmpty()) {
// in dynamic mode, if there is no changes to commit, no data will be deleted
skipOverwrite = true;
Expand All @@ -482,7 +453,8 @@ public int overwritePartition(
} else {
// partition may be partial partition fields, so here must use predicate way.
Predicate partitionPredicate =
createPartitionPredicate(partition, partitionType, partitionDefaultName);
createPartitionPredicate(
partition, partitionType, options.partitionDefaultName());
partitionFilter =
PartitionPredicate.fromPredicate(partitionType, partitionPredicate);
// sanity check, all changes must be done within the given partition
Expand Down Expand Up @@ -613,16 +585,19 @@ public void dropPartitions(List<Map<String, String>> partitions, long commitIden
PartitionPredicate partitionFilter;
if (fullMode) {
List<BinaryRow> binaryPartitions =
createBinaryPartitions(partitions, partitionType, partitionDefaultName);
createBinaryPartitions(
partitions, partitionType, options.partitionDefaultName());
partitionFilter = PartitionPredicate.fromMultiple(partitionType, binaryPartitions);
} else {
// partitions may be partial partition fields, so here must to use predicate way.
// partitions may be partial partition fields, so here must use predicate way.
Predicate predicate =
partitions.stream()
.map(
partition ->
createPartitionPredicate(
partition, partitionType, partitionDefaultName))
partition,
partitionType,
options.partitionDefaultName()))
.reduce(PredicateBuilder::or)
.orElseThrow(
() -> new RuntimeException("Failed to get partition filter."));
Expand Down Expand Up @@ -688,7 +663,7 @@ public FileIO fileIO() {
}

private ManifestEntryChanges collectChanges(List<CommitMessage> commitMessages) {
ManifestEntryChanges changes = new ManifestEntryChanges(numBucket);
ManifestEntryChanges changes = new ManifestEntryChanges(options.bucket());
commitMessages.forEach(changes::collect);
LOG.info("Finished collecting changes, including: {}", changes);
return changes;
Expand Down Expand Up @@ -730,12 +705,12 @@ private int tryCommit(

retryResult = (RetryCommitResult) result;

if (System.currentTimeMillis() - startMillis > commitTimeout
|| retryCount >= commitMaxRetries) {
if (System.currentTimeMillis() - startMillis > options.commitTimeout()
|| retryCount >= options.commitMaxRetries()) {
String message =
String.format(
"Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.",
commitTimeout, retryCount);
options.commitTimeout(), retryCount);
throw new RuntimeException(message, retryResult.exception);
}

Expand All @@ -761,7 +736,11 @@ private int tryOverwritePartition(
return tryCommit(
latestSnapshot ->
scanner.readOverwriteChanges(
numBucket, changes, indexFiles, latestSnapshot, partitionFilter),
options.bucket(),
changes,
indexFiles,
latestSnapshot,
partitionFilter),
identifier,
watermark,
properties,
Expand Down Expand Up @@ -836,7 +815,8 @@ CommitResult tryCommitOnce(
}

List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
boolean discardDuplicate = discardDuplicateFiles && commitKind == CommitKind.APPEND;
boolean discardDuplicate =
options.commitDiscardDuplicateFiles() && commitKind == CommitKind.APPEND;
if (latestSnapshot != null && (discardDuplicate || detectConflicts)) {
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
// so we have to check again
Expand Down Expand Up @@ -920,14 +900,14 @@ CommitResult tryCommitOnce(
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
manifestTargetSize.getBytes(),
manifestMergeMinCount,
manifestFullCompactionSize.getBytes(),
options.manifestTargetSize().getBytes(),
options.manifestMergeMinCount(),
options.manifestFullCompactionThresholdSize().getBytes(),
partitionType,
manifestReadParallelism);
options.scanManifestParallelism());
baseManifestList = manifestList.write(mergeAfterManifests);

if (rowTrackingEnabled) {
if (options.rowTrackingEnabled()) {
RowTrackingAssigned assigned =
assignRowTracking(newSnapshotId, firstRowIdStart, deltaFiles);
nextRowIdStart = assigned.nextRowIdStart;
Expand Down Expand Up @@ -1097,12 +1077,12 @@ public void compactManifest() {
break;
}

if (System.currentTimeMillis() - startMillis > commitTimeout
|| retryCount >= commitMaxRetries) {
if (System.currentTimeMillis() - startMillis > options.commitTimeout()
|| retryCount >= options.commitMaxRetries()) {
throw new RuntimeException(
String.format(
"Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.",
commitTimeout, retryCount));
options.commitTimeout(), retryCount));
}

retryWaiter.retryWait(retryCount);
Expand All @@ -1126,11 +1106,11 @@ private boolean compactManifestOnce() {
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
manifestTargetSize.getBytes(),
options.manifestTargetSize().getBytes(),
1,
1,
partitionType,
manifestReadParallelism);
options.scanManifestParallelism());

if (new HashSet<>(mergeBeforeManifests).equals(new HashSet<>(mergeAfterManifests))) {
// no need to commit this snapshot, because no compact were happened
Expand Down Expand Up @@ -1173,7 +1153,7 @@ private boolean commitSnapshotImpl(Snapshot newSnapshot, List<PartitionEntry> de
for (PartitionEntry entry : deltaStatistics) {
statistics.add(entry.toPartitionStatistics(partitionComputer));
}
return snapshotCommit.commit(newSnapshot, branchName, statistics);
return snapshotCommit.commit(newSnapshot, options.branch(), statistics);
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import java.util.Iterator;
import java.util.function.Supplier;

/** A checker to check strict mode based on last safe snapshot. */
public class StrictModeChecker {
Expand All @@ -51,19 +48,6 @@ public StrictModeChecker(
this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
}

@Nullable
public static StrictModeChecker create(
SnapshotManager snapshotManager,
String commitUser,
Supplier<FileStoreScan> scanSupplier,
@Nullable Long strictModeLastSafeSnapshot) {
if (strictModeLastSafeSnapshot == null) {
return null;
}
return new StrictModeChecker(
snapshotManager, commitUser, scanSupplier.get(), strictModeLastSafeSnapshot);
}

public void check(long newSnapshotId, CommitKind newCommitKind) {
for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; id++) {
Snapshot snapshot = snapshotManager.snapshot(id);
Expand Down