Skip to content

Commit

Permalink
[Kernel] Add monotonic inCommitTimestamp and read support for inCommi…
Browse files Browse the repository at this point in the history
…tTimestamp
  • Loading branch information
EstherBear committed Jun 26, 2024
1 parent 71eaed9 commit ea40a0f
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package io.delta.kernel.internal;

import java.io.IOException;
import java.util.Optional;

import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.types.StructType;

import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
Expand All @@ -41,6 +43,7 @@ public class SnapshotImpl implements Snapshot {
private final Protocol protocol;
private final Metadata metadata;
private final LogSegment logSegment;
private Optional<Long> inCommitTimestampOpt;

public SnapshotImpl(
Path dataPath,
Expand All @@ -55,6 +58,7 @@ public SnapshotImpl(
this.logReplay = logReplay;
this.protocol = protocol;
this.metadata = metadata;
this.inCommitTimestampOpt = Optional.empty();
}

@Override
Expand Down Expand Up @@ -121,4 +125,38 @@ public Path getLogPath() {
public Path getDataPath() {
return dataPath;
}

/**
* Returns the timestamp of the latest commit of this snapshot.
* For an uninitialized snapshot, this returns -1.
* <p>
* When InCommitTimestampTableFeature is enabled, the timestamp
* is retrieved from the CommitInfo of the latest commit which
* can result in an IO operation.
* <p>
* For non-ICT tables, this is the same as the file modification time of the latest commit in
* the snapshot.
*
* @param engine the engine to use for IO operations
* @return the timestamp of the latest commit
*/
public long getTimestamp(Engine engine) {
if (TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata)) {
if (!inCommitTimestampOpt.isPresent()) {
try {
Optional<CommitInfo> commitInfoOpt = CommitInfo.getCommitInfoOpt(
engine, logPath, logSegment.version);
inCommitTimestampOpt = Optional.of(CommitInfo.getRequiredInCommitTimestamp(
commitInfoOpt,
String.valueOf(logSegment.version),
dataPath));
} catch (IOException e) {
throw new RuntimeException("Failed to get inCommitTimestamp with IO", e);
}
}
return inCommitTimestampOpt.get();
} else {
return logSegment.lastCommitTimestamp;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,43 @@
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.snapshot.SnapshotManager;
import io.delta.kernel.internal.util.Clock;

public class TableImpl implements Table {
public static Table forPath(Engine engine, String path) {
return forPath(engine, path, System::currentTimeMillis);
}

/**
* Instantiate a table object for the Delta Lake table at the given path. It takes an additional
* parameter called {@link Clock} which helps in testing.
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @param path location of the table.
* @param clock {@link Clock} instance to use for time-related operations.
*
* @return an instance of {@link Table} representing the Delta table at the given path
*/
public static Table forPath(Engine engine, String path, Clock clock) {
String resolvedPath;
try {
resolvedPath = engine.getFileSystemClient().resolvePath(path);
} catch (IOException io) {
throw new RuntimeException(io);
}
return new TableImpl(resolvedPath);
return new TableImpl(resolvedPath, clock);
}

private final SnapshotManager snapshotManager;
private final String tablePath;
private final Clock clock;

public TableImpl(String tablePath) {
public TableImpl(String tablePath, Clock clock) {
this.tablePath = tablePath;
final Path dataPath = new Path(tablePath);
final Path logPath = new Path(dataPath, "_delta_log");
this.snapshotManager = new SnapshotManager(logPath, dataPath);
this.clock = clock;
}

@Override
Expand Down Expand Up @@ -81,6 +98,10 @@ public TransactionBuilder createTransactionBuilder(
return new TransactionBuilderImpl(this, engineInfo, operation);
}

public Clock getClock() {
return clock;
}

protected Path getDataPath() {
return new Path(tablePath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public Transaction build(Engine engine) {
metadata,
setTxnOpt,
shouldUpdateMetadata,
shouldUpdateProtocol);
shouldUpdateProtocol,
table.getClock());
}

/**
Expand Down Expand Up @@ -206,6 +207,11 @@ private class InitialSnapshot extends SnapshotImpl {
InitialSnapshot(Path dataPath, LogReplay logReplay, Metadata metadata, Protocol protocol) {
super(dataPath, LogSegment.empty(table.getLogPath()), logReplay, protocol, metadata);
}

@Override
public long getTimestamp(Engine engine) {
return -1L;
}
}

private LogReplay getEmptyLogReplay(Engine engine, Metadata metadata, Protocol protocol) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.util.Clock;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.InCommitTimestampUtils;
import io.delta.kernel.internal.util.VectorUtils;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class TransactionImpl
private final SnapshotImpl readSnapshot;
private final Optional<SetTransaction> setTxnOpt;
private final boolean shouldUpdateProtocol;
private final Clock clock;
private Metadata metadata;
private boolean shouldUpdateMetadata;

Expand All @@ -88,7 +90,8 @@ public TransactionImpl(
Metadata metadata,
Optional<SetTransaction> setTxnOpt,
boolean shouldUpdateMetadata,
boolean shouldUpdateProtocol) {
boolean shouldUpdateProtocol,
Clock clock) {
this.isNewTable = isNewTable;
this.dataPath = dataPath;
this.logPath = logPath;
Expand All @@ -100,6 +103,7 @@ public TransactionImpl(
this.setTxnOpt = setTxnOpt;
this.shouldUpdateMetadata = shouldUpdateMetadata;
this.shouldUpdateProtocol = shouldUpdateProtocol;
this.clock = clock;
}

@Override
Expand Down Expand Up @@ -239,14 +243,22 @@ public Optional<SetTransaction> getSetTxnOpt() {
return setTxnOpt;
}

/**
* Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This
* can result in an additional file read and that this will only happen if ICT is enabled.
*/
private Optional<Long> generateInCommitTimestampForFirstCommitAttempt(
Engine engine, long currentTimestamp) {
boolean ictEnabled = IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata);
return ictEnabled ? Optional.of(currentTimestamp) : Optional.empty();
if (IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata)) {
long lastCommitTimestamp = readSnapshot.getTimestamp(engine);
return Optional.of(Math.max(currentTimestamp, lastCommitTimestamp + 1));
} else {
return Optional.empty();
}
}

private CommitInfo generateCommitAction(Engine engine) {
long commitAttemptStartTime = System.currentTimeMillis();
long commitAttemptStartTime = clock.getTimeMillis();
return new CommitInfo(
generateInCommitTimestampForFirstCommitAttempt(
engine, commitAttemptStartTime),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,27 @@
*/
package io.delta.kernel.internal.actions;

import java.io.IOException;
import java.util.*;
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.InvalidTableException;
import io.delta.kernel.types.*;

import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.VectorUtils;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
import static io.delta.kernel.internal.util.VectorUtils.stringStringMapValue;

/**
Expand Down Expand Up @@ -84,6 +95,8 @@ public static CommitInfo fromColumnVector(ColumnVector vector, int rowId) {
.boxed()
.collect(toMap(i -> FULL_SCHEMA.at(i).getName(), i -> i));

private static final Logger logger = LoggerFactory.getLogger(CommitInfo.class);

private final long timestamp;
private final String engineInfo;
private final String operation;
Expand Down Expand Up @@ -148,4 +161,64 @@ public Row toRow() {

return new GenericRow(CommitInfo.FULL_SCHEMA, commitInfo);
}

/**
* Returns the `inCommitTimestamp` of the given `commitInfoOpt` if it is defined.
* Throws an exception if `commitInfoOpt` is empty or contains an empty `inCommitTimestamp`.
*/
public static long getRequiredInCommitTimestamp(
Optional<CommitInfo> commitInfoOpt, String version, Path dataPath) {
CommitInfo commitInfo = commitInfoOpt
.orElseThrow(() -> new InvalidTableException(
dataPath.toString(),
String.format("This table has the feature inCommitTimestamp-preview " +
"enabled which requires the presence of the CommitInfo action " +
"in every commit. However, the CommitInfo action is " +
"missing from commit version %s.", version)));
return commitInfo
.inCommitTimestamp
.orElseThrow(() -> new InvalidTableException(
dataPath.toString(),
String.format("This table has the feature inCommitTimestamp-preview " +
"enabled which requires the presence of inCommitTimestamp in the " +
"CommitInfo action. However, this field has not " +
"been set in commit version %s.", version)));
}

/** Get the persisted commit info (if available) for the given delta file. */
public static Optional<CommitInfo> getCommitInfoOpt(
Engine engine,
Path logPath,
long version) throws IOException {
final FileStatus file = FileStatus.of(
FileNames.deltaFile(logPath, version), /* path */
0, /* size */
0 /* modification time */);
final StructType COMMITINFO_READ_SCHEMA = new StructType()
.add("commitInfo", CommitInfo.FULL_SCHEMA);
try (CloseableIterator<ColumnarBatch> columnarBatchIter = engine.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(file),
COMMITINFO_READ_SCHEMA,
Optional.empty())) {
while (columnarBatchIter.hasNext()) {
final ColumnarBatch columnarBatch = columnarBatchIter.next();
assert(columnarBatch.getSchema().equals(COMMITINFO_READ_SCHEMA));
final ColumnVector commitInfoVector = columnarBatch.getColumnVector(0);
for (int i = 0; i < commitInfoVector.getSize(); i++) {
if (!commitInfoVector.isNullAt(i)) {
CommitInfo commitInfo = CommitInfo.fromColumnVector(commitInfoVector, i);
if (commitInfo != null) {
return Optional.of(commitInfo);
}
}
}
}
} catch (IOException ex) {
throw new RuntimeException("Could not close iterator", ex);
}

logger.info("No commit info found for commit of version {}", version);
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.util;

/**
* An interface to represent clocks, so that they can be mocked out in unit tests.
*/
public interface Clock {
/** @return Current system time, in ms. */
long getTimeMillis();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.util;

/**
* A clock whose time can be manually set and modified.
*/
public class ManualClock implements Clock {
private long timeMillis;
public ManualClock(long timeMillis) {
this.timeMillis = timeMillis;
}

/**
* @param timeToSet new time (in milliseconds) that the clock should represent
*/
public synchronized void setTime(long timeToSet) {
this.timeMillis = timeToSet;
this.notifyAll();
}

@Override
public long getTimeMillis() {
return timeMillis;
}
}
Loading

0 comments on commit ea40a0f

Please sign in to comment.