Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,26 @@ package org.apache.spark.sql.connect.config

import java.util.concurrent.TimeUnit

import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.connect.common.config.ConnectCommon

object Connect {
import org.apache.spark.sql.internal.SQLConf.buildStaticConf

val CONNECT_GRPC_BINDING_ADDRESS =
ConfigBuilder("spark.connect.grpc.binding.address")
buildStaticConf("spark.connect.grpc.binding.address")
.version("4.0.0")
.stringConf
.createOptional

val CONNECT_GRPC_BINDING_PORT =
ConfigBuilder("spark.connect.grpc.binding.port")
buildStaticConf("spark.connect.grpc.binding.port")
.version("3.4.0")
.intConf
.createWithDefault(ConnectCommon.CONNECT_GRPC_BINDING_PORT)

val CONNECT_GRPC_INTERCEPTOR_CLASSES =
ConfigBuilder("spark.connect.grpc.interceptor.classes")
buildStaticConf("spark.connect.grpc.interceptor.classes")
.doc(
"Comma separated list of class names that must " +
"implement the io.grpc.ServerInterceptor interface.")
Expand All @@ -47,7 +46,7 @@ object Connect {
.createOptional

val CONNECT_GRPC_ARROW_MAX_BATCH_SIZE =
ConfigBuilder("spark.connect.grpc.arrow.maxBatchSize")
buildStaticConf("spark.connect.grpc.arrow.maxBatchSize")
.doc(
"When using Apache Arrow, limit the maximum size of one arrow batch, in bytes unless " +
"otherwise specified, that can be sent from server side to client side. Currently, we " +
Expand All @@ -57,15 +56,15 @@ object Connect {
.createWithDefault(4 * 1024 * 1024)

val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE =
ConfigBuilder("spark.connect.grpc.maxInboundMessageSize")
buildStaticConf("spark.connect.grpc.maxInboundMessageSize")
.doc("Sets the maximum inbound message in bytes size for the gRPC requests." +
"Requests with a larger payload will fail.")
.version("3.4.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)

val CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT =
ConfigBuilder("spark.connect.grpc.marshallerRecursionLimit")
buildStaticConf("spark.connect.grpc.marshallerRecursionLimit")
.internal()
.doc("""
|Sets the recursion limit to grpc protobuf messages.
Expand All @@ -75,31 +74,31 @@ object Connect {
.createWithDefault(1024)

val CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT =
ConfigBuilder("spark.connect.execute.manager.detachedTimeout")
buildStaticConf("spark.connect.execute.manager.detachedTimeout")
.internal()
.doc("Timeout after which executions without an attached RPC will be removed.")
.version("3.5.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5m")

val CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL =
ConfigBuilder("spark.connect.execute.manager.maintenanceInterval")
buildStaticConf("spark.connect.execute.manager.maintenanceInterval")
.internal()
.doc("Interval at which execution manager will search for abandoned executions to remove.")
.version("3.5.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")

val CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE =
ConfigBuilder("spark.connect.execute.manager.abandonedTombstonesSize")
buildStaticConf("spark.connect.execute.manager.abandonedTombstonesSize")
.internal()
.doc("Maximum size of the cache of abandoned executions.")
.version("3.5.0")
.intConf
.createWithDefaultString("10000")

val CONNECT_EXECUTE_REATTACHABLE_ENABLED =
ConfigBuilder("spark.connect.execute.reattachable.enabled")
buildStaticConf("spark.connect.execute.reattachable.enabled")
.internal()
.doc("Enables reattachable execution on the server. If disabled and a client requests it, " +
"non-reattachable execution will follow and should run until query completion. This will " +
Expand All @@ -110,7 +109,7 @@ object Connect {
.createWithDefault(true)

val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION =
ConfigBuilder("spark.connect.execute.reattachable.senderMaxStreamDuration")
buildStaticConf("spark.connect.execute.reattachable.senderMaxStreamDuration")
.internal()
.doc("For reattachable execution, after this amount of time the response stream will be " +
"automatically completed and client needs to send a new ReattachExecute RPC to continue. " +
Expand All @@ -120,7 +119,7 @@ object Connect {
.createWithDefaultString("2m")

val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE =
ConfigBuilder("spark.connect.execute.reattachable.senderMaxStreamSize")
buildStaticConf("spark.connect.execute.reattachable.senderMaxStreamSize")
.internal()
.doc(
"For reattachable execution, after total responses size exceeds this value, the " +
Expand All @@ -131,7 +130,7 @@ object Connect {
.createWithDefaultString("1g")

val CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE =
ConfigBuilder("spark.connect.execute.reattachable.observerRetryBufferSize")
buildStaticConf("spark.connect.execute.reattachable.observerRetryBufferSize")
.internal()
.doc(
"For reattachable execution, the total size of responses that were already sent to be " +
Expand All @@ -143,7 +142,7 @@ object Connect {
.createWithDefaultString("1m")

val CONNECT_EXTENSIONS_RELATION_CLASSES =
ConfigBuilder("spark.connect.extensions.relation.classes")
buildStaticConf("spark.connect.extensions.relation.classes")
.doc("""
|Comma separated list of classes that implement the trait
|org.apache.spark.sql.connect.plugin.RelationPlugin to support custom
Expand All @@ -155,7 +154,7 @@ object Connect {
.createWithDefault(Nil)

val CONNECT_EXTENSIONS_EXPRESSION_CLASSES =
ConfigBuilder("spark.connect.extensions.expression.classes")
buildStaticConf("spark.connect.extensions.expression.classes")
.doc("""
|Comma separated list of classes that implement the trait
|org.apache.spark.sql.connect.plugin.ExpressionPlugin to support custom
Expand All @@ -167,7 +166,7 @@ object Connect {
.createWithDefault(Nil)

val CONNECT_EXTENSIONS_COMMAND_CLASSES =
ConfigBuilder("spark.connect.extensions.command.classes")
buildStaticConf("spark.connect.extensions.command.classes")
.doc("""
|Comma separated list of classes that implement the trait
|org.apache.spark.sql.connect.plugin.CommandPlugin to support custom
Expand All @@ -179,7 +178,7 @@ object Connect {
.createWithDefault(Nil)

val CONNECT_JVM_STACK_TRACE_MAX_SIZE =
ConfigBuilder("spark.connect.jvmStacktrace.maxSize")
buildStaticConf("spark.connect.jvmStacktrace.maxSize")
.doc("""
|Sets the maximum stack trace size to display when
|`spark.sql.pyspark.jvmStacktrace.enabled` is true.
Expand All @@ -203,13 +202,13 @@ object Connect {
.createWithDefault(false)

val CONNECT_UI_STATEMENT_LIMIT =
ConfigBuilder("spark.sql.connect.ui.retainedStatements")
buildStaticConf("spark.sql.connect.ui.retainedStatements")
.doc("The number of statements kept in the Spark Connect UI history.")
.version("3.5.0")
.intConf
.createWithDefault(200)

val CONNECT_UI_SESSION_LIMIT = ConfigBuilder("spark.sql.connect.ui.retainedSessions")
val CONNECT_UI_SESSION_LIMIT = buildStaticConf("spark.sql.connect.ui.retainedSessions")
.doc("The number of client sessions kept in the Spark Connect UI history.")
.version("3.5.0")
.intConf
Expand Down