/
SQLConf.scala
4451 lines (3770 loc) · 195 KB
/
SQLConf.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.internal
import java.util.{Locale, NoSuchElementException, Properties, TimeZone}
import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.util.Try
import scala.util.control.NonFatal
import scala.util.matching.Regex
import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.{IGNORE_MISSING_FILES => SPARK_IGNORE_MISSING_FILES}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.{AtomicType, TimestampNTZType, TimestampType}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL.
////////////////////////////////////////////////////////////////////////////////////////////////////
object SQLConf {
private[this] val sqlConfEntriesUpdateLock = new Object
@volatile
private[this] var sqlConfEntries: util.Map[String, ConfigEntry[_]] = util.Collections.emptyMap()
private[this] val staticConfKeysUpdateLock = new Object
@volatile
private[this] var staticConfKeys: java.util.Set[String] = util.Collections.emptySet()
private def register(entry: ConfigEntry[_]): Unit = sqlConfEntriesUpdateLock.synchronized {
require(!sqlConfEntries.containsKey(entry.key),
s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
val updatedMap = new java.util.HashMap[String, ConfigEntry[_]](sqlConfEntries)
updatedMap.put(entry.key, entry)
sqlConfEntries = updatedMap
}
// For testing only
private[sql] def unregister(entry: ConfigEntry[_]): Unit = sqlConfEntriesUpdateLock.synchronized {
val updatedMap = new java.util.HashMap[String, ConfigEntry[_]](sqlConfEntries)
updatedMap.remove(entry.key)
sqlConfEntries = updatedMap
}
private[internal] def getConfigEntry(key: String): ConfigEntry[_] = {
sqlConfEntries.get(key)
}
private[internal] def getConfigEntries(): util.Collection[ConfigEntry[_]] = {
sqlConfEntries.values()
}
private[internal] def containsConfigEntry(entry: ConfigEntry[_]): Boolean = {
getConfigEntry(entry.key) == entry
}
private[sql] def containsConfigKey(key: String): Boolean = {
sqlConfEntries.containsKey(key)
}
def registerStaticConfigKey(key: String): Unit = staticConfKeysUpdateLock.synchronized {
val updated = new util.HashSet[String](staticConfKeys)
updated.add(key)
staticConfKeys = updated
}
def isStaticConfigKey(key: String): Boolean = staticConfKeys.contains(key)
def buildConf(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register)
def buildStaticConf(key: String): ConfigBuilder = {
ConfigBuilder(key).onCreate { entry =>
SQLConf.registerStaticConfigKey(entry.key)
SQLConf.register(entry)
}
}
/**
* Merge all non-static configs to the SQLConf. For example, when the 1st [[SparkSession]] and
* the global [[SharedState]] have been initialized, all static configs have taken affect and
* should not be set to other values. Other later created sessions should respect all static
* configs and only be able to change non-static configs.
*/
private[sql] def mergeNonStaticSQLConfigs(
sqlConf: SQLConf,
configs: Map[String, String]): Unit = {
for ((k, v) <- configs if !staticConfKeys.contains(k)) {
sqlConf.setConfString(k, v)
}
}
/**
* Extract entries from `SparkConf` and put them in the `SQLConf`
*/
private[sql] def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = {
sparkConf.getAll.foreach { case (k, v) =>
sqlConf.setConfString(k, v)
}
}
/**
* Default config. Only used when there is no active SparkSession for the thread.
* See [[get]] for more information.
*/
private lazy val fallbackConf = new ThreadLocal[SQLConf] {
override def initialValue: SQLConf = new SQLConf
}
/** See [[get]] for more information. */
def getFallbackConf: SQLConf = fallbackConf.get()
private lazy val existingConf = new ThreadLocal[SQLConf] {
override def initialValue: SQLConf = null
}
def withExistingConf[T](conf: SQLConf)(f: => T): T = {
val old = existingConf.get()
existingConf.set(conf)
try {
f
} finally {
if (old != null) {
existingConf.set(old)
} else {
existingConf.remove()
}
}
}
/**
* Defines a getter that returns the SQLConf within scope.
* See [[get]] for more information.
*/
private val confGetter = new AtomicReference[() => SQLConf](() => fallbackConf.get())
/**
* Sets the active config object within the current scope.
* See [[get]] for more information.
*/
def setSQLConfGetter(getter: () => SQLConf): Unit = {
confGetter.set(getter)
}
/**
* Returns the active config object within the current scope. If there is an active SparkSession,
* the proper SQLConf associated with the thread's active session is used. If it's called from
* tasks in the executor side, a SQLConf will be created from job local properties, which are set
* and propagated from the driver side, unless a `SQLConf` has been set in the scope by
* `withExistingConf` as done for propagating SQLConf for operations performed on RDDs created
* from DataFrames.
*
* The way this works is a little bit convoluted, due to the fact that config was added initially
* only for physical plans (and as a result not in sql/catalyst module).
*
* The first time a SparkSession is instantiated, we set the [[confGetter]] to return the
* active SparkSession's config. If there is no active SparkSession, it returns using the thread
* local [[fallbackConf]]. The reason [[fallbackConf]] is a thread local (rather than just a conf)
* is to support setting different config options for different threads so we can potentially
* run tests in parallel. At the time this feature was implemented, this was a no-op since we
* run unit tests (that does not involve SparkSession) in serial order.
*/
def get: SQLConf = {
if (TaskContext.get != null) {
val conf = existingConf.get()
if (conf != null) {
conf
} else {
new ReadOnlySQLConf(TaskContext.get())
}
} else {
val isSchedulerEventLoopThread = SparkContext.getActive
.flatMap { sc => Option(sc.dagScheduler) }
.map(_.eventProcessLoop.eventThread)
.exists(_.getId == Thread.currentThread().getId)
if (isSchedulerEventLoopThread) {
// DAGScheduler event loop thread does not have an active SparkSession, the `confGetter`
// will return `fallbackConf` which is unexpected. Here we require the caller to get the
// conf within `withExistingConf`, otherwise fail the query.
val conf = existingConf.get()
if (conf != null) {
conf
} else if (Utils.isTesting) {
throw QueryExecutionErrors.cannotGetSQLConfInSchedulerEventLoopThreadError()
} else {
confGetter.get()()
}
} else {
val conf = existingConf.get()
if (conf != null) {
conf
} else {
confGetter.get()()
}
}
}
}
val ANALYZER_MAX_ITERATIONS = buildConf("spark.sql.analyzer.maxIterations")
.internal()
.doc("The max number of iterations the analyzer runs.")
.version("3.0.0")
.intConf
.createWithDefault(100)
val OPTIMIZER_EXCLUDED_RULES = buildConf("spark.sql.optimizer.excludedRules")
.doc("Configures a list of rules to be disabled in the optimizer, in which the rules are " +
"specified by their rule names and separated by comma. It is not guaranteed that all the " +
"rules in this configuration will eventually be excluded, as some rules are necessary " +
"for correctness. The optimizer will log the rules that have indeed been excluded.")
.version("2.4.0")
.stringConf
.createOptional
val OPTIMIZER_MAX_ITERATIONS = buildConf("spark.sql.optimizer.maxIterations")
.internal()
.doc("The max number of iterations the optimizer runs.")
.version("2.0.0")
.intConf
.createWithDefault(100)
val OPTIMIZER_INSET_CONVERSION_THRESHOLD =
buildConf("spark.sql.optimizer.inSetConversionThreshold")
.internal()
.doc("The threshold of set size for InSet conversion.")
.version("2.0.0")
.intConf
.createWithDefault(10)
val OPTIMIZER_INSET_SWITCH_THRESHOLD =
buildConf("spark.sql.optimizer.inSetSwitchThreshold")
.internal()
.doc("Configures the max set size in InSet for which Spark will generate code with " +
"switch statements. This is applicable only to bytes, shorts, ints, dates.")
.version("3.0.0")
.intConf
.checkValue(threshold => threshold >= 0 && threshold <= 600, "The max set size " +
"for using switch statements in InSet must be non-negative and less than or equal to 600")
.createWithDefault(400)
val PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.planChangeLog.level")
.internal()
.doc("Configures the log level for logging the change from the original plan to the new " +
"plan after a rule or batch is applied. The value can be 'trace', 'debug', 'info', " +
"'warn', or 'error'. The default log level is 'trace'.")
.version("3.1.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel),
"Invalid value for 'spark.sql.planChangeLog.level'. Valid values are " +
"'trace', 'debug', 'info', 'warn' and 'error'.")
.createWithDefault("trace")
val PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.planChangeLog.rules")
.internal()
.doc("Configures a list of rules for logging plan changes, in which the rules are " +
"specified by their rule names and separated by comma.")
.version("3.1.0")
.stringConf
.createOptional
val PLAN_CHANGE_LOG_BATCHES = buildConf("spark.sql.planChangeLog.batches")
.internal()
.doc("Configures a list of batches for logging plan changes, in which the batches " +
"are specified by their batch names and separated by comma.")
.version("3.1.0")
.stringConf
.createOptional
val DYNAMIC_PARTITION_PRUNING_ENABLED =
buildConf("spark.sql.optimizer.dynamicPartitionPruning.enabled")
.doc("When true, we will generate predicate for partition column when it's used as join key")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
val DYNAMIC_PARTITION_PRUNING_USE_STATS =
buildConf("spark.sql.optimizer.dynamicPartitionPruning.useStats")
.internal()
.doc("When true, distinct count statistics will be used for computing the data size of the " +
"partitioned table after dynamic partition pruning, in order to evaluate if it is worth " +
"adding an extra subquery as the pruning filter if broadcast reuse is not applicable.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
val DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO =
buildConf("spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio")
.internal()
.doc("When statistics are not available or configured not to be used, this config will be " +
"used as the fallback filter ratio for computing the data size of the partitioned table " +
"after dynamic partition pruning, in order to evaluate if it is worth adding an extra " +
"subquery as the pruning filter if broadcast reuse is not applicable.")
.version("3.0.0")
.doubleConf
.createWithDefault(0.5)
val DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY =
buildConf("spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly")
.internal()
.doc("When true, dynamic partition pruning will only apply when the broadcast exchange of " +
"a broadcast hash join operation can be reused as the dynamic pruning filter.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
.doc("When set to true Spark SQL will automatically select a compression codec for each " +
"column based on statistics of the data.")
.version("1.0.1")
.booleanConf
.createWithDefault(true)
val COLUMN_BATCH_SIZE = buildConf("spark.sql.inMemoryColumnarStorage.batchSize")
.doc("Controls the size of batches for columnar caching. Larger batch sizes can improve " +
"memory utilization and compression, but risk OOMs when caching data.")
.version("1.1.1")
.intConf
.createWithDefault(10000)
val IN_MEMORY_PARTITION_PRUNING =
buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning")
.internal()
.doc("When true, enable partition pruning for in-memory columnar tables.")
.version("1.2.0")
.booleanConf
.createWithDefault(true)
val IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED =
buildConf("spark.sql.inMemoryTableScanStatistics.enable")
.internal()
.doc("When true, enable in-memory table scan accumulators.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
val CACHE_VECTORIZED_READER_ENABLED =
buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader")
.doc("Enables vectorized reader for columnar caching.")
.version("2.3.1")
.booleanConf
.createWithDefault(true)
val COLUMN_VECTOR_OFFHEAP_ENABLED =
buildConf("spark.sql.columnVector.offheap.enabled")
.internal()
.doc("When true, use OffHeapColumnVector in ColumnarBatch.")
.version("2.3.0")
.booleanConf
.createWithDefault(false)
val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin")
.internal()
.doc("When true, prefer sort merge join over shuffled hash join. " +
"Sort merge join consumes less memory than shuffled hash join and it works efficiently " +
"when both join tables are large. On the other hand, shuffled hash join can improve " +
"performance (e.g., of full outer joins) when one of join tables is much smaller.")
.version("2.0.0")
.booleanConf
.createWithDefault(true)
val RADIX_SORT_ENABLED = buildConf("spark.sql.sort.enableRadixSort")
.internal()
.doc("When true, enable use of radix sort when possible. Radix sort is much faster but " +
"requires additional memory to be reserved up-front. The memory overhead may be " +
"significant when sorting very small rows (up to 50% more in this case).")
.version("2.0.0")
.booleanConf
.createWithDefault(true)
val AUTO_BROADCASTJOIN_THRESHOLD = buildConf("spark.sql.autoBroadcastJoinThreshold")
.doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
"nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " +
"Note that currently statistics are only supported for Hive Metastore tables where the " +
"command `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been " +
"run, and file-based data source tables where the statistics are computed directly on " +
"the files of data.")
.version("1.1.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("10MB")
val SHUFFLE_HASH_JOIN_FACTOR = buildConf("spark.sql.shuffledHashJoinFactor")
.doc("The shuffle hash join can be selected if the data size of small" +
" side multiplied by this factor is still smaller than the large side.")
.version("3.3.0")
.intConf
.checkValue(_ >= 1, "The shuffle hash join factor cannot be negative.")
.createWithDefault(3)
val LIMIT_SCALE_UP_FACTOR = buildConf("spark.sql.limit.scaleUpFactor")
.internal()
.doc("Minimal increase rate in number of partitions between attempts when executing a take " +
"on a query. Higher values lead to more partitions read. Lower values might lead to " +
"longer execution times as more jobs will be run")
.version("2.1.1")
.intConf
.createWithDefault(4)
val ADVANCED_PARTITION_PREDICATE_PUSHDOWN =
buildConf("spark.sql.hive.advancedPartitionPredicatePushdown.enabled")
.internal()
.doc("When true, advanced partition predicate pushdown into Hive metastore is enabled.")
.version("2.3.0")
.booleanConf
.createWithDefault(true)
val LEAF_NODE_DEFAULT_PARALLELISM = buildConf("spark.sql.leafNodeDefaultParallelism")
.doc("The default parallelism of Spark SQL leaf nodes that produce data, such as the file " +
"scan node, the local data scan node, the range node, etc. The default value of this " +
"config is 'SparkContext#defaultParallelism'.")
.version("3.2.0")
.intConf
.checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.")
.createOptional
val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
.doc("The default number of partitions to use when shuffling data for joins or aggregations. " +
"Note: For structured streaming, this configuration cannot be changed between query " +
"restarts from the same checkpoint location.")
.version("1.1.0")
.intConf
.checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive")
.createWithDefault(200)
val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
.internal()
.doc("(Deprecated since Spark 3.0)")
.version("1.6.0")
.bytesConf(ByteUnit.BYTE)
.checkValue(_ > 0, "advisoryPartitionSizeInBytes must be positive")
.createWithDefaultString("64MB")
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution, which re-optimizes the query plan in the " +
"middle of query execution, based on accurate runtime statistics.")
.version("1.6.0")
.booleanConf
.createWithDefault(true)
val ADAPTIVE_EXECUTION_FORCE_APPLY = buildConf("spark.sql.adaptive.forceApply")
.internal()
.doc("Adaptive query execution is skipped when the query does not have exchanges or " +
"sub-queries. By setting this config to true (together with " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' set to true), Spark will force apply adaptive query " +
"execution for all supported queries.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
val ADAPTIVE_EXECUTION_LOG_LEVEL = buildConf("spark.sql.adaptive.logLevel")
.internal()
.doc("Configures the log level for adaptive execution logging of plan changes. The value " +
"can be 'trace', 'debug', 'info', 'warn', or 'error'. The default log level is 'debug'.")
.version("3.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR"))
.createWithDefault("debug")
val ADVISORY_PARTITION_SIZE_IN_BYTES =
buildConf("spark.sql.adaptive.advisoryPartitionSizeInBytes")
.doc("The advisory size in bytes of the shuffle partition during adaptive optimization " +
s"(when ${ADAPTIVE_EXECUTION_ENABLED.key} is true). It takes effect when Spark " +
"coalesces small shuffle partitions or splits skewed shuffle partition.")
.version("3.0.0")
.fallbackConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
val COALESCE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.coalescePartitions.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark will coalesce " +
"contiguous shuffle partitions according to the target size (specified by " +
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), to avoid too many small tasks.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
val COALESCE_PARTITIONS_PARALLELISM_FIRST =
buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst")
.doc("When true, Spark does not respect the target size specified by " +
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when coalescing contiguous " +
"shuffle partitions, but adaptively calculate the target size according to the default " +
"parallelism of the Spark cluster. The calculated size is usually smaller than the " +
"configured target size. This is to maximize the parallelism and avoid performance " +
"regression when enabling adaptive query execution. It's recommended to set this config " +
"to false and respect the configured target size.")
.version("3.2.0")
.booleanConf
.createWithDefault(true)
val COALESCE_PARTITIONS_MIN_PARTITION_SIZE =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionSize")
.doc("The minimum size of shuffle partitions after coalescing. This is useful when the " +
"adaptively calculated target size is too small during partition coalescing.")
.version("3.2.0")
.bytesConf(ByteUnit.BYTE)
.checkValue(_ > 0, "minPartitionSize must be positive")
.createWithDefaultString("1MB")
val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
.internal()
.doc("(deprecated) The suggested (not guaranteed) minimum number of shuffle partitions " +
"after coalescing. If not set, the default value is the default parallelism of the " +
"Spark cluster. This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
.version("3.0.0")
.intConf
.checkValue(_ > 0, "The minimum number of partitions must be positive.")
.createOptional
val COALESCE_PARTITIONS_INITIAL_PARTITION_NUM =
buildConf("spark.sql.adaptive.coalescePartitions.initialPartitionNum")
.doc("The initial number of shuffle partitions before coalescing. If not set, it equals to " +
s"${SHUFFLE_PARTITIONS.key}. This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and '${COALESCE_PARTITIONS_ENABLED.key}' " +
"are both true.")
.version("3.0.0")
.intConf
.checkValue(_ > 0, "The initial number of partitions must be positive.")
.createOptional
val FETCH_SHUFFLE_BLOCKS_IN_BATCH =
buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch")
.internal()
.doc("Whether to fetch the contiguous shuffle blocks in batch. Instead of fetching blocks " +
"one by one, fetching contiguous shuffle blocks for the same map task in batch can " +
"reduce IO and improve performance. Note, multiple contiguous blocks exist in single " +
s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature also depends " +
"on a relocatable serializer, the concatenation support codec in use, the new version " +
"shuffle fetch protocol and io encryption is disabled.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
val LOCAL_SHUFFLE_READER_ENABLED =
buildConf("spark.sql.adaptive.localShuffleReader.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark tries to use local " +
"shuffle reader to read the shuffle data when the shuffle partitioning is not needed, " +
"for example, after converting sort-merge join to broadcast-hash join.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
val SKEW_JOIN_ENABLED =
buildConf("spark.sql.adaptive.skewJoin.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark dynamically " +
"handles skew in shuffled join (sort-merge and shuffled hash) by splitting (and " +
"replicating if needed) skewed partitions.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
val SKEW_JOIN_SKEWED_PARTITION_FACTOR =
buildConf("spark.sql.adaptive.skewJoin.skewedPartitionFactor")
.doc("A partition is considered as skewed if its size is larger than this factor " +
"multiplying the median partition size and also larger than " +
"'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes'")
.version("3.0.0")
.intConf
.checkValue(_ >= 0, "The skew factor cannot be negative.")
.createWithDefault(5)
val SKEW_JOIN_SKEWED_PARTITION_THRESHOLD =
buildConf("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes")
.doc("A partition is considered as skewed if its size in bytes is larger than this " +
s"threshold and also larger than '${SKEW_JOIN_SKEWED_PARTITION_FACTOR.key}' " +
"multiplying the median partition size. Ideally this config should be set larger " +
s"than '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.")
.version("3.0.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("256MB")
val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.internal()
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
"considered as the build side of a broadcast-hash join in adaptive execution regardless " +
"of its size.This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is true.")
.version("3.0.0")
.doubleConf
.checkValue(_ >= 0, "The non-empty partition ratio must be positive number.")
.createWithDefault(0.2)
val ADAPTIVE_OPTIMIZER_EXCLUDED_RULES =
buildConf("spark.sql.adaptive.optimizer.excludedRules")
.doc("Configures a list of rules to be disabled in the adaptive optimizer, in which the " +
"rules are specified by their rule names and separated by comma. The optimizer will log " +
"the rules that have indeed been excluded.")
.version("3.1.0")
.stringConf
.createOptional
val ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD =
buildConf("spark.sql.adaptive.autoBroadcastJoinThreshold")
.doc("Configures the maximum size in bytes for a table that will be broadcast to all " +
"worker nodes when performing a join. By setting this value to -1 broadcasting can be " +
s"disabled. The default value is same with ${AUTO_BROADCASTJOIN_THRESHOLD.key}. " +
"Note that, this config is used only in adaptive framework.")
.version("3.2.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
val ADAPTIVE_MAX_SHUFFLE_HASH_JOIN_LOCAL_MAP_THRESHOLD =
buildConf("spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold")
.doc("Configures the maximum size in bytes per partition that can be allowed to build " +
"local hash map. If this value is not smaller than " +
s"${ADVISORY_PARTITION_SIZE_IN_BYTES.key} and all the partition size are not larger " +
"than this config, join selection prefer to use shuffled hash join instead of " +
s"sort merge join regardless of the value of ${PREFER_SORTMERGEJOIN.key}.")
.version("3.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(0L)
val ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark will optimize the " +
"skewed shuffle partitions in RebalancePartitions and split them to smaller ones " +
s"according to the target size (specified by '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), " +
"to avoid data skew.")
.version("3.2.0")
.booleanConf
.createWithDefault(true)
val ADAPTIVE_REBALANCE_PARTITIONS_SMALL_PARTITION_FACTOR =
buildConf("spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor")
.doc(s"A partition will be merged during splitting if its size is small than this factor " +
s"multiply ${ADVISORY_PARTITION_SIZE_IN_BYTES.key}.")
.version("3.3.0")
.doubleConf
.checkValue(v => v > 0 && v < 1, "the factor must be in (0, 1)")
.createWithDefault(0.2)
val ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN =
buildConf("spark.sql.adaptive.forceOptimizeSkewedJoin")
.doc("When true, force enable OptimizeSkewedJoin even if it introduces extra shuffle.")
.version("3.3.0")
.booleanConf
.createWithDefault(false)
val ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS =
buildConf("spark.sql.adaptive.customCostEvaluatorClass")
.doc("The custom cost evaluator class to be used for adaptive execution. If not being set," +
" Spark will use its own SimpleCostEvaluator by default.")
.version("3.2.0")
.stringConf
.createOptional
val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
.internal()
.doc("When true, common subexpressions will be eliminated.")
.version("1.6.0")
.booleanConf
.createWithDefault(true)
val SUBEXPRESSION_ELIMINATION_CACHE_MAX_ENTRIES =
buildConf("spark.sql.subexpressionElimination.cache.maxEntries")
.internal()
.doc("The maximum entries of the cache used for interpreted subexpression elimination.")
.version("3.1.0")
.intConf
.checkValue(_ >= 0, "The maximum must not be negative")
.createWithDefault(100)
val CASE_SENSITIVE = buildConf("spark.sql.caseSensitive")
.internal()
.doc("Whether the query analyzer should be case sensitive or not. " +
"Default to case insensitive. It is highly discouraged to turn on case sensitive mode.")
.version("1.4.0")
.booleanConf
.createWithDefault(false)
val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
.internal()
.doc("When true, the query optimizer will infer and propagate data constraints in the query " +
"plan to optimize them. Constraint propagation can sometimes be computationally expensive " +
"for certain kinds of query plans (such as those with a large number of predicates and " +
"aliases) which might negatively impact overall runtime.")
.version("2.2.0")
.booleanConf
.createWithDefault(true)
val ESCAPED_STRING_LITERALS = buildConf("spark.sql.parser.escapedStringLiterals")
.internal()
.doc("When true, string literals (including regex patterns) remain escaped in our SQL " +
"parser. The default is false since Spark 2.0. Setting it to true can restore the behavior " +
"prior to Spark 2.0.")
.version("2.2.1")
.booleanConf
.createWithDefault(false)
val FILE_COMPRESSION_FACTOR = buildConf("spark.sql.sources.fileCompressionFactor")
.internal()
.doc("When estimating the output data size of a table scan, multiply the file size with this " +
"factor as the estimated data size, in case the data is compressed in the file and lead to" +
" a heavily underestimated result.")
.version("2.3.1")
.doubleConf
.checkValue(_ > 0, "the value of fileCompressionFactor must be greater than 0")
.createWithDefault(1.0)
val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
"if no summary file is available.")
.version("1.5.0")
.booleanConf
.createWithDefault(false)
val PARQUET_SCHEMA_RESPECT_SUMMARIES = buildConf("spark.sql.parquet.respectSummaryFiles")
.doc("When true, we make assumption that all part-files of Parquet are consistent with " +
"summary files and we will ignore them when merging schema. Otherwise, if this is " +
"false, which is the default, we will merge all part-files. This should be considered " +
"as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
.version("1.5.0")
.booleanConf
.createWithDefault(false)
val PARQUET_BINARY_AS_STRING = buildConf("spark.sql.parquet.binaryAsString")
.doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
"Spark SQL, do not differentiate between binary data and strings when writing out the " +
"Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
"compatibility with these systems.")
.version("1.1.1")
.booleanConf
.createWithDefault(false)
val PARQUET_INT96_AS_TIMESTAMP = buildConf("spark.sql.parquet.int96AsTimestamp")
.doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
"Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
"nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
"provide compatibility with these systems.")
.version("1.3.0")
.booleanConf
.createWithDefault(true)
val PARQUET_INT96_TIMESTAMP_CONVERSION = buildConf("spark.sql.parquet.int96TimestampConversion")
.doc("This controls whether timestamp adjustments should be applied to INT96 data when " +
"converting to timestamps, for data written by Impala. This is necessary because Impala " +
"stores INT96 data with a different timezone offset than Hive & Spark.")
.version("2.3.0")
.booleanConf
.createWithDefault(false)
object ParquetOutputTimestampType extends Enumeration {
val INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value
}
val PARQUET_OUTPUT_TIMESTAMP_TYPE = buildConf("spark.sql.parquet.outputTimestampType")
.doc("Sets which Parquet timestamp type to use when Spark writes data to Parquet files. " +
"INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS " +
"is a standard timestamp type in Parquet, which stores number of microseconds from the " +
"Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which " +
"means Spark has to truncate the microsecond portion of its timestamp value.")
.version("2.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(ParquetOutputTimestampType.values.map(_.toString))
.createWithDefault(ParquetOutputTimestampType.INT96.toString)
val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
.doc("Sets the compression codec used when writing Parquet files. If either `compression` or " +
"`parquet.compression` is specified in the table-specific options/properties, the " +
"precedence would be `compression`, `parquet.compression`, " +
"`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " +
"snappy, gzip, lzo, brotli, lz4, zstd.")
.version("1.1.1")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd"))
.createWithDefault("snappy")
val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
.doc("Enables Parquet filter push-down optimization when set to true.")
.version("1.2.0")
.booleanConf
.createWithDefault(true)
val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date")
.doc("If true, enables Parquet filter push-down optimization for Date. " +
s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
"enabled.")
.version("2.4.0")
.internal()
.booleanConf
.createWithDefault(true)
val PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.timestamp")
.doc("If true, enables Parquet filter push-down optimization for Timestamp. " +
s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
"enabled and Timestamp stored as TIMESTAMP_MICROS or TIMESTAMP_MILLIS type.")
.version("2.4.0")
.internal()
.booleanConf
.createWithDefault(true)
val PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.decimal")
.doc("If true, enables Parquet filter push-down optimization for Decimal. " +
s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
"enabled.")
.version("2.4.0")
.internal()
.booleanConf
.createWithDefault(true)
val PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.string.startsWith")
.doc("If true, enables Parquet filter push-down optimization for string startsWith function. " +
s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
"enabled.")
.version("2.4.0")
.internal()
.booleanConf
.createWithDefault(true)
val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
.doc("For IN predicate, Parquet filter will push-down a set of OR clauses if its " +
"number of values not exceeds this threshold. Otherwise, Parquet filter will push-down " +
"a value greater than or equal to its minimum value and less than or equal to " +
"its maximum value. By setting this value to 0 this feature can be disabled. " +
s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
"enabled.")
.version("2.4.0")
.internal()
.intConf
.checkValue(threshold => threshold >= 0, "The threshold must not be negative.")
.createWithDefault(10)
val PARQUET_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.aggregatePushdown")
.doc("If true, MAX/MIN/COUNT without filter and group by will be pushed" +
" down to Parquet for optimization. MAX/MIN/COUNT for complex types and timestamp" +
" can't be pushed down")
.version("3.3.0")
.booleanConf
.createWithDefault(false)
val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal " +
"values will be written in Apache Parquet's fixed-length byte array format, which other " +
"systems such as Apache Hive and Apache Impala use. If false, the newer format in Parquet " +
"will be used. For example, decimals will be written in int-based format. If Parquet " +
"output is intended for use with systems that do not support this newer format, set to true.")
.version("1.6.0")
.booleanConf
.createWithDefault(false)
val PARQUET_OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.parquet.output.committer.class")
.doc("The output committer class used by Parquet. The specified class needs to be a " +
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter. If it is not, then metadata " +
"summaries will never be created, irrespective of the value of " +
"parquet.summary.metadata.level")
.version("1.5.0")
.internal()
.stringConf
.createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")
val PARQUET_VECTORIZED_READER_ENABLED =
buildConf("spark.sql.parquet.enableVectorizedReader")
.doc("Enables vectorized parquet decoding.")
.version("2.0.0")
.booleanConf
.createWithDefault(true)
val PARQUET_RECORD_FILTER_ENABLED = buildConf("spark.sql.parquet.recordLevelFilter.enabled")
.doc("If true, enables Parquet's native record-level filtering using the pushed down " +
"filters. " +
s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' " +
"is enabled and the vectorized reader is not used. You can ensure the vectorized reader " +
s"is not used by setting '${PARQUET_VECTORIZED_READER_ENABLED.key}' to false.")
.version("2.3.0")
.booleanConf
.createWithDefault(false)
val PARQUET_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.parquet.columnarReaderBatchSize")
.doc("The number of rows to include in a parquet vectorized reader batch. The number should " +
"be carefully chosen to minimize overhead and avoid OOMs in reading data.")
.version("2.4.0")
.intConf
.createWithDefault(4096)
val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
"`orc.compress` is specified in the table-specific options/properties, the precedence " +
"would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." +
"Acceptable values include: none, uncompressed, snappy, zlib, lzo, zstd, lz4.")
.version("2.3.0")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo", "zstd", "lz4"))
.createWithDefault("snappy")
val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl")
.doc("When native, use the native version of ORC support instead of the ORC library in Hive. " +
"It is 'hive' by default prior to Spark 2.4.")
.version("2.3.0")
.internal()
.stringConf
.checkValues(Set("hive", "native"))
.createWithDefault("native")
val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader")
.doc("Enables vectorized orc decoding.")
.version("2.3.0")
.booleanConf
.createWithDefault(true)
val ORC_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.orc.columnarReaderBatchSize")
.doc("The number of rows to include in a orc vectorized reader batch. The number should " +
"be carefully chosen to minimize overhead and avoid OOMs in reading data.")
.version("2.4.0")
.intConf
.createWithDefault(4096)
val ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED =
buildConf("spark.sql.orc.enableNestedColumnVectorizedReader")
.doc("Enables vectorized orc decoding for nested column.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.version("1.4.0")
.booleanConf
.createWithDefault(true)
val ORC_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.aggregatePushdown")
.doc("If true, aggregates will be pushed down to ORC for optimization. Support MIN, MAX and " +
"COUNT as aggregate expression. For MIN/MAX, support boolean, integer, float and date " +
"type. For COUNT, support all data types.")
.version("3.3.0")
.booleanConf
.createWithDefault(false)
val ORC_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.orc.mergeSchema")
.doc("When true, the Orc data source merges schemas collected from all data files, " +
"otherwise the schema is picked from a random data file.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
.doc("When true, check all the partition paths under the table\'s root directory " +
"when reading data stored in HDFS. This configuration will be deprecated in the future " +
s"releases and replaced by ${SPARK_IGNORE_MISSING_FILES.key}.")
.version("1.4.0")
.booleanConf
.createWithDefault(false)
val HIVE_METASTORE_PARTITION_PRUNING =
buildConf("spark.sql.hive.metastorePartitionPruning")
.doc("When true, some predicates will be pushed down into the Hive metastore so that " +
"unmatching partitions can be eliminated earlier.")
.version("1.5.0")
.booleanConf