diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala index f4254c75a2a8c..c06498684fa6f 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala @@ -24,7 +24,7 @@ import scala.reflect.ClassTag import com.google.rpc.ErrorInfo import io.grpc.{ManagedChannel, StatusRuntimeException} import io.grpc.protobuf.StatusProto -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.jackson.JsonMethods import org.apache.spark.{QueryContext, QueryContextType, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException} @@ -354,7 +354,7 @@ private[client] object GrpcExceptionConverter { * truncated error message. */ private def errorInfoToThrowable(info: ErrorInfo, message: String): Throwable = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val classes = JsonMethods.parse(info.getMetadataOrDefault("classes", "[]")).extract[Array[String]] val errorClass = info.getMetadataOrDefault("errorClass", null) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 6dd5af2389a81..7f3eb0370a078 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -21,14 +21,14 @@ import scala.collection.mutable.HashMap import scala.util.control.NonFatal import org.apache.kafka.common.TopicPartition -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization /** * Utilities for converting Kafka related objects to and from json. */ private object JsonUtils { - private implicit val formats = Serialization.formats(NoTypeHints) + private implicit val formats: Formats = Serialization.formats(NoTypeHints) /** * Read TopicPartitions from json string @@ -96,10 +96,8 @@ private object JsonUtils { */ def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { val result = new HashMap[String, HashMap[Int, Long]]() - implicit val order = new Ordering[TopicPartition] { - override def compare(x: TopicPartition, y: TopicPartition): Int = { - Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) - } + implicit val order: Ordering[TopicPartition] = (x: TopicPartition, y: TopicPartition) => { + Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) } val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism partitions.foreach { tp => diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 7209e2c373ab1..d6a50ff84f562 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -30,6 +30,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.sys.process._ +import org.json4s.Formats import org.json4s.jackson.JsonMethods import org.apache.spark.{SparkConf, SparkContext} @@ -340,7 +341,7 @@ private object FaultToleranceTest extends App with Logging { private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File) extends Logging { - implicit val formats = org.json4s.DefaultFormats + implicit val formats: Formats = org.json4s.DefaultFormats var state: RecoveryState.Value = _ var liveWorkerIPs: List[String] = _ var numLiveApps = 0 @@ -383,7 +384,7 @@ private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile private class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File) extends Logging { - implicit val formats = org.json4s.DefaultFormats + implicit val formats: Formats = org.json4s.DefaultFormats logDebug("Created worker: " + this) diff --git a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala index db9f5c516cf65..509049550ad4f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala @@ -23,7 +23,7 @@ import java.nio.file.Files import scala.collection.mutable import scala.util.control.NonFatal -import org.json4s.{DefaultFormats, Extraction} +import org.json4s.{DefaultFormats, Extraction, Formats} import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.SparkException @@ -115,7 +115,7 @@ private[spark] object StandaloneResourceUtils extends Logging { private def writeResourceAllocationJson[T]( allocations: Seq[T], jsonFile: File): Unit = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val allocationJson = Extraction.decompose(allocations) Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes()) } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f964e2b50b577..f1a9aa353e76d 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -27,7 +27,6 @@ import scala.util.{Failure, Success} import scala.util.control.NonFatal import io.netty.util.internal.PlatformDependent -import org.json4s.DefaultFormats import org.apache.spark._ import org.apache.spark.TaskState.TaskState @@ -60,8 +59,6 @@ private[spark] class CoarseGrainedExecutorBackend( import CoarseGrainedExecutorBackend._ - private implicit val formats = DefaultFormats - private[spark] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala index c9e5ba1ad8e04..80f7b4d500eaf 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala @@ -19,7 +19,7 @@ package org.apache.spark.resource import scala.util.control.NonFatal -import org.json4s.{DefaultFormats, Extraction, JValue} +import org.json4s.{DefaultFormats, Extraction, Formats, JValue} import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkException @@ -70,7 +70,7 @@ private[spark] object ResourceInformation { * Parses a JSON string into a [[ResourceInformation]] instance. */ def parseJson(json: String): ResourceInformation = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats try { parse(json).extract[ResourceInformationJson].toResourceInformation } catch { @@ -81,7 +81,7 @@ private[spark] object ResourceInformation { } def parseJson(json: JValue): ResourceInformation = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats try { json.extract[ResourceInformationJson].toResourceInformation } catch { diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index fe08e8337f76f..a6f2ac35af7a7 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -22,7 +22,7 @@ import java.util.Optional import scala.util.control.NonFatal -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SparkConf, SparkException} @@ -253,7 +253,7 @@ private[spark] object ResourceUtils extends Logging { def parseAllocatedFromJsonFile(resourcesFile: String): Seq[ResourceAllocation] = { withResourcesJson[ResourceAllocation](resourcesFile) { json => - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats parse(json).extract[Seq[ResourceAllocation]] } } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala index d19744db089ba..96dc5ac44b47a 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala @@ -31,7 +31,7 @@ private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { private[spark] class AppStatusSource extends Source { - override implicit val metricRegistry = new MetricRegistry() + override implicit val metricRegistry: MetricRegistry = new MetricRegistry() override val sourceName = "appStatus" diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 17cd0891532b1..b4920c7cb841d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -22,7 +22,7 @@ import java.util.{HashMap => JHashMap} import java.util.concurrent.TimeUnit import scala.collection.mutable -import scala.concurrent.{ExecutionContext, Future, TimeoutException} +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, TimeoutException} import scala.jdk.CollectionConverters._ import scala.util.Random import scala.util.control.NonFatal @@ -100,7 +100,8 @@ class BlockManagerMasterEndpoint( private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100) - private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool) + private implicit val askExecutionContext: ExecutionContextExecutorService = + ExecutionContext.fromExecutorService(askThreadPool) private val topologyMapper = { val topologyMapperClassName = conf.get( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala index 476be80e67df3..5cc08714d41c1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} import org.apache.spark.{MapOutputTracker, SparkEnv} import org.apache.spark.internal.Logging @@ -38,7 +38,8 @@ class BlockManagerStorageEndpoint( private val asyncThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-storage-async-thread-pool", 100) - private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool) + private implicit val asyncExecutionContext: ExecutionContextExecutorService = + ExecutionContext.fromExecutorService(asyncThreadPool) // Operations that involve removing blocks may be slow and should be done asynchronously override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 2d3d6ec89ffbd..5addac7371925 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -379,7 +379,7 @@ private[spark] object ThreadUtils { def parmap[I, O](in: Seq[I], prefix: String, maxThreads: Int)(f: I => O): Seq[O] = { val pool = newForkJoinPool(prefix, maxThreads) try { - implicit val ec = ExecutionContext.fromExecutor(pool) + implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(pool) val futures = in.map(x => Future(f(x))) val futureSeq = Future.sequence(futures) diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index b78bb1f57564d..05709c9bdd756 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.storage._ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager]) extends SparkFunSuite with BeforeAndAfter with LocalSparkContext { - implicit val defaultTimeout = timeout(10.seconds) + implicit val defaultTimeout: PatienceConfiguration.Timeout = timeout(10.seconds) val conf = new SparkConf() .setMaster("local[2]") .setAppName("ContextCleanerSuite") diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index 1b2d92af4b026..f3bae2066e146 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -23,7 +23,7 @@ import java.util.function.Supplier import scala.concurrent.duration._ -import org.json4s.{DefaultFormats, Extraction} +import org.json4s.{DefaultFormats, Extraction, Formats} import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Answers.RETURNS_SMART_NULLS import org.mockito.ArgumentMatchers.any @@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with P } def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts) - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats private val _generateWorkerId = PrivateMethod[String](Symbol("generateWorkerId")) diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 35fe0b0d1c908..45c27aea60228 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.concurrent.TrieMap import scala.concurrent.duration._ -import org.json4s.{DefaultFormats, Extraction} +import org.json4s.{DefaultFormats, Extraction, Formats} import org.json4s.JsonAST.{JArray, JObject} import org.json4s.JsonDSL._ import org.mockito.ArgumentMatchers.any @@ -50,7 +50,7 @@ import org.apache.spark.util.{SerializableBuffer, ThreadUtils, Utils} class CoarseGrainedExecutorBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats def createSparkConf(): SparkConf = { new SparkConf() diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 642216a7a471d..a6f1707a1aabf 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.memory import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} import scala.concurrent.duration.Duration import org.mockito.ArgumentMatchers.{any, anyLong} @@ -148,7 +148,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite { // -- Tests of sharing of execution memory between tasks ---------------------------------------- // Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite. - implicit val ec = ExecutionContext.global + implicit val ec: ExecutionContextExecutor = ExecutionContext.global test("single task requesting on-heap execution memory") { val manager = createMemoryManager(1000L) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index f133a38269d71..f7c7ca2bd9365 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import java.util.Properties -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} import scala.language.implicitConversions import scala.reflect.ClassTag @@ -31,7 +31,7 @@ import org.apache.spark.util.ThreadUtils class BlockInfoManagerSuite extends SparkFunSuite { - private implicit val ec = ExecutionContext.global + private implicit val ec: ExecutionContextExecutor = ExecutionContext.global private var blockInfoManager: BlockInfoManager = _ override protected def beforeEach(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index a2d1293fc5f5a..8689a9c16d69b 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -83,7 +83,7 @@ private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler { class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers { implicit var webDriver: WebDriver = _ - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats override def beforeAll(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala b/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala index 6888e492a8d33..6902493dc3c5d 100644 --- a/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala @@ -22,14 +22,14 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration._ -import org.scalatest.concurrent.{ThreadSignaler, TimeLimits} +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.apache.spark.SparkFunSuite class KeyLockSuite extends SparkFunSuite with TimeLimits { // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x - private implicit val defaultSignaler = ThreadSignaler + private implicit val defaultSignaler: Signaler = ThreadSignaler private val foreverMs = 60 * 1000L diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala index b17b86c08314b..4957c3d3f6c1e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala @@ -17,7 +17,7 @@ package org.apache.spark.examples.sql // $example on:programmatic_schema$ -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Encoder, Row} // $example off:programmatic_schema$ // $example on:init_session$ import org.apache.spark.sql.SparkSession @@ -220,7 +220,8 @@ object SparkSQLExample { // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly - implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] + implicit val mapEncoder: Encoder[Map[String, Any]] = + org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 23402599e543d..20fb4318c158b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -22,7 +22,7 @@ import java.util.Locale import breeze.linalg.normalize import breeze.numerics.exp import org.apache.hadoop.fs.Path -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonAST.JObject import org.json4s.jackson.JsonMethods._ @@ -385,7 +385,7 @@ private object LDAParams { def getAndSetParams(model: LDAParams, metadata: Metadata): Unit = { VersionUtils.majorMinorVersion(metadata.sparkVersion) match { case (1, 6) => - implicit val format = DefaultFormats + implicit val format: Formats = DefaultFormats metadata.params match { case JObject(pairs) => pairs.foreach { case (paramName, jsonValue) => diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala index 8f03a29eb991a..a8844358ead2d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.ml.linalg -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render} @@ -29,7 +29,7 @@ private[ml] object JsonMatrixConverter { * Parses the JSON representation of a Matrix into a [[Matrix]]. */ def fromJson(json: String): Matrix = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val jValue = parseJson(json) (jValue \ "type").extract[Int] match { case 0 => // sparse diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala index 1b949d75eeaa0..12387233879ad 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.linalg -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render} @@ -27,7 +27,7 @@ private[ml] object JsonVectorConverter { * Parses the JSON representation of a vector into a [[Vector]]. */ def fromJson(json: String): Vector = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val jValue = parseJson(json) (jValue \ "type").extract[Int] match { case 0 => // sparse diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index ef1adea592492..62df8f9d78447 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -129,7 +129,7 @@ private[ml] object Param { case JObject(v) => val keys = v.map(_._1) if (keys.contains("class")) { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val className = (jValue \ "class").extract[String] className match { case JsonMatrixConverter.className => @@ -398,7 +398,7 @@ class IntParam(parent: String, name: String, doc: String, isValid: Int => Boolea } override def jsonDecode(json: String): Int = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats parse(json).extract[Int] } } @@ -484,7 +484,7 @@ class LongParam(parent: String, name: String, doc: String, isValid: Long => Bool } override def jsonDecode(json: String): Long = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats parse(json).extract[Long] } } @@ -505,7 +505,7 @@ class BooleanParam(parent: String, name: String, doc: String) // No need for isV } override def jsonDecode(json: String): Boolean = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats parse(json).extract[Boolean] } } @@ -528,7 +528,7 @@ class StringArrayParam(parent: Params, name: String, doc: String, isValid: Array } override def jsonDecode(json: String): Array[String] = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats parse(json).extract[Seq[String]].toArray } } @@ -617,7 +617,7 @@ class IntArrayParam(parent: Params, name: String, doc: String, isValid: Array[In } override def jsonDecode(json: String): Array[Int] = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats parse(json).extract[Seq[Int]].toArray } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala index 594d9f315f508..66a96b0192297 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala @@ -137,7 +137,7 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg class AFTSurvivalRegressionWrapperReader extends MLReader[AFTSurvivalRegressionWrapper] { override def load(path: String): AFTSurvivalRegressionWrapper = { - implicit val format = DefaultFormats + implicit val format: Formats = DefaultFormats val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala index ad13cced4667b..125cdf7259fef 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala @@ -103,7 +103,7 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] { class ALSWrapperReader extends MLReader[ALSWrapper] { override def load(path: String): ALSWrapper = { - implicit val format = DefaultFormats + implicit val format: Formats = DefaultFormats val rMetadataPath = new Path(path, "rMetadata").toString val modelPath = new Path(path, "model").toString diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala index 5161bc72659c6..ad7435ce5be76 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.classification -import org.json4s.{DefaultFormats, JValue} +import org.json4s.{DefaultFormats, Formats, JValue} import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD @@ -65,7 +65,7 @@ private[mllib] object ClassificationModel { * @return (numFeatures, numClasses) */ def getNumFeaturesClasses(metadata: JValue): (Int, Int) = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats ((metadata \ "numFeatures").extract[Int], (metadata \ "numClasses").extract[Int]) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala index ceaae27d83429..9f3aad9238979 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala @@ -224,7 +224,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { } def load(sc: SparkContext, path: String): BisectingKMeansModel = { - implicit val formats: DefaultFormats = DefaultFormats + implicit val formats: Formats = DefaultFormats val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) assert(className == thisClassName) assert(formatVersion == thisFormatVersion) @@ -262,7 +262,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { } def load(sc: SparkContext, path: String): BisectingKMeansModel = { - implicit val formats: DefaultFormats = DefaultFormats + implicit val formats: Formats = DefaultFormats val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) assert(className == thisClassName) assert(formatVersion == thisFormatVersion) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 42ec37f438f4e..8982a8ca7c6c0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{DenseVector => BreezeVector} -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -177,7 +177,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { @Since("1.4.0") override def load(sc: SparkContext, path: String): GaussianMixtureModel = { val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path) - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val k = (metadata \ "k").extract[Int] val classNameV1_0 = SaveLoadV1_0.classNameV1_0 (loadedClassName, version) match { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index c4ea263ee116a..476df64581f7e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -181,7 +181,7 @@ object KMeansModel extends Loader[KMeansModel] { } def load(sc: SparkContext, path: String): KMeansModel = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) assert(className == thisClassName) @@ -216,7 +216,7 @@ object KMeansModel extends Loader[KMeansModel] { } def load(sc: SparkContext, path: String): KMeansModel = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) assert(className == thisClassName) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index aa8b6a00a427f..e318f06900950 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{argmax, argtopk, normalize, sum, DenseMatrix => BDM, DenseVector => BDV} import breeze.numerics.{exp, lgamma} import org.apache.hadoop.fs.Path -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -496,7 +496,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] { @Since("1.5.0") override def load(sc: SparkContext, path: String): LocalLDAModel = { val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path) - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val expectedK = (metadata \ "k").extract[Int] val expectedVocabSize = (metadata \ "vocabSize").extract[Int] val docConcentration = @@ -923,7 +923,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { @Since("1.5.0") override def load(sc: SparkContext, path: String): DistributedLDAModel = { val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path) - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val expectedK = (metadata \ "k").extract[Int] val vocabSize = (metadata \ "vocabSize").extract[Int] val docConcentration = diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index ba541bbcccd29..12c7ae5066c82 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -79,7 +79,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode @Since("1.4.0") def load(sc: SparkContext, path: String): PowerIterationClusteringModel = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index a1b00bccbc34e..41eb6567b845a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -147,7 +147,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] { } def load(sc: SparkContext, path: String): ChiSqSelectorModel = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) assert(className == thisClassName) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index d6a493a011b99..0dddbec8a7ed8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import scala.jdk.CollectionConverters._ import com.google.common.collect.{Ordering => GuavaOrdering} -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -704,7 +704,7 @@ object Word2VecModel extends Loader[Word2VecModel] { override def load(sc: SparkContext, path: String): Word2VecModel = { val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path) - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val expectedVectorSize = (metadata \ "vectorSize").extract[Int] val expectedNumWords = (metadata \ "numWords").extract[Int] val classNameV1_0 = SaveLoadV1_0.classNameV1_0 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala index c74ad6b5c1aed..0136e8bf169d2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala @@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe._ -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, render} @@ -126,7 +126,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] { } def load(sc: SparkContext, path: String): FPGrowthModel[_] = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index f2b7151feb16f..59d22f0eac991 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe._ -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, render} @@ -670,7 +670,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] { } def load(sc: SparkContext, path: String): PrefixSpanModel[_] = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index fbdb5843eb99d..80979970a867c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._ import scala.language.implicitConversions import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render} @@ -432,7 +432,7 @@ object Vectors { */ @Since("1.6.0") def fromJson(json: String): Vector = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val jValue = parseJson(json) (jValue \ "type").extract[Int] match { case 0 => // sparse diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 0cf0a94bb3bf2..9ffee8832db93 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -392,7 +392,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { } def load(sc: SparkContext, path: String): MatrixFactorizationModel = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val (className, formatVersion, metadata) = loadMetadata(sc, path) assert(className == thisClassName) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index ba115c278eb44..83b01807b3eb2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -209,7 +209,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { @Since("1.4.0") override def load(sc: SparkContext, path: String): IsotonicRegressionModel = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val (loadedClassName, version, metadata) = loadMetadata(sc, path) val isotonic = (metadata \ "isotonic").extract[Boolean] val classNameV1_0 = SaveLoadV1_0.thisClassName diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala index a95a54225a085..0e2dbe43e45bb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.regression -import org.json4s.{DefaultFormats, JValue} +import org.json4s.{DefaultFormats, Formats, JValue} import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD @@ -64,7 +64,7 @@ private[mllib] object RegressionModel { * @return numFeatures */ def getNumFeatures(metadata: JValue): Int = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats (metadata \ "numFeatures").extract[Int] } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index cdc998000c2fc..7a864b9d41efe 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -312,7 +312,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { */ @Since("1.3.0") override def load(sc: SparkContext, path: String): DecisionTreeModel = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path) val algo = (metadata \ "algo").extract[String] val numNodes = (metadata \ "numNodes").extract[Int] diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index b403770f5616a..579d6b77f62c3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -440,7 +440,7 @@ private[tree] object TreeEnsembleModel extends Logging { * Read metadata from the loaded JSON metadata. */ def readMetadata(metadata: JValue): Metadata = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats (metadata \ "metadata").extract[Metadata] } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala index c13bc4099ce70..74e8ae75caf3e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala @@ -117,7 +117,7 @@ private[mllib] object Loader { * @return (class name, version, metadata) */ def loadMetadata(sc: SparkContext, path: String): (String, String, JValue) = { - implicit val formats = DefaultFormats + implicit val formats: Formats = DefaultFormats val metadata = parse(sc.textFile(metadataPath(path)).first()) val clazz = (metadata \ "class").extract[String] val version = (metadata \ "version").extract[String] diff --git a/pom.xml b/pom.xml index 6ed16d88b0dc4..cb965b48dc486 100644 --- a/pom.xml +++ b/pom.xml @@ -2994,10 +2994,6 @@ `procedure syntax is deprecated` --> -Wconf:cat=deprecation&msg=procedure syntax is deprecated:e - - -Wconf:msg=Implicit definition should have explicit type:s diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 72ea06a8d0504..993509ebaee8e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -247,8 +247,6 @@ object SparkBuild extends PomBuild { "-Wconf:cat=deprecation&msg=Auto-application to \\`\\(\\)\\` is deprecated&site=org.apache.spark.streaming.kafka010.KafkaRDDSuite:s", // SPARK-35574 Prevent the recurrence of compilation warnings related to `procedure syntax is deprecated` "-Wconf:cat=deprecation&msg=procedure syntax is deprecated:e", - // SPARK-40497 Upgrade Scala to 2.13.11 and suppress `Implicit definition should have explicit type` - "-Wconf:msg=Implicit definition should have explicit type:s", // SPARK-45627 Symbol literals are deprecated in Scala 2.13 and it's a compile error in Scala 3. "-Wconf:cat=deprecation&msg=symbol literal is deprecated:e", // SPARK-45627 `enum`, `export` and `given` will become keywords in Scala 3, diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 1cfb3d9557049..8b88d38f033b4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -64,14 +64,14 @@ private[spark] abstract class YarnSchedulerBackend( private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint( YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint) - private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf) + private implicit val askTimeout: RpcTimeout = RpcUtils.askRpcTimeout(sc.conf) /** * Declare implicit single thread execution context for futures doRequestTotalExecutors and * doKillExecutors below, avoiding using the global execution context that may cause conflict * with user code's execution of futures. */ - private implicit val schedulerEndpointEC = ExecutionContext.fromExecutorService( + private implicit val schedulerEndpointEC: ExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonSingleThreadExecutor("yarn-scheduler-endpoint")) /** Application ID. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index 01ecebf1e6abe..a54f490dd1463 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -492,7 +492,8 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } } - implicit private def mapIntStrEncoder = ExpressionEncoder[Map[Int, String]]() + implicit private def mapIntStrEncoder: ExpressionEncoder[Map[Int, String]] = + ExpressionEncoder[Map[Int, String]]() test("SPARK-23588 CatalystToExternalMap should support interpreted execution") { // To get a resolved `CatalystToExternalMap` expression, we build a deserializer plan diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index f28df3839d0a8..266c369894eca 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -423,7 +423,8 @@ class ColumnPruningSuite extends PlanTest { comparePlans(optimized, expected) } - implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() + implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] = + ExpressionEncoder[T]() private val func = identity[Iterator[OtherTuple]] _ test("Column pruning on MapPartitions") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala index 1c818eee1224d..74cd917955284 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala @@ -37,8 +37,10 @@ class EliminateMapObjectsSuite extends PlanTest { } } - implicit private def intArrayEncoder = ExpressionEncoder[Array[Int]]() - implicit private def doubleArrayEncoder = ExpressionEncoder[Array[Double]]() + implicit private def intArrayEncoder: ExpressionEncoder[Array[Int]] = + ExpressionEncoder[Array[Int]]() + implicit private def doubleArrayEncoder: ExpressionEncoder[Array[Double]] = + ExpressionEncoder[Array[Double]]() test("SPARK-20254: Remove unnecessary data conversion for primitive array") { val intObjType = ObjectType(classOf[Array[Int]]) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala index 0d654cc1ac935..f1acac39a7672 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala @@ -35,8 +35,9 @@ class EliminateSerializationSuite extends PlanTest { EliminateSerialization) :: Nil } - implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() - implicit private def intEncoder = ExpressionEncoder[Int]() + implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] = + ExpressionEncoder[T]() + implicit private def intEncoder: ExpressionEncoder[Int] = ExpressionEncoder[Int]() test("back to back serialization") { val input = LocalRelation($"obj".obj(classOf[(Int, Int)])) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala index a1039b051ce45..8c4daacd1b2f0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala @@ -40,7 +40,8 @@ class ObjectSerializerPruningSuite extends PlanTest { RemoveNoopOperators) :: Nil } - implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() + implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] = + ExpressionEncoder[T]() test("collect struct types") { val dataTypes = Seq( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala index 4385777e79c09..fb29f6f17e756 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala @@ -37,7 +37,8 @@ class TypedFilterOptimizationSuite extends PlanTest { CombineTypedFilters) :: Nil } - implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() + implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] = + ExpressionEncoder[T]() val testRelation = LocalRelation($"_1".int, $"_2".int) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala index acf62d07bc398..2324c33806d48 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala @@ -47,7 +47,8 @@ class DistinctKeyVisitorSuite extends PlanTest { assert(plan.analyze.distinctKeys === distinctKeys) } - implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() + implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] = + ExpressionEncoder[T]() test("Aggregate's distinct attributes") { checkDistinctAttributes(t1.groupBy($"a", $"b")($"a", $"b", 1), Set(ExpressionSet(Seq(a, b)))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index d36aaef558663..0c4a8402754ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -22,6 +22,7 @@ import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream} import scala.annotation.varargs import scala.collection.mutable.{ArrayBuffer, HashSet} import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal @@ -241,7 +242,7 @@ class Dataset[T] private[sql]( exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer) } - private implicit def classTag = exprEnc.clsTag + private implicit def classTag: ClassTag[T] = exprEnc.clsTag // sqlContext must be val because a stable identifier is expected when you import implicits @transient lazy val sqlContext: SQLContext = sparkSession.sqlContext @@ -1627,7 +1628,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = withOrigin { - implicit val encoder = c1.encoder + implicit val encoder: ExpressionEncoder[U1] = c1.encoder val project = Project(c1.withInputType(exprEnc, logicalPlan.output).named :: Nil, logicalPlan) if (!encoder.isSerializedAsStructForTopLevel) { @@ -3469,7 +3470,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = withOrigin { - implicit val uEnc = encoder + implicit val uEnc: Encoder[U] = encoder withTypedPlan(MapElements[T, U](func, logicalPlan)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 22dfed3ea4c5c..83498740be402 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -43,8 +43,8 @@ class KeyValueGroupedDataset[K, V] private[sql]( private val groupingAttributes: Seq[Attribute]) extends Serializable { // Similar to [[Dataset]], we turn the passed in encoder to `ExpressionEncoder` explicitly. - private implicit val kExprEnc = encoderFor(kEncoder) - private implicit val vExprEnc = encoderFor(vEncoder) + private implicit val kExprEnc: ExpressionEncoder[K] = encoderFor(kEncoder) + private implicit val vExprEnc: ExpressionEncoder[V] = encoderFor(vEncoder) private def logicalPlan = queryExecution.analyzed private def sparkSession = queryExecution.sparkSession diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index ace22ade91595..a5c1f6613b4c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -22,7 +22,7 @@ import java.util.Locale import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.SparkUpgradeException @@ -55,7 +55,7 @@ object DataSourceUtils extends PredicateHelper { /** * Utility methods for converting partitionBy columns to options and back. */ - private implicit val formats = Serialization.formats(NoTypeHints) + private implicit val formats: Formats = Serialization.formats(NoTypeHints) def encodePartitioningColumns(columns: Seq[String]): String = { Serialization.write(columns) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala index 86ebc2e7ef148..cfe01f85cbe73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala @@ -216,7 +216,7 @@ class ApplyInPandasWithStatePythonRunner( STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER) stateMetadataBatch.rowIterator().asScala.take(numRows).flatMap { row => - implicit val formats = org.json4s.DefaultFormats + implicit val formats: Formats = org.json4s.DefaultFormats // NOTE: See ApplyInPandasWithStatePythonRunner.STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER // for the schema. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala index 1d428e68fe60e..dc52f33474c62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala @@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets._ import scala.io.{Source => IOSource} -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession @@ -82,7 +82,7 @@ case class CommitMetadata(nextBatchWatermarkMs: Long = 0) { } object CommitMetadata { - implicit val format = Serialization.formats(NoTypeHints) + implicit val format: Formats = Serialization.formats(NoTypeHints) def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 140367b3236ff..fa1beb9d15c75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -24,7 +24,7 @@ import scala.io.{Source => IOSource} import scala.reflect.ClassTag import org.apache.hadoop.fs.Path -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession @@ -49,9 +49,10 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( import CompactibleFileStreamLog._ - private implicit val formats = Serialization.formats(NoTypeHints) + private implicit val formats: Formats = Serialization.formats(NoTypeHints) /** Needed to serialize type T into JSON when using Jackson */ + @scala.annotation.nowarn private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass) protected val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 94ba8b8aa5153..d8aa31be47972 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -18,8 +18,6 @@ package org.apache.spark.sql.execution.streaming import org.apache.hadoop.fs.FileStatus -import org.json4s.NoTypeHints -import org.json4s.jackson.Serialization import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession @@ -90,8 +88,6 @@ class FileStreamSinkLog( _retentionMs: Option[Long] = None) extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) { - private implicit val formats = Serialization.formats(NoTypeHints) - protected override val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index ceadbca2b1226..14653864a2922 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -22,9 +22,6 @@ import java.util.Map.Entry import scala.collection.mutable -import org.json4s.NoTypeHints -import org.json4s.jackson.Serialization - import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry import org.apache.spark.sql.internal.SQLConf @@ -52,8 +49,6 @@ class FileStreamSourceLog( protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSourceLogDeletion - private implicit val formats = Serialization.formats(NoTypeHints) - // A fixed size log entry cache to cache the file entries belong to the compaction batch. It is // used to avoid scanning the compacted log file to retrieve it's own batch data. private val cacheSize = compactInterval diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala index a2b49d944a688..ba79c77f38677 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming import scala.util.control.Exception._ -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization /** @@ -34,7 +34,7 @@ case class FileStreamSourceOffset(logOffset: Long) extends Offset { } object FileStreamSourceOffset { - implicit val format = Serialization.formats(NoTypeHints) + implicit val format: Formats = Serialization.formats(NoTypeHints) def apply(offset: Offset): FileStreamSourceOffset = { offset match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index 3af9c9aebf33d..c9ade7b568e82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -246,7 +246,7 @@ private[sql] object GroupStateImpl { } def fromJson[S](value: Option[S], json: JValue): GroupStateImpl[S] = { - implicit val formats = org.json4s.DefaultFormats + implicit val formats: Formats = org.json4s.DefaultFormats val hmap = json.extract[Map[String, Any]] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index a7b0483ea08a7..79627030e1eba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import org.apache.commons.io.IOUtils import org.apache.hadoop.fs._ -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging @@ -50,9 +50,10 @@ import org.apache.spark.util.ArrayImplicits._ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String) extends MetadataLog[T] with Logging { - private implicit val formats = Serialization.formats(NoTypeHints) + private implicit val formats: Formats = Serialization.formats(NoTypeHints) /** Needed to serialize type T into JSON when using Jackson */ + @scala.annotation.nowarn private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass) // Avoid serializing generic sequences, see SPARK-17372 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index dea75e3ec4783..006d6221e55aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging @@ -89,7 +89,7 @@ case class OffsetSeqMetadata( } object OffsetSeqMetadata extends Logging { - private implicit val format = Serialization.formats(NoTypeHints) + private implicit val format: Formats = Serialization.formats(NoTypeHints) /** * These configs are related to streaming query execution and should not be changed across * batches of a streaming query. The values of these configs are persisted into the offset diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala index cb18988b46872..978cb3c34f606 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -25,7 +25,7 @@ import scala.util.control.NonFatal import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path} -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging @@ -45,7 +45,7 @@ case class StreamMetadata(id: String) { } object StreamMetadata extends Logging { - implicit val format = Serialization.formats(NoTypeHints) + implicit val format: Formats = Serialization.formats(NoTypeHints) /** Read the metadata from file if it exists */ def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala index 3f5f81bee4e85..5463a1fa4e997 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala @@ -25,7 +25,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.ListBuffer -import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.{DefaultFormats, Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.SparkEnv @@ -286,6 +286,6 @@ class TextSocketContinuousPartitionReader( } case class TextSocketOffset(offsets: List[Int]) extends Offset { - private implicit val formats = Serialization.formats(NoTypeHints) + private implicit val formats: Formats = Serialization.formats(NoTypeHints) override def json: String = Serialization.write(offsets) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala index dc97386d8fcf1..d0ba95ffd8a23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala @@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.ListBuffer -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.{SparkEnv, TaskContext} @@ -46,7 +46,7 @@ import org.apache.spark.util.RpcUtils class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2) extends MemoryStreamBase[A](sqlContext) with ContinuousStream { - private implicit val formats = Serialization.formats(NoTypeHints) + private implicit val formats: Formats = Serialization.formats(NoTypeHints) // ContinuousReader implementation @@ -182,6 +182,6 @@ class ContinuousMemoryStreamPartitionReader( case class ContinuousMemoryStreamOffset(partitionNums: Map[Int, Int]) extends Offset { - private implicit val formats = Serialization.formats(NoTypeHints) + private implicit val formats: Formats = Serialization.formats(NoTypeHints) override def json(): String = Serialization.write(partitionNums) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala index 6954e4534e494..8dca7d40704ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming.sources -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging @@ -130,7 +130,7 @@ case class RatePerMicroBatchStreamOffset(offset: Long, timestamp: Long) extends } object RatePerMicroBatchStreamOffset { - implicit val formats = Serialization.formats(NoTypeHints) + implicit val formats: Formats = Serialization.formats(NoTypeHints) def apply(json: String): RatePerMicroBatchStreamOffset = Serialization.read[RatePerMicroBatchStreamOffset](json) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala index 4ef39ddfd2557..b58c805af9d60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala @@ -24,7 +24,7 @@ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataOutputStream, Path} -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging @@ -64,8 +64,9 @@ case class OperatorStateMetadataV1( object OperatorStateMetadataV1 { - private implicit val formats = Serialization.formats(NoTypeHints) + private implicit val formats: Formats = Serialization.formats(NoTypeHints) + @scala.annotation.nowarn private implicit val manifest = Manifest .classType[OperatorStateMetadataV1](implicitly[ClassTag[OperatorStateMetadataV1]].runtimeClass) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 3dfc27b14c247..feb745b040239 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -26,7 +26,7 @@ import scala.ref.WeakReference import scala.util.Try import org.apache.hadoop.conf.Configuration -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.rocksdb.{RocksDB => NativeRocksDB, _} import org.rocksdb.CompressionType._ @@ -886,7 +886,7 @@ case class RocksDBMetrics( } object RocksDBMetrics { - val format = Serialization.formats(NoTypeHints) + val format: Formats = Serialization.formats(NoTypeHints) } /** Class to wrap RocksDB's native histogram */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index ae342813338c2..98c241eaca387 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -34,7 +34,7 @@ import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModul import org.apache.commons.io.{FilenameUtils, IOUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} -import org.json4s.NoTypeHints +import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.{SparkConf, SparkEnv} @@ -715,7 +715,7 @@ case class RocksDBCheckpointMetadata( object RocksDBCheckpointMetadata { val VERSION = 1 - implicit val format = Serialization.formats(NoTypeHints) + implicit val format: Formats = Serialization.formats(NoTypeHints) /** Used to convert between classes and JSON. */ lazy val mapper = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index 2de2d90e7dd29..5cdafc90cfecb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -231,7 +231,7 @@ case class FooAgg(s: Int) extends Aggregator[Row, Int, Int] { class DatasetAggregatorSuite extends QueryTest with SharedSparkSession { import testImplicits._ - private implicit val ordering = Ordering.by((c: AggData) => c.a -> c.b) + private implicit val ordering: Ordering[AggData] = Ordering.by((c: AggData) => c.a -> c.b) test("typed aggregation: complex result type") { val ds = Seq("a" -> 1, "a" -> 3, "b" -> 3).toDS() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 739d1f6827d5f..cd28c60d83c7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -71,7 +71,7 @@ class DatasetSuite extends QueryTest with AdaptiveSparkPlanHelper { import testImplicits._ - private implicit val ordering = Ordering.by((c: ClassData) => c.a -> c.b) + private implicit val ordering: Ordering[ClassData] = Ordering.by((c: ClassData) => c.a -> c.b) test("checkAnswer should compare map correctly") { val data = Seq((1, "2", Map(1 -> 2, 2 -> 1))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 5bf346d0e6285..e4869f19f2f79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -21,7 +21,7 @@ import java.util.UUID import scala.collection.mutable -import org.scalactic.TolerantNumerics +import org.scalactic.{Equality, TolerantNumerics} import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.Waiters.Waiter @@ -44,7 +44,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { import testImplicits._ // To make === between double tolerate inexact values - implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01) + implicit val doubleEquality: Equality[Double] = TolerantNumerics.tolerantDoubleEquality(0.01) after { spark.streams.active.foreach(_.stop()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index b2978f257866b..95abfcdd6747b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -29,7 +29,7 @@ import org.apache.commons.io.FileUtils import org.apache.commons.lang3.RandomStringUtils import org.apache.hadoop.fs.Path import org.mockito.Mockito.when -import org.scalactic.TolerantNumerics +import org.scalactic.{Equality, TolerantNumerics} import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatestplus.mockito.MockitoSugar @@ -60,7 +60,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi import testImplicits._ // To make === between double tolerate inexact values - implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01) + implicit val doubleEquality: Equality[Double] = TolerantNumerics.tolerantDoubleEquality(0.01) after { sqlContext.streams.active.foreach(_.stop()) diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala index e94faf5e06686..fc9e4513d12ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala @@ -48,7 +48,7 @@ class SqlResourceWithActualMetricsSuite // Exclude nodes which may not have the metrics val excludedNodes = List("WholeStageCodegen", "Project", "SerializeFromObject") - implicit val formats = new DefaultFormats { + implicit val formats: DefaultFormats = new DefaultFormats { override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss") } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index ddfe01bb7ac22..805d93bee0ee5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -25,7 +25,7 @@ import java.util.{Locale, UUID} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise} import scala.concurrent.duration._ import scala.io.Source import scala.jdk.CollectionConverters._ @@ -441,7 +441,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftServer2Test { s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") queries.foreach(statement.execute) - implicit val ec = ExecutionContext.fromExecutorService( + implicit val ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonSingleThreadExecutor("test-jdbc-cancel")) try { // Start a very-long-running query that will take hours to finish, then cancel it in order diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala index 92cfd7d40338c..9681245baa0fe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala @@ -89,5 +89,5 @@ case class Time(private val millis: Long) { } object Time { - implicit val ordering = Ordering.by((time: Time) => time.millis) + implicit val ordering: Ordering[Time] = Ordering.by((time: Time) => time.millis) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 7087f167003d2..e5b98dd714b3d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.receiver -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} import scala.concurrent.duration._ import org.apache.hadoop.conf.Configuration @@ -159,8 +159,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // For processing futures used in parallel block storing into block manager and write ahead log // # threads = 2, so that both writing to BM and WAL can proceed in parallel - implicit private val executionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName)) + implicit private val executionContext: ExecutionContextExecutorService = ExecutionContext + .fromExecutorService(ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName)) /** * This implementation stores the block into the block manager as well as a write ahead log.