From c795a52b8483e5f9633cbf2dc3e5ed34fd2ecd55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20H=C3=A1va?= Date: Tue, 16 Jul 2019 11:21:35 +0200 Subject: [PATCH] [SW-1421] Integrate H2O 3.26.0.1 (#1348) --- .../spark/h2o/utils/H2OContextUtils.scala | 106 ++++++++---------- gradle.properties | 6 +- .../spark/ml/h2o/models/H2OMOJOModel.scala | 5 +- py/pysparkling/context.py | 7 +- 4 files changed, 55 insertions(+), 69 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/h2o/utils/H2OContextUtils.scala b/core/src/main/scala/org/apache/spark/h2o/utils/H2OContextUtils.scala index b2decad622..4a80d1ffc3 100644 --- a/core/src/main/scala/org/apache/spark/h2o/utils/H2OContextUtils.scala +++ b/core/src/main/scala/org/apache/spark/h2o/utils/H2OContextUtils.scala @@ -17,19 +17,18 @@ package org.apache.spark.h2o.utils -import java.io.{ByteArrayOutputStream, File} +import java.io.File import java.net.URI import java.text.SimpleDateFormat import java.util.Date -import java.util.zip.{ZipEntry, ZipOutputStream} import org.apache.spark.SparkContext import org.apache.spark.h2o.{BuildInfo, H2OConf} import water.H2O -import water.api.ImportHiveTableHandler import water.api.ImportHiveTableHandler.HiveTableImporter +import water.api.{ImportHiveTableHandler, RequestServer} import water.fvec.Frame -import water.util.{GetLogsFromNode, Log, StringUtils} +import water.util.{GetLogsFromNode, Log, LogArchiveContainer, StringUtils} import scala.language.postfixOps @@ -93,33 +92,27 @@ private[spark] trait H2OContextUtils extends Logging { } } - /** - * @param destination directory where the logs will be downloaded - */ - def downloadH2OLogs(destination: URI): URI = { - val perNodeZipByteArray = H2O.CLOUD.members.zipWithIndex.map { case (node, i) => - // Skip nodes that aren't healthy, since they are likely to cause the entire process to hang. + private def getLogsFromWorkers(logContainer: LogArchiveContainer): Array[Array[Byte]] = { + H2O.CLOUD.members.zipWithIndex.map { case (node, i) => try { if (node.isHealthy) { - val g = new GetLogsFromNode() - g.nodeidx = i + val g = new GetLogsFromNode(i, logContainer) g.doIt() g.bytes - } - else { + } else { StringUtils.bytesOf("Node not healthy") } } catch { - case e: Exception => - StringUtils.toBytes(e) + case e: Exception => StringUtils.toBytes(e); } } + } - val clientNodeByteArray = if (H2O.ARGS.client) { + private def getLogsFromClient(logContainer: LogArchiveContainer): Array[Byte] = { + if (H2O.ARGS.client) { try { - val g = new GetLogsFromNode - g.nodeidx = -1 + val g = new GetLogsFromNode(-1, logContainer) g.doIt() g.bytes } catch { @@ -129,62 +122,51 @@ private[spark] trait H2OContextUtils extends Logging { } else { null } - - val outputFileStem = "h2ologs_" + new SimpleDateFormat("yyyyMMdd_hhmmss").format(new Date) - zipLogs(perNodeZipByteArray, clientNodeByteArray, outputFileStem, destination) - } - - def downloadH2OLogs(destination: String): String = { - downloadH2OLogs(new URI(destination)) - destination } - /** Zip the H2O logs and store them to specified destination */ - private def zipLogs(results: Array[Array[Byte]], clientResult: Array[Byte], topDir: String, destination: URI): URI = { - assert(H2O.CLOUD._memary.length == results.length, "Unexpected change in the cloud!") - val l = results.map(_.length).sum - val baos = new ByteArrayOutputStream(l) - // Add top-level directory. - val zos = new ZipOutputStream(baos) - val zde = new ZipEntry(topDir + File.separator) - zos.putNextEntry(zde) - - try { - // Add zip directory from each cloud member. - results.zipWithIndex.foreach { case (result, i) => - val filename = topDir + File.separator + "node" + i + "_" + H2O.CLOUD._memary(i).getIpPortString.replace(':', '_').replace('/', '_') + ".zip" - val ze = new ZipEntry(filename) - zos.putNextEntry(ze) - zos.write(result) - zos.closeEntry() - } + /** + * @param destination directory where the logs will be downloaded + */ + def downloadH2OLogs(destination: URI, logContainer: LogArchiveContainer): URI = { + val workersLogs = getLogsFromWorkers(logContainer) + val clientLogs = getLogsFromClient(logContainer) + val outputFileStem = "h2ologs_" + new SimpleDateFormat("yyyyMMdd_hhmmss").format(new Date) - // Add zip directory from the client node. Name it 'driver' since that's what Sparking Water users see. - if (clientResult != null) { - val filename = topDir + File.separator + "driver.zip" - val ze = new ZipEntry(filename) - zos.putNextEntry(ze) - zos.write(clientResult) - zos.closeEntry() - } - // Close the top-level directory. - zos.closeEntry() - } finally { - // Close the full zip file. - zos.close() + val finalArchiveByteArray = try { + val method = classOf[RequestServer].getDeclaredMethod("archiveLogs", classOf[LogArchiveContainer], + classOf[Date], classOf[Array[Array[Byte]]], classOf[Array[Byte]], classOf[String]) + method.setAccessible(true) + val res = method.invoke(null, logContainer, new Date, workersLogs, clientLogs, outputFileStem) + res.asInstanceOf[Array[Byte]] + } catch { + case e: Exception => StringUtils.toBytes(e) } import java.io.FileOutputStream - val outputStream = new FileOutputStream(destination.toString) + val destinationFile = new File(destination.toString, outputFileStem + "." + logContainer.getFileExtension) + val outputStream = new FileOutputStream(destinationFile) try { - baos.writeTo(outputStream) + outputStream.write(finalArchiveByteArray) } finally { - if (outputStream != null) outputStream.close() + outputStream.close() } + destinationFile.toURI + } + + def downloadH2OLogs(destination: URI, logContainer: String): URI = { + downloadH2OLogs(destination, LogArchiveContainer.valueOf(logContainer)) + } + + def downloadH2OLogs(destination: String, logArchiveContainer: LogArchiveContainer): String = { + downloadH2OLogs(new URI(destination), logArchiveContainer) destination } + def downloadH2OLogs(destination: String, logContainer: String = "ZIP"): String = { + downloadH2OLogs(destination, LogArchiveContainer.valueOf(logContainer)) + } + def importHiveTable(database: String = HiveTableImporter.DEFAULT_DATABASE, table: String, partitions: Array[Array[String]] = null, allowMultiFormat: Boolean = false): Frame = { val hiveTableHandler = new ImportHiveTableHandler diff --git a/gradle.properties b/gradle.properties index 346a3b446b..8edc48d340 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,11 +1,11 @@ # artifacts group group=ai.h2o # Major version of H2O release -h2oMajorVersion=3.24.0 +h2oMajorVersion=3.26.0 # Name of H2O major version -h2oMajorName=yates +h2oMajorName=yau # H2O Build version, defined here to be overriden by -P option -h2oBuild=5 +h2oBuild=1 # Scala version for Sparkling Water. By default, for 2.11 we use 2.11.12. scalaBaseVersion=2.11 # Internal Nexus location diff --git a/ml/src/main/scala/org/apache/spark/ml/h2o/models/H2OMOJOModel.scala b/ml/src/main/scala/org/apache/spark/ml/h2o/models/H2OMOJOModel.scala index edddf2cf4b..bdfc3602fa 100644 --- a/ml/src/main/scala/org/apache/spark/ml/h2o/models/H2OMOJOModel.scala +++ b/ml/src/main/scala/org/apache/spark/ml/h2o/models/H2OMOJOModel.scala @@ -21,7 +21,7 @@ import java.io.ByteArrayInputStream import java.util import _root_.hex.genmodel.MojoReaderBackendFactory -import _root_.hex.genmodel.descriptor.JsonModelDescriptorReader +import _root_.hex.genmodel.attributes.ModelJsonReader import com.google.gson.{GsonBuilder, JsonElement} import hex.ModelCategory import hex.genmodel.easy.EasyPredictModelWrapper @@ -175,7 +175,8 @@ object H2OMOJOModel extends H2OMOJOReadable[PyH2OMOJOModel] with H2OMOJOLoader[P val is = new ByteArrayInputStream(mojoData) val reader = MojoReaderBackendFactory.createReaderBackend(is, MojoReaderBackendFactory.CachingStrategy.MEMORY) - val modelOutputJson = JsonModelDescriptorReader.parseModelJson(reader).getAsJsonObject("output") + + val modelOutputJson = ModelJsonReader.parseModelJson(reader).getAsJsonObject("output") if (modelOutputJson == null) { "Model details not available!" } else { diff --git a/py/pysparkling/context.py b/py/pysparkling/context.py index c4f96f1e12..5806578f4c 100644 --- a/py/pysparkling/context.py +++ b/py/pysparkling/context.py @@ -11,6 +11,8 @@ import warnings import atexit import sys +from h2o.utils.typechecks import assert_is_type, Enum + def _monkey_patch_H2OFrame(hc): @staticmethod @@ -193,8 +195,9 @@ def stop(self): self.__stop() sys.exit() - def download_h2o_logs(self, destination): - return self._jhc.h2oContext().downloadH2OLogs(destination) + def download_h2o_logs(self, destination, container = "ZIP"): + assert_is_type(container, Enum("ZIP", "LOG")) + return self._jhc.h2oContext().downloadH2OLogs(destination, container) def __str__(self): if H2OContext.is_initialized: