From 2d0debb0abd401b2fe68fe0b934a4a6939f7cb27 Mon Sep 17 00:00:00 2001 From: Darcy Shen Date: Wed, 10 Oct 2018 16:58:22 +0800 Subject: [PATCH 1/2] refactor sql/core according to Intellij --- .../org/apache/spark/sql/internal/StaticSQLConf.scala | 2 +- .../org/apache/spark/sql/DataFrameNaFunctions.scala | 10 +++++----- .../org/apache/spark/sql/KeyValueGroupedDataset.scala | 4 ++-- .../scala/org/apache/spark/sql/api/r/SQLUtils.scala | 4 ++-- .../execution/ExternalAppendOnlyUnsafeRowArray.scala | 6 +++--- .../spark/sql/execution/LocalTableScanExec.scala | 4 ++-- .../org/apache/spark/sql/execution/RowIterator.scala | 2 +- .../org/apache/spark/sql/execution/SQLExecution.scala | 1 - .../org/apache/spark/sql/execution/SelectedField.scala | 4 +--- .../apache/spark/sql/execution/ShuffledRowRDD.scala | 2 +- .../apache/spark/sql/execution/SortPrefixUtils.scala | 2 +- .../sql/execution/aggregate/ObjectAggregationMap.scala | 1 - .../execution/aggregate/RowBasedHashMapGenerator.scala | 4 ++-- .../spark/sql/execution/arrow/ArrowConverters.scala | 3 +-- .../compression/CompressibleColumnBuilder.scala | 2 +- .../apache/spark/sql/execution/command/resources.scala | 2 +- .../execution/datasources/BasicWriteStatsTracker.scala | 4 ++-- .../spark/sql/execution/datasources/DataSource.scala | 8 +++----- .../execution/datasources/FileFormatDataWriter.scala | 2 +- .../datasources/PruneFileSourcePartitions.scala | 2 +- .../sql/execution/datasources/WriteStatsTracker.scala | 2 +- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 2 +- .../datasources/parquet/ParquetOutputWriter.scala | 1 - .../sql/execution/datasources/text/TextOptions.scala | 2 +- .../apache/spark/sql/execution/metric/SQLMetrics.scala | 10 +++++----- .../spark/sql/execution/python/ArrowPythonRunner.scala | 6 +++--- .../org/apache/spark/sql/execution/subquery.scala | 4 ++-- .../org/apache/spark/sql/expressions/Aggregator.scala | 4 ++-- .../spark/sql/expressions/UserDefinedFunction.scala | 2 +- .../org/apache/spark/sql/jdbc/AggregatedDialect.scala | 6 +++--- .../scala/org/apache/spark/sql/jdbc/DB2Dialect.scala | 2 +- .../scala/org/apache/spark/sql/jdbc/DerbyDialect.scala | 2 +- .../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 2 +- .../org/apache/spark/sql/jdbc/MsSqlServerDialect.scala | 2 +- .../scala/org/apache/spark/sql/jdbc/MySQLDialect.scala | 2 +- .../org/apache/spark/sql/jdbc/OracleDialect.scala | 2 +- .../org/apache/spark/sql/jdbc/PostgresDialect.scala | 4 ++-- .../org/apache/spark/sql/jdbc/TeradataDialect.scala | 4 +--- .../datasources/BasicWriteTaskStatsTrackerSuite.scala | 2 +- 40 files changed, 61 insertions(+), 71 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index d9c354b165e52..191963dda977d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -123,7 +123,7 @@ object StaticSQLConf { val UI_RETAINED_EXECUTIONS = buildStaticConf("spark.sql.ui.retainedExecutions") .doc("Number of executions to retain in the Spark UI.") - .intConf + .longConf .createWithDefault(1000) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index 5288907b7d7ff..de7fcd9a691e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -461,11 +461,11 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { } private def convertToDouble(v: Any): Double = v match { - case v: Float => v.toDouble - case v: Double => v - case v: Long => v.toDouble - case v: Int => v.toDouble - case v => throw new IllegalArgumentException( + case f: Float => f.toDouble + case d: Double => d + case l: Long => l.toDouble + case i: Int => i.toDouble + case _ => throw new IllegalArgumentException( s"Unsupported value type ${v.getClass.getName} ($v).") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 6bab21dca0cbd..4713e38c47c9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -567,10 +567,10 @@ class KeyValueGroupedDataset[K, V] private[sql]( override def toString: String = { val builder = new StringBuilder val kFields = kExprEnc.schema.map { - case f => s"${f.name}: ${f.dataType.simpleString(2)}" + f => s"${f.name}: ${f.dataType.simpleString(2)}" } val vFields = vExprEnc.schema.map { - case f => s"${f.name}: ${f.dataType.simpleString(2)}" + f => s"${f.name}: ${f.dataType.simpleString(2)}" } builder.append("KeyValueGroupedDataset: [key: [") builder.append(kFields.take(2).mkString(", ")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index af20764f9a968..cc50745b4b5fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -133,7 +133,7 @@ private[sql] object SQLUtils extends Logging { val cols = (0 until row.length).map(row(_).asInstanceOf[Object]).toArray SerDe.writeObject(dos, cols, jvmObjectTracker = null) - bos.toByteArray() + bos.toByteArray } // Schema for DataFrame of serialized R data @@ -188,7 +188,7 @@ private[sql] object SQLUtils extends Logging { dataType match { case 's' => // Read StructType for DataFrame - val fields = SerDe.readList(dis, jvmObjectTracker = null).asInstanceOf[Array[Object]] + val fields = SerDe.readList(dis, jvmObjectTracker = null) Row.fromSeq(fields) case _ => null } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index ac282ea2e94f5..ce57a29bb6aaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -178,7 +178,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( abstract class ExternalAppendOnlyUnsafeRowArrayIterator extends Iterator[UnsafeRow] { private val expectedModificationsCount = modificationsCount - protected def isModified(): Boolean = expectedModificationsCount != modificationsCount + protected def isModified: Boolean = expectedModificationsCount != modificationsCount protected def throwExceptionIfModified(): Unit = { if (expectedModificationsCount != modificationsCount) { @@ -194,7 +194,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( private var currentIndex = startIndex - override def hasNext(): Boolean = !isModified() && currentIndex < numRows + override def hasNext(): Boolean = !isModified && currentIndex < numRows override def next(): UnsafeRow = { throwExceptionIfModified() @@ -211,7 +211,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( private val currentRow = new UnsafeRow(numFieldPerRow) - override def hasNext(): Boolean = !isModified() && iterator.hasNext + override def hasNext(): Boolean = !isModified && iterator.hasNext override def next(): UnsafeRow = { throwExceptionIfModified() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index 448eb703eacde..7d558146f9fda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -67,13 +67,13 @@ case class LocalTableScanExec( } override def executeCollect(): Array[InternalRow] = { - longMetric("numOutputRows").add(unsafeRows.size) + longMetric("numOutputRows").add(unsafeRows.length) unsafeRows } override def executeTake(limit: Int): Array[InternalRow] = { val taken = unsafeRows.take(limit) - longMetric("numOutputRows").add(taken.size) + longMetric("numOutputRows").add(taken.length) taken } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala index 717ff93eab5d4..59f86d745e1d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala @@ -78,7 +78,7 @@ private final class RowIteratorToScala(val rowIter: RowIterator) extends Iterato } private final class RowIteratorFromScala(scalaIter: Iterator[InternalRow]) extends RowIterator { - private[this] var _next: InternalRow = null + private[this] var _next: InternalRow = _ override def advanceNext(): Boolean = { if (scalaIter.hasNext) { _next = scalaIter.next() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 439932b0cc3ac..ff74ce2d56ca3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong -import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala index 0e7c593f9fb67..a0ccfddfc09f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala @@ -51,15 +51,13 @@ import org.apache.spark.sql.types._ * type appropriate to the complex type extractor. In our example, the name of the child expression * is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] with a single string * field named "first". - * - * @param expr the top-level complex type extractor */ private[execution] object SelectedField { def unapply(expr: Expression): Option[StructField] = { // If this expression is an alias, work on its child instead val unaliased = expr match { case Alias(child, _) => child - case expr => expr + case _ => expr } selectField(unaliased, None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 862ee05392f37..efdc17ec507b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -58,7 +58,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A @transient private lazy val parentPartitionMapping: Array[Int] = { val n = parent.numPartitions val result = new Array[Int](n) - for (i <- 0 until partitionStartIndices.length) { + for (i <- partitionStartIndices.indices) { val start = partitionStartIndices(i) val end = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n for (j <- start until end) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala index c6665d273fd27..8b845c8e3af38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala @@ -44,9 +44,9 @@ object SortPrefixUtils { case BinaryType => binaryPrefixComparator(sortOrder) case BooleanType | ByteType | ShortType | IntegerType | LongType | DateType | TimestampType => longPrefixComparator(sortOrder) + case FloatType | DoubleType => doublePrefixComparator(sortOrder) case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => longPrefixComparator(sortOrder) - case FloatType | DoubleType => doublePrefixComparator(sortOrder) case dt: DecimalType => doublePrefixComparator(sortOrder) case _ => NoOpPrefixComparator } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala index b5372bcca89dd..9f2cf84a6d7e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, U import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, TypedImperativeAggregate} import org.apache.spark.sql.execution.UnsafeKVExternalSorter import org.apache.spark.sql.types.StructType -import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** * An aggregation map that supports using safe `SpecificInternalRow`s aggregation buffers, so that diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala index 56cf78d8b7fc1..74ba10758287a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala @@ -101,7 +101,7 @@ class RowBasedHashMapGenerator( def genEqualsForKeys(groupingKeys: Seq[Buffer]): String = { groupingKeys.zipWithIndex.map { case (key: Buffer, ordinal: Int) => s"""(${ctx.genEqual(key.dataType, CodeGenerator.getValue("row", - key.dataType, ordinal.toString()), key.name)})""" + key.dataType, ordinal.toString), key.name)})""" }.mkString(" && ") } @@ -152,7 +152,7 @@ class RowBasedHashMapGenerator( | if (numRows < capacity && !isBatchFull) { | agg_rowWriter.reset(); | $resetNullBits - | ${createUnsafeRowForKey}; + | $createUnsafeRowForKey; | org.apache.spark.sql.catalyst.expressions.UnsafeRow agg_result | = agg_rowWriter.getRow(); | Object kbase = agg_result.getBaseObject(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index 2bf6a58b55658..03b9e24de880c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -31,7 +31,6 @@ import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, MessageSerializer} import org.apache.spark.TaskContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.network.util.JavaUtils -import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ @@ -114,7 +113,7 @@ private[sql] object ArrowConverters { rowCount += 1 } arrowWriter.finish() - val batch = unloader.getRecordBatch() + val batch = unloader.getRecordBatch MessageSerializer.serialize(writeChannel, batch) batch.close() } { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala index d1fece05a8414..93c852b8e721c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala @@ -66,7 +66,7 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType] // the row to become unaligned, thus causing crashes. Until a way of fixing the compression // is found to also allow aligned accesses this must be disabled for SPARC. - protected def isWorthCompressing(encoder: Encoder[T]) = { + protected def isWorthCompressing(encoder: Encoder[T]): Boolean = { CompressibleColumnBuilder.unaligned && encoder.compressionRatio < 0.8 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala index 2e859cf1ef253..9817e89eb36a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala @@ -62,7 +62,7 @@ case class ListFilesCommand(files: Seq[String] = Seq.empty[String]) extends Runn } override def run(sparkSession: SparkSession): Seq[Row] = { val fileList = sparkSession.sparkContext.listFiles() - if (files.size > 0) { + if (files.nonEmpty) { files.map { f => val uri = new URI(f) val schemeCorrectedPath = uri.getScheme match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index ba7d2b7cbdb1a..e8ce16fc0c2a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -65,7 +65,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) val path = new Path(filePath) val fs = path.getFileSystem(hadoopConf) try { - Some(fs.getFileStatus(path).getLen()) + Some(fs.getFileStatus(path).getLen) } catch { case e: FileNotFoundException => // may arise against eventually consistent object stores @@ -103,7 +103,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) numRows += 1 } - override def getFinalStats(): WriteTaskStats = { + override def getFinalStats: WriteTaskStats = { statCurrentFile() // Reports bytesWritten and recordsWritten to the Spark output metrics. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ce3bc3dd48327..98303964579f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -30,7 +30,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.SparkPlan @@ -96,7 +95,7 @@ case class DataSource( private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver - bucketSpec.map { bucket => + bucketSpec.foreach { bucket => SchemaUtils.checkColumnNameDuplication( bucket.bucketColumnNames, "in the bucket definition", equality) SchemaUtils.checkColumnNameDuplication( @@ -722,13 +721,12 @@ object DataSource extends Logging { */ private def validateSchema(schema: StructType): Unit = { def hasEmptySchema(schema: StructType): Boolean = { - schema.size == 0 || schema.find { + schema.isEmpty || schema.exists { case StructField(_, b: StructType, _, _) => hasEmptySchema(b) case _ => false - }.isDefined + } } - if (hasEmptySchema(schema)) { throw new AnalysisException( s""" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 6499328e89ce7..7695448c797a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -74,7 +74,7 @@ abstract class FileFormatDataWriter( releaseResources() val summary = ExecutedWriteSummary( updatedPartitions = updatedPartitions.toSet, - stats = statsTrackers.map(_.getFinalStats())) + stats = statsTrackers.map(_.getFinalStats)) WriteTaskResult(committer.commitTask(taskAttemptContext), summary) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 16b2367bfdd5c..88f2ea5ce8a32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -56,7 +56,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = ExpressionSet(normalizedFilters - .filterNot(SubqueryExpression.hasSubquery(_)) + .filterNot(SubqueryExpression.hasSubquery) .filter(_.references.subsetOf(partitionSet))) if (partitionKeyFilters.nonEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala index c39a82ee037bc..959f4d6a4152b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala @@ -83,7 +83,7 @@ trait WriteTaskStatsTracker { * @note This may only be called once. Further use of the object may lead to undefined behavior. * @return An object of subtype of [[WriteTaskStats]], to be sent to the driver. */ - def getFinalStats(): WriteTaskStats + def getFinalStats: WriteTaskStats } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 16b493892e3be..84eefa1084368 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -111,7 +111,7 @@ object JDBCRDD extends Logging { case In(attr, value) if value.isEmpty => s"CASE WHEN ${quote(attr)} IS NULL THEN NULL ELSE FALSE END" case In(attr, value) => s"${quote(attr)} IN (${dialect.compileValue(value)})" - case Not(f) => compileFilter(f, dialect).map(p => s"(NOT ($p))").getOrElse(null) + case Not(filter) => compileFilter(filter, dialect).map(p => s"(NOT ($p))").orNull case Or(f1, f2) => // We can't compile Or filter unless both sub-filters are compiled successfully. // It applies too for the following And filter. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index edea549748b47..93db4955c20de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -117,7 +117,7 @@ object JdbcUtils extends Logging { } def isCascadingTruncateTable(url: String): Option[Boolean] = { - JdbcDialects.get(url).isCascadingTruncateTable() + JdbcDialects.get(url).isCascadingTruncateTable } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala index 8361762b09703..e2b8678073021 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala @@ -21,7 +21,6 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.OutputWriter diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index e4e201995faa2..994ef8f9a98cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -51,7 +51,7 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti // Note that the option 'lineSep' uses a different default value in read and write. val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => - lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8)) + lineSep.getBytes(encoding.map(Charset.forName).getOrElse(StandardCharsets.UTF_8)) } val lineSeparatorInWrite: Array[Byte] = lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index cbf707f4a9cfd..803bfce7bc2a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -53,7 +53,7 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } - override def isZero(): Boolean = _value == _zeroValue + override def isZero: Boolean = _value == _zeroValue override def add(v: Long): Unit = _value += v @@ -95,7 +95,7 @@ object SQLMetrics { def createMetric(sc: SparkContext, name: String): SQLMetric = { val acc = new SQLMetric(SUM_METRIC) - acc.register(sc, name = Some(name), countFailedValues = false) + acc.register(sc, name = Some(name)) acc } @@ -108,7 +108,7 @@ object SQLMetrics { // data size total (min, med, max): // 100GB (100MB, 1GB, 10GB) val acc = new SQLMetric(SIZE_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false) + acc.register(sc, name = Some(s"$name total (min, med, max)")) acc } @@ -117,7 +117,7 @@ object SQLMetrics { // duration(min, med, max): // 5s (800ms, 1s, 2s) val acc = new SQLMetric(TIMING_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false) + acc.register(sc, name = Some(s"$name total (min, med, max)")) acc } @@ -132,7 +132,7 @@ object SQLMetrics { // probe avg (min, med, max): // (1.2, 2.2, 6.3) val acc = new SQLMetric(AVERAGE_METRIC) - acc.register(sc, name = Some(s"$name (min, med, max)"), countFailedValues = false) + acc.register(sc, name = Some(s"$name (min, med, max)")) acc } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 18992d7a9f974..01030306596a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -159,9 +159,9 @@ class ArrowPythonRunner( stream.readInt() match { case SpecialLengths.START_ARROW_STREAM => reader = new ArrowStreamReader(stream, allocator) - root = reader.getVectorSchemaRoot() - schema = ArrowUtils.fromArrowSchema(root.getSchema()) - vectors = root.getFieldVectors().asScala.map { vector => + root = reader.getVectorSchemaRoot + schema = ArrowUtils.fromArrowSchema(root.getSchema) + vectors = root.getFieldVectors.asScala.map { vector => new ArrowColumnVector(vector) }.toArray[ColumnVector] read() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 310ebcdf67686..3116f7cf16778 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -22,11 +22,11 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} -import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, InSet, Literal, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, Literal, PlanExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, DataType, StructType} +import org.apache.spark.sql.types.{DataType, StructType} /** * The base class for subquery that is used in SparkPlan. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala index 1e076207bc607..2c580b8a335ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala @@ -97,8 +97,8 @@ abstract class Aggregator[-IN, BUF, OUT] extends Serializable { * @since 1.6.0 */ def toColumn: TypedColumn[IN, OUT] = { - implicit val bEncoder = bufferEncoder - implicit val cEncoder = outputEncoder + implicit val bEncoder: Encoder[BUF] = bufferEncoder + implicit val cEncoder: Encoder[OUT] = outputEncoder val expr = AggregateExpression( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala index 697757f8a73ce..845c9bc7c5dbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala @@ -147,7 +147,7 @@ private[sql] object SparkUserDefinedFunction { f: AnyRef, dataType: DataType, inputSchemas: Option[Seq[ScalaReflection.Schema]]): UserDefinedFunction = { - val udf = new UserDefinedFunction(f, dataType, inputSchemas.map(_.map(_.dataType))) + val udf = UserDefinedFunction(f, dataType, inputSchemas.map(_.map(_.dataType))) udf.nullableTypes = inputSchemas.map(_.map(_.nullable)) udf } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala index 3a3246a1b1d13..289c53be6f38a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala @@ -54,12 +54,12 @@ private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect dialects.head.getSchemaQuery(table) } - override def isCascadingTruncateTable(): Option[Boolean] = { + override def isCascadingTruncateTable: Option[Boolean] = { // If any dialect claims cascading truncate, this dialect is also cascading truncate. // Otherwise, if any dialect has unknown cascading truncate, this dialect is also unknown. - dialects.flatMap(_.isCascadingTruncateTable()).reduceOption(_ || _) match { + dialects.flatMap(_.isCascadingTruncateTable).reduceOption(_ || _) match { case Some(true) => Some(true) - case _ if dialects.exists(_.isCascadingTruncateTable().isEmpty) => None + case _ if dialects.exists(_.isCascadingTruncateTable.isEmpty) => None case _ => Some(false) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index d160ad82888a2..d2ef69723ab4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -48,5 +48,5 @@ private object DB2Dialect extends JdbcDialect { case _ => None } - override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + override def isCascadingTruncateTable: Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala index d13c29ed46bd5..a11ca4236bf02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala @@ -42,5 +42,5 @@ private object DerbyDialect extends JdbcDialect { case _ => None } - override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + override def isCascadingTruncateTable: Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index f76c1fae562c6..054ee0407fc19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -182,7 +182,7 @@ abstract class JdbcDialect extends Serializable { * Some[false] : TRUNCATE TABLE does not cause cascading. * None: The behavior of TRUNCATE TABLE is unknown (default). */ - def isCascadingTruncateTable(): Option[Boolean] = None + def isCascadingTruncateTable: Option[Boolean] = None } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index da787b4859a73..f8aed32be2311 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -41,5 +41,5 @@ private object MsSqlServerDialect extends JdbcDialect { case _ => None } - override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + override def isCascadingTruncateTable: Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index b2cff7877d8b5..3fb68d574dd47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -45,5 +45,5 @@ private case object MySQLDialect extends JdbcDialect { s"SELECT 1 FROM $table LIMIT 1" } - override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + override def isCascadingTruncateTable: Option[Boolean] = Some(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index f4a6d0a4d2e44..e373837dceee9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -94,7 +94,7 @@ private case object OracleDialect extends JdbcDialect { case _ => value } - override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + override def isCascadingTruncateTable: Option[Boolean] = Some(false) /** * The SQL query used to truncate a table. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index f8d2bc8e0f13f..413999c84e470 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -38,7 +38,7 @@ private object PostgresDialect extends JdbcDialect { } else if (sqlType == Types.OTHER) { Some(StringType) } else if (sqlType == Types.ARRAY) { - val scale = md.build.getLong("scale").toInt + val scale = md.build().getLong("scale").toInt // postgres array type names start with underscore toCatalystType(typeName.drop(1), size, scale).map(ArrayType(_)) } else None @@ -85,7 +85,7 @@ private object PostgresDialect extends JdbcDialect { s"SELECT 1 FROM $table LIMIT 1" } - override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + override def isCascadingTruncateTable: Option[Boolean] = Some(false) /** * The SQL query used to truncate a table. For Postgres, the default behaviour is to diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala index 6c17bd7ed9ec4..a8ef543af4db2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.jdbc -import java.sql.Types - import org.apache.spark.sql.types._ @@ -33,7 +31,7 @@ private case object TeradataDialect extends JdbcDialect { } // Teradata does not support cascading a truncation - override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + override def isCascadingTruncateTable: Option[Boolean] = Some(false) /** * The SQL query used to truncate a table. Teradata does not support the 'TRUNCATE' syntax that diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala index 32941d8d2cd11..ae12e96003788 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala @@ -72,7 +72,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { } private def finalStatus(tracker: BasicWriteTaskStatsTracker): BasicWriteTaskStats = { - tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats] + tracker.getFinalStats.asInstanceOf[BasicWriteTaskStats] } test("No files in run") { From 52ed24ea472f49bc1bf98c36348a0fe19532bc28 Mon Sep 17 00:00:00 2001 From: Darcy Shen Date: Wed, 10 Oct 2018 18:01:09 +0800 Subject: [PATCH 2/2] fix compile on unit tests --- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7fa0e7fc162ca..6e5f3d8a0ac81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -777,7 +777,7 @@ class JDBCSuite extends QueryTest assert(!agg.canHandle("jdbc:h2")) assert(agg.getCatalystType(0, "", 1, null) === Some(LongType)) assert(agg.getCatalystType(1, "", 1, null) === Some(StringType)) - assert(agg.isCascadingTruncateTable() === Some(true)) + assert(agg.isCascadingTruncateTable === Some(true)) assert(agg.quoteIdentifier ("Dummy") === "My Dummy quoteIdentifier") assert(agg.getTableExistsQuery ("Dummy") === "My Dummy Table") assert(agg.getSchemaQuery ("Dummy") === "My Dummy Schema") @@ -791,13 +791,13 @@ class JDBCSuite extends QueryTest typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = None - override def isCascadingTruncateTable(): Option[Boolean] = cascadingTruncateTable + override def isCascadingTruncateTable: Option[Boolean] = cascadingTruncateTable } def testDialects(cascadings: List[Option[Boolean]], expected: Option[Boolean]): Unit = { - val dialects = cascadings.map(genDialect(_)) + val dialects = cascadings.map(genDialect) val agg = new AggregatedDialect(dialects) - assert(agg.isCascadingTruncateTable() === expected) + assert(agg.isCascadingTruncateTable === expected) } testDialects(List(Some(true), Some(false), None), Some(true))