Skip to content

Commit 837cf22

Browse files
[Kernel] Assign base row ID and default row commit version to AddFile (#3894)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR implements the row tracking _support_ requirements in Delta Kernel, according to the [Delta Protocol](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#row-tracking). Specifically, it includes: - assigning `baseRowId` and `defaultRowCommitVersion` to `AddFile` actions prior to committing them - maintaining the `rowIdHighWaterMark` of the `delta.rowTracking` metadata domain during the base row ID assignment, which is the highest assigned fresh row id for the table ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Added tests in `RowTrackingSuite.scala`. This includes unit tests and integration tests with Delta-Spark. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No.
1 parent 633214e commit 837cf22

File tree

11 files changed

+559
-28
lines changed

11 files changed

+559
-28
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,19 @@ public static ConcurrentWriteException concurrentDomainMetadataAction(
288288
return new ConcurrentWriteException(message);
289289
}
290290

291+
public static KernelException missingNumRecordsStatsForRowTracking() {
292+
return new KernelException(
293+
"Cannot write to a rowTracking-supported table without 'numRecords' statistics. "
294+
+ "Connectors are expected to populate the number of records statistics when "
295+
+ "writing to a Delta table with 'rowTracking' table feature supported.");
296+
}
297+
298+
public static KernelException rowTrackingSupportedWithDomainMetadataUnsupported() {
299+
return new KernelException(
300+
"Feature 'rowTracking' is supported and depends on feature 'domainMetadata',"
301+
+ " but 'domainMetadata' is unsupported");
302+
}
303+
291304
/* ------------------------ HELPER METHODS ----------------------------- */
292305
private static String formatTimestamp(long millisSinceEpochUTC) {
293306
return new Timestamp(millisSinceEpochUTC).toInstant().toString();

kernel/kernel-api/src/main/java/io/delta/kernel/internal/InternalScanFileUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.net.URI;
3333
import java.util.HashMap;
3434
import java.util.Map;
35+
import java.util.Optional;
3536

3637
/**
3738
* Utilities to extract information out of the scan file rows returned by {@link
@@ -42,6 +43,7 @@ private InternalScanFileUtils() {}
4243

4344
private static final String TABLE_ROOT_COL_NAME = "tableRoot";
4445
private static final DataType TABLE_ROOT_DATA_TYPE = StringType.STRING;
46+
4547
/** {@link Column} expression referring to the `partitionValues` in scan `add` file. */
4648
public static final Column ADD_FILE_PARTITION_COL_REF =
4749
new Column(new String[] {"add", "partitionValues"});
@@ -190,4 +192,14 @@ public static DeletionVectorDescriptor getDeletionVectorDescriptorFromRow(Row sc
190192
public static Column getPartitionValuesParsedRefInAddFile(String partitionColName) {
191193
return new Column(new String[] {"add", "partitionValues_parsed", partitionColName});
192194
}
195+
196+
public static Optional<Long> getBaseRowId(Row scanFile) {
197+
Row addFileRow = getAddFileEntry(scanFile);
198+
return new AddFile(addFileRow).getBaseRowId();
199+
}
200+
201+
public static Optional<Long> getDefaultRowCommitVersion(Row scanFile) {
202+
Row addFileRow = getAddFileEntry(scanFile);
203+
return new AddFile(addFileRow).getDefaultRowCommitVersion();
204+
}
193205
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class TableFeatures {
4040
add("typeWidening-preview");
4141
add("typeWidening");
4242
add(DOMAIN_METADATA_FEATURE_NAME);
43+
add(ROW_TRACKING_FEATURE_NAME);
4344
}
4445
});
4546

@@ -59,9 +60,10 @@ public class TableFeatures {
5960
}
6061
});
6162

62-
/** The feature name for domain metadata. */
6363
public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata";
6464

65+
public static final String ROW_TRACKING_FEATURE_NAME = "rowTracking";
66+
6567
/** The minimum writer version required to support table features. */
6668
public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7;
6769

