/
CarbonScanRDD.scala
721 lines (665 loc) · 30.9 KB
/
CarbonScanRDD.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.rdd
import java.text.SimpleDateFormat
import java.util.{ArrayList, Date, List}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.Random
import scala.util.control.Breaks.{break, breakable}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.profiler.{GetPartition, Profiler, QueryTaskEnd}
import org.apache.spark.sql.util.SparkSQLUtil.sessionState
import org.apache.spark.util.{CarbonReflectionUtils, TaskCompletionListener}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
import org.apache.carbondata.core.datastore.block.Distributable
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.filter.FilterUtil
import org.apache.carbondata.core.scan.model.QueryModel
import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
import org.apache.carbondata.core.statusmanager.FileFormat
import org.apache.carbondata.core.util._
import org.apache.carbondata.hadoop._
import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.InitInputMetrics
import org.apache.carbondata.spark.util.Util
import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
/**
* This RDD is used to perform query on CarbonData file. Before sending tasks to scan
* CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
* level filtering in driver side.
*/
class CarbonScanRDD[T: ClassTag](
@transient private val spark: SparkSession,
val columnProjection: CarbonProjection,
var filterExpression: Expression,
identifier: AbsoluteTableIdentifier,
@transient private val serializedTableInfo: Array[Byte],
@transient private val tableInfo: TableInfo,
inputMetricsStats: InitInputMetrics,
@transient val partitionNames: Seq[PartitionSpec],
val dataTypeConverterClz: Class[_ <: DataTypeConverter] = classOf[SparkDataTypeConverterImpl],
val readSupportClz: Class[_ <: CarbonReadSupport[_]] = SparkReadSupport.readSupportClass)
extends CarbonRDDWithTableInfo[T](spark, Nil, serializedTableInfo) {
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
formatter.format(new Date())
}
private var vectorReader = false
private val bucketedTable = tableInfo.getFactTable.getBucketingInfo
@transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
override def internalGetPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
var partitions: Array[Partition] = Array.empty[Partition]
var getSplitsStartTime: Long = -1
var getSplitsEndTime: Long = -1
var distributeStartTime: Long = -1
var distributeEndTime: Long = -1
val tablePath = tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath
var numSegments = 0
var numStreamSegments = 0
var numBlocks = 0
try {
val conf = FileFactory.getConfiguration
val jobConf = new JobConf(conf)
SparkHadoopUtil.get.addCredentials(jobConf)
val job = Job.getInstance(jobConf)
val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelformat")
val format = if (fileLevelExternal != null && fileLevelExternal.equalsIgnoreCase("true")) {
prepareFileInputFormatForDriver(job.getConfiguration)
} else {
prepareInputFormatForDriver(job.getConfiguration)
}
// initialise query_id for job
job.getConfiguration.set("query.id", queryId)
// get splits
getSplitsStartTime = System.currentTimeMillis()
val splits = format.getSplits(job)
getSplitsEndTime = System.currentTimeMillis()
if ((splits == null) && format.isInstanceOf[CarbonFileInputFormat[Object]]) {
throw new SparkException(
"CarbonData file not exist in the segment_null (SDK writer Output) path")
}
numSegments = format.getNumSegments
numStreamSegments = format.getNumStreamSegments
numBlocks = format.getNumBlocks
// separate split
// 1. for batch splits, invoke distributeSplits method to create partitions
// 2. for stream splits, create partition for each split by default
val columnarSplits = new ArrayList[InputSplit]()
val streamSplits = new ArrayBuffer[InputSplit]()
splits.asScala.foreach { split =>
val carbonInputSplit = split.asInstanceOf[CarbonInputSplit]
if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) {
streamSplits += split
} else {
columnarSplits.add(split)
}
}
distributeStartTime = System.currentTimeMillis()
val batchPartitions = distributeColumnarSplits(columnarSplits)
distributeEndTime = System.currentTimeMillis()
// check and remove InExpression from filterExpression
checkAndRemoveInExpressinFromFilterExpression(batchPartitions)
if (streamSplits.isEmpty) {
partitions = batchPartitions.toArray
} else {
val index = batchPartitions.length
val streamPartitions: mutable.Buffer[Partition] =
streamSplits.zipWithIndex.map { splitWithIndex =>
val multiBlockSplit =
new CarbonMultiBlockSplit(
Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
splitWithIndex._1.getLocations,
FileFormat.ROW_V1)
new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit)
}
if (batchPartitions.isEmpty) {
partitions = streamPartitions.toArray
} else {
logInfo(
s"""
| Identified no.of Streaming Blocks: ${ streamPartitions.size },
""".stripMargin)
// should keep the order by index of partition
batchPartitions.appendAll(streamPartitions)
partitions = batchPartitions.toArray
}
}
partitions
} finally {
Profiler.invokeIfEnable {
val endTime = System.currentTimeMillis()
val executionId = spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
if (executionId != null) {
Profiler.send(
GetPartition(
executionId.toLong,
tableInfo.getDatabaseName + "." + tableInfo.getFactTable.getTableName,
tablePath,
queryId,
partitions.length,
startTime,
endTime,
getSplitsStartTime,
getSplitsEndTime,
numSegments,
numStreamSegments,
numBlocks,
distributeStartTime,
distributeEndTime,
if (filterExpression == null) "" else filterExpression.getStatement,
if (columnProjection == null) "" else columnProjection.getAllColumns.mkString(",")
)
)
}
}
}
}
private def distributeColumnarSplits(splits: List[InputSplit]): mutable.Buffer[Partition] = {
// this function distributes the split based on following logic:
// 1. based on data locality, to make split balanced on all available nodes
// 2. if the number of split for one
var statistic = new QueryStatistic()
val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
var parallelism = sparkContext.defaultParallelism
val result = new ArrayList[Partition](parallelism)
var noOfBlocks = 0
var noOfNodes = 0
var noOfTasks = 0
if (!splits.isEmpty) {
statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
statisticRecorder.recordStatisticsForDriver(statistic, queryId)
statistic = new QueryStatistic()
val carbonDistribution = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
// If bucketing is enabled on table then partitions should be grouped based on buckets.
if (bucketedTable != null) {
var i = 0
val bucketed =
splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => f.getBucketId)
(0 until bucketedTable.getNumOfRanges).map { bucketId =>
val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil)
val multiBlockSplit =
new CarbonMultiBlockSplit(
bucketPartitions.asJava,
bucketPartitions.flatMap(_.getLocations).toArray)
val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
i += 1
result.add(partition)
}
} else {
val useCustomDistribution =
CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
"false").toBoolean ||
carbonDistribution.equalsIgnoreCase(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM)
val enableSearchMode = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).toBoolean
if (useCustomDistribution || enableSearchMode) {
if (enableSearchMode) {
// force to assign only one task contains multiple splits each node
parallelism = 0
}
// create a list of block based on split
val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
// get the list of executors and map blocks to executors based on locality
val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
// divide the blocks among the tasks of the nodes as per the data locality
val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
parallelism, activeNodes.toList.asJava)
var i = 0
// Create Spark Partition for each task and assign blocks
nodeBlockMapping.asScala.foreach { case (node, blockList) =>
blockList.asScala.foreach { blocksPerTask =>
val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
if (blocksPerTask.size() != 0) {
val multiBlockSplit =
new CarbonMultiBlockSplit(splits.asJava, Array(node))
val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
result.add(partition)
i += 1
}
}
}
noOfNodes = nodeBlockMapping.size
} else if (carbonDistribution.equalsIgnoreCase(
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCKLET)) {
// Use blocklet distribution
// Randomize the blocklets for better shuffling
Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex =>
val multiBlockSplit =
new CarbonMultiBlockSplit(
Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
splitWithIndex._1.getLocations)
val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
result.add(partition)
}
} else if (carbonDistribution.equalsIgnoreCase(
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)) {
// sort blocks in reverse order of length
val blockSplits = splits
.asScala
.map(_.asInstanceOf[CarbonInputSplit])
.groupBy(f => f.getBlockPath)
.map { blockSplitEntry =>
new CarbonMultiBlockSplit(
blockSplitEntry._2.asJava,
blockSplitEntry._2.flatMap(f => f.getLocations).distinct.toArray)
}.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse)
val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
val defaultParallelism = spark.sparkContext.defaultParallelism
val totalBytes = blockSplits.map(_.getLength + openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math
.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
val currentFiles = new ArrayBuffer[CarbonMultiBlockSplit]
var currentSize = 0L
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
result.add(combineSplits(currentFiles, currentSize, result.size()))
}
currentFiles.clear()
currentSize = 0
}
blockSplits.foreach { file =>
if (currentSize + file.getLength > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.getLength + openCostInBytes
currentFiles += file
}
closePartition()
} else {
// Use block distribution
splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy { f =>
f.getSegmentId.concat(f.getBlockPath)
}.values.zipWithIndex.foreach { splitWithIndex =>
val multiBlockSplit =
new CarbonMultiBlockSplit(
splitWithIndex._1.asJava,
splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray)
val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
result.add(partition)
}
}
}
noOfBlocks = splits.size
noOfTasks = result.size()
statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
System.currentTimeMillis)
statisticRecorder.recordStatisticsForDriver(statistic, queryId)
statisticRecorder.logStatisticsAsTableDriver()
}
logInfo(
s"""
| Identified no.of.blocks: $noOfBlocks,
| no.of.tasks: $noOfTasks,
| no.of.nodes: $noOfNodes,
| parallelism: $parallelism
""".stripMargin)
result.asScala
}
def combineSplits(
splits: ArrayBuffer[CarbonMultiBlockSplit],
size: Long,
partitionId: Int
): CarbonSparkPartition = {
val carbonInputSplits = splits.flatMap(_.getAllSplits.asScala)
// Computes total number of bytes can be retrieved from each host.
val hostToNumBytes = mutable.HashMap.empty[String, Long]
splits.foreach { split =>
split.getLocations.filter(_ != "localhost").foreach { host =>
hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) + split.getLength
}
}
// Takes the first 3 hosts with the most data to be retrieved
val locations = hostToNumBytes
.toSeq
.sortBy(_._2)(implicitly[Ordering[Long]].reverse)
.take(3)
.map(_._1)
.toArray
val multiBlockSplit = new CarbonMultiBlockSplit(carbonInputSplits.asJava, locations)
new CarbonSparkPartition(id, partitionId, multiBlockSplit)
}
override def internalCompute(split: Partition, context: TaskContext): Iterator[T] = {
val queryStartTime = System.currentTimeMillis
val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
if (null == carbonPropertiesFilePath) {
System.setProperty("carbon.properties.filepath",
System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
)
}
val executionId = context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val taskId = split.index
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
TaskMetricsMap.getInstance().registerThreadCallback()
inputMetricsStats.initBytesReadCallback(context, inputSplit)
val iterator = if (inputSplit.getAllSplits.size() > 0) {
val model = format.createQueryModel(inputSplit, attemptContext)
// one query id per table
model.setQueryId(queryId)
// get RecordReader by FileFormat
var reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
case FileFormat.ROW_V1 =>
// create record reader for row format
DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
val inputFormat = new CarbonStreamInputFormat
val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
.asInstanceOf[CarbonStreamRecordReader]
streamReader.setVectorReader(vectorReader)
streamReader.setInputMetricsStats(inputMetricsStats)
model.setStatisticsRecorder(
CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
streamReader.setQueryModel(model)
streamReader
case _ =>
// create record reader for CarbonData file format
if (vectorReader) {
val carbonRecordReader = createVectorizedCarbonRecordReader(model,
inputMetricsStats,
"true")
if (carbonRecordReader == null) {
new CarbonRecordReader(model,
format.getReadSupportClass(attemptContext.getConfiguration),
inputMetricsStats,
attemptContext.getConfiguration)
} else {
carbonRecordReader
}
} else {
new CarbonRecordReader(model,
format.getReadSupportClass(attemptContext.getConfiguration),
inputMetricsStats, attemptContext.getConfiguration)
}
}
val closeReader = () => {
if (reader != null) {
try {
reader.close()
} catch {
case e: Exception =>
LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(e)
}
reader = null
}
}
// create a statistics recorder
val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
model.setStatisticsRecorder(recorder)
// TODO: rewrite this logic to call free memory in FailureListener on failures. On success,
// TODO: no memory leak should be there, resources should be freed on success completion.
val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks")
onCompleteCallbacksField.setAccessible(true)
val listeners = onCompleteCallbacksField.get(context)
.asInstanceOf[ArrayBuffer[TaskCompletionListener]]
val isAdded = listeners.exists(p => p.isInstanceOf[InsertTaskCompletionListener])
model.setFreeUnsafeMemory(!isAdded)
// add task completion before calling initialize as initialize method will internally call
// for usage of unsafe method for processing of one blocklet and if there is any exception
// while doing that the unsafe memory occupied for that task will not get cleared
context.addTaskCompletionListener { new QueryTaskCompletionListener(!isAdded,
reader,
inputMetricsStats,
executionId,
taskId,
queryStartTime,
model.getStatisticsRecorder,
split,
queryId)
}
// initialize the reader
reader.initialize(inputSplit, attemptContext)
new Iterator[Any] {
private var havePair = false
private var finished = false
override def hasNext: Boolean = {
if (context.isInterrupted) {
throw new TaskKilledException
}
if (!finished && !havePair) {
finished = !reader.nextKeyValue
havePair = !finished
}
if (finished) {
closeReader.apply()
}
!finished
}
override def next(): Any = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
val value = reader.getCurrentValue
value
}
}
} else {
new Iterator[Any] {
override def hasNext: Boolean = false
override def next(): Any = throw new java.util.NoSuchElementException("End of stream")
}
}
iterator.asInstanceOf[Iterator[T]]
}
private def close() {
TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId)
inputMetricsStats.updateAndClose()
}
def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
CarbonInputFormat.setTableInfo(conf, tableInfo)
CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
if (partitionNames != null) {
CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
}
CarbonInputFormat.setTransactionalTable(conf, tableInfo.isTransactionalTable)
createInputFormat(conf)
}
def prepareFileInputFormatForDriver(conf: Configuration): CarbonFileInputFormat[Object] = {
CarbonInputFormat.setTableInfo(conf, tableInfo)
CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
if (partitionNames != null) {
CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
}
createFileInputFormat(conf)
}
private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
CarbonInputFormat.setCarbonReadSupport(conf, readSupportClz)
val tableInfo1 = getTableInfo
CarbonInputFormat.setTableInfo(conf, tableInfo1)
CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
CarbonInputFormat.setDataTypeConverter(conf, dataTypeConverterClz)
createInputFormat(conf)
}
private def createFileInputFormat(conf: Configuration): CarbonFileInputFormat[Object] = {
val format = new CarbonFileInputFormat[Object]
CarbonInputFormat.setTablePath(conf,
identifier.appendWithLocalPrefix(identifier.getTablePath))
CarbonInputFormat.setQuerySegment(conf, identifier)
CarbonInputFormat.setFilterPredicates(conf, filterExpression)
CarbonInputFormat.setColumnProjection(conf, columnProjection)
CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
// when validate segments is disabled in thread local update it to CarbonTableInputFormat
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (carbonSessionInfo != null) {
val tableUniqueKey = identifier.getDatabaseName + "." + identifier.getTableName
val validateInputSegmentsKey = CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
tableUniqueKey
CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getThreadParams
.getProperty(validateInputSegmentsKey, "true").toBoolean)
val queryOnPreAggStreamingKey = CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING +
tableUniqueKey
val queryOnPreAggStreaming = carbonSessionInfo.getThreadParams
.getProperty(queryOnPreAggStreamingKey, "false").toBoolean
val inputSegmentsKey = CarbonCommonConstants.CARBON_INPUT_SEGMENTS + tableUniqueKey
CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getThreadParams
.getProperty(validateInputSegmentsKey, "true").toBoolean)
CarbonInputFormat
.setQuerySegment(conf,
carbonSessionInfo.getThreadParams
.getProperty(inputSegmentsKey,
CarbonProperties.getInstance().getProperty(inputSegmentsKey, "*")))
if (queryOnPreAggStreaming) {
CarbonInputFormat.setAccessStreamingSegments(conf, queryOnPreAggStreaming)
carbonSessionInfo.getThreadParams.removeProperty(queryOnPreAggStreamingKey)
carbonSessionInfo.getThreadParams.removeProperty(inputSegmentsKey)
carbonSessionInfo.getThreadParams.removeProperty(validateInputSegmentsKey)
}
}
format
}
private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
val format = new CarbonTableInputFormat[Object]
CarbonInputFormat.setTablePath(conf,
identifier.appendWithLocalPrefix(identifier.getTablePath))
CarbonInputFormat.setQuerySegment(conf, identifier)
CarbonInputFormat.setFilterPredicates(conf, filterExpression)
CarbonInputFormat.setColumnProjection(conf, columnProjection)
CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
// when validate segments is disabled in thread local update it to CarbonTableInputFormat
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (carbonSessionInfo != null) {
val tableUniqueKey = identifier.getDatabaseName + "." + identifier.getTableName
val validateInputSegmentsKey = CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
tableUniqueKey
CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getThreadParams
.getProperty(validateInputSegmentsKey, "true").toBoolean)
val queryOnPreAggStreamingKey = CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING +
tableUniqueKey
val queryOnPreAggStreaming = carbonSessionInfo.getThreadParams
.getProperty(queryOnPreAggStreamingKey, "false").toBoolean
val inputSegmentsKey = CarbonCommonConstants.CARBON_INPUT_SEGMENTS + tableUniqueKey
CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getThreadParams
.getProperty(validateInputSegmentsKey, "true").toBoolean)
CarbonInputFormat
.setQuerySegment(conf,
carbonSessionInfo.getThreadParams
.getProperty(inputSegmentsKey,
CarbonProperties.getInstance().getProperty(inputSegmentsKey, "*")))
if (queryOnPreAggStreaming) {
CarbonInputFormat.setAccessStreamingSegments(conf, queryOnPreAggStreaming)
carbonSessionInfo.getThreadParams.removeProperty(queryOnPreAggStreamingKey)
carbonSessionInfo.getThreadParams.removeProperty(inputSegmentsKey)
carbonSessionInfo.getThreadParams.removeProperty(validateInputSegmentsKey)
}
}
format
}
/**
* This method will check and remove InExpression from filterExpression to prevent the List
* Expression values from serializing and deserializing on executor
*
* @param identifiedPartitions
*/
private def checkAndRemoveInExpressinFromFilterExpression(
identifiedPartitions: mutable.Buffer[Partition]) = {
if (null != filterExpression) {
if (identifiedPartitions.nonEmpty &&
!checkForBlockWithoutBlockletInfo(identifiedPartitions)) {
FilterUtil.removeInExpressionNodeWithPositionIdColumn(filterExpression)
}
}
}
/**
* This method will check for presence of any block from old store (version 1.1). If any of the
* blocks identified does not contain the blocklet info that means that block is from old store
*
* @param identifiedPartitions
* @return
*/
private def checkForBlockWithoutBlockletInfo(
identifiedPartitions: mutable.Buffer[Partition]): Boolean = {
var isBlockWithoutBlockletInfoPresent = false
breakable {
identifiedPartitions.foreach { value =>
val inputSplit = value.asInstanceOf[CarbonSparkPartition].split.value
val splitList = if (inputSplit.isInstanceOf[CarbonMultiBlockSplit]) {
inputSplit.asInstanceOf[CarbonMultiBlockSplit].getAllSplits
} else {
new java.util.ArrayList().add(inputSplit.asInstanceOf[CarbonInputSplit])
}.asInstanceOf[java.util.List[CarbonInputSplit]]
// check for block from old store (version 1.1 and below)
if (Util.isBlockWithoutBlockletInfoExists(splitList)) {
isBlockWithoutBlockletInfoPresent = true
break
}
}
}
isBlockWithoutBlockletInfoPresent
}
/**
* Get the preferred locations where to launch this task.
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[CarbonSparkPartition]
val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost")
firstOptionLocation
}
def createVectorizedCarbonRecordReader(queryModel: QueryModel,
inputMetricsStats: InputMetricsStats, enableBatch: String): RecordReader[Void, Object] = {
val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
try {
val cons = Class.forName(name).getDeclaredConstructors
cons.head.setAccessible(true)
cons.head.newInstance(queryModel, inputMetricsStats, enableBatch)
.asInstanceOf[RecordReader[Void, Object]]
} catch {
case e: Exception =>
LOGGER.error(e)
null
}
}
// TODO find the better way set it.
def setVectorReaderSupport(boolean: Boolean): Unit = {
vectorReader = boolean
}
}