Skip to content

Commit

Permalink
[SPARK-45629][CORE][SQL][CONNECT][ML][STREAMING][BUILD][EXAMPLES] Fix…
Browse files Browse the repository at this point in the history
… `Implicit definition should have explicit type`

### What changes were proposed in this pull request?
This PR aims to fix `Implicit definition should have explicit type` in Scala 2.13.

This pr includes:
1. Declaration types for global variables of implicit
2. Add scala.annotation.warn

### Why are the changes needed?

- For implicit global variables without explicit type declaration, will get warnning :
warning: Implicit definition should have explicit type (inferred String) [quickfixable]
- No modifications are required for local variables.
Additionally, to handle cases involving reflection-related types like ClassTag in implicit variables, the [scala.annotation.warn](https://github.com/scala.annotation.warn) annotation is used to suppress the warning.

Furthermore, warnings generated in Spark will be treated as errors:

[error] ... Implicit definition should have explicit type (inferred org.json4s.DefaultFormats.type) [quickfixable]
...
[error]   implicit val formats = org.json4s.DefaultFormats

Jira link:
SPARK-45314: https://issues.apache.org/jira/browse/SPARK-45629

Related issue link about `implicit` :
scala/bug#5265

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Most of the testing is completed through CI, and the example module is locally compiled and tested in IDEA Additionally, there are some writing changes that are verified through demo code

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #43526 from laglangyue/SPARK-45629.

Lead-authored-by: tangjiafu <jiafu.tang@qq.com>
Co-authored-by: tangjiafu <tangjiafu@corp.netease.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
2 people authored and LuciferYang committed Dec 1, 2023
1 parent 042d854 commit a186324
Show file tree
Hide file tree
Showing 79 changed files with 151 additions and 155 deletions.
Expand Up @@ -24,7 +24,7 @@ import scala.reflect.ClassTag
import com.google.rpc.ErrorInfo
import io.grpc.{ManagedChannel, StatusRuntimeException}
import io.grpc.protobuf.StatusProto
import org.json4s.DefaultFormats
import org.json4s.{DefaultFormats, Formats}
import org.json4s.jackson.JsonMethods

import org.apache.spark.{QueryContext, QueryContextType, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
Expand Down Expand Up @@ -354,7 +354,7 @@ private[client] object GrpcExceptionConverter {
* truncated error message.
*/
private def errorInfoToThrowable(info: ErrorInfo, message: String): Throwable = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
val classes =
JsonMethods.parse(info.getMetadataOrDefault("classes", "[]")).extract[Array[String]]
val errorClass = info.getMetadataOrDefault("errorClass", null)
Expand Down
Expand Up @@ -21,14 +21,14 @@ import scala.collection.mutable.HashMap
import scala.util.control.NonFatal

import org.apache.kafka.common.TopicPartition
import org.json4s.NoTypeHints
import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization

/**
* Utilities for converting Kafka related objects to and from json.
*/
private object JsonUtils {
private implicit val formats = Serialization.formats(NoTypeHints)
private implicit val formats: Formats = Serialization.formats(NoTypeHints)

/**
* Read TopicPartitions from json string
Expand Down Expand Up @@ -96,10 +96,8 @@ private object JsonUtils {
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
implicit val order = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
implicit val order: Ordering[TopicPartition] = (x: TopicPartition, y: TopicPartition) => {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
Expand Down
Expand Up @@ -30,6 +30,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.sys.process._

import org.json4s.Formats
import org.json4s.jackson.JsonMethods

import org.apache.spark.{SparkConf, SparkContext}
Expand Down Expand Up @@ -340,7 +341,7 @@ private object FaultToleranceTest extends App with Logging {
private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats = org.json4s.DefaultFormats
implicit val formats: Formats = org.json4s.DefaultFormats
var state: RecoveryState.Value = _
var liveWorkerIPs: List[String] = _
var numLiveApps = 0
Expand Down Expand Up @@ -383,7 +384,7 @@ private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile
private class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats = org.json4s.DefaultFormats
implicit val formats: Formats = org.json4s.DefaultFormats

logDebug("Created worker: " + this)

Expand Down
Expand Up @@ -23,7 +23,7 @@ import java.nio.file.Files
import scala.collection.mutable
import scala.util.control.NonFatal

import org.json4s.{DefaultFormats, Extraction}
import org.json4s.{DefaultFormats, Extraction, Formats}
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.SparkException
Expand Down Expand Up @@ -115,7 +115,7 @@ private[spark] object StandaloneResourceUtils extends Logging {
private def writeResourceAllocationJson[T](
allocations: Seq[T],
jsonFile: File): Unit = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
val allocationJson = Extraction.decompose(allocations)
Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes())
}
Expand Down
Expand Up @@ -27,7 +27,6 @@ import scala.util.{Failure, Success}
import scala.util.control.NonFatal

import io.netty.util.internal.PlatformDependent
import org.json4s.DefaultFormats

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
Expand Down Expand Up @@ -60,8 +59,6 @@ private[spark] class CoarseGrainedExecutorBackend(

import CoarseGrainedExecutorBackend._

private implicit val formats = DefaultFormats

private[spark] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None
Expand Down
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.resource

import scala.util.control.NonFatal

import org.json4s.{DefaultFormats, Extraction, JValue}
import org.json4s.{DefaultFormats, Extraction, Formats, JValue}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkException
Expand Down Expand Up @@ -71,7 +71,7 @@ private[spark] object ResourceInformation {
* Parses a JSON string into a [[ResourceInformation]] instance.
*/
def parseJson(json: String): ResourceInformation = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
try {
parse(json).extract[ResourceInformationJson].toResourceInformation
} catch {
Expand All @@ -82,7 +82,7 @@ private[spark] object ResourceInformation {
}

def parseJson(json: JValue): ResourceInformation = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
try {
json.extract[ResourceInformationJson].toResourceInformation
} catch {
Expand Down
Expand Up @@ -22,7 +22,7 @@ import java.util.Optional

import scala.util.control.NonFatal

import org.json4s.DefaultFormats
import org.json4s.{DefaultFormats, Formats}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{SparkConf, SparkException}
Expand Down Expand Up @@ -253,7 +253,7 @@ private[spark] object ResourceUtils extends Logging {

def parseAllocatedFromJsonFile(resourcesFile: String): Seq[ResourceAllocation] = {
withResourcesJson[ResourceAllocation](resourcesFile) { json =>
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
parse(json).extract[Seq[ResourceAllocation]]
}
}
Expand Down
Expand Up @@ -31,7 +31,7 @@ private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] {

private[spark] class AppStatusSource extends Source {

override implicit val metricRegistry = new MetricRegistry()
override implicit val metricRegistry: MetricRegistry = new MetricRegistry()

override val sourceName = "appStatus"

Expand Down
Expand Up @@ -22,7 +22,7 @@ import java.util.{HashMap => JHashMap}
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, TimeoutException}
import scala.jdk.CollectionConverters._
import scala.util.Random
import scala.util.control.NonFatal
Expand Down Expand Up @@ -100,7 +100,8 @@ class BlockManagerMasterEndpoint(

private val askThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
private implicit val askExecutionContext: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(askThreadPool)

private val topologyMapper = {
val topologyMapperClassName = conf.get(
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}

import org.apache.spark.{MapOutputTracker, SparkEnv}
import org.apache.spark.internal.Logging
Expand All @@ -38,7 +38,8 @@ class BlockManagerStorageEndpoint(

private val asyncThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-storage-async-thread-pool", 100)
private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
private implicit val asyncExecutionContext: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(asyncThreadPool)

// Operations that involve removing blocks may be slow and should be done asynchronously
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down
Expand Up @@ -379,7 +379,7 @@ private[spark] object ThreadUtils {
def parmap[I, O](in: Seq[I], prefix: String, maxThreads: Int)(f: I => O): Seq[O] = {
val pool = newForkJoinPool(prefix, maxThreads)
try {
implicit val ec = ExecutionContext.fromExecutor(pool)
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(pool)

val futures = in.map(x => Future(f(x)))
val futureSeq = Future.sequence(futures)
Expand Down
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.storage._
abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
{
implicit val defaultTimeout = timeout(10.seconds)
implicit val defaultTimeout: PatienceConfiguration.Timeout = timeout(10.seconds)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
Expand Down
Expand Up @@ -23,7 +23,7 @@ import java.util.function.Supplier

import scala.concurrent.duration._

import org.json4s.{DefaultFormats, Extraction}
import org.json4s.{DefaultFormats, Extraction, Formats}
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.any
Expand Down Expand Up @@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with P
}
def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)

implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats

private val _generateWorkerId = PrivateMethod[String](Symbol("generateWorkerId"))

Expand Down
Expand Up @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration._

import org.json4s.{DefaultFormats, Extraction}
import org.json4s.{DefaultFormats, Extraction, Formats}
import org.json4s.JsonAST.{JArray, JObject}
import org.json4s.JsonDSL._
import org.mockito.ArgumentMatchers.any
Expand All @@ -50,7 +50,7 @@ import org.apache.spark.util.{SerializableBuffer, ThreadUtils, Utils}
class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
with LocalSparkContext with MockitoSugar {

implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats

def createSparkConf(): SparkConf = {
new SparkConf()
Expand Down
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.memory
import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.Duration

import org.mockito.ArgumentMatchers.{any, anyLong}
Expand Down Expand Up @@ -148,7 +148,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
// -- Tests of sharing of execution memory between tasks ----------------------------------------
// Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.

implicit val ec = ExecutionContext.global
implicit val ec: ExecutionContextExecutor = ExecutionContext.global

test("single task requesting on-heap execution memory") {
val manager = createMemoryManager(1000L)
Expand Down
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.storage

import java.util.Properties

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.language.implicitConversions
import scala.reflect.ClassTag

Expand All @@ -31,7 +31,7 @@ import org.apache.spark.util.ThreadUtils

class BlockInfoManagerSuite extends SparkFunSuite {

private implicit val ec = ExecutionContext.global
private implicit val ec: ExecutionContextExecutor = ExecutionContext.global
private var blockInfoManager: BlockInfoManager = _

override protected def beforeEach(): Unit = {
Expand Down
Expand Up @@ -83,7 +83,7 @@ private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {
class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers {

implicit var webDriver: WebDriver = _
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats


override def beforeAll(): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala
Expand Up @@ -22,14 +22,14 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.duration._

import org.scalatest.concurrent.{ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}

import org.apache.spark.SparkFunSuite

class KeyLockSuite extends SparkFunSuite with TimeLimits {

// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
private implicit val defaultSignaler = ThreadSignaler
private implicit val defaultSignaler: Signaler = ThreadSignaler

private val foreverMs = 60 * 1000L

Expand Down
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.examples.sql

// $example on:programmatic_schema$
import org.apache.spark.sql.Row
import org.apache.spark.sql.{Encoder, Row}
// $example off:programmatic_schema$
// $example on:init_session$
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -220,7 +220,8 @@ object SparkSQLExample {
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
implicit val mapEncoder: Encoder[Map[String, Any]] =
org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

Expand Down
4 changes: 2 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import breeze.linalg.normalize
import breeze.numerics.exp
import org.apache.hadoop.fs.Path
import org.json4s.DefaultFormats
import org.json4s.{DefaultFormats, Formats}
import org.json4s.JsonAST.JObject
import org.json4s.jackson.JsonMethods._

Expand Down Expand Up @@ -385,7 +385,7 @@ private object LDAParams {
def getAndSetParams(model: LDAParams, metadata: Metadata): Unit = {
VersionUtils.majorMinorVersion(metadata.sparkVersion) match {
case (1, 6) =>
implicit val format = DefaultFormats
implicit val format: Formats = DefaultFormats
metadata.params match {
case JObject(pairs) =>
pairs.foreach { case (paramName, jsonValue) =>
Expand Down
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.ml.linalg

import org.json4s.DefaultFormats
import org.json4s.{DefaultFormats, Formats}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render}

Expand All @@ -31,7 +31,7 @@ private[ml] object JsonMatrixConverter {
* Parses the JSON representation of a Matrix into a [[Matrix]].
*/
def fromJson(json: String): Matrix = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
val jValue = parseJson(json)
(jValue \ "type").extract[Int] match {
case 0 => // sparse
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ml.linalg

import org.json4s.DefaultFormats
import org.json4s.{DefaultFormats, Formats}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render}

Expand All @@ -29,7 +29,7 @@ private[ml] object JsonVectorConverter {
* Parses the JSON representation of a vector into a [[Vector]].
*/
def fromJson(json: String): Vector = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
val jValue = parseJson(json)
(jValue \ "type").extract[Int] match {
case 0 => // sparse
Expand Down

0 comments on commit a186324

Please sign in to comment.