Skip to content

Commit

Permalink
[CARBONDATA-2903] Fix compiler warning
Browse files Browse the repository at this point in the history
This closes #2677
  • Loading branch information
jackylk authored and QiangCai committed Sep 3, 2018
1 parent b588cb6 commit 884ac91
Show file tree
Hide file tree
Showing 37 changed files with 63 additions and 72 deletions.
Expand Up @@ -525,8 +525,8 @@ object MVHelper {
aliasMap: Map[AttributeKey, NamedExpression]): Seq[Seq[Any]] = {
val updatedFlagSpec = select.flagSpec.map { f =>
f.map {
case list: ArrayBuffer[SortOrder] =>
list.map { s =>
case list: ArrayBuffer[_] =>
list.map { case s: SortOrder =>
val expressions =
updateOutPutList(
Seq(s.child.asInstanceOf[Attribute]),
Expand Down
Expand Up @@ -176,9 +176,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
}
writer.close()
} catch {
case ex: Exception => throw new RuntimeException(ex)

case _ => None
case ex: Throwable => throw new RuntimeException(ex)
}
}

Expand All @@ -205,8 +203,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
}
writer.close()
} catch {
case ex: Exception => throw new RuntimeException(ex)
case _ => None
case ex: Throwable => throw new RuntimeException(ex)
}
}

Expand Down Expand Up @@ -239,9 +236,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
}
writer.close()
} catch {
case ex: Exception => throw new RuntimeException(ex)

case _ => None
case ex: Throwable => throw new RuntimeException(ex)
}
}

Expand Down
Expand Up @@ -70,8 +70,7 @@ object TestSparkCarbonFileFormatWithSparkSession {
}
writer.close()
} catch {
case ex: Exception => None
case _ => None
case ex: Throwable => throw new RuntimeException(ex)
}
}

