From cfbdd6a1f5906b848c520d3365cc4034992215d9 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 11 Sep 2018 14:46:03 -0500 Subject: [PATCH] [SPARK-25398] Minor bugs from comparing unrelated types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? Correct some comparisons between unrelated types to what they seem to… have been trying to do ## How was this patch tested? Existing tests. Closes #22384 from srowen/SPARK-25398. Authored-by: Sean Owen Signed-off-by: Sean Owen --- .../org/apache/spark/status/LiveEntity.scala | 4 +--- .../apache/spark/util/ClosureCleaner.scala | 2 +- .../ExternalAppendOnlyMapSuite.scala | 4 +--- .../cluster/mesos/MesosClusterScheduler.scala | 20 +++++++++---------- .../mesos/MesosClusterSchedulerSuite.scala | 14 ++++++------- ...esosFineGrainedSchedulerBackendSuite.scala | 2 +- .../spark/deploy/yarn/ClientSuite.scala | 2 +- .../PropagateEmptyRelationSuite.scala | 2 +- .../sql/catalyst/util/UnsafeArraySuite.scala | 4 ++-- .../org/apache/spark/sql/DatasetSuite.scala | 2 +- .../parquet/ParquetSchemaSuite.scala | 2 +- 11 files changed, 27 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 762aed4133517..8708e64db3c17 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -33,7 +33,6 @@ import org.apache.spark.storage.RDDInfo import org.apache.spark.ui.SparkUI import org.apache.spark.util.AccumulatorContext import org.apache.spark.util.collection.OpenHashSet -import org.apache.spark.util.kvstore.KVStore /** * A mutable representation of a live entity in Spark (jobs, stages, tasks, et al). Every live @@ -588,8 +587,7 @@ private object LiveEntityHelpers { .filter { acc => // We don't need to store internal or SQL accumulables as their values will be shown in // other places, so drop them to reduce the memory usage. - !acc.internal && (!acc.metadata.isDefined || - acc.metadata.get != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) + !acc.internal && acc.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER) } .map { acc => new v1.AccumulableInfo( diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index b6c300c4778b1..43d62561e8eba 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -175,7 +175,7 @@ private[spark] object ClosureCleaner extends Logging { closure.getClass.isSynthetic && closure .getClass - .getInterfaces.exists(_.getName.equals("scala.Serializable")) + .getInterfaces.exists(_.getName == "scala.Serializable") if (isClosureCandidate) { try { diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index d542ba0b6640d..8a2f2ffe0acf1 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.util.collection -import java.util.Objects - import scala.collection.mutable.ArrayBuffer import scala.ref.WeakReference @@ -509,7 +507,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite .sorted assert(it.isEmpty) - assert(keys == (0 until 100)) + assert(keys == (0 until 100).toList) assert(map.numSpills == 0) // these asserts try to show that we're no longer holding references to the underlying map. diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 7d80eedcc43ce..cb1bcba651be6 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -202,7 +202,7 @@ private[spark] class MesosClusterScheduler( } else if (removeFromPendingRetryDrivers(submissionId)) { k.success = true k.message = "Removed driver while it's being retried" - } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) { + } else if (finishedDrivers.exists(_.driverDescription.submissionId == submissionId)) { k.success = false k.message = "Driver already terminated" } else { @@ -222,21 +222,21 @@ private[spark] class MesosClusterScheduler( } s.submissionId = submissionId stateLock.synchronized { - if (queuedDrivers.exists(_.submissionId.equals(submissionId))) { + if (queuedDrivers.exists(_.submissionId == submissionId)) { s.success = true s.driverState = "QUEUED" } else if (launchedDrivers.contains(submissionId)) { s.success = true s.driverState = "RUNNING" launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString) - } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) { + } else if (finishedDrivers.exists(_.driverDescription.submissionId == submissionId)) { s.success = true s.driverState = "FINISHED" finishedDrivers .find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus .foreach(state => s.message = state.toString) - } else if (pendingRetryDrivers.exists(_.submissionId.equals(submissionId))) { - val status = pendingRetryDrivers.find(_.submissionId.equals(submissionId)) + } else if (pendingRetryDrivers.exists(_.submissionId == submissionId)) { + val status = pendingRetryDrivers.find(_.submissionId == submissionId) .get.retryState.get.lastFailureStatus s.success = true s.driverState = "RETRYING" @@ -254,13 +254,13 @@ private[spark] class MesosClusterScheduler( */ def getDriverState(submissionId: String): Option[MesosDriverState] = { stateLock.synchronized { - queuedDrivers.find(_.submissionId.equals(submissionId)) + queuedDrivers.find(_.submissionId == submissionId) .map(d => new MesosDriverState("QUEUED", d)) .orElse(launchedDrivers.get(submissionId) .map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d)))) - .orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId)) + .orElse(finishedDrivers.find(_.driverDescription.submissionId == submissionId) .map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d)))) - .orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId)) + .orElse(pendingRetryDrivers.find(_.submissionId == submissionId) .map(d => new MesosDriverState("RETRYING", d))) } } @@ -814,7 +814,7 @@ private[spark] class MesosClusterScheduler( status: Int): Unit = {} private def removeFromQueuedDrivers(subId: String): Boolean = { - val index = queuedDrivers.indexWhere(_.submissionId.equals(subId)) + val index = queuedDrivers.indexWhere(_.submissionId == subId) if (index != -1) { queuedDrivers.remove(index) queuedDriversState.expunge(subId) @@ -834,7 +834,7 @@ private[spark] class MesosClusterScheduler( } private def removeFromPendingRetryDrivers(subId: String): Boolean = { - val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(subId)) + val index = pendingRetryDrivers.indexWhere(_.submissionId == subId) if (index != -1) { pendingRetryDrivers.remove(index) pendingRetryDriversState.expunge(subId) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index e534b9d7e3ed9..082d4bcfdf83a 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -21,7 +21,7 @@ import java.util.{Collection, Collections, Date} import scala.collection.JavaConverters._ -import org.apache.mesos.Protos.{Environment, Secret, TaskState => MesosTaskState, _} +import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} import org.apache.mesos.Protos.Value.{Scalar, Type} import org.apache.mesos.SchedulerDriver import org.mockito.{ArgumentCaptor, Matchers} @@ -146,14 +146,14 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi assert(scheduler.getResource(resources, "cpus") == 1.5) assert(scheduler.getResource(resources, "mem") == 1200) val resourcesSeq: Seq[Resource] = resources.asScala - val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList + val cpus = resourcesSeq.filter(_.getName == "cpus").toList assert(cpus.size == 2) - assert(cpus.exists(_.getRole().equals("role2"))) - assert(cpus.exists(_.getRole().equals("*"))) - val mem = resourcesSeq.filter(_.getName.equals("mem")).toList + assert(cpus.exists(_.getRole() == "role2")) + assert(cpus.exists(_.getRole() == "*")) + val mem = resourcesSeq.filter(_.getName == "mem").toList assert(mem.size == 2) - assert(mem.exists(_.getRole().equals("role2"))) - assert(mem.exists(_.getRole().equals("*"))) + assert(mem.exists(_.getRole() == "role2")) + assert(mem.exists(_.getRole() == "*")) verify(driver, times(1)).launchTasks( Matchers.eq(Collections.singleton(offer.getId)), diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 31f84310485a0..1ead4b1ed7c7e 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -106,7 +106,7 @@ class MesosFineGrainedSchedulerBackendSuite // uri is null. val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") val executorResources = executorInfo.getResourcesList - val cpus = executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue + val cpus = executorResources.asScala.find(_.getName == "cpus").get.getScalar.getValue assert(cpus === mesosExecutorCores) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 7fa597167f3f0..26013a109c42b 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -191,7 +191,7 @@ class ClientSuite extends SparkFunSuite with Matchers { appContext.getQueue should be ("staging-queue") appContext.getAMContainerSpec should be (containerLaunchContext) appContext.getApplicationType should be ("SPARK") - appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method => + appContext.getClass.getMethods.filter(_.getName == "getApplicationTags").foreach { method => val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]] tags should contain allOf ("tag1", "dup", "tag2", "multi word") tags.asScala.count(_.nonEmpty) should be (4) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index f1ce7543ffdc1..d395bba105a7b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -147,7 +147,7 @@ class PropagateEmptyRelationSuite extends PlanTest { .where(false) .select('a) .where('a > 1) - .where('a != 200) + .where('a =!= 200) .orderBy('a.asc) val optimized = Optimize.execute(query.analyze) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala index 8f75c14192c9b..755c8897cada2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala @@ -114,7 +114,7 @@ class UnsafeArraySuite extends SparkFunSuite { assert(unsafeDate.isInstanceOf[UnsafeArrayData]) assert(unsafeDate.numElements == dateArray.length) dateArray.zipWithIndex.map { case (e, i) => - assert(unsafeDate.get(i, DateType) == e) + assert(unsafeDate.get(i, DateType).asInstanceOf[Int] == e) } val unsafeTimestamp = ExpressionEncoder[Array[Long]].resolveAndBind(). @@ -122,7 +122,7 @@ class UnsafeArraySuite extends SparkFunSuite { assert(unsafeTimestamp.isInstanceOf[UnsafeArrayData]) assert(unsafeTimestamp.numElements == timestampArray.length) timestampArray.zipWithIndex.map { case (e, i) => - assert(unsafeTimestamp.get(i, TimestampType) == e) + assert(unsafeTimestamp.get(i, TimestampType).asInstanceOf[Long] == e) } Seq(decimalArray4_1, decimalArray20_20).map { decimalArray => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index ca8fbc991a3a5..4e593ff046a53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -611,7 +611,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { ).toDF("id", "stringData") val sampleDF = df.sample(false, 0.7, 50) // After sampling, sampleDF doesn't contain id=1. - assert(!sampleDF.select("id").collect.contains(1)) + assert(!sampleDF.select("id").as[Int].collect.contains(1)) // simpleUdf should not encounter id=1. checkAnswer(sampleDF.select(simpleUdf($"id")), List.fill(sampleDF.count.toInt)(Row(1))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 7eefedb8ff5bb..528a4d0ca8004 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -427,7 +427,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { assert(errMsg.startsWith("Parquet column cannot be converted in file")) val file = errMsg.substring("Parquet column cannot be converted in file ".length, errMsg.indexOf(". ")) - val col = spark.read.parquet(file).schema.fields.filter(_.name.equals("a")) + val col = spark.read.parquet(file).schema.fields.filter(_.name == "a") assert(col.length == 1) if (col(0).dataType == StringType) { assert(errMsg.contains("Column: [a], Expected: int, Found: BINARY"))