Skip to content

Commit

Permalink
remove toJson and use default serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Jun 12, 2019
1 parent 83ceb87 commit 94896ed
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 20 deletions.
16 changes: 7 additions & 9 deletions core/src/main/scala/org/apache/spark/ResourceUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import java.io.{BufferedInputStream, File, FileInputStream}

import com.fasterxml.jackson.databind.exc.MismatchedInputException
import org.json4s.{DefaultFormats, MappingException}
import org.json4s.JsonAST.JObject
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.internal.Logging
Expand All @@ -49,17 +47,17 @@ private[spark] case class ResourceRequest(

private[spark] case class TaskResourceRequirement(resourceName: String, amount: Int)

/**
* Case class representing allocated resource addresses for a specific resource.
* Cluster manager uses the JSON serialization of this case class to pass allocated resource info to
* driver and executors.
*
* @see [[org.apache.spark.executor.CoarseGrainedExecutorBackend.resourcesFileOpt]]
*/
private[spark] case class ResourceAllocation(id: ResourceID, addresses: Seq[String]) {
def toResourceInfo: ResourceInformation = {
new ResourceInformation(id.resourceName, addresses.toArray)
}

def toJson: JObject = {
("id" ->
("componentName" -> id.componentName) ~
("resourceName" -> id.resourceName)) ~
("addresses" -> addresses)
}
}

private[spark] object ResourceUtils extends Logging {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import scala.util.Try

import com.google.common.io.{ByteStreams, Files}
import org.apache.log4j.PropertyConfigurator
import org.json4s.JsonAST.JArray
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.executor.TaskMetrics
Expand Down Expand Up @@ -316,7 +316,7 @@ private[spark] object TestUtils {
current ++ current.filter(_.isDirectory).flatMap(recursiveList)
}

def writeJsonToFile(dir: File, strToWrite: JArray): String = {
def writeJsonToFile(dir: File, strToWrite: JValue): String = {
val f1 = File.createTempFile("jsonResourceFile", "", dir)
JavaFiles.write(f1.toPath(), compact(render(strToWrite)).getBytes())
f1.getPath()
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
import org.json4s.JsonAST.JArray
import org.json4s.{DefaultFormats, Extraction}
import org.scalatest.Matchers._
import org.scalatest.concurrent.Eventually

Expand Down Expand Up @@ -765,9 +765,10 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
val scriptPath = writeStringToFileAndSetPermissions(gpuFile,
"""'{"name": "gpu","addresses":["5", "6"]}'""")

implicit val formats = DefaultFormats
val gpusAllocated =
ResourceAllocation(DRIVER_GPU_ID, Seq("0", "1", "8"))
val ja = JArray(List(gpusAllocated.toJson))
val ja = Extraction.decompose(Seq(gpusAllocated))
val resourcesFile = writeJsonToFile(dir, ja)

val conf = new SparkConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import java.util.Properties
import scala.collection.mutable
import scala.concurrent.duration._

import org.json4s.{DefaultFormats, Extraction}
import org.json4s.JsonAST.{JArray, JObject}
import org.json4s.JsonDSL._
import org.mockito.Mockito.when
import org.scalatest.concurrent.Eventually.{eventually, timeout}
import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.ResourceInformation

import org.apache.spark.ResourceUtils._
import org.apache.spark.TestResourceIDs._
import org.apache.spark.TestUtils._
Expand All @@ -45,6 +46,8 @@ import org.apache.spark.util.{SerializableBuffer, Utils}
class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
with LocalSparkContext with MockitoSugar {

implicit val formats = DefaultFormats

test("parsing no resources") {
val conf = new SparkConf
conf.set(TASK_GPU_ID.amountConf, "2")
Expand Down Expand Up @@ -78,7 +81,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
4, Seq.empty[URL], env, None)
withTempDir { tmpDir =>
val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = JArray(List(ra.toJson))
val ja = Extraction.decompose(Seq(ra))
val f1 = writeJsonToFile(tmpDir, ja)
val parsedResources = backend.parseOrFindResources(Some(f1))

Expand Down Expand Up @@ -106,7 +109,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val fpgaArgs =
ResourceAllocation(EXECUTOR_FPGA_ID, Seq("f1", "f2", "f3"))
val ja = JArray(List(gpuArgs.toJson, fpgaArgs.toJson))
val ja = Extraction.decompose(Seq(gpuArgs, fpgaArgs))
val f1 = writeJsonToFile(tmpDir, ja)
val parsedResources = backend.parseOrFindResources(Some(f1))

Expand All @@ -133,7 +136,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
// not enough gpu's on the executor
withTempDir { tmpDir =>
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0"))
val ja = JArray(List(gpuArgs.toJson))
val ja = Extraction.decompose(Seq(gpuArgs))
val f1 = writeJsonToFile(tmpDir, ja)

var error = intercept[IllegalArgumentException] {
Expand All @@ -147,7 +150,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
// missing resource on the executor
withTempDir { tmpDir =>
val fpga = ResourceAllocation(EXECUTOR_FPGA_ID, Seq("0"))
val ja = JArray(List(fpga.toJson))
val ja = Extraction.decompose(Seq(fpga))
val f1 = writeJsonToFile(tmpDir, ja)

var error = intercept[SparkException] {
Expand All @@ -172,7 +175,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
// executor resources < required
withTempDir { tmpDir =>
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = JArray(List(gpuArgs.toJson))
val ja = Extraction.decompose(Seq(gpuArgs))
val f1 = writeJsonToFile(tmpDir, ja)

var error = intercept[IllegalArgumentException] {
Expand Down Expand Up @@ -230,7 +233,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
4, Seq.empty[URL], env, None)
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = JArray(List(gpuArgs.toJson))
val ja = Extraction.decompose(Seq(gpuArgs))
val f1 = writeJsonToFile(dir, ja)
val parsedResources = backend.parseOrFindResources(Some(f1))

Expand Down

0 comments on commit 94896ed

Please sign in to comment.