Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Branch 1.6 #10467

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration

import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.{ Failure, Success }

import org.apache.spark.rpc._
import org.apache.spark._
Expand All @@ -33,7 +33,21 @@ import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
import org.apache.spark.util.{ ThreadUtils, SignalLogger, Utils }

// han sampler import begin
import java.io._
import org.apache.spark.storage._
import java.lang.System
import java.util.concurrent._
import java.lang.management._
import java.util.List
import java.util.Date
import scala.collection.JavaConversions._
import java.text._
import scala.util.Properties
import scala.sys.process._
// han sampler import end

private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
Expand All @@ -44,7 +58,6 @@ private[spark] class CoarseGrainedExecutorBackend(
userClassPath: Seq[URL],
env: SparkEnv)
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {

var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None

Expand All @@ -62,7 +75,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutorResponse
}
case Failure(e) => {
logError(s"Cannot register with driver: $driverUrl", e)
Expand All @@ -80,6 +93,7 @@ private[spark] class CoarseGrainedExecutorBackend(
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor(hostname) =>
logInfo("Successfully registered with driver")
// val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

case RegisterExecutorFailed(message) =>
Expand Down Expand Up @@ -111,7 +125,7 @@ private[spark] class CoarseGrainedExecutorBackend(
// a message to self to actually do the shutdown.
self.send(Shutdown)

case Shutdown =>
case Shutdown =>
executor.stop()
stop()
rpcEnv.shutdown()
Expand All @@ -138,13 +152,13 @@ private[spark] class CoarseGrainedExecutorBackend(
private[spark] object CoarseGrainedExecutorBackend extends Logging {

private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {

SignalLogger.register(log)

Expand All @@ -161,7 +175,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
port,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
clientMode=true)
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
Seq[(String, String)](("spark.app.id", appId))
Expand Down Expand Up @@ -202,7 +216,50 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
}

def getProcessUsedMemoryAndCPU(processID: String): (Double, Double) = {
var memory = 0d;
var CPU = 0d;
val NUMCPU = 8;
var process = ""
/* val commands = new java.util.Vector[String]()
commands.add("/bin/bash")
commands.add("-c")
commands.add("ps aux | grep " + processID)
val pb=new java.lang.ProcessBuilder(commands)
val pr=pb.start()
pr.waitFor()
if (pr.exitValue()==0) {
val outReader=new java.io.BufferedReader(new java.io.InputStreamReader(pr.getInputStream()));
var source = ""
source = outReader.readLine()
while(source != null) {
try {
val tokens = source.split(" +")
if(tokens(1).equals(processID)) {
memory = 1024L*tokens(5).toLong
CPU = tokens(2).toDouble / NUMCPU
}
} catch { case e: Exception => () }
finally {
source = outReader.readLine()
}
}
}
*/

val topout = Seq("/bin/sh", "-c", "top -n 1 -b -p " + processID + " | tail -1").!!.trim.split(" +")
val len = topout(5).length
if(topout(5).endsWith("g")) { memory = 1024L*1024L*1024L*topout(5).take(len-1).toDouble }
else if(topout(5).endsWith("m")) { memory = 1024L*1024L*topout(5).take(len-1).toDouble }
else if(topout(5).endsWith("k")) { memory = 1024L*topout(5).take(len-1).toDouble }
else { memory = topout(5).toDouble }
CPU = topout(8).toDouble / NUMCPU

return (memory, CPU)
}

def main(args: Array[String]) {

var driverUrl: String = null
var executorId: String = null
var hostname: String = null
Expand Down Expand Up @@ -250,7 +307,186 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
printUsageAndExit()
}

// han sampler 1 begin
val SAMPLING_PERIOD: Long = 10
val JMAP_PERIOD: Long = 5000
val TIMESTAMP_PERIOD: Long = 1000
var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss")

val dirname_executor = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + appId + "/" + executorId
val dir_executor = new File(dirname_executor)
if (!dir_executor.exists())
dir_executor.mkdirs()
val dirname_histo = dirname_executor + "/histo"
val dir_histo = new File(dirname_histo)
if (!dir_histo.exists())
dir_histo.mkdirs()

// val writer = new FileWriter(new File("/home/ubuntu/sparkOutput/sparkOutput_executor_" + System.nanoTime() + ".txt"), true)
val writer = new FileWriter(new File(dirname_executor + "/" + "sparkOutput_worker_" + appId + "_" + executorId + ".txt"), true)
writer.write(appId + "_" + executorId + "\n")
writer.flush()

var osBean: com.sun.management.OperatingSystemMXBean = ManagementFactory.getPlatformMXBean(classOf[com.sun.management.OperatingSystemMXBean])
var availableProcessors: Int = osBean.getAvailableProcessors()
logInfo("Number of available processors for executor " + executorId + ": " + availableProcessors)
var avgUsedCPU: Double = 0
var numberOfCPUSamples: Long = 1
var memBean: MemoryMXBean = ManagementFactory.getMemoryMXBean()
var rtBean: RuntimeMXBean = ManagementFactory.getRuntimeMXBean()
var mpBeans: List[MemoryPoolMXBean] = ManagementFactory.getMemoryPoolMXBeans()
var s: String = ""
mpBeans.foreach {

b =>
s += b.getName() + "\t";
}
s += "Used heap\tCommitted heap\tMax heap\tUsed nonheap\tCommitted nonheap\tMax nonheap\tTotal memory\tUsed CPU"
writer.write(s + "\n")
writer.flush()

var mBeans: List[MemoryPoolMXBean] = null
var prevUpTime: Double = 0
var prevProcessCPUTime: Double = 0
var upTime: Double = 0
var processCPUTime: Double = 0

var processID: String = ""
var datanodePID: String = ""
var nodemanagerPID: String = ""

/* val commands = new java.util.Vector[String]()
commands.add("/bin/bash")
commands.add("-c")
commands.add("echo $PPID")
val pb=new java.lang.ProcessBuilder(commands)
val pr=pb.start()
pr.waitFor()
if (pr.exitValue()==0) {
val outReader=new java.io.BufferedReader(new java.io.InputStreamReader(pr.getInputStream()));
processID = outReader.readLine().trim()
println("Found process ID: " + processID)
} else {
println("Error while getting PID")
}
*/
val pname = ManagementFactory.getRuntimeMXBean().getName()
processID = pname.substring(0, pname.indexOf('@'))

val jps = new java.util.Vector[String]()
jps.add("/bin/bash")
jps.add("-c")
jps.add("jps")
val pbi=new java.lang.ProcessBuilder(jps)
val pri = pbi.start()
pri.waitFor()
if (pri.exitValue()==0) {
val outReader=new java.io.BufferedReader(new java.io.InputStreamReader(pri.getInputStream()));
var source = ""
source = outReader.readLine()
while(source != null) {
try {
val tokens = source.split(" +")
if(tokens(1).equals("DataNode")) {
datanodePID = tokens(0)
//println("Found datanode PID: " + datanodePID)
}
if(tokens(1).equals("NodeManager")) {
nodemanagerPID = tokens(0)
//println("Found nodemanager pid: " + nodemanagerPID)
}
} catch { case e: Exception => () }
finally {
source = outReader.readLine()
}
}
} else {
println("Error while getting jps output")
}


val ex = new ScheduledThreadPoolExecutor(1)
ex.setRemoveOnCancelPolicy(true)
val task = new Runnable {
var i: Long = 0
override def run {

s = ""
mpBeans.foreach {
b =>
s += b.getUsage().getUsed() + "\t";
}
s += memBean.getHeapMemoryUsage().getUsed() + "\t"
s += memBean.getHeapMemoryUsage().getCommitted() + "\t"
s += memBean.getHeapMemoryUsage().getMax() + "\t"
s += memBean.getNonHeapMemoryUsage().getUsed() + "\t"
s += memBean.getNonHeapMemoryUsage().getCommitted() + "\t"
s += memBean.getNonHeapMemoryUsage().getMax() + "\t"
// get used memory from jmap
// val jmapout = Seq("/bin/sh", "-c", "jmap -histo " + processID + " | tail -1").!!.trim
// s += jmapout.split(" +")(2)

// record off heap memory usage
s += org.apache.spark.unsafe.Platform.TOTAL_BYTES;

upTime = rtBean.getUptime() * 10000
processCPUTime = osBean.getProcessCpuTime()
var elapsedCPU: Double = processCPUTime - prevProcessCPUTime
var elapsedTime: Double = upTime - prevUpTime
var usedCPU: Double = 0
if (elapsedTime > 0.0) {
usedCPU = math.min(99.0, elapsedCPU / (elapsedTime * availableProcessors))
avgUsedCPU += usedCPU
numberOfCPUSamples += 1
}
s += "\t" + usedCPU.toString()
prevUpTime = upTime
prevProcessCPUTime = processCPUTime

var res: (Double, Double) = (0, 0);
try { res = getProcessUsedMemoryAndCPU(processID); s += "\t" + res._1 + "\t" + res._2 } catch{ case e:Exception => e.printStackTrace() }

try { res = getProcessUsedMemoryAndCPU(datanodePID); s += "\t" + res._1 + "\t" + res._2 } catch{ case e:Exception => e.printStackTrace() }
try { res = getProcessUsedMemoryAndCPU(nodemanagerPID); s += "\t" + res._1 + "\t" + res._2 } catch{ case e:Exception => e.printStackTrace() }

if (i % TIMESTAMP_PERIOD == 0) {
var time: String = dateFormat.format(new Date())
s += "\t" + time
}


if (i % JMAP_PERIOD == 0) {
var time: String = dateFormat.format(new Date())
val pname = ManagementFactory.getRuntimeMXBean().getName()
val pid = pname.substring(0, pname.indexOf('@'))
val command = "jmap -histo " + pid
val result = command.!!
val writer1 = new FileWriter(new File(dirname_histo + "/" + "sparkOutput_worker_" + appId + "_" + executorId + "_" + time + ".txt"), true)
writer1.write(result)
writer1.flush()
writer1.close()
}


i = i + SAMPLING_PERIOD
writer.write(s + "\n")
writer.flush()

}
}
val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS)
// han sampler 1 end

run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

// han sampler 2 begin
avgUsedCPU /= numberOfCPUSamples
logInfo("Average used CPU of executor " + executorId + ": " + avgUsedCPU)

f.cancel(true)
writer.flush()
writer.close()
// han sampler 2 end
}

private def printUsageAndExit() = {
Expand All @@ -273,3 +509,4 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,18 @@ private[spark] abstract class MemoryManager(
* sun.misc.Unsafe.
*/
final val tungstenMemoryMode: MemoryMode = {
//println("-- spark.memory.offHeap.enabled" + conf.getBoolean("spark.memory.offHeap.enabled", false));
//println("-- spark.memory.offHeap.size" + conf.getSizeAsBytes("spark.memory.offHeap.size"));
//MemoryMode.OFF_HEAP

if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}

}

/**
Expand Down
5 changes: 5 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
Expand Down
Loading