Skip to content

Commit c01bb7c

Browse files
authored
[Kernel] Allow setting arbitrary properties when creating/updating the table (#4107)
## Description Currently, Kernel doesn't allow setting arbitrary table properties other than the `delta.*` properties that Kernel understands. We have valid use cases where we need to allow storing properties with arbitrary key names. As part of this, we also don't want to set any `delta.*` properties that Kernel doesn't understand or supports yet. ## How was this patch tested? UTs ## Does this PR introduce _any_ user-facing changes? Now the connectors can property with any name. Resolves #3149
1 parent f32539d commit c01bb7c

File tree

5 files changed

+133
-40
lines changed

5 files changed

+133
-40
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -191,32 +191,41 @@ public class TableConfig<T> {
191191
///////////////////////////
192192

193193
/**
194-
* Validates that the given properties have the delta prefix in the key name, and they are in the
195-
* set of valid properties. The caller should get the validated configurations and store the case
196-
* of the property name defined in TableConfig.
194+
* Validates that the given new properties that the txn is trying to update in table. Properties
195+
* that have `delta.` prefix in the key name should be in valid list and are editable. The caller
196+
* is expected to store the returned properties in the table metadata after further validation
197+
* from a protocol point of view. The returned properties will have the key's case normalized as
198+
* defined in its {@link TableConfig}.
197199
*
198-
* @param configurations the properties to validate
200+
* @param newProperties the properties to validate
199201
* @throws InvalidConfigurationValueException if any of the properties are invalid
200202
* @throws UnknownConfigurationException if any of the properties are unknown
201203
*/
202-
public static Map<String, String> validateProperties(Map<String, String> configurations) {
203-
Map<String, String> validatedConfigurations = new HashMap<>();
204-
for (Map.Entry<String, String> kv : configurations.entrySet()) {
204+
public static Map<String, String> validateDeltaProperties(Map<String, String> newProperties) {
205+
Map<String, String> validatedProperties = new HashMap<>();
206+
for (Map.Entry<String, String> kv : newProperties.entrySet()) {
205207
String key = kv.getKey().toLowerCase(Locale.ROOT);
206208
String value = kv.getValue();
207-
if (key.startsWith("delta.") && VALID_PROPERTIES.containsKey(key)) {
209+
210+
if (key.startsWith("delta.")) {
211+
// If it is a delta table property, make sure it is a supported property and editable
212+
if (!VALID_PROPERTIES.containsKey(key)) {
213+
throw DeltaErrors.unknownConfigurationException(kv.getKey());
214+
}
215+
208216
TableConfig<?> tableConfig = VALID_PROPERTIES.get(key);
209-
if (tableConfig.editable) {
210-
tableConfig.validate(value);
211-
validatedConfigurations.put(tableConfig.getKey(), value);
212-
} else {
217+
if (!tableConfig.editable) {
213218
throw DeltaErrors.cannotModifyTableProperty(kv.getKey());
214219
}
220+
221+
tableConfig.validate(value);
222+
validatedProperties.put(tableConfig.getKey(), value);
215223
} else {
216-
throw DeltaErrors.unknownConfigurationException(kv.getKey());
224+
// allow unknown properties to be set
225+
validatedProperties.put(key, value);
217226
}
218227
}
219-
return validatedConfigurations;
228+
return validatedProperties;
220229
}
221230

222231
private static void addConfig(HashMap<String, TableConfig<?>> configs, TableConfig<?> config) {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public Transaction build(Engine engine) {
137137
Protocol protocol = snapshot.getProtocol();
138138
if (tableProperties.isPresent()) {
139139
Map<String, String> validatedProperties =
140-
TableConfig.validateProperties(tableProperties.get());
140+
TableConfig.validateDeltaProperties(tableProperties.get());
141141
Map<String, String> newProperties =
142142
metadata.filterOutUnchangedProperties(validatedProperties);
143143

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableConfigSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
2323
class TableConfigSuite extends AnyFunSuite {
2424

2525
test("check TableConfig.editable is true") {
26-
TableConfig.validateProperties(
26+
TableConfig.validateDeltaProperties(
2727
Map(
2828
TableConfig.TOMBSTONE_RETENTION.getKey -> "interval 2 week",
2929
TableConfig.CHECKPOINT_INTERVAL.getKey -> "20",
@@ -36,7 +36,7 @@ class TableConfigSuite extends AnyFunSuite {
3636

3737
test("check TableConfig.MAX_COLUMN_ID.editable is false") {
3838
val e = intercept[KernelException] {
39-
TableConfig.validateProperties(
39+
TableConfig.validateDeltaProperties(
4040
Map(
4141
TableConfig.TOMBSTONE_RETENTION.getKey -> "interval 2 week",
4242
TableConfig.CHECKPOINT_INTERVAL.getKey -> "20",

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -292,29 +292,6 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
292292
}
293293
}
294294

295-
test("create table - invalid properties - expect failure") {
296-
withTempDirAndEngine { (tablePath, engine) =>
297-
val ex1 = intercept[UnknownConfigurationException] {
298-
createTxn(
299-
engine, tablePath, isNewTable = true, testSchema, Seq.empty, Map("invalid key" -> "10"))
300-
}
301-
assert(ex1.getMessage.contains("Unknown configuration was specified: invalid key"))
302-
303-
val ex2 = intercept[InvalidConfigurationValueException] {
304-
createTxn(
305-
engine,
306-
tablePath,
307-
isNewTable = true,
308-
testSchema, Seq.empty, Map(TableConfig.CHECKPOINT_INTERVAL.getKey -> "-1"))
309-
}
310-
assert(
311-
ex2.getMessage.contains(
312-
String.format(
313-
"Invalid value for table property '%s': '%s'. %s",
314-
TableConfig.CHECKPOINT_INTERVAL.getKey, "-1", "needs to be a positive integer.")))
315-
}
316-
}
317-
318295
test("create partitioned table - partition column is not part of the schema") {
319296
withTempDirAndEngine { (tablePath, engine) =>
320297
val table = Table.forPath(engine, tablePath)
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.defaults
17+
18+
import io.delta.kernel.Table
19+
import io.delta.kernel.exceptions.UnknownConfigurationException
20+
import io.delta.kernel.internal.SnapshotImpl
21+
import io.delta.kernel.utils.CloseableIterable.emptyIterable
22+
23+
import scala.collection.immutable.Seq
24+
25+
/**
26+
* Suite to set or get table properties.
27+
* TODO: for now we just have the support for `set`. API `get` will be added in the next PRs.
28+
*/
29+
class TablePropertiesSuite extends DeltaTableWriteSuiteBase {
30+
test("create/update table - allow arbitrary properties") {
31+
withTempDir { tempFile =>
32+
val tablePath = tempFile.getAbsolutePath
33+
34+
// create table with arbitrary properties and check if they are set
35+
createUpdateTableWithProps(
36+
tablePath,
37+
createTable = true,
38+
props = Map("my key" -> "10", "my key2" -> "20")
39+
)
40+
assertHasProp(tablePath, expProps = Map("my key" -> "10", "my key2" -> "20"))
41+
42+
// update table by modifying the arbitrary properties and check if they are updated
43+
createUpdateTableWithProps(tablePath, props = Map("my key" -> "30"))
44+
assertHasProp(tablePath, expProps = Map("my key" -> "30", "my key2" -> "20"))
45+
46+
// update table without any new properties and check if the existing properties are retained
47+
createUpdateTableWithProps(tablePath)
48+
assertHasProp(tablePath, expProps = Map("my key" -> "30", "my key2" -> "20"))
49+
50+
// update table by adding new arbitrary properties and check if they are set
51+
createUpdateTableWithProps(tablePath, props = Map("new key3" -> "str"))
52+
assertHasProp(
53+
tablePath,
54+
expProps = Map("my key" -> "30", "my key2" -> "20", "new key3" -> "str"))
55+
}
56+
}
57+
58+
test("create/update table - disallow unknown delta.* properties to Kernel") {
59+
withTempDir { tempFile =>
60+
val tablePath = tempFile.getAbsolutePath
61+
val ex1 = intercept[UnknownConfigurationException] {
62+
createUpdateTableWithProps(tablePath, createTable = true, Map("delta.unknown" -> "str"))
63+
}
64+
assert(ex1.getMessage.contains("Unknown configuration was specified: delta.unknown"))
65+
66+
// Try updating in an existing table
67+
createUpdateTableWithProps(tablePath, createTable = true)
68+
val ex2 = intercept[UnknownConfigurationException] {
69+
createUpdateTableWithProps(tablePath, props = Map("Delta.unknown" -> "str"))
70+
}
71+
assert(ex2.getMessage.contains("Unknown configuration was specified: Delta.unknown"))
72+
}
73+
}
74+
75+
test("create/update table - delta configs are stored with same case as defined in TableConfig") {
76+
withTempDir { tempFile =>
77+
val tablePath = tempFile.getAbsolutePath
78+
createUpdateTableWithProps(tablePath,
79+
createTable = true,
80+
Map("delta.CHECKPOINTINTERVAL" -> "20"))
81+
assertHasProp(tablePath, expProps = Map("delta.checkpointInterval" -> "20"))
82+
83+
// Try updating in an existing table
84+
createUpdateTableWithProps(
85+
tablePath,
86+
props = Map("DELTA.CHECKPOINTINTERVAL" -> "30"))
87+
assertHasProp(tablePath, expProps = Map("delta.checkpointInterval" -> "30"))
88+
}
89+
}
90+
91+
def createUpdateTableWithProps(
92+
tablePath: String,
93+
createTable: Boolean = false,
94+
props: Map[String, String] = null): Unit = {
95+
createTxn(defaultEngine, tablePath, createTable, testSchema, Seq.empty, props)
96+
.commit(defaultEngine, emptyIterable())
97+
}
98+
99+
// TODO: this will be replaced with get API in the next PRs.
100+
def assertHasProp(tablePath: String, expProps: Map[String, String]): Unit = {
101+
val snapshot = Table.forPath(defaultEngine, tablePath)
102+
.getLatestSnapshot(defaultEngine).asInstanceOf[SnapshotImpl]
103+
expProps.foreach { case (key, value) =>
104+
assert(snapshot.getMetadata.getConfiguration.get(key) === value, key)
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)