Skip to content

Commit eb593f8

Browse files
Allow DomainMetadata to be set in Transaction instead of TransactionBuilder (#4245)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR allows connectors to specify domain metadata in the `Transaction` using `addDomainMetadata` and `removeDomainMetadata` instead of having to specify the domain metadata in the `TransactionBuilder`. The `TransactionBuilder` has the method `withDomainMetadataSupported`, which allows the user to enable domain metadata support. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> - All tests that check that the protocol is automatically updated when passing domain metadata are removed. This is now done explicitly, so the tests are no longer needed. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> --------- Co-authored-by: Oussama Saoudi <oussama.saoudi@databricks.com>
1 parent 5a46862 commit eb593f8

File tree

5 files changed

+212
-270
lines changed

5 files changed

+212
-270
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.delta.kernel.data.*;
2929
import io.delta.kernel.engine.Engine;
3030
import io.delta.kernel.exceptions.ConcurrentWriteException;
31+
import io.delta.kernel.exceptions.DomainDoesNotExistException;
3132
import io.delta.kernel.expressions.Literal;
3233
import io.delta.kernel.internal.DataWriteContextImpl;
3334
import io.delta.kernel.internal.actions.AddFile;
@@ -97,6 +98,31 @@ public interface Transaction {
9798
TransactionCommitResult commit(Engine engine, CloseableIterable<Row> dataActions)
9899
throws ConcurrentWriteException;
99100

101+
/**
102+
* Commit the provided domain metadata as part of this transaction. If this is called more than
103+
* once with the same {@code domain} the latest provided {@code config} will be committed in the
104+
* transaction. Only user-controlled domains are allowed (aka. domains with a `delta.` prefix are
105+
* not allowed). Adding and removing a domain with the same identifier in the same txn is not
106+
* allowed. Adding domain metadata to a table that does not support the table feature is not
107+
* allowed. To enable the table feature, make sure to call {@link
108+
* TransactionBuilder#withDomainMetadataSupported}
109+
*
110+
* @param domain the domain identifier
111+
* @param config configuration string for this domain
112+
*/
113+
void addDomainMetadata(String domain, String config);
114+
115+
/**
116+
* Mark the domain metadata with identifier {@code domain} as removed in this transaction. If this
117+
* domain does not exist in the latest version of the table, calling {@link
118+
* Transaction#commit(Engine, CloseableIterable)} will throw a {@link
119+
* DomainDoesNotExistException}. Adding and removing a domain with the same identifier in one txn
120+
* is not allowed.
121+
*
122+
* @param domain the domain identifier for the domain to remove
123+
*/
124+
void removeDomainMetadata(String domain);
125+
100126
/**
101127
* Given the logical data that needs to be written to the table, convert it into the required
102128
* physical data depending upon the table Delta protocol and features enabled on the table. Kernel

kernel/kernel-api/src/main/java/io/delta/kernel/TransactionBuilder.java

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -93,38 +93,19 @@ TransactionBuilder withTransactionId(
9393
TransactionBuilder withMaxRetries(int maxRetries);
9494

9595
/**
96-
* Commit the provided domain metadata as part of this transaction. If this is called more than
97-
* once with the same {@code domain} the latest provided {@code config} will be committed in the
98-
* transaction. Only user-controlled domains are allowed (aka. domains with a `delta.` prefix are
99-
* not allowed). Adding and removing a domain with the same identifier in the same txn is not
100-
* allowed.
101-
*
102-
* <p>See the Delta protocol for more information on how to use domain metadata <a
103-
* href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#domain-metadata">Domain
104-
* Metadata</a>.
105-
*
106-
* <p>Please note using this API will automatically upgrade the protocol of the table to support
107-
* Domain Metadata if it is not already supported. See <a
96+
* Enables support for Domain Metadata on this table if it is not supported already. The table
97+
* feature _must_ be supported on the table to add or remove domain metadata using {@link
98+
* Transaction#addDomainMetadata} or {@link Transaction#removeDomainMetadata}. See <a
10899
* href="https://docs.delta.io/latest/versioning.html#how-does-delta-lake-manage-feature-compatibility">
109-
* How does Delta Lake manage feature compatibility?</a> for more details. This may break existing
110-
* writers that do not support the Domain Metadata feature; readers will be unaffected.
111-
*
112-
* @param domain the domain identifier
113-
* @param config configuration string for this domain
114-
* @return updated {@link TransactionBuilder} instance
115-
*/
116-
TransactionBuilder withDomainMetadata(String domain, String config);
117-
118-
/**
119-
* Mark the domain metadata with identifier {@code domain} as removed in this transaction. If this
120-
* domain does not exist in the latest version of the table will throw a {@link
121-
* DomainDoesNotExistException} upon calling {@link TransactionBuilder#build(Engine)}. Adding and
122-
* removing a domain with the same identifier in one txn is not allowed.
100+
* How does Delta Lake manage feature compatibility?</a> for more details on table feature
101+
* support.
123102
*
124-
* @param domain the domain identifier for the domain to remove
125-
* @return updated {@link TransactionBuilder} instance
103+
* <p>See the Delta protocol for more information on how to use <a
104+
* href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#domain-metadata">Domain
105+
* Metadata</a>. This may break existing writers that do not support the Domain Metadata feature;
106+
* readers will be unaffected.
126107
*/
127-
TransactionBuilder withDomainMetadataRemoved(String domain);
108+
TransactionBuilder withDomainMetadataSupported();
128109

129110
/**
130111
* Build the transaction. Also validates the given info to ensure that a valid transaction can be

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 5 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
import io.delta.kernel.*;
3131
import io.delta.kernel.engine.Engine;
32-
import io.delta.kernel.exceptions.DomainDoesNotExistException;
3332
import io.delta.kernel.exceptions.TableNotFoundException;
3433
import io.delta.kernel.internal.actions.*;
3534
import io.delta.kernel.internal.fs.Path;
@@ -58,12 +57,11 @@ public class TransactionBuilderImpl implements TransactionBuilder {
5857
private final TableImpl table;
5958
private final String engineInfo;
6059
private final Operation operation;
61-
private final Map<String, DomainMetadata> domainMetadatasAdded = new HashMap<>();
62-
private final Set<String> domainMetadatasRemoved = new HashSet<>();
6360
private Optional<StructType> schema = Optional.empty();
6461
private Optional<List<String>> partitionColumns = Optional.empty();
6562
private Optional<SetTransaction> setTxnOpt = Optional.empty();
6663
private Optional<Map<String, String>> tableProperties = Optional.empty();
64+
private boolean needDomainMetadataSupport = false;
6765

6866
/**
6967
* Number of retries for concurrent write exceptions to resolve conflicts and retry commit. In
@@ -119,27 +117,8 @@ public TransactionBuilder withMaxRetries(int maxRetries) {
119117
}
120118

121119
@Override
122-
public TransactionBuilder withDomainMetadata(String domain, String config) {
123-
checkArgument(
124-
DomainMetadata.isUserControlledDomain(domain),
125-
"Setting a system-controlled domain is not allowed: " + domain);
126-
checkArgument(
127-
!domainMetadatasRemoved.contains(domain),
128-
"Cannot add a domain that is removed in this transaction");
129-
// we override any existing value
130-
domainMetadatasAdded.put(domain, new DomainMetadata(domain, config, false /* removed */));
131-
return this;
132-
}
133-
134-
@Override
135-
public TransactionBuilder withDomainMetadataRemoved(String domain) {
136-
checkArgument(
137-
DomainMetadata.isUserControlledDomain(domain),
138-
"Removing a system-controlled domain is not allowed: " + domain);
139-
checkArgument(
140-
!domainMetadatasAdded.containsKey(domain),
141-
"Cannot remove a domain that is added in this transaction");
142-
domainMetadatasRemoved.add(domain);
120+
public TransactionBuilder withDomainMetadataSupported() {
121+
needDomainMetadataSupport = true;
143122
return this;
144123
}
145124

@@ -189,9 +168,7 @@ public Transaction build(Engine engine) {
189168
// Ex: We enable feature `icebergCompatV2` plus dependent features `columnMapping`
190169
Optional<Tuple2<Protocol, Set<TableFeature>>> newProtocolAndFeatures =
191170
TableFeatures.autoUpgradeProtocolBasedOnMetadata(
192-
newMetadata.orElse(snapshotMetadata),
193-
!domainMetadatasAdded.isEmpty(),
194-
snapshotProtocol);
171+
newMetadata.orElse(snapshotMetadata), needDomainMetadataSupport, snapshotProtocol);
195172
if (newProtocolAndFeatures.isPresent()) {
196173
logger.info(
197174
"Automatically enabling table features: {}",
@@ -244,8 +221,7 @@ public Transaction build(Engine engine) {
244221
newMetadata.isPresent() /* shouldUpdateMetadata */,
245222
newProtocol.isPresent() /* shouldUpdateProtocol */,
246223
maxRetries,
247-
table.getClock(),
248-
getDomainMetadatasToCommit(snapshot));
224+
table.getClock());
249225
}
250226

251227
/**
@@ -386,45 +362,4 @@ private Metadata getInitialMetadata() {
386362
private Protocol getInitialProtocol() {
387363
return new Protocol(DEFAULT_READ_VERSION, DEFAULT_WRITE_VERSION);
388364
}
389-
390-
/**
391-
* Returns a list of the domain metadatas to commit. This consists of the domain metadatas added
392-
* in the transaction using {@link TransactionBuilder#withDomainMetadata(String, String)} and the
393-
* tombstones for the domain metadatas removed in the transaction using {@link
394-
* TransactionBuilder#withDomainMetadataRemoved(String)}.
395-
*/
396-
private List<DomainMetadata> getDomainMetadatasToCommit(SnapshotImpl snapshot) {
397-
// Add all domain metadatas added in the transaction
398-
List<DomainMetadata> finalDomainMetadatas = new ArrayList<>(domainMetadatasAdded.values());
399-
400-
// Generate the tombstones for the removed domain metadatas
401-
Map<String, DomainMetadata> snapshotDomainMetadataMap = snapshot.getDomainMetadataMap();
402-
for (String domainName : domainMetadatasRemoved) {
403-
// Note: we know domainName is not already in finalDomainMetadatas because we do not allow
404-
// removing and adding a domain with the same identifier in a single txn!
405-
if (snapshotDomainMetadataMap.containsKey(domainName)) {
406-
DomainMetadata domainToRemove = snapshotDomainMetadataMap.get(domainName);
407-
if (domainToRemove.isRemoved()) {
408-
// If the domain is already removed we throw an error to avoid any inconsistencies or
409-
// ambiguity. The snapshot read by the connector is inconsistent with the snapshot
410-
// loaded here as the domain to remove no longer exists.
411-
throw new DomainDoesNotExistException(
412-
table.getDataPath().toString(), domainName, snapshot.getVersion());
413-
}
414-
finalDomainMetadatas.add(domainToRemove.removed());
415-
} else {
416-
// We must throw an error if the domain does not exist. Otherwise, there could be unexpected
417-
// behavior within conflict resolution. For example, consider the following
418-
// 1. Table has no domains set in V0
419-
// 2. txnA is started and wants to remove domain "foo"
420-
// 3. txnB is started and adds domain "foo" and commits V1 before txnA
421-
// 4. txnA needs to perform conflict resolution against the V1 commit from txnB
422-
// Conflict resolution should fail but since the domain does not exist we cannot create
423-
// a tombstone to mark it as removed and correctly perform conflict resolution.
424-
throw new DomainDoesNotExistException(
425-
table.getDataPath().toString(), domainName, snapshot.getVersion());
426-
}
427-
}
428-
return finalDomainMetadatas;
429-
}
430365
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

Lines changed: 98 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.delta.kernel.data.Row;
3030
import io.delta.kernel.engine.Engine;
3131
import io.delta.kernel.exceptions.ConcurrentWriteException;
32+
import io.delta.kernel.exceptions.DomainDoesNotExistException;
3233
import io.delta.kernel.expressions.Column;
3334
import io.delta.kernel.hook.PostCommitHook;
3435
import io.delta.kernel.internal.actions.*;
@@ -81,7 +82,9 @@ public class TransactionImpl implements Transaction {
8182
private final Optional<SetTransaction> setTxnOpt;
8283
private final boolean shouldUpdateProtocol;
8384
private final Clock clock;
84-
private List<DomainMetadata> domainMetadatas;
85+
private final Map<String, DomainMetadata> domainMetadatasAdded = new HashMap<>();
86+
private final Set<String> domainMetadatasRemoved = new HashSet<>();
87+
private Optional<List<DomainMetadata>> domainMetadatas = Optional.empty();
8588
private Metadata metadata;
8689
private boolean shouldUpdateMetadata;
8790
private int maxRetries;
@@ -101,8 +104,7 @@ public TransactionImpl(
101104
boolean shouldUpdateMetadata,
102105
boolean shouldUpdateProtocol,
103106
int maxRetries,
104-
Clock clock,
105-
List<DomainMetadata> domainMetadatas) {
107+
Clock clock) {
106108
this.isNewTable = isNewTable;
107109
this.dataPath = dataPath;
108110
this.logPath = logPath;
@@ -116,7 +118,6 @@ public TransactionImpl(
116118
this.shouldUpdateProtocol = shouldUpdateProtocol;
117119
this.maxRetries = maxRetries;
118120
this.clock = clock;
119-
this.domainMetadatas = domainMetadatas;
120121
}
121122

122123
@Override
@@ -150,18 +151,94 @@ public Optional<SetTransaction> getSetTxnOpt() {
150151
return setTxnOpt;
151152
}
152153

153-
/**
154-
* Internal API to add domain metadata actions for this transaction. Visible for testing.
155-
*
156-
* @param domainMetadatas List of domain metadata to be added to the transaction.
157-
*/
158154
@VisibleForTesting
159-
public void addDomainMetadatas(List<DomainMetadata> domainMetadatas) {
160-
this.domainMetadatas.addAll(domainMetadatas);
155+
public void addDomainMetadataInternal(String domain, String config) {
156+
checkArgument(
157+
!domainMetadatasRemoved.contains(domain),
158+
"Cannot add a domain that is removed in this transaction");
159+
checkState(!closed, "Cannot add a domain metadata after the transaction has completed");
160+
// we override any existing value
161+
domainMetadatasAdded.put(domain, new DomainMetadata(domain, config, false /* removed */));
162+
}
163+
164+
@Override
165+
public void addDomainMetadata(String domain, String config) {
166+
checkState(
167+
TableFeatures.isDomainMetadataSupported(protocol),
168+
"Unable to add domain metadata when the domain metadata table feature is disabled");
169+
checkArgument(
170+
DomainMetadata.isUserControlledDomain(domain),
171+
"Setting a system-controlled domain is not allowed: " + domain);
172+
addDomainMetadataInternal(domain, config);
173+
}
174+
175+
@VisibleForTesting
176+
public void removeDomainMetadataInternal(String domain) {
177+
checkArgument(
178+
!domainMetadatasAdded.containsKey(domain),
179+
"Cannot remove a domain that is added in this transaction");
180+
checkState(!closed, "Cannot remove a domain after the transaction has completed");
181+
domainMetadatasRemoved.add(domain);
161182
}
162183

184+
@Override
185+
public void removeDomainMetadata(String domain) {
186+
checkState(
187+
TableFeatures.isDomainMetadataSupported(protocol),
188+
"Unable to add domain metadata when the domain metadata table feature is disabled");
189+
checkArgument(
190+
DomainMetadata.isUserControlledDomain(domain),
191+
"Removing a system-controlled domain is not allowed: " + domain);
192+
removeDomainMetadataInternal(domain);
193+
}
194+
195+
/**
196+
* Returns a list of the domain metadatas to commit. This consists of the domain metadatas added
197+
* in the transaction using {@link Transaction#addDomainMetadata(String, String)} and the
198+
* tombstones for the domain metadatas removed in the transaction using {@link
199+
* Transaction#removeDomainMetadata(String)}. The result is stored in {@code domainMetadatas}.
200+
*
201+
* @return A list of {@link DomainMetadata} containing domain metadata to be committed in this
202+
* transaction.
203+
*/
163204
public List<DomainMetadata> getDomainMetadatas() {
164-
return domainMetadatas;
205+
// If we have already processed the domain metadatas, then return the list.
206+
if (domainMetadatas.isPresent()) {
207+
return domainMetadatas.get();
208+
}
209+
// Add all domain metadatas added in the transaction
210+
List<DomainMetadata> finalDomainMetadatas = new ArrayList<>(domainMetadatasAdded.values());
211+
212+
// Generate the tombstones for the removed domain metadatas
213+
Map<String, DomainMetadata> snapshotDomainMetadataMap = readSnapshot.getDomainMetadataMap();
214+
for (String domainName : domainMetadatasRemoved) {
215+
// Note: we know domainName is not already in finalDomainMetadatas because we do not allow
216+
// removing and adding a domain with the same identifier in a single txn!
217+
if (snapshotDomainMetadataMap.containsKey(domainName)) {
218+
DomainMetadata domainToRemove = snapshotDomainMetadataMap.get(domainName);
219+
if (domainToRemove.isRemoved()) {
220+
// If the domain is already removed we throw an error to avoid any inconsistencies or
221+
// ambiguity. The snapshot read by the connector is inconsistent with the snapshot
222+
// loaded here as the domain to remove no longer exists.
223+
throw new DomainDoesNotExistException(
224+
dataPath.toString(), domainName, readSnapshot.getVersion());
225+
}
226+
finalDomainMetadatas.add(domainToRemove.removed());
227+
} else {
228+
// We must throw an error if the domain does not exist. Otherwise, there could be unexpected
229+
// behavior within conflict resolution. For example, consider the following
230+
// 1. Table has no domains set in V0
231+
// 2. txnA is started and wants to remove domain "foo"
232+
// 3. txnB is started and adds domain "foo" and commits V1 before txnA
233+
// 4. txnA needs to perform conflict resolution against the V1 commit from txnB
234+
// Conflict resolution should fail but since the domain does not exist we cannot create
235+
// a tombstone to mark it as removed and correctly perform conflict resolution.
236+
throw new DomainDoesNotExistException(
237+
dataPath.toString(), domainName, readSnapshot.getVersion());
238+
}
239+
}
240+
domainMetadatas = Optional.of(finalDomainMetadatas);
241+
return finalDomainMetadatas;
165242
}
166243

167244
public Protocol getProtocol() {
@@ -201,18 +278,20 @@ private TransactionCommitResult commitWithRetry(
201278
CommitInfo attemptCommitInfo = generateCommitAction(engine);
202279
updateMetadataWithICTIfRequired(
203280
engine, attemptCommitInfo.getInCommitTimestamp(), readSnapshot.getVersion());
281+
List<DomainMetadata> resolvedDomainMetadatas = getDomainMetadatas();
204282

205283
// If row tracking is supported, assign base row IDs and default row commit versions to any
206284
// AddFile actions that do not yet have them. If the row ID high watermark changes, emit a
207285
// DomainMetadata action to update it.
208286
if (TableFeatures.isRowTrackingSupported(protocol)) {
209-
domainMetadatas =
287+
List<DomainMetadata> updatedDomainMetadata =
210288
RowTracking.updateRowIdHighWatermarkIfNeeded(
211289
readSnapshot,
212290
protocol,
213291
Optional.empty() /* winningTxnRowIdHighWatermark */,
214292
dataActions,
215-
domainMetadatas);
293+
resolvedDomainMetadatas);
294+
domainMetadatas = Optional.of(updatedDomainMetadata);
216295
dataActions =
217296
RowTracking.assignBaseRowIdAndDefaultRowCommitVersion(
218297
readSnapshot,
@@ -239,7 +318,7 @@ private TransactionCommitResult commitWithRetry(
239318
resolveConflicts(engine, commitAsVersion, attemptCommitInfo, numTries, dataActions);
240319
commitAsVersion = rebaseState.getLatestVersion() + 1;
241320
dataActions = rebaseState.getUpdatedDataActions();
242-
domainMetadatas = rebaseState.getUpdatedDomainMetadatas();
321+
domainMetadatas = Optional.of(rebaseState.getUpdatedDomainMetadatas());
243322
}
244323
}
245324
numTries++;
@@ -329,10 +408,12 @@ private TransactionCommitResult doCommit(
329408
}
330409
setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow())));
331410

411+
List<DomainMetadata> resolvedDomainMetadatas = getDomainMetadatas();
412+
332413
// Check for duplicate domain metadata and if the protocol supports
333-
DomainMetadataUtils.validateDomainMetadatas(domainMetadatas, protocol);
414+
DomainMetadataUtils.validateDomainMetadatas(resolvedDomainMetadatas, protocol);
334415

335-
domainMetadatas.forEach(
416+
resolvedDomainMetadatas.forEach(
336417
dm -> metadataActions.add(createDomainMetadataSingleAction(dm.toRow())));
337418

338419
try (CloseableIterator<Row> stageDataIter = dataActions.iterator()) {

0 commit comments

Comments
 (0)