Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45629][CORE][SQL][CONNECT][ML][STREAMING][BUILD][EXAMPLES]Fix Implicit definition should have explicit type #43526

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,7 +24,7 @@ import scala.reflect.ClassTag
import com.google.rpc.ErrorInfo
import io.grpc.{ManagedChannel, StatusRuntimeException}
import io.grpc.protobuf.StatusProto
import org.json4s.DefaultFormats
import org.json4s.{DefaultFormats, Formats}
import org.json4s.jackson.JsonMethods

import org.apache.spark.{QueryContext, QueryContextType, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
Expand Down Expand Up @@ -354,7 +354,7 @@ private[client] object GrpcExceptionConverter {
* truncated error message.
*/
private def errorInfoToThrowable(info: ErrorInfo, message: String): Throwable = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
val classes =
JsonMethods.parse(info.getMetadataOrDefault("classes", "[]")).extract[Array[String]]
val errorClass = info.getMetadataOrDefault("errorClass", null)
Expand Down
Expand Up @@ -21,14 +21,14 @@ import scala.collection.mutable.HashMap
import scala.util.control.NonFatal

import org.apache.kafka.common.TopicPartition
import org.json4s.NoTypeHints
import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization

/**
* Utilities for converting Kafka related objects to and from json.
*/
private object JsonUtils {
private implicit val formats = Serialization.formats(NoTypeHints)
private implicit val formats: Formats = Serialization.formats(NoTypeHints)

/**
* Read TopicPartitions from json string
Expand Down Expand Up @@ -96,10 +96,8 @@ private object JsonUtils {
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
implicit val order = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
implicit val order: Ordering[TopicPartition] = (x: TopicPartition, y: TopicPartition) => {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
Expand Down
Expand Up @@ -30,6 +30,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.sys.process._

import org.json4s.Formats
import org.json4s.jackson.JsonMethods

import org.apache.spark.{SparkConf, SparkContext}
Expand Down Expand Up @@ -340,7 +341,7 @@ private object FaultToleranceTest extends App with Logging {
private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FaultToleranceTest is a strange file, it's a test class, but it's in the production directory (so it probably has never been run by GA task). Can we directly delete this file? @srowen @dongjoon-hyun @HyukjinKwon

extends Logging {

implicit val formats = org.json4s.DefaultFormats
implicit val formats: Formats = org.json4s.DefaultFormats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this seems correct, there is no mechanism to guarantee the correctness and availability of this file.

var state: RecoveryState.Value = _
var liveWorkerIPs: List[String] = _
var numLiveApps = 0
Expand Down Expand Up @@ -383,7 +384,7 @@ private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile
private class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats = org.json4s.DefaultFormats
implicit val formats: Formats = org.json4s.DefaultFormats

logDebug("Created worker: " + this)

Expand Down
Expand Up @@ -23,7 +23,7 @@ import java.nio.file.Files
import scala.collection.mutable
import scala.util.control.NonFatal

import org.json4s.{DefaultFormats, Extraction}
import org.json4s.{DefaultFormats, Extraction, Formats}
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.SparkException
Expand Down Expand Up @@ -115,7 +115,7 @@ private[spark] object StandaloneResourceUtils extends Logging {
private def writeResourceAllocationJson[T](
allocations: Seq[T],
jsonFile: File): Unit = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
val allocationJson = Extraction.decompose(allocations)
Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes())
}
Expand Down
Expand Up @@ -27,7 +27,6 @@ import scala.util.{Failure, Success}
import scala.util.control.NonFatal

import io.netty.util.internal.PlatformDependent
import org.json4s.DefaultFormats

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
Expand Down Expand Up @@ -60,8 +59,6 @@ private[spark] class CoarseGrainedExecutorBackend(

import CoarseGrainedExecutorBackend._

private implicit val formats = DefaultFormats

private[spark] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None
Expand Down
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.resource

import scala.util.control.NonFatal

import org.json4s.{DefaultFormats, Extraction, JValue}
import org.json4s.{DefaultFormats, Extraction, Formats, JValue}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkException
Expand Down Expand Up @@ -70,7 +70,7 @@ private[spark] object ResourceInformation {
* Parses a JSON string into a [[ResourceInformation]] instance.
*/
def parseJson(json: String): ResourceInformation = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
try {
parse(json).extract[ResourceInformationJson].toResourceInformation
} catch {
Expand All @@ -81,7 +81,7 @@ private[spark] object ResourceInformation {
}

def parseJson(json: JValue): ResourceInformation = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
try {
json.extract[ResourceInformationJson].toResourceInformation
} catch {
Expand Down
Expand Up @@ -22,7 +22,7 @@ import java.util.Optional

import scala.util.control.NonFatal

import org.json4s.DefaultFormats
import org.json4s.{DefaultFormats, Formats}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{SparkConf, SparkException}
Expand Down Expand Up @@ -253,7 +253,7 @@ private[spark] object ResourceUtils extends Logging {

def parseAllocatedFromJsonFile(resourcesFile: String): Seq[ResourceAllocation] = {
withResourcesJson[ResourceAllocation](resourcesFile) { json =>
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
parse(json).extract[Seq[ResourceAllocation]]
}
}
Expand Down
Expand Up @@ -31,7 +31,7 @@ private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] {

private[spark] class AppStatusSource extends Source {

override implicit val metricRegistry = new MetricRegistry()
override implicit val metricRegistry: MetricRegistry = new MetricRegistry()

override val sourceName = "appStatus"

Expand Down
Expand Up @@ -22,7 +22,7 @@ import java.util.{HashMap => JHashMap}
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, TimeoutException}
import scala.jdk.CollectionConverters._
import scala.util.Random
import scala.util.control.NonFatal
Expand Down Expand Up @@ -100,7 +100,8 @@ class BlockManagerMasterEndpoint(

private val askThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
private implicit val askExecutionContext: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(askThreadPool)

private val topologyMapper = {
val topologyMapperClassName = conf.get(
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}

import org.apache.spark.{MapOutputTracker, SparkEnv}
import org.apache.spark.internal.Logging
Expand All @@ -38,7 +38,8 @@ class BlockManagerStorageEndpoint(

private val asyncThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-storage-async-thread-pool", 100)
private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
private implicit val asyncExecutionContext: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(asyncThreadPool)

// Operations that involve removing blocks may be slow and should be done asynchronously
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down
Expand Up @@ -379,7 +379,7 @@ private[spark] object ThreadUtils {
def parmap[I, O](in: Seq[I], prefix: String, maxThreads: Int)(f: I => O): Seq[O] = {
val pool = newForkJoinPool(prefix, maxThreads)
try {
implicit val ec = ExecutionContext.fromExecutor(pool)
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(pool)

val futures = in.map(x => Future(f(x)))
val futureSeq = Future.sequence(futures)
Expand Down
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.storage._
abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
{
implicit val defaultTimeout = timeout(10.seconds)
implicit val defaultTimeout: PatienceConfiguration.Timeout = timeout(10.seconds)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
Expand Down
Expand Up @@ -23,7 +23,7 @@ import java.util.function.Supplier

import scala.concurrent.duration._

import org.json4s.{DefaultFormats, Extraction}
import org.json4s.{DefaultFormats, Extraction, Formats}
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.any
Expand Down Expand Up @@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with P
}
def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)

implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats

private val _generateWorkerId = PrivateMethod[String](Symbol("generateWorkerId"))

Expand Down
Expand Up @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration._

import org.json4s.{DefaultFormats, Extraction}
import org.json4s.{DefaultFormats, Extraction, Formats}
import org.json4s.JsonAST.{JArray, JObject}
import org.json4s.JsonDSL._
import org.mockito.ArgumentMatchers.any
Expand All @@ -50,7 +50,7 @@ import org.apache.spark.util.{SerializableBuffer, ThreadUtils, Utils}
class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
with LocalSparkContext with MockitoSugar {

implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats

def createSparkConf(): SparkConf = {
new SparkConf()
Expand Down
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.memory
import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.Duration

import org.mockito.ArgumentMatchers.{any, anyLong}
Expand Down Expand Up @@ -148,7 +148,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
// -- Tests of sharing of execution memory between tasks ----------------------------------------
// Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.

implicit val ec = ExecutionContext.global
implicit val ec: ExecutionContextExecutor = ExecutionContext.global

test("single task requesting on-heap execution memory") {
val manager = createMemoryManager(1000L)
Expand Down
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.storage

import java.util.Properties

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.language.implicitConversions
import scala.reflect.ClassTag

Expand All @@ -31,7 +31,7 @@ import org.apache.spark.util.ThreadUtils

class BlockInfoManagerSuite extends SparkFunSuite {

private implicit val ec = ExecutionContext.global
private implicit val ec: ExecutionContextExecutor = ExecutionContext.global
private var blockInfoManager: BlockInfoManager = _

override protected def beforeEach(): Unit = {
Expand Down
Expand Up @@ -83,7 +83,7 @@ private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {
class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers {

implicit var webDriver: WebDriver = _
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats


override def beforeAll(): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala
Expand Up @@ -22,14 +22,14 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.duration._

import org.scalatest.concurrent.{ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}

import org.apache.spark.SparkFunSuite

class KeyLockSuite extends SparkFunSuite with TimeLimits {

// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
private implicit val defaultSignaler = ThreadSignaler
private implicit val defaultSignaler: Signaler = ThreadSignaler

private val foreverMs = 60 * 1000L

Expand Down
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.examples.sql

// $example on:programmatic_schema$
import org.apache.spark.sql.Row
import org.apache.spark.sql.{Encoder, Row}
// $example off:programmatic_schema$
// $example on:init_session$
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -220,7 +220,8 @@ object SparkSQLExample {
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
implicit val mapEncoder: Encoder[Map[String, Any]] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the modifications to the Example section, we can perform manual verification to ensure their availability and correctness. This should also be reflected in the pr description, specifically in the section about how testing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

Expand Down
4 changes: 2 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import breeze.linalg.normalize
import breeze.numerics.exp
import org.apache.hadoop.fs.Path
import org.json4s.DefaultFormats
import org.json4s.{DefaultFormats, Formats}
import org.json4s.JsonAST.JObject
import org.json4s.jackson.JsonMethods._

Expand Down Expand Up @@ -385,7 +385,7 @@ private object LDAParams {
def getAndSetParams(model: LDAParams, metadata: Metadata): Unit = {
VersionUtils.majorMinorVersion(metadata.sparkVersion) match {
case (1, 6) =>
implicit val format = DefaultFormats
implicit val format: Formats = DefaultFormats
metadata.params match {
case JObject(pairs) =>
pairs.foreach { case (paramName, jsonValue) =>
Expand Down
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.ml.linalg

import org.json4s.DefaultFormats
import org.json4s.{DefaultFormats, Formats}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render}

Expand All @@ -29,7 +29,7 @@ private[ml] object JsonMatrixConverter {
* Parses the JSON representation of a Matrix into a [[Matrix]].
*/
def fromJson(json: String): Matrix = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
val jValue = parseJson(json)
(jValue \ "type").extract[Int] match {
case 0 => // sparse
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ml.linalg

import org.json4s.DefaultFormats
import org.json4s.{DefaultFormats, Formats}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render}

Expand All @@ -27,7 +27,7 @@ private[ml] object JsonVectorConverter {
* Parses the JSON representation of a vector into a [[Vector]].
*/
def fromJson(json: String): Vector = {
implicit val formats = DefaultFormats
implicit val formats: Formats = DefaultFormats
val jValue = parseJson(json)
(jValue \ "type").extract[Int] match {
case 0 => // sparse
Expand Down