From 2edfe7fb3fbc52ab7e652390e67aa69b63b5364e Mon Sep 17 00:00:00 2001 From: Jakub Hava Date: Wed, 21 Feb 2018 23:12:27 +0100 Subject: [PATCH] [SW-727] Improve method for downloading H2O logs --- .../org/apache/spark/h2o/H2OContext.scala | 12 +- .../spark/h2o/utils/H2OContextUtils.scala | 104 +++++++++++++++++- 2 files changed, 104 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala b/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala index b69507b23a..841ff9d5c8 100644 --- a/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala +++ b/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala @@ -17,6 +17,7 @@ package org.apache.spark.h2o +import java.net.URI import java.util.concurrent.atomic.AtomicReference import org.apache.spark._ @@ -329,17 +330,6 @@ class H2OContext private(val sparkSession: SparkSession, conf: H2OConf) extends basic ++ sparkYarnAppId ++ backend.epilog } - /** - * @param path directory where the logs will be downloaded - */ - def downloadH2OLogs(path: String, logFileName: String = "logs.zip"): String = { - import sys.process._ - import java.net.URL - import java.io.File - new URL(s"http://$localClientIp:$localClientPort/3/Logs/download").#>(new File(path, logFileName)).!! - path + File.separator + logFileName - } - // scalastyle:off // Disable style checker so "implicits" object can start with lowercase i /** Define implicits available via h2oContext.implicits._ */ diff --git a/core/src/main/scala/org/apache/spark/h2o/utils/H2OContextUtils.scala b/core/src/main/scala/org/apache/spark/h2o/utils/H2OContextUtils.scala index 773419ee62..320fd30143 100644 --- a/core/src/main/scala/org/apache/spark/h2o/utils/H2OContextUtils.scala +++ b/core/src/main/scala/org/apache/spark/h2o/utils/H2OContextUtils.scala @@ -17,11 +17,18 @@ package org.apache.spark.h2o.utils +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.text.SimpleDateFormat +import java.util.Date +import java.util.zip.{ZipEntry, ZipOutputStream} + import org.apache.spark.SparkContext import org.apache.spark.h2o.BuildInfo import org.apache.spark.internal.Logging +import water.H2O import water.fvec.Frame -import water.util.Log +import water.util.{GetLogsFromNode, Log, StringUtils} import scala.language.postfixOps @@ -77,6 +84,101 @@ private[spark] trait H2OContextUtils extends Logging { } } + /** + * @param destination directory where the logs will be downloaded + */ + def downloadH2OLogs(destination: URI): URI = { + val perNodeZipByteArray = H2O.CLOUD.members.zipWithIndex.map { case (node, i) => + // Skip nodes that aren't healthy, since they are likely to cause the entire process to hang. + try { + if (node.isHealthy) { + val g = new GetLogsFromNode() + g.nodeidx = i + g.doIt() + g.bytes + } + else { + StringUtils.bytesOf("Node not healthy") + } + } + catch { + case e: Exception => + StringUtils.toBytes(e) + } + } + + val clientNodeByteArray = if (H2O.ARGS.client) { + try { + val g = new GetLogsFromNode + g.nodeidx = -1 + g.doIt() + g.bytes + } catch { + case e: Exception => + StringUtils.toBytes(e) + } + } else { + null + } + + val outputFileStem = "h2ologs_" + new SimpleDateFormat("yyyyMMdd_hhmmss").format(new Date) + zipLogs(perNodeZipByteArray, clientNodeByteArray, outputFileStem, destination) + } + + def downloadH2OLogs(destination: String): String = { + downloadH2OLogs(new URI(destination)) + destination + } + + /** Zip the H2O logs and store them to specified destination */ + private def zipLogs(results: Array[Array[Byte]], clientResult: Array[Byte], topDir: String, destination: URI): URI = { + assert(H2O.CLOUD._memary.length == results.length, "Unexpected change in the cloud!") + val l = results.map(_.length).sum + val baos = new ByteArrayOutputStream(l) + // Add top-level directory. + val zos = new ZipOutputStream(baos) + val zde = new ZipEntry(topDir + File.separator) + zos.putNextEntry(zde) + + try { + // Add zip directory from each cloud member. + results.zipWithIndex.foreach { case (result, i) => + val filename = topDir + File.separator + "node" + i + "_" + H2O.CLOUD._memary(i).getIpPortString.replace(':', '_').replace('/', '_') + ".zip" + val ze = new ZipEntry(filename) + zos.putNextEntry(ze) + zos.write(result) + zos.closeEntry() + } + + // Add zip directory from the client node. Name it 'driver' since that's what Sparking Water users see. + if (clientResult != null) { + val filename = topDir + File.separator + "driver.zip" + val ze = new ZipEntry(filename) + zos.putNextEntry(ze) + zos.write(clientResult) + zos.closeEntry() + } + // Close the top-level directory. + zos.closeEntry() + } finally { + // Close the full zip file. + zos.close() + } + + import java.io.FileOutputStream + try { + val outputStream = new FileOutputStream(destination.toString) + try { + baos.writeTo(outputStream) + } + finally { + if (outputStream != null) outputStream.close() + } + } + destination + } + + def isRunningOnDatabricks(): Boolean = { try { Class.forName("com.databricks.backend.daemon.driver.DriverLocal")