Expand Down
Expand Up @@ -67,7 +67,7 @@ class ProfilerSuite extends QueryTest with BeforeAndAfterAll {
try {
profilerEndPoint.processSQLStart(statementId, messages)
} catch {
case _ =>
case _: Throwable =>
assert(false, "Failed to log StatementSummary")
}
}
Expand All @@ -77,7 +77,7 @@ class ProfilerSuite extends QueryTest with BeforeAndAfterAll {
try {
profilerEndPoint.processExecutionEnd(executionId, messages)
} catch {
case _ =>
case _: Throwable =>
assert(false, "Failed to log ExecutionSummary")
}
}
Expand Down
Expand Up @@ -48,8 +48,9 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par
/**
* This class is aimed at generating dictionary file for the newly added columns
*/
class AlterTableAddColumnRDD[K, V](@transient sparkSession: SparkSession,
@transient newColumns: Seq[ColumnSchema],
class AlterTableAddColumnRDD[K, V](
@transient private val sparkSession: SparkSession,
@transient private val newColumns: Seq[ColumnSchema],
identifier: AbsoluteTableIdentifier)
extends CarbonRDD[(Int, SegmentStatus)](sparkSession, Nil) {

Expand Down
Expand Up @@ -45,8 +45,9 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa
/**
* This class is aimed at generating dictionary file for the newly added columns
*/
class AlterTableDropColumnRDD[K, V](@transient ss: SparkSession,
@transient newColumns: Seq[ColumnSchema],
class AlterTableDropColumnRDD[K, V](
@transient private val ss: SparkSession,
@transient private val newColumns: Seq[ColumnSchema],
carbonTableIdentifier: AbsoluteTableIdentifier)
extends CarbonRDD[(Int, SegmentStatus)](ss, Nil) {

Expand Down
Expand Up @@ -43,7 +43,7 @@ case class CarbonDropPartition(rddId: Int, val idx: Int, segment: Segment)
* @param segments segments to be cleaned
*/
class CarbonDropPartitionRDD(
@transient ss: SparkSession,
@transient private val ss: SparkSession,
tablePath: String,
segments: Seq[Segment],
partitions: util.List[PartitionSpec],
Expand Down
Expand Up @@ -174,7 +174,7 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
* @param model a model package load info
*/
class CarbonAllDictionaryCombineRDD(
@transient sparkSession: SparkSession,
@transient private val sparkSession: SparkSession,
prev: RDD[(String, Iterable[String])],
model: DictionaryLoadModel)
extends CarbonRDD[(Int, ColumnDistinctValues)](sparkSession, prev) {
Expand Down Expand Up @@ -268,7 +268,7 @@ class StringArrayRow(var values: Array[String]) extends Row {
* @param model a model package load info
*/
class CarbonBlockDistinctValuesCombineRDD(
@transient ss: SparkSession,
@transient private val ss: SparkSession,
prev: RDD[Row],
model: DictionaryLoadModel)
extends CarbonRDD[(Int, ColumnDistinctValues)](ss, prev) {
Expand Down Expand Up @@ -327,7 +327,7 @@ class CarbonBlockDistinctValuesCombineRDD(
* @param model a model package load info
*/
class CarbonGlobalDictionaryGenerateRDD(
@transient sparkSession: SparkSession,
@transient private val sparkSession: SparkSession,
prev: RDD[(Int, ColumnDistinctValues)],
model: DictionaryLoadModel)
extends CarbonRDD[(Int, SegmentStatus)](sparkSession, prev) {
Expand Down Expand Up @@ -502,9 +502,10 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
* @param dimensions carbon dimenisons having predefined dict
* @param dictFolderPath path of dictionary folder
*/
class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
class CarbonColumnDictGenerateRDD(
carbonLoadModel: CarbonLoadModel,
dictionaryLoadModel: DictionaryLoadModel,
@transient ss: SparkSession,
@transient private val ss: SparkSession,
table: CarbonTableIdentifier,
dimensions: Array[CarbonDimension],
dictFolderPath: String)
Expand Down
Expand Up @@ -16,12 +16,8 @@
*/
package org.apache.carbondata.spark.rdd

import java.util
import java.util.List

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.Partition
Expand All @@ -42,7 +38,7 @@ import org.apache.carbondata.spark.MergeResult
* IUD carbon merger RDD
* */
class CarbonIUDMergerRDD[K, V](
@transient ss: SparkSession,
@transient private val ss: SparkSession,
result: MergeResult[K, V],
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping,
Expand Down
Expand Up @@ -61,7 +61,7 @@ import org.apache.carbondata.spark.MergeResult
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}

class CarbonMergerRDD[K, V](
@transient ss: SparkSession,
@transient private val ss: SparkSession,
result: MergeResult[K, V],
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping,
Expand Down
Expand Up @@ -33,7 +33,8 @@ import org.apache.carbondata.core.util._
/**
* This RDD maintains session level ThreadLocal
*/
abstract class CarbonRDD[T: ClassTag](@transient ss: SparkSession,
abstract class CarbonRDD[T: ClassTag](
@transient private val ss: SparkSession,
@transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) {

val carbonSessionInfo: CarbonSessionInfo = {
Expand Down Expand Up @@ -86,7 +87,7 @@ abstract class CarbonRDD[T: ClassTag](@transient ss: SparkSession,
* This RDD contains TableInfo object which is serialized and deserialized in driver and executor
*/
abstract class CarbonRDDWithTableInfo[T: ClassTag](
@transient ss: SparkSession,
@transient private val ss: SparkSession,
@transient private var deps: Seq[Dependency[_]],
serializedTableInfo: Array[Byte]) extends CarbonRDD[T](ss, deps) {

Expand Down
Expand Up @@ -70,12 +70,12 @@ import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRec
* level filtering in driver side.
*/
class CarbonScanRDD[T: ClassTag](
@transient spark: SparkSession,
@transient private val spark: SparkSession,
val columnProjection: CarbonProjection,
var filterExpression: Expression,
identifier: AbsoluteTableIdentifier,
@transient serializedTableInfo: Array[Byte],
@transient tableInfo: TableInfo,
@transient private val serializedTableInfo: Array[Byte],
@transient private val tableInfo: TableInfo,
inputMetricsStats: InitInputMetrics,
@transient val partitionNames: Seq[PartitionSpec],
val dataTypeConverterClz: Class[_ <: DataTypeConverter] = classOf[SparkDataTypeConverterImpl],
Expand Down
Expand Up @@ -160,7 +160,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
* It loads the data to carbon using @AbstractDataLoadProcessorStep
*/
class NewCarbonDataLoadRDD[K, V](
@transient ss: SparkSession,
@transient private val ss: SparkSession,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
blocksGroupBy: Array[(String, Array[BlockDetails])])
Expand Down Expand Up @@ -307,7 +307,7 @@ class NewCarbonDataLoadRDD[K, V](
* @see org.apache.carbondata.processing.newflow.DataLoadExecutor
*/
class NewDataFrameLoaderRDD[K, V](
@transient ss: SparkSession,
@transient private val ss: SparkSession,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](ss, prev) {
Expand Down Expand Up @@ -505,7 +505,7 @@ class LazyRddIterator(serializer: SerializerInstance,
* @see org.apache.carbondata.processing.newflow.DataLoadExecutor
*/
class PartitionTableDataLoaderRDD[K, V](
@transient ss: SparkSession,
@transient private val ss: SparkSession,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
prev: RDD[Row]) extends CarbonRDD[(K, V)](ss, prev) {
Expand Down
Expand Up @@ -60,6 +60,8 @@ object PartitionDropper {
String.valueOf(oldPartitionIds(partitionIndex + 1))
}
case PartitionType.LIST => "0"
case _ => throw new UnsupportedOperationException(
s"${partitionInfo.getPartitionType} is not supported")
}

if (!dropWithData) {
Expand Down
Expand Up @@ -57,7 +57,8 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte
* @param ss
* @param dataMapFormat
*/
class DataMapPruneRDD(@transient ss: SparkSession,
class DataMapPruneRDD(
@transient private val ss: SparkSession,
dataMapFormat: DistributableDataMapFormat,
resolverIntf: FilterResolverIntf)
extends CarbonRDD[(ExtendedBlocklet)](ss, Nil) {
Expand Down
Expand Up @@ -98,7 +98,7 @@ class StreamingRawResultIterator(
* execute streaming segment handoff
*/
class StreamHandoffRDD[K, V](
@transient ss: SparkSession,
@transient private val ss: SparkSession,
result: HandoffResult[K, V],
carbonLoadModel: CarbonLoadModel,
handOffSegmentId: String) extends CarbonRDD[(K, V)](ss, Nil) {
Expand Down Expand Up @@ -371,7 +371,6 @@ object StreamHandoffRDD {
LOGGER.error("Handoff failed due to failure in table status updation.")
throw new Exception(errorMessage)
}
done
}

}
Expand Down
Expand Up @@ -463,7 +463,6 @@ object CarbonScalaUtil {
}
i += 1
}
table
} catch {
case e: Exception =>
// ignore it
Expand Down
Expand Up @@ -41,7 +41,7 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String)
* @param segments segments to be merged
*/
class CarbonMergeFilesRDD(
@transient ss: SparkSession,
@transient private val ss: SparkSession,
carbonTable: CarbonTable,
segments: Seq[String],
segmentFileNameToSegmentIdMap: java.util.Map[String, String],
Expand Down
Expand Up @@ -28,8 +28,8 @@ import org.apache.carbondata.spark.rdd.CarbonRDD
case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)

class DataLoadCoalescedRDD[T: ClassTag](
@transient sparkSession: SparkSession,
@transient var prev: RDD[T],
@transient private val sparkSession: SparkSession,
@transient private var prev: RDD[T],
nodeList: Array[String])
extends CarbonRDD[DataLoadPartitionWrap[T]](sparkSession, Nil) {

Expand Down
Expand Up @@ -193,6 +193,7 @@ class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) {
hostMapPartitionIds.get(loc) match {
case Some(parts) =>
parts.remove(partitionId)
case None =>
}
}
}
Expand Down
Expand Up @@ -80,8 +80,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
}
writer.close()
} catch {
case ex: Exception => None
case _ => None
case _: Throwable => None
}
}

Expand Down
Expand Up @@ -48,7 +48,7 @@ object TestUtil {
def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]):Unit = {
checkAnswer(df, expectedAnswer.asScala) match {
case Some(errorMessage) => assert(false, errorMessage)
case None => null
case None =>
}
}

Expand All @@ -66,7 +66,7 @@ object TestUtil {
def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = {
checkAnswer(df, expectedAnswer.collect()) match {
case Some(errorMessage) => assert(false, errorMessage)
case None => null
case None =>
}
}

Expand Down
Expand Up @@ -292,9 +292,9 @@ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Ar
}

class IndexDataMapRebuildRDD[K, V](
@transient session: SparkSession,
@transient private val session: SparkSession,
result: RefreshResult[K, V],
@transient tableInfo: TableInfo,
@transient private val tableInfo: TableInfo,
dataMapName: String,
indexColumns: Array[CarbonColumn],
segments: Set[Segment])
Expand Down
Expand Up @@ -50,7 +50,7 @@ import org.apache.carbondata.streaming.CarbonStreamingQueryListener
*/
class CarbonSession(@transient val sc: SparkContext,
@transient private val existingSharedState: Option[SharedState],
@transient useHiveMetaStore: Boolean = true
@transient private val useHiveMetaStore: Boolean = true
) extends SparkSession(sc) { self =>

def this(sc: SparkContext) {
Expand Down
Expand Up @@ -382,7 +382,7 @@ object CastExpressionOptimization {
numericTimeValue.toString.toDouble
true
} catch {
case _ => false
case _: Throwable => false
}
}

Expand Down
Expand Up @@ -89,6 +89,8 @@ case class CarbonAlterTableDropPartitionCommand(
case PartitionType.RANGE_INTERVAL =>
throwMetadataException(dbName, tableName,
"Dropping range interval partition is unsupported")
case _ => throw new UnsupportedOperationException(
s"${partitionInfo.getPartitionType} is not supported")
}
partitionInfo.dropPartition(partitionIndex)

Expand Down
Expand Up @@ -764,7 +764,7 @@ object PreAggregateUtil {
aggExp.isDistinct))
case Sum(_: Expression) =>
Seq(aggExp)
case Count(MatchCastExpression(exp: Seq[Expression], changeDataType: DataType)) =>
case Count(MatchCastExpression(exp: Seq[_], changeDataType: DataType)) =>
Seq(AggregateExpression(Count(Cast(
exp,
changeDataType)),
Expand Down
Expand Up @@ -30,11 +30,11 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRela
class CarbonDataSourceScan(
override val output: Seq[Attribute],
val rdd: RDD[InternalRow],
@transient relation: HadoopFsRelation,
@transient override val relation: HadoopFsRelation,
val partitioning: Partitioning,
override val metadata: Map[String, String],
identifier: Option[TableIdentifier],
@transient logicalRelation: LogicalRelation)
@transient private val logicalRelation: LogicalRelation)
extends FileSourceScanExec(
relation,
output,
Expand Down

0 comments on commit 884ac91

Please sign in to comment.