Skip to content

Commit

Permalink
[SPARK-9262][build] Treat Scala compiler warnings as errors
Browse files Browse the repository at this point in the history
I've seen a few cases in the past few weeks that the compiler is throwing warnings that are caused by legitimate bugs. This patch upgrades warnings to errors, except deprecation warnings.

Note that ideally we should be able to mark deprecation warnings as errors as well. However, due to the lack of ability to suppress individual warning messages in the Scala compiler, we cannot do that (since we do need to access deprecated APIs in Hadoop).

Most of the work are done by ericl.

Author: Reynold Xin <rxin@databricks.com>
Author: Eric Liang <ekl@databricks.com>

Closes #7598 from rxin/warnings and squashes the following commits:

beb311b [Reynold Xin] Fixed tests.
542c031 [Reynold Xin] Fixed one more warning.
87c354a [Reynold Xin] Fixed all non-deprecation warnings.
78660ac [Eric Liang] first effort to fix warnings
  • Loading branch information
rxin committed Jul 23, 2015
1 parent a721ee5 commit d71a13f
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.api.r
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}

import scala.collection.mutable.HashMap
import scala.language.existentials

import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
}

override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
rdds.map { rdd: RDD[_] =>
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
Expand Down Expand Up @@ -133,7 +134,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
case oneToOneDependency: OneToOneDependency[Product2[K, Any]] =>
case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
val dependencyPartition = split.narrowDeps(depNum).get.split
// Read them from the parent
val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ private[spark] object JsonProtocol {
logStartToJson(logStart)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
executorMetricsUpdateToJson(metricsUpdate)
case blockUpdated: SparkListenerBlockUpdated =>
throw new MatchError(blockUpdated) // TODO(ekl) implement this
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import java.io.{ObjectInputStream, ObjectOutputStream}

import org.apache.hadoop.conf.Configuration

import org.apache.spark.util.Utils

private[spark]
class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import java.io.{ObjectInputStream, ObjectOutputStream}

import org.apache.hadoop.mapred.JobConf

import org.apache.spark.util.Utils

private[spark]
class SerializableJobConf(@transient var value: JobConf) extends Serializable {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.mllib.stat.test
import scala.annotation.varargs

import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution}
import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => CommonMathKolmogorovSmirnovTest}

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -187,7 +187,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
}

private def evalOneSampleP(ksStat: Double, n: Long): KolmogorovSmirnovTestResult = {
val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
val pval = 1 - new CommonMathKolmogorovSmirnovTest().cdf(ksStat, n.toInt)
new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString)
}
}
Expand Down
33 changes: 32 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,38 @@ object SparkBuild extends PomBuild {
if (major.toInt >= 1 && minor.toInt >= 8) Seq("-Xdoclint:all", "-Xdoclint:-missing") else Seq.empty
},

javacOptions in Compile ++= Seq("-encoding", "UTF-8")
javacOptions in Compile ++= Seq("-encoding", "UTF-8"),

// Implements -Xfatal-warnings, ignoring deprecation warnings.
// Code snippet taken from https://issues.scala-lang.org/browse/SI-8410.
compile in Compile := {
val analysis = (compile in Compile).value
val s = streams.value

def logProblem(l: (=> String) => Unit, f: File, p: xsbti.Problem) = {
l(f.toString + ":" + p.position.line.fold("")(_ + ":") + " " + p.message)
l(p.position.lineContent)
l("")
}

var failed = 0
analysis.infos.allInfos.foreach { case (k, i) =>
i.reportedProblems foreach { p =>
val deprecation = p.message.contains("is deprecated")

if (!deprecation) {
failed = failed + 1
}

logProblem(if (deprecation) s.log.warn else s.log.error, k, p)
}
}

if (failed > 0) {
sys.error(s"$failed fatal warnings")
}
analysis
}
)

def enable(settings: Seq[Setting[_]])(projectRef: ProjectRef) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.{Map => JavaMap}
import javax.annotation.Nullable

import scala.collection.mutable.HashMap
import scala.language.existentials

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -401,7 +402,7 @@ object CatalystTypeConverters {
case seq: Seq[Any] => seq.map(convertToCatalyst)
case r: Row => InternalRow(r.toSeq.map(convertToCatalyst): _*)
case arr: Array[Any] => arr.toSeq.map(convertToCatalyst).toArray
case m: Map[Any, Any] =>
case m: Map[_, _] =>
m.map { case (k, v) => (convertToCatalyst(k), convertToCatalyst(v)) }.toMap
case other => other
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ final class DataFrameWriter private[sql](df: DataFrame) {
// the table. But, insertInto with Overwrite requires the schema of data be the same
// the schema of the table.
insertInto(tableName)

case SaveMode.Overwrite =>
throw new UnsupportedOperationException("overwrite mode unsupported.")
}
} else {
val cmd =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ private[sql] case class InsertIntoHadoopFsRelation(
val pathExists = fs.exists(qualifiedOutputPath)
val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"path $qualifiedOutputPath already exists.")
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
fs.delete(qualifiedOutputPath, true)
true
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
true
case (SaveMode.Ignore, exists) =>
!exists
case (s, exists) =>
throw new IllegalStateException(s"unsupported save mode $s ($exists)")
}
// If we are appending data to an existing dir.
val isAppend = pathExists && (mode == SaveMode.Append)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ private[orc] object OrcFilters extends Logging {
private def buildSearchArgument(expression: Filter, builder: Builder): Option[Builder] = {
def newBuilder = SearchArgument.FACTORY.newBuilder()

def isSearchableLiteral(value: Any) = value match {
def isSearchableLiteral(value: Any): Boolean = value match {
// These are types recognized by the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
case _: String | _: Long | _: Double | _: DateWritable | _: HiveDecimal | _: HiveChar |
_: HiveVarchar | _: Byte | _: Short | _: Integer | _: Float => true
case _: String | _: Long | _: Double | _: Byte | _: Short | _: Integer | _: Float => true
case _: DateWritable | _: HiveDecimal | _: HiveChar | _: HiveVarchar => true
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {

test("save()/load() - non-partitioned table - ErrorIfExists") {
withTempDir { file =>
intercept[RuntimeException] {
intercept[AnalysisException] {
testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).save(file.getCanonicalPath)
}
}
Expand Down Expand Up @@ -233,7 +233,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {

test("save()/load() - partitioned table - ErrorIfExists") {
withTempDir { file =>
intercept[RuntimeException] {
intercept[AnalysisException] {
partitionedTestDF.write
.format(dataSourceName)
.mode(SaveMode.ErrorIfExists)
Expand Down Expand Up @@ -696,7 +696,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
// This should only complain that the destination directory already exists, rather than file
// "empty" is not a Parquet file.
assert {
intercept[RuntimeException] {
intercept[AnalysisException] {
df.write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
}.getMessage.contains("already exists")
}
Expand Down

0 comments on commit d71a13f

Please sign in to comment.