From 244bbae71c2aa0b9f173ad7ac16ad0440eaab99c Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Wed, 14 Jun 2017 22:55:16 +0800 Subject: [PATCH 1/3] update JsonProtocol --- .../org/apache/spark/deploy/JsonProtocol.scala | 12 +++++++++--- .../org/apache/spark/deploy/DeployTestUtils.scala | 4 ++-- .../apache/spark/deploy/JsonProtocolSuite.scala | 14 ++++++++++---- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 220b20bf7cbd1..72c04826b8a85 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -29,6 +29,7 @@ private[deploy] object JsonProtocol { ("id" -> obj.id) ~ ("host" -> obj.host) ~ ("port" -> obj.port) ~ + ("address" -> obj.hostPort) ~ ("webuiaddress" -> obj.webUiAddress) ~ ("cores" -> obj.cores) ~ ("coresused" -> obj.coresUsed) ~ @@ -44,7 +45,7 @@ private[deploy] object JsonProtocol { ("starttime" -> obj.startTime) ~ ("id" -> obj.id) ~ ("name" -> obj.desc.name) ~ - ("cores" -> obj.desc.maxCores) ~ + ("cores" -> obj.coresGranted) ~ ("user" -> obj.desc.user) ~ ("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~ ("submitdate" -> obj.submitDate.toString) ~ @@ -54,7 +55,7 @@ private[deploy] object JsonProtocol { def writeApplicationDescription(obj: ApplicationDescription): JObject = { ("name" -> obj.name) ~ - ("cores" -> obj.maxCores) ~ + ("cores" -> obj.maxCores.getOrElse(0)) ~ ("memoryperslave" -> obj.memoryPerExecutorMB) ~ ("user" -> obj.user) ~ ("command" -> obj.command.toString) @@ -72,13 +73,17 @@ private[deploy] object JsonProtocol { ("starttime" -> obj.startTime.toString) ~ ("state" -> obj.state.toString) ~ ("cores" -> obj.desc.cores) ~ - ("memory" -> obj.desc.mem) + ("memory" -> obj.desc.mem) ~ + ("submitdate" -> obj.submitDate.toString) ~ + ("worker" -> obj.worker.map(_.id).getOrElse("None")) ~ + ("mainclass" -> obj.desc.command.arguments(2)) } def writeMasterState(obj: MasterStateResponse): JObject = { val aliveWorkers = obj.workers.filter(_.isAlive()) ("url" -> obj.uri) ~ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ + ("aliveworkers" -> aliveWorkers.length) ~ ("cores" -> aliveWorkers.map(_.cores).sum) ~ ("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~ ("memory" -> aliveWorkers.map(_.memory).sum) ~ @@ -86,6 +91,7 @@ private[deploy] object JsonProtocol { ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~ ("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~ + ("completeddrivers" -> obj.completedDrivers.toList.map(writeDriverInfo)) ~ ("status" -> obj.status.toString) } diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala index 9c13c15281a42..55a541d60ea3c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala @@ -39,7 +39,7 @@ private[deploy] object DeployTestUtils { } def createDriverCommand(): Command = new Command( - "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"), + "org.apache.spark.FakeClass", Seq("WORKER_URL", "USER_JAR", "mainClass"), Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo") ) @@ -47,7 +47,7 @@ private[deploy] object DeployTestUtils { new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand()) def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3", - createDriverDesc(), new Date()) + createDriverDesc(), JsonConstants.submitDate) def createWorkerInfo(): WorkerInfo = { val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80") diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 7093dad05c5f6..68c53ffc569a0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -105,7 +105,7 @@ object JsonConstants { val appInfoJsonStr = """ |{"starttime":3,"id":"id","name":"name", - |"cores":4,"user":"%s", + |"cores":0,"user":"%s", |"memoryperslave":1234,"submitdate":"%s", |"state":"WAITING","duration":%d} """.format(System.getProperty("user.name", ""), @@ -114,6 +114,7 @@ object JsonConstants { val workerInfoJsonStr = """ |{"id":"id","host":"host","port":8080, + |"address":"host:8080", |"webuiaddress":"http://publicAddress:80", |"cores":4,"coresused":0,"coresfree":4, |"memory":1234,"memoryused":0,"memoryfree":1234, @@ -134,19 +135,24 @@ object JsonConstants { val driverInfoJsonStr = """ - |{"id":"driver-3","starttime":"3","state":"SUBMITTED","cores":3,"memory":100} - """.stripMargin + |{"id":"driver-3","starttime":"3", + |"state":"SUBMITTED","cores":3,"memory":100, + |"submitdate":"%s","worker":"None", + |"mainclass":"mainClass"} + """.format(submitDate.toString).stripMargin val masterStateJsonStr = """ |{"url":"spark://host:8080", |"workers":[%s,%s], + |"aliveworkers":2, |"cores":8,"coresused":0,"memory":2468,"memoryused":0, |"activeapps":[%s],"completedapps":[], |"activedrivers":[%s], + |"completeddrivers":[%s], |"status":"ALIVE"} """.format(workerInfoJsonStr, workerInfoJsonStr, - appInfoJsonStr, driverInfoJsonStr).stripMargin + appInfoJsonStr, driverInfoJsonStr, driverInfoJsonStr).stripMargin val workerStateJsonStr = """ From 8dc52183f220da1bf2dcddcdae224943b62bc5eb Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Sun, 18 Jun 2017 23:43:46 +0800 Subject: [PATCH 2/3] add comments --- .../apache/spark/deploy/JsonProtocol.scala | 144 ++++++++++++++++-- 1 file changed, 128 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 72c04826b8a85..2d1614d9efeac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -21,26 +21,62 @@ import org.json4s.JsonAST.JObject import org.json4s.JsonDSL._ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} -import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} +import org.apache.spark.deploy.master._ +import org.apache.spark.deploy.master.RecoveryState.MasterState import org.apache.spark.deploy.worker.ExecutorRunner private[deploy] object JsonProtocol { - def writeWorkerInfo(obj: WorkerInfo): JObject = { - ("id" -> obj.id) ~ - ("host" -> obj.host) ~ - ("port" -> obj.port) ~ - ("address" -> obj.hostPort) ~ - ("webuiaddress" -> obj.webUiAddress) ~ - ("cores" -> obj.cores) ~ - ("coresused" -> obj.coresUsed) ~ - ("coresfree" -> obj.coresFree) ~ - ("memory" -> obj.memory) ~ - ("memoryused" -> obj.memoryUsed) ~ - ("memoryfree" -> obj.memoryFree) ~ - ("state" -> obj.state.toString) ~ - ("lastheartbeat" -> obj.lastHeartbeat) - } + /** + * Export the [[WorkerInfo]] to a Json object, a [[WorkerInfo]] consists of the information of a + * worker. + * + * @return a Json object containing the following fields: + * `id` a string identifier of the worker + * `host` the host that the worker is running on + * `port` the port that the worker is bound to + * `address` ${host}:${port} + * `webuiaddress` the address used in web UI + * `cores` total cores of the worker + * `coresused` allocated cores of the worker + * `coresfree` free cores of the worker + * `memory` total memory of the worker + * `memoryused` allocated memory of the worker + * `memoryfree` free memory of the worker + * `state` state of the worker, see [[WorkerState]] + * `lastheartbeat` time in milliseconds that the latest heart beat message from the + * worker is received. + */ + def writeWorkerInfo(obj: WorkerInfo): JObject = { + ("id" -> obj.id) ~ + ("host" -> obj.host) ~ + ("port" -> obj.port) ~ + ("address" -> obj.hostPort) ~ + ("webuiaddress" -> obj.webUiAddress) ~ + ("cores" -> obj.cores) ~ + ("coresused" -> obj.coresUsed) ~ + ("coresfree" -> obj.coresFree) ~ + ("memory" -> obj.memory) ~ + ("memoryused" -> obj.memoryUsed) ~ + ("memoryfree" -> obj.memoryFree) ~ + ("state" -> obj.state.toString) ~ + ("lastheartbeat" -> obj.lastHeartbeat) + } + /** + * Export the [[ApplicationInfo]] to a Json object, an [[ApplicationInfo]] consists of the + * information of an application. + * + * @return a Json object containing the following fields: + * `id` a string identifier of the application + * `starttime` time in milliseconds that the application starts + * `name` a name describes the application + * `cores` total cores granted to the application + * `user` name of the user who submitted the application + * `memoryperslave` minimal memory in MB required to each executor + * `submitdate` time in Date that the application is submitted + * `state` state of the application, see [[ApplicationState]] + * `duration` time in milliseconds that the application has been running + */ def writeApplicationInfo(obj: ApplicationInfo): JObject = { ("starttime" -> obj.startTime) ~ ("id" -> obj.id) ~ @@ -53,6 +89,17 @@ private[deploy] object JsonProtocol { ("duration" -> obj.duration) } + /** + * Export the [[ApplicationDescription]] to a Json object, an [[ApplicationDescription]] consists + * of the description of an application. + * + * @return a Json object containing the following fields: + * `name` a name describes the application + * `cores` max cores can be allocated to the application, 0 means unlimited + * `memoryperslave` minimal memory in MB required to each executor + * `user` name of the user who submitted the application + * `command` the command string that submitted the application + */ def writeApplicationDescription(obj: ApplicationDescription): JObject = { ("name" -> obj.name) ~ ("cores" -> obj.maxCores.getOrElse(0)) ~ @@ -61,6 +108,17 @@ private[deploy] object JsonProtocol { ("command" -> obj.command.toString) } + /** + * Export the [[ExecutorRunner]] to a Json object, an [[ExecutorRunner]] consists of the + * information of an executor. + * + * @return a Json object containing the following fields: + * `id` a integer identifier of the executor + * `memory` memory in MB allocated to the executor + * `appid` a string identifier of the application that the executor is working for + * `appdesc` a Json object of the [[ApplicationDescription]] of the application that the + * executor is working for + */ def writeExecutorRunner(obj: ExecutorRunner): JObject = { ("id" -> obj.execId) ~ ("memory" -> obj.memory) ~ @@ -68,6 +126,20 @@ private[deploy] object JsonProtocol { ("appdesc" -> writeApplicationDescription(obj.appDesc)) } + /** + * Export the [[DriverInfo]] to a Json object, a [[DriverInfo]] consists of the information of a + * driver. + * + * @return a Json object containing the following fields: + * `id` a string identifier of the driver + * `starttime` time in milliseconds that the driver starts + * `state` state of the driver, see [[DriverState]] + * `cores` cores allocated to the driver + * `memory` memory in MB allocated to the driver + * `submitdate` time in Date that the driver is created + * `worker` identifier of the worker that the driver is running on + * `mainclass` main class of the command string that started the driver + */ def writeDriverInfo(obj: DriverInfo): JObject = { ("id" -> obj.id) ~ ("starttime" -> obj.startTime.toString) ~ @@ -79,6 +151,29 @@ private[deploy] object JsonProtocol { ("mainclass" -> obj.desc.command.arguments(2)) } + /** + * Export the [[MasterStateResponse]] to a Json object, a [[MasterStateResponse]] consists the + * information of a master node. + * + * @return a Json object containing the following fields: + * `url` the url of the master node + * `workers` a list of Json objects of [[WorkerInfo]] of the workers allocated to the + * master + * `aliveworkers` size of alive workers allocated to the master + * `cores` total cores available of the master + * `coresused` cores used by the master + * `memory` total memory available of the master + * `memoryused` memory used by the master + * `activeapps` a list of Json objects of [[ApplicationInfo]] of the active applications + * running on the master + * `completedapps` a list of Json objects of [[ApplicationInfo]] of the completed + * applications from the master + * `activedrivers` a list of Json objects of [[DriverInfo]] of the active drivers of the + * master + * `completeddrivers` a list of Json objects of [[DriverInfo]] of the completed drivers + * of the master + * `status` status of the master, see [[MasterState]] + */ def writeMasterState(obj: MasterStateResponse): JObject = { val aliveWorkers = obj.workers.filter(_.isAlive()) ("url" -> obj.uri) ~ @@ -95,6 +190,23 @@ private[deploy] object JsonProtocol { ("status" -> obj.status.toString) } + /** + * Export the [[WorkerStateResponse]] to a Json object, a [[WorkerStateResponse]] consists the + * information of a worker node. + * + * @return a Json object containing the following fields: + * `id` a string identifier of the worker node + * `masterurl` url of the master node of the worker + * `masterwebuiurl` the address used in web UI of the master node of the worker + * `cores` total cores of the worker + * `coreused` used cores of the worker + * `memory` total memory of the worker + * `memoryused` used memory of the worker + * `executors` a list of Json objects of [[ExecutorRunner]] of the executors running on + * the worker + * `finishedexecutors` a list of Json objects of [[ExecutorRunner]] of the finished + * executors of the worker + */ def writeWorkerState(obj: WorkerStateResponse): JObject = { ("id" -> obj.workerId) ~ ("masterurl" -> obj.masterUrl) ~ From 8c399127741446b063c1b081593569bc76ad8fa8 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Mon, 19 Jun 2017 10:15:27 +0800 Subject: [PATCH 3/3] update comments. --- .../apache/spark/deploy/JsonProtocol.scala | 42 +++++++++---------- .../spark/deploy/JsonProtocolSuite.scala | 3 +- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 2d1614d9efeac..7212696166570 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -27,14 +27,13 @@ import org.apache.spark.deploy.worker.ExecutorRunner private[deploy] object JsonProtocol { /** - * Export the [[WorkerInfo]] to a Json object, a [[WorkerInfo]] consists of the information of a + * Export the [[WorkerInfo]] to a Json object. A [[WorkerInfo]] consists of the information of a * worker. * * @return a Json object containing the following fields: * `id` a string identifier of the worker * `host` the host that the worker is running on * `port` the port that the worker is bound to - * `address` ${host}:${port} * `webuiaddress` the address used in web UI * `cores` total cores of the worker * `coresused` allocated cores of the worker @@ -44,13 +43,12 @@ private[deploy] object JsonProtocol { * `memoryfree` free memory of the worker * `state` state of the worker, see [[WorkerState]] * `lastheartbeat` time in milliseconds that the latest heart beat message from the - * worker is received. + * worker is received */ def writeWorkerInfo(obj: WorkerInfo): JObject = { ("id" -> obj.id) ~ ("host" -> obj.host) ~ ("port" -> obj.port) ~ - ("address" -> obj.hostPort) ~ ("webuiaddress" -> obj.webUiAddress) ~ ("cores" -> obj.cores) ~ ("coresused" -> obj.coresUsed) ~ @@ -63,13 +61,13 @@ private[deploy] object JsonProtocol { } /** - * Export the [[ApplicationInfo]] to a Json object, an [[ApplicationInfo]] consists of the + * Export the [[ApplicationInfo]] to a Json objec. An [[ApplicationInfo]] consists of the * information of an application. * * @return a Json object containing the following fields: * `id` a string identifier of the application * `starttime` time in milliseconds that the application starts - * `name` a name describes the application + * `name` the description of the application * `cores` total cores granted to the application * `user` name of the user who submitted the application * `memoryperslave` minimal memory in MB required to each executor @@ -78,8 +76,8 @@ private[deploy] object JsonProtocol { * `duration` time in milliseconds that the application has been running */ def writeApplicationInfo(obj: ApplicationInfo): JObject = { - ("starttime" -> obj.startTime) ~ ("id" -> obj.id) ~ + ("starttime" -> obj.startTime) ~ ("name" -> obj.desc.name) ~ ("cores" -> obj.coresGranted) ~ ("user" -> obj.desc.user) ~ @@ -90,15 +88,15 @@ private[deploy] object JsonProtocol { } /** - * Export the [[ApplicationDescription]] to a Json object, an [[ApplicationDescription]] consists + * Export the [[ApplicationDescription]] to a Json object. An [[ApplicationDescription]] consists * of the description of an application. * * @return a Json object containing the following fields: - * `name` a name describes the application - * `cores` max cores can be allocated to the application, 0 means unlimited + * `name` the description of the application + * `cores` max cores that can be allocated to the application, 0 means unlimited * `memoryperslave` minimal memory in MB required to each executor * `user` name of the user who submitted the application - * `command` the command string that submitted the application + * `command` the command string used to submit the application */ def writeApplicationDescription(obj: ApplicationDescription): JObject = { ("name" -> obj.name) ~ @@ -109,15 +107,15 @@ private[deploy] object JsonProtocol { } /** - * Export the [[ExecutorRunner]] to a Json object, an [[ExecutorRunner]] consists of the + * Export the [[ExecutorRunner]] to a Json object. An [[ExecutorRunner]] consists of the * information of an executor. * * @return a Json object containing the following fields: - * `id` a integer identifier of the executor + * `id` an integer identifier of the executor * `memory` memory in MB allocated to the executor - * `appid` a string identifier of the application that the executor is working for + * `appid` a string identifier of the application that the executor is working on * `appdesc` a Json object of the [[ApplicationDescription]] of the application that the - * executor is working for + * executor is working on */ def writeExecutorRunner(obj: ExecutorRunner): JObject = { ("id" -> obj.execId) ~ @@ -127,7 +125,7 @@ private[deploy] object JsonProtocol { } /** - * Export the [[DriverInfo]] to a Json object, a [[DriverInfo]] consists of the information of a + * Export the [[DriverInfo]] to a Json object. A [[DriverInfo]] consists of the information of a * driver. * * @return a Json object containing the following fields: @@ -152,7 +150,7 @@ private[deploy] object JsonProtocol { } /** - * Export the [[MasterStateResponse]] to a Json object, a [[MasterStateResponse]] consists the + * Export the [[MasterStateResponse]] to a Json object. A [[MasterStateResponse]] consists the * information of a master node. * * @return a Json object containing the following fields: @@ -166,8 +164,8 @@ private[deploy] object JsonProtocol { * `memoryused` memory used by the master * `activeapps` a list of Json objects of [[ApplicationInfo]] of the active applications * running on the master - * `completedapps` a list of Json objects of [[ApplicationInfo]] of the completed - * applications from the master + * `completedapps` a list of Json objects of [[ApplicationInfo]] of the applications + * completed in the master * `activedrivers` a list of Json objects of [[DriverInfo]] of the active drivers of the * master * `completeddrivers` a list of Json objects of [[DriverInfo]] of the completed drivers @@ -191,7 +189,7 @@ private[deploy] object JsonProtocol { } /** - * Export the [[WorkerStateResponse]] to a Json object, a [[WorkerStateResponse]] consists the + * Export the [[WorkerStateResponse]] to a Json object. A [[WorkerStateResponse]] consists the * information of a worker node. * * @return a Json object containing the following fields: @@ -215,7 +213,7 @@ private[deploy] object JsonProtocol { ("coresused" -> obj.coresUsed) ~ ("memory" -> obj.memory) ~ ("memoryused" -> obj.memoryUsed) ~ - ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~ - ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner)) + ("executors" -> obj.executors.map(writeExecutorRunner)) ~ + ("finishedexecutors" -> obj.finishedExecutors.map(writeExecutorRunner)) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 68c53ffc569a0..1903130cb694a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -104,7 +104,7 @@ object JsonConstants { val submitDate = new Date(123456789) val appInfoJsonStr = """ - |{"starttime":3,"id":"id","name":"name", + |{"id":"id","starttime":3,"name":"name", |"cores":0,"user":"%s", |"memoryperslave":1234,"submitdate":"%s", |"state":"WAITING","duration":%d} @@ -114,7 +114,6 @@ object JsonConstants { val workerInfoJsonStr = """ |{"id":"id","host":"host","port":8080, - |"address":"host:8080", |"webuiaddress":"http://publicAddress:80", |"cores":4,"coresused":0,"coresfree":4, |"memory":1234,"memoryused":0,"memoryfree":1234,