@@ -101,7 +103,8 @@ public static void validateReadSupportedTable(
101103
* <li>protocol writer version 1.
102104
* <li>protocol writer version 2 only with appendOnly feature enabled.
103105
* <li>protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code
104-
* columnMapping}, {@code typeWidening}, {@code domainMetadata} feature enabled.
106+
* columnMapping}, {@code typeWidening}, {@code domainMetadata}, {@code rowTracking} feature
107+
* enabled.
105108
* </ul>
106109
*
107110
* @param protocol Table protocol
@@ -137,6 +140,12 @@ public static void validateWriteSupportedTable(
137140
throw unsupportedWriterFeature(tablePath, writerFeature);
138141
}
139142
}
143+
// Eventually we may have a way to declare and enforce dependencies between features.
144+
// By putting this check for row tracking here, it makes it easier to spot that row
145+
// tracking defines such a dependency that can be implicitly checked.
146+
if (isRowTrackingSupported(protocol) && !isDomainMetadataSupported(protocol)) {
147+
throw DeltaErrors.rowTrackingSupportedWithDomainMetadataUnsupported();
148+
}
140149
break;
141150
default:
142151
throw unsupportedWriterProtocol(tablePath, minWriterVersion);
@@ -190,12 +199,17 @@ public static Set<String> extractAutomaticallyEnabledWriterFeatures(
190199
* @return true if the "domainMetadata" feature is supported, false otherwise
191200
*/
192201
public static boolean isDomainMetadataSupported(Protocol protocol) {
193-
List<String> writerFeatures = protocol.getWriterFeatures();
194-
if (writerFeatures == null) {
195-
return false;
196-
}
197-
return writerFeatures.contains(DOMAIN_METADATA_FEATURE_NAME)
198-
&& protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION;
202+
return isWriterFeatureSupported(protocol, DOMAIN_METADATA_FEATURE_NAME);
203+
}
204+
205+
/**
206+
* Check if the table protocol supports the "rowTracking" writer feature.
207+
*
208+
* @param protocol the protocol to check
209+
* @return true if the protocol supports row tracking, false otherwise
210+
*/
211+
public static boolean isRowTrackingSupported(Protocol protocol) {
212+
return isWriterFeatureSupported(protocol, ROW_TRACKING_FEATURE_NAME);
199213
}
200214

201215
/**
@@ -254,4 +268,13 @@ private static void validateNoInvariants(StructType tableSchema) {
254268
throw columnInvariantsNotSupported();
255269
}
256270
}
271+
272+
private static boolean isWriterFeatureSupported(Protocol protocol, String featureName) {
273+
List<String> writerFeatures = protocol.getWriterFeatures();
274+
if (writerFeatures == null) {
275+
return false;
276+
}
277+
return writerFeatures.contains(featureName)
278+
&& protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION;
279+
}
257280
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.delta.kernel.internal.fs.Path;
3333
import io.delta.kernel.internal.replay.ConflictChecker;
3434
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
35+
import io.delta.kernel.internal.rowtracking.RowTracking;
3536
import io.delta.kernel.internal.util.*;
3637
import io.delta.kernel.types.StructType;
3738
import io.delta.kernel.utils.CloseableIterable;
@@ -145,6 +146,18 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
145146
CommitInfo attemptCommitInfo = generateCommitAction(engine);
146147
updateMetadataWithICTIfRequired(
147148
engine, attemptCommitInfo.getInCommitTimestamp(), readSnapshot.getVersion(engine));
149+
150+
// If row tracking is supported, assign base row IDs and default row commit versions to any
151+
// AddFile actions that do not yet have them. If the row ID high watermark changes, emit a
152+
// DomainMetadata action to update it.
153+
if (TableFeatures.isRowTrackingSupported(protocol)) {
154+
RowTracking.createNewHighWaterMarkIfNeeded(readSnapshot, dataActions)
155+
.ifPresent(domainMetadatas::add);
156+
dataActions =
157+
RowTracking.assignBaseRowIdAndDefaultRowCommitVersion(
158+
readSnapshot, commitAsVersion, dataActions);
159+
}
160+
148161
int numRetries = 0;
149162
do {
150163
logger.info("Committing transaction as version = {}.", commitAsVersion);

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public static Row createAddFileRow(
136136

137137
/** Constructs an {@link AddFile} action from the given 'AddFile' {@link Row}. */
138138
public AddFile(Row row) {
139-
super(row, FULL_SCHEMA);
139+
super(row);
140140
}
141141

142142
public String getPath() {
@@ -181,10 +181,12 @@ public Optional<Long> getDefaultRowCommitVersion() {
181181
}
182182

183183
public Optional<DataFileStatistics> getStats() {
184-
int index = getFieldIndex("stats");
185-
return row.isNullAt(index)
186-
? Optional.empty()
187-
: DataFileStatistics.deserializeFromJson(row.getString(index));
184+
return getFieldIndexOpt("stats")
185+
.flatMap(
186+
index ->
187+
row.isNullAt(index)
188+
? Optional.empty()
189+
: DataFileStatistics.deserializeFromJson(row.getString(index)));
188190
}
189191

190192
public Optional<Long> getNumRecords() {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/RowBackedAction.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.delta.kernel.types.*;
2323
import java.util.Collections;
2424
import java.util.Map;
25+
import java.util.Optional;
2526

2627
/**
2728
* An abstract base class for Delta Log actions that are backed by a {@link Row}. This design is to
@@ -34,26 +35,30 @@ public abstract class RowBackedAction {
3435
/** The underlying {@link Row} that represents an action and contains all its field values. */
3536
protected final Row row;
3637

37-
protected RowBackedAction(Row row, StructType expectedSchema) {
38-
checkArgument(
39-
row.getSchema().equals(expectedSchema),
40-
"Expected row schema: %s, found: %s",
41-
expectedSchema,
42-
row.getSchema());
43-
38+
protected RowBackedAction(Row row) {
4439
this.row = row;
4540
}
4641

4742
/**
48-
* Returns the index of the field with the given name in the full schema of the row. Throws an
49-
* {@link IllegalArgumentException} if the field is not found.
43+
* Returns the index of the field with the given name in the schema of the row. Throws an {@link
44+
* IllegalArgumentException} if the field is not found.
5045
*/
5146
protected int getFieldIndex(String fieldName) {
5247
int index = row.getSchema().indexOf(fieldName);
5348
checkArgument(index >= 0, "Field '%s' not found in schema: %s", fieldName, row.getSchema());
5449
return index;
5550
}
5651

52+
/**
53+
* Returns the index of the field with the given name in the schema of the row, or {@link
54+
* Optional#empty()} if the field is not found. This should be used when the underlying row may or
55+
* may not contain that field.
56+
*/
57+
protected Optional<Integer> getFieldIndexOpt(String fieldName) {
58+
int index = row.getSchema().indexOf(fieldName);
59+
return index >= 0 ? Optional.of(index) : Optional.empty();
60+
}
61+
5762
/**
5863
* Returns a new {@link Row} with the same schema and values as the row backing this action, but
5964
* with the value of the field with the given name overridden by the given value.

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class SingleAction {
7272
// schema for those fields here.
7373

7474
private static final int TXN_ORDINAL = FULL_SCHEMA.indexOf("txn");
75-
private static final int ADD_FILE_ORDINAL = FULL_SCHEMA.indexOf("add");
75+
public static final int ADD_FILE_ORDINAL = FULL_SCHEMA.indexOf("add");
7676
private static final int REMOVE_FILE_ORDINAL = FULL_SCHEMA.indexOf("remove");
7777
private static final int METADATA_ORDINAL = FULL_SCHEMA.indexOf("metaData");
7878
private static final int PROTOCOL_ORDINAL = FULL_SCHEMA.indexOf("protocol");
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.rowtracking;
17+
18+
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
19+
20+
import io.delta.kernel.data.Row;
21+
import io.delta.kernel.internal.DeltaErrors;
22+
import io.delta.kernel.internal.SnapshotImpl;
23+
import io.delta.kernel.internal.TableFeatures;
24+
import io.delta.kernel.internal.actions.*;
25+
import io.delta.kernel.utils.CloseableIterable;
26+
import io.delta.kernel.utils.CloseableIterator;
27+
import java.io.IOException;
28+
import java.util.Optional;
29+
import java.util.concurrent.atomic.AtomicLong;
30+
31+
/** A collection of helper methods for working with row tracking. */
32+
public class RowTracking {
33+
private RowTracking() {
34+
// Empty constructor to prevent instantiation of this class
35+
}
36+
37+
/**
38+
* Assigns base row IDs and default row commit versions to {@link AddFile} actions in the provided
39+
* {@code dataActions}. This method should be called when processing data actions during commit
40+
* preparation and before the actual commit.
41+
*
42+
* <p>This method should be called exactly once per transaction during commit preparation, i.e.,
43+
* not for each commit attempt. And it should only be called when the 'rowTracking' feature is
44+
* supported.
45+
*
46+
* <p>For {@link AddFile} actions missing a base row ID, assigns the current row ID high watermark
47+
* plus 1. The high watermark is then incremented by the number of records in the file. For
48+
* actions missing a default row commit version, assigns the specified commit version.
49+
*
50+
* @param snapshot the snapshot of the table that this transaction is reading at
51+
* @param commitVersion the version of the commit for default row commit version assignment
52+
* @param dataActions the {@link CloseableIterable} of data actions to process
53+
* @return an {@link CloseableIterable} of data actions with base row IDs and default row commit
54+
* versions assigned
55+
*/
56+
public static CloseableIterable<Row> assignBaseRowIdAndDefaultRowCommitVersion(
57+
SnapshotImpl snapshot, long commitVersion, CloseableIterable<Row> dataActions) {
58+
checkArgument(
59+
TableFeatures.isRowTrackingSupported(snapshot.getProtocol()),
60+
"Base row ID and default row commit version are assigned "
61+
+ "only when feature 'rowTracking' is supported.");
62+
63+
return new CloseableIterable<Row>() {
64+
@Override
65+
public void close() throws IOException {
66+
dataActions.close();
67+
}
68+
69+
@Override
70+
public CloseableIterator<Row> iterator() {
71+
// Used to keep track of the current high watermark as we iterate through the data actions.
72+
// Use an AtomicLong to allow for updating the high watermark in the lambda.
73+
final AtomicLong currRowIdHighWatermark = new AtomicLong(readRowIdHighWaterMark(snapshot));
74+
return dataActions
75+
.iterator()
76+
.map(
77+
row -> {
78+
// Non-AddFile actions are returned unchanged
79+
if (row.isNullAt(SingleAction.ADD_FILE_ORDINAL)) {
80+
return row;
81+
}
82+
83+
AddFile addFile = new AddFile(row.getStruct(SingleAction.ADD_FILE_ORDINAL));
84+
85+
// Assign base row ID if missing
86+
if (!addFile.getBaseRowId().isPresent()) {
87+
final long numRecords = getNumRecordsOrThrow(addFile);
88+
addFile = addFile.withNewBaseRowId(currRowIdHighWatermark.get() + 1L);
89+
currRowIdHighWatermark.addAndGet(numRecords);
90+
}
91+
92+
// Assign default row commit version if missing
93+
if (!addFile.getDefaultRowCommitVersion().isPresent()) {
94+
addFile = addFile.withNewDefaultRowCommitVersion(commitVersion);
95+
}
96+
97+
return SingleAction.createAddFileSingleAction(addFile.toRow());
98+
});
99+
}
100+
};
101+
}
102+
103+
/**
104+
* Returns a {@link DomainMetadata} action if the row ID high watermark has changed due to newly
105+
* processed {@link AddFile} actions.
106+
*
107+
* <p>This method should be called during commit preparation to prepare the domain metadata
108+
* actions for commit. It should be called only when the 'rowTracking' feature is supported.
109+
*
110+
* @param snapshot the snapshot of the table that this transaction is reading at
111+
* @param dataActions the iterable of data actions that may update the high watermark
112+
*/
113+
public static Optional<DomainMetadata> createNewHighWaterMarkIfNeeded(
114+
SnapshotImpl snapshot, CloseableIterable<Row> dataActions) {
115+
checkArgument(
116+
TableFeatures.isRowTrackingSupported(snapshot.getProtocol()),
117+
"Row ID high watermark is updated only when feature 'rowTracking' is supported.");
118+
119+
final long prevRowIdHighWatermark = readRowIdHighWaterMark(snapshot);
120+
// Use an AtomicLong to allow for updating the high watermark in the lambda
121+
final AtomicLong newRowIdHighWatermark = new AtomicLong(prevRowIdHighWatermark);
122+
123+
dataActions.forEach(
124+
row -> {
125+
if (!row.isNullAt(SingleAction.ADD_FILE_ORDINAL)) {
126+
AddFile addFile = new AddFile(row.getStruct(SingleAction.ADD_FILE_ORDINAL));
127+
if (!addFile.getBaseRowId().isPresent()) {
128+
newRowIdHighWatermark.addAndGet(getNumRecordsOrThrow(addFile));
129+
}
130+
}
131+
});
132+
133+
return (newRowIdHighWatermark.get() != prevRowIdHighWatermark)
134+
? Optional.of(new RowTrackingMetadataDomain(newRowIdHighWatermark.get()).toDomainMetadata())
135+
: Optional.empty();
136+
}
137+
138+
/**
139+
* Reads the current row ID high watermark from the snapshot, or returns a default value if
140+
* missing.
141+
*/
142+
private static long readRowIdHighWaterMark(SnapshotImpl snapshot) {
143+
return RowTrackingMetadataDomain.fromSnapshot(snapshot)
144+
.map(RowTrackingMetadataDomain::getRowIdHighWaterMark)
145+
.orElse(RowTrackingMetadataDomain.MISSING_ROW_ID_HIGH_WATERMARK);
146+
}
147+
148+
/**
149+
* Get the number of records from the AddFile's statistics. It errors out if statistics are
150+
* missing.
151+
*/
152+
private static long getNumRecordsOrThrow(AddFile addFile) {
153+
return addFile.getNumRecords().orElseThrow(DeltaErrors::missingNumRecordsStatsForRowTracking);
154+
}
155+
}

0 commit comments

Comments
 (0)