diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala index 12a4b05827..7e3269df33 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala @@ -17,7 +17,7 @@ package org.apache.fluss.spark -import org.apache.fluss.spark.procedure.{GetClusterConfigsProcedure, ProcedureBuilder} +import org.apache.fluss.spark.procedure.{GetClusterConfigsProcedure, ProcedureBuilder, ResetClusterConfigsProcedure, SetClusterConfigsProcedure} import java.util.Locale @@ -34,7 +34,9 @@ object SparkProcedures { private def initProcedureBuilders(): Map[String, () => ProcedureBuilder] = { Map( - "get_cluster_configs" -> (() => GetClusterConfigsProcedure.builder()) + "get_cluster_configs" -> (() => GetClusterConfigsProcedure.builder()), + "set_cluster_configs" -> (() => SetClusterConfigsProcedure.builder()), + "reset_cluster_configs" -> (() => ResetClusterConfigsProcedure.builder()) ) } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ResetClusterConfigsProcedure.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ResetClusterConfigsProcedure.scala new file mode 100644 index 0000000000..2e797c7e02 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ResetClusterConfigsProcedure.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.fluss.config.cluster.{AlterConfig, AlterConfigOpType} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +/** + * Procedure to reset cluster configurations to their default values. + * + * This procedure reverts the configurations to their initial system defaults. The changes are: + * - Validated by the CoordinatorServer before persistence + * - Persisted in ZooKeeper for durability + * - Applied to all relevant servers (Coordinator and TabletServers) + * - Survive server restarts + * + * Usage examples: + * {{{ + * -- Reset a single configuration + * CALL sys.reset_cluster_configs(config_keys => array('key1')) + * + * -- Reset multiple configurations at once + * CALL sys.reset_cluster_configs(config_keys => array('key1', 'key2')) + * }}} + */ +class ResetClusterConfigsProcedure(tableCatalog: TableCatalog) extends BaseProcedure(tableCatalog) { + + override def parameters(): Array[ProcedureParameter] = { + ResetClusterConfigsProcedure.PARAMETERS + } + + override def outputType(): StructType = { + ResetClusterConfigsProcedure.OUTPUT_TYPE + } + + override def call(args: InternalRow): Array[InternalRow] = { + val configKeys = if (args.numFields > 0 && !args.isNullAt(0)) { + val keysArray = args.getArray(0) + (0 until keysArray.numElements()).map { + i => + if (keysArray.isNullAt(i)) { + throw new IllegalArgumentException( + s"config_keys contains a null element at position $i. " + + "Please specify valid configuration keys.") + } + keysArray.getUTF8String(i).toString + }.toArray + } else { + Array.empty[String] + } + + resetConfigs(configKeys) + } + + private def resetConfigs(configKeys: Array[String]): Array[InternalRow] = { + if (configKeys.isEmpty) { + throw new IllegalArgumentException( + "config_keys cannot be null or empty. " + + "Please specify valid configuration keys.") + } + + try { + val configList = new java.util.ArrayList[AlterConfig]() + val results = scala.collection.mutable.ArrayBuffer[InternalRow]() + + for (key <- configKeys) { + val configKey = key.trim + if (configKey.isEmpty) { + throw new IllegalArgumentException( + "Config key cannot be null or empty. " + + "Please specify a valid configuration key.") + } + + configList.add(new AlterConfig(configKey, null, AlterConfigOpType.DELETE)) + results += newInternalRow( + UTF8String.fromString(configKey), + UTF8String.fromString( + s"Successfully deleted (reset to default) configuration '$configKey'.") + ) + } + + admin.alterClusterConfigs(configList).get() + + results.toArray + } catch { + case e: IllegalArgumentException => + throw e + case e: Exception => + throw new RuntimeException(s"Failed to reset cluster config: ${e.getMessage}", e) + } + } + + override def description(): String = { + "Reset cluster configuration values to their defaults." + } +} + +object ResetClusterConfigsProcedure { + + private val PARAMETERS: Array[ProcedureParameter] = Array( + ProcedureParameter.required("config_keys", DataTypes.createArrayType(DataTypes.StringType)) + ) + + private val OUTPUT_TYPE: StructType = new StructType( + Array( + new StructField("config_key", DataTypes.StringType, nullable = false, Metadata.empty), + new StructField("result", DataTypes.StringType, nullable = false, Metadata.empty) + ) + ) + + def builder(): ProcedureBuilder = { + new BaseProcedure.Builder[ResetClusterConfigsProcedure]() { + override protected def doBuild(): ResetClusterConfigsProcedure = { + new ResetClusterConfigsProcedure(getTableCatalog) + } + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/SetClusterConfigsProcedure.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/SetClusterConfigsProcedure.scala new file mode 100644 index 0000000000..7a8b05a7ab --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/SetClusterConfigsProcedure.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.fluss.config.cluster.{AlterConfig, AlterConfigOpType} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +/** + * Procedure to set cluster configuration dynamically. + * + * This procedure allows modifying dynamic cluster configurations. The changes are: + * - Validated by the CoordinatorServer before persistence + * - Persisted in ZooKeeper for durability + * - Applied to all relevant servers (Coordinator and TabletServers) + * - Survive server restarts + * + * Usage examples: + * {{{ + * -- Set a single configuration + * CALL sys.set_cluster_configs(config_pairs => array('key1', 'value1')) + * + * -- Set multiple configurations at once + * CALL sys.set_cluster_configs(config_pairs => array('key1', 'value1', 'key2', 'value2')) + * }}} + * + * Note: Not all configurations support dynamic changes. The server will validate the change and + * reject it if the configuration cannot be modified dynamically or if the new value is invalid. + */ +class SetClusterConfigsProcedure(tableCatalog: TableCatalog) extends BaseProcedure(tableCatalog) { + + override def parameters(): Array[ProcedureParameter] = { + SetClusterConfigsProcedure.PARAMETERS + } + + override def outputType(): StructType = { + SetClusterConfigsProcedure.OUTPUT_TYPE + } + + override def call(args: InternalRow): Array[InternalRow] = { + val configPairs = if (args.numFields > 0 && !args.isNullAt(0)) { + val pairsArray = args.getArray(0) + (0 until pairsArray.numElements()).map { + i => + if (pairsArray.isNullAt(i)) { + throw new IllegalArgumentException( + s"config_pairs contains a null element at position $i. " + + "Please specify valid configuration key/value pairs.") + } + pairsArray.getUTF8String(i).toString + }.toArray + } else { + Array.empty[String] + } + + setConfigs(configPairs) + } + + private def setConfigs(configPairs: Array[String]): Array[InternalRow] = { + if (configPairs.isEmpty) { + throw new IllegalArgumentException( + "config_pairs cannot be null or empty. " + + "Please specify valid configuration pairs.") + } + + if (configPairs.length % 2 != 0) { + throw new IllegalArgumentException( + "config_pairs must be set in pairs (key, value). " + + "Please specify valid configuration pairs.") + } + + try { + val configList = new java.util.ArrayList[AlterConfig]() + val results = scala.collection.mutable.ArrayBuffer[InternalRow]() + + for (i <- configPairs.indices by 2) { + val configKey = configPairs(i).trim + if (configKey.isEmpty) { + throw new IllegalArgumentException( + "Config key cannot be null or empty. " + + "Please specify a valid configuration key.") + } + val configValue = configPairs(i + 1) + + configList.add(new AlterConfig(configKey, configValue, AlterConfigOpType.SET)) + results += newInternalRow( + UTF8String.fromString(configKey), + UTF8String.fromString(configValue), + UTF8String.fromString(s"Successfully set configuration '$configKey' to '$configValue'.") + ) + } + + admin.alterClusterConfigs(configList).get() + + results.toArray + } catch { + case e: IllegalArgumentException => + throw e + case e: Exception => + throw new RuntimeException(s"Failed to set cluster config: ${e.getMessage}", e) + } + } + + override def description(): String = { + "Set cluster configuration values dynamically." + } +} + +object SetClusterConfigsProcedure { + + private val PARAMETERS: Array[ProcedureParameter] = Array( + ProcedureParameter.required("config_pairs", DataTypes.createArrayType(DataTypes.StringType)) + ) + + private val OUTPUT_TYPE: StructType = new StructType( + Array( + new StructField("config_key", DataTypes.StringType, nullable = false, Metadata.empty), + new StructField("config_value", DataTypes.StringType, nullable = false, Metadata.empty), + new StructField("result", DataTypes.StringType, nullable = false, Metadata.empty) + ) + ) + + def builder(): ProcedureBuilder = { + new BaseProcedure.Builder[SetClusterConfigsProcedure]() { + override protected def doBuild(): SetClusterConfigsProcedure = { + new SetClusterConfigsProcedure(getTableCatalog) + } + } + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/ResetClusterConfigsProcedureTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/ResetClusterConfigsProcedureTest.scala new file mode 100644 index 0000000000..cc0c0c51aa --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/ResetClusterConfigsProcedureTest.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.fluss.config.ConfigOptions +import org.apache.fluss.spark.FlussSparkTestBase + +class ResetClusterConfigsProcedureTest extends FlussSparkTestBase { + + test("reset_cluster_configs: set and then reset a configuration") { + val configKey = ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key() + val configValue = "300MB" + + // First, set a dynamic configuration + sql( + s"CALL $DEFAULT_CATALOG.sys.set_cluster_configs(config_pairs => array('$configKey', '$configValue'))") + .collect() + + // Verify it was set + val getResult = + sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array('$configKey'))") + .collect() + assert(getResult.length == 1) + assert(getResult.head.getString(2) == "DYNAMIC") + + // Reset the configuration + val resetResult = + sql(s"CALL $DEFAULT_CATALOG.sys.reset_cluster_configs(config_keys => array('$configKey'))") + .collect() + + assert(resetResult.length == 1) + assert(resetResult.head.getString(0) == configKey) + assert(resetResult.head.getString(1).contains("Successfully")) + + // Verify it was reset (should no longer be DYNAMIC). + // describeClusterConfigs only returns entries that exist as either static (initial) + // or dynamic configs. If the key was not present in initial configs, an empty result + // is expected after reset; otherwise the source should no longer be DYNAMIC. + val getResultAfterReset = + sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array('$configKey'))") + .collect() + assert(getResultAfterReset.length <= 1) + if (getResultAfterReset.length == 1) { + assert(getResultAfterReset.head.getString(2) != "DYNAMIC") + } + } + + test("reset_cluster_configs: reset multiple configurations") { + val key1 = ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key() + val key2 = ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key() + + // First, set dynamic configurations + sql( + s"CALL $DEFAULT_CATALOG.sys.set_cluster_configs(config_pairs => array('$key1', '100MB', '$key2', '2'))") + .collect() + + // Reset both configurations + val result = sql( + s"CALL $DEFAULT_CATALOG.sys.reset_cluster_configs(config_keys => array('$key1', '$key2'))") + .collect() + + assert(result.length == 2) + assert(result(0).getString(0) == key1) + assert(result(1).getString(0) == key2) + } + + test("reset_cluster_configs: empty config_keys should fail") { + val exception = intercept[RuntimeException] { + sql(s"CALL $DEFAULT_CATALOG.sys.reset_cluster_configs(config_keys => array())").collect() + } + assert(exception.getMessage.contains("cannot be null or empty")) + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/SetClusterConfigsProcedureTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/SetClusterConfigsProcedureTest.scala new file mode 100644 index 0000000000..e12c5e549a --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/SetClusterConfigsProcedureTest.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.fluss.config.ConfigOptions +import org.apache.fluss.spark.FlussSparkTestBase + +class SetClusterConfigsProcedureTest extends FlussSparkTestBase { + + test("set_cluster_configs: set a single configuration") { + val configKey = ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key() + val configValue = "200MB" + + val result = sql( + s"CALL $DEFAULT_CATALOG.sys.set_cluster_configs(config_pairs => array('$configKey', '$configValue'))") + .collect() + + assert(result.length == 1) + assert(result.head.getString(0) == configKey) + assert(result.head.getString(1) == configValue) + assert(result.head.getString(2).contains("Successfully")) + + // Verify the configuration was actually set + val getResult = + sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array('$configKey'))") + .collect() + + assert(getResult.length == 1) + assert(getResult.head.getString(0) == configKey) + assert(getResult.head.getString(1) == configValue) + assert(getResult.head.getString(2) == "DYNAMIC") + } + + test("set_cluster_configs: set multiple configurations") { + val key1 = ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key() + val value1 = "100MB" + val key2 = ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key() + val value2 = "2" + + val result = sql( + s"CALL $DEFAULT_CATALOG.sys.set_cluster_configs(config_pairs => array('$key1', '$value1', '$key2', '$value2'))") + .collect() + + assert(result.length == 2) + assert(result(0).getString(0) == key1) + assert(result(0).getString(1) == value1) + assert(result(1).getString(0) == key2) + assert(result(1).getString(1) == value2) + } + + test("set_cluster_configs: empty config_pairs should fail") { + val exception = intercept[RuntimeException] { + sql(s"CALL $DEFAULT_CATALOG.sys.set_cluster_configs(config_pairs => array())").collect() + } + assert(exception.getMessage.contains("cannot be null or empty")) + } + + test("set_cluster_configs: odd number of config_pairs should fail") { + val exception = intercept[RuntimeException] { + sql(s"CALL $DEFAULT_CATALOG.sys.set_cluster_configs(config_pairs => array('key1'))").collect() + } + assert(exception.getMessage.contains("must be set in pairs")) + } +} diff --git a/website/docs/engine-spark/procedures.md b/website/docs/engine-spark/procedures.md index ace2d3a6df..7fbb5947bb 100644 --- a/website/docs/engine-spark/procedures.md +++ b/website/docs/engine-spark/procedures.md @@ -87,6 +87,69 @@ CALL sys.get_cluster_configs(); CALL sys.get_cluster_configs(config_keys => ARRAY('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format')); ``` +### set_cluster_configs + +Set cluster configuration values dynamically. The changes are validated, persisted, and applied to all servers without requiring a restart. + +**Syntax:** + +```sql +CALL [catalog_name.]sys.set_cluster_configs(config_pairs => ARRAY('key1', 'value1', 'key2', 'value2')) +``` + +**Parameters:** + +- `config_pairs` (required): Array of configuration key-value pairs. Keys and values must alternate (key1, value1, key2, value2, ...). + +**Returns:** A table with columns: + +- `config_key`: The configuration key that was set +- `config_value`: The value that was set +- `result`: A message indicating the result of the operation + +**Example:** + +```sql title="Spark SQL" +-- Set a single configuration +CALL sys.set_cluster_configs(config_pairs => ARRAY('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB')); + +-- Set multiple configurations at once +CALL sys.set_cluster_configs(config_pairs => ARRAY('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB', 'datalake.format', 'paimon')); +``` + +:::note +Not all configurations support dynamic changes. The server will validate the change and reject it if the configuration cannot be modified dynamically or if the new value is invalid. +::: + +### reset_cluster_configs + +Reset cluster configurations to their default values. The changes are validated, persisted, and applied to all servers without requiring a restart. + +**Syntax:** + +```sql +CALL [catalog_name.]sys.reset_cluster_configs(config_keys => ARRAY('key1', 'key2')) +``` + +**Parameters:** + +- `config_keys` (required): Array of configuration keys to reset to their default values. + +**Returns:** A table with columns: + +- `config_key`: The configuration key that was reset +- `result`: A message indicating the result of the operation + +**Example:** + +```sql title="Spark SQL" +-- Reset a single configuration +CALL sys.reset_cluster_configs(config_keys => ARRAY('kv.rocksdb.shared-rate-limiter.bytes-per-sec')); + +-- Reset multiple configurations at once +CALL sys.reset_cluster_configs(config_keys => ARRAY('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format')); +``` + ## Error Handling Procedures will throw exceptions in the following cases: