Skip to content

Commit

Permalink
[SW-727] Improve method for downloading H2O logs
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubhava committed Feb 22, 2018
1 parent c09705f commit 2edfe7f
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 12 deletions.
12 changes: 1 addition & 11 deletions core/src/main/scala/org/apache/spark/h2o/H2OContext.scala
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.h2o

import java.net.URI
import java.util.concurrent.atomic.AtomicReference

import org.apache.spark._
Expand Down Expand Up @@ -329,17 +330,6 @@ class H2OContext private(val sparkSession: SparkSession, conf: H2OConf) extends
basic ++ sparkYarnAppId ++ backend.epilog
}

/**
* @param path directory where the logs will be downloaded
*/
def downloadH2OLogs(path: String, logFileName: String = "logs.zip"): String = {
import sys.process._
import java.net.URL
import java.io.File
new URL(s"http://$localClientIp:$localClientPort/3/Logs/download").#>(new File(path, logFileName)).!!
path + File.separator + logFileName
}

// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
/** Define implicits available via h2oContext.implicits._ */
Expand Down
104 changes: 103 additions & 1 deletion core/src/main/scala/org/apache/spark/h2o/utils/H2OContextUtils.scala
Expand Up @@ -17,11 +17,18 @@

package org.apache.spark.h2o.utils

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
import java.util.zip.{ZipEntry, ZipOutputStream}

import org.apache.spark.SparkContext
import org.apache.spark.h2o.BuildInfo
import org.apache.spark.internal.Logging
import water.H2O
import water.fvec.Frame
import water.util.Log
import water.util.{GetLogsFromNode, Log, StringUtils}

import scala.language.postfixOps

Expand Down Expand Up @@ -77,6 +84,101 @@ private[spark] trait H2OContextUtils extends Logging {
}
}

/**
* @param destination directory where the logs will be downloaded
*/
def downloadH2OLogs(destination: URI): URI = {
val perNodeZipByteArray = H2O.CLOUD.members.zipWithIndex.map { case (node, i) =>
// Skip nodes that aren't healthy, since they are likely to cause the entire process to hang.
try {
if (node.isHealthy) {
val g = new GetLogsFromNode()
g.nodeidx = i
g.doIt()
g.bytes
}
else {
StringUtils.bytesOf("Node not healthy")
}
}
catch {
case e: Exception =>
StringUtils.toBytes(e)
}
}

val clientNodeByteArray = if (H2O.ARGS.client) {
try {
val g = new GetLogsFromNode
g.nodeidx = -1
g.doIt()
g.bytes
} catch {
case e: Exception =>
StringUtils.toBytes(e)
}
} else {
null
}

val outputFileStem = "h2ologs_" + new SimpleDateFormat("yyyyMMdd_hhmmss").format(new Date)
zipLogs(perNodeZipByteArray, clientNodeByteArray, outputFileStem, destination)
}

def downloadH2OLogs(destination: String): String = {
downloadH2OLogs(new URI(destination))
destination
}

/** Zip the H2O logs and store them to specified destination */
private def zipLogs(results: Array[Array[Byte]], clientResult: Array[Byte], topDir: String, destination: URI): URI = {
assert(H2O.CLOUD._memary.length == results.length, "Unexpected change in the cloud!")
val l = results.map(_.length).sum
val baos = new ByteArrayOutputStream(l)
// Add top-level directory.
val zos = new ZipOutputStream(baos)
val zde = new ZipEntry(topDir + File.separator)
zos.putNextEntry(zde)

try {
// Add zip directory from each cloud member.
results.zipWithIndex.foreach { case (result, i) =>
val filename = topDir + File.separator + "node" + i + "_" + H2O.CLOUD._memary(i).getIpPortString.replace(':', '_').replace('/', '_') + ".zip"
val ze = new ZipEntry(filename)
zos.putNextEntry(ze)
zos.write(result)
zos.closeEntry()
}

// Add zip directory from the client node. Name it 'driver' since that's what Sparking Water users see.
if (clientResult != null) {
val filename = topDir + File.separator + "driver.zip"
val ze = new ZipEntry(filename)
zos.putNextEntry(ze)
zos.write(clientResult)
zos.closeEntry()
}
// Close the top-level directory.
zos.closeEntry()
} finally {
// Close the full zip file.
zos.close()
}

import java.io.FileOutputStream
try {
val outputStream = new FileOutputStream(destination.toString)
try {
baos.writeTo(outputStream)
}
finally {
if (outputStream != null) outputStream.close()
}
}
destination
}


def isRunningOnDatabricks(): Boolean = {
try {
Class.forName("com.databricks.backend.daemon.driver.DriverLocal")
Expand Down

0 comments on commit 2edfe7f

Please sign in to comment.