Skip to content

Commit

Permalink
[SW-1421] Integrate H2O 3.26.0.1 (#1348)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubhava committed Jul 16, 2019
1 parent bc3035a commit c795a52
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 69 deletions.
106 changes: 44 additions & 62 deletions core/src/main/scala/org/apache/spark/h2o/utils/H2OContextUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@

package org.apache.spark.h2o.utils

import java.io.{ByteArrayOutputStream, File}
import java.io.File
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
import java.util.zip.{ZipEntry, ZipOutputStream}

import org.apache.spark.SparkContext
import org.apache.spark.h2o.{BuildInfo, H2OConf}
import water.H2O
import water.api.ImportHiveTableHandler
import water.api.ImportHiveTableHandler.HiveTableImporter
import water.api.{ImportHiveTableHandler, RequestServer}
import water.fvec.Frame
import water.util.{GetLogsFromNode, Log, StringUtils}
import water.util.{GetLogsFromNode, Log, LogArchiveContainer, StringUtils}

import scala.language.postfixOps

Expand Down Expand Up @@ -93,33 +92,27 @@ private[spark] trait H2OContextUtils extends Logging {
}
}

/**
* @param destination directory where the logs will be downloaded
*/
def downloadH2OLogs(destination: URI): URI = {
val perNodeZipByteArray = H2O.CLOUD.members.zipWithIndex.map { case (node, i) =>
// Skip nodes that aren't healthy, since they are likely to cause the entire process to hang.
private def getLogsFromWorkers(logContainer: LogArchiveContainer): Array[Array[Byte]] = {
H2O.CLOUD.members.zipWithIndex.map { case (node, i) =>
try {
if (node.isHealthy) {
val g = new GetLogsFromNode()
g.nodeidx = i
val g = new GetLogsFromNode(i, logContainer)
g.doIt()
g.bytes
}
else {
} else {
StringUtils.bytesOf("Node not healthy")
}
}
catch {
case e: Exception =>
StringUtils.toBytes(e)
case e: Exception => StringUtils.toBytes(e);
}
}
}

val clientNodeByteArray = if (H2O.ARGS.client) {
private def getLogsFromClient(logContainer: LogArchiveContainer): Array[Byte] = {
if (H2O.ARGS.client) {
try {
val g = new GetLogsFromNode
g.nodeidx = -1
val g = new GetLogsFromNode(-1, logContainer)
g.doIt()
g.bytes
} catch {
Expand All @@ -129,62 +122,51 @@ private[spark] trait H2OContextUtils extends Logging {
} else {
null
}

val outputFileStem = "h2ologs_" + new SimpleDateFormat("yyyyMMdd_hhmmss").format(new Date)
zipLogs(perNodeZipByteArray, clientNodeByteArray, outputFileStem, destination)
}

def downloadH2OLogs(destination: String): String = {
downloadH2OLogs(new URI(destination))
destination
}

/** Zip the H2O logs and store them to specified destination */
private def zipLogs(results: Array[Array[Byte]], clientResult: Array[Byte], topDir: String, destination: URI): URI = {
assert(H2O.CLOUD._memary.length == results.length, "Unexpected change in the cloud!")
val l = results.map(_.length).sum
val baos = new ByteArrayOutputStream(l)
// Add top-level directory.
val zos = new ZipOutputStream(baos)
val zde = new ZipEntry(topDir + File.separator)
zos.putNextEntry(zde)

try {
// Add zip directory from each cloud member.
results.zipWithIndex.foreach { case (result, i) =>
val filename = topDir + File.separator + "node" + i + "_" + H2O.CLOUD._memary(i).getIpPortString.replace(':', '_').replace('/', '_') + ".zip"
val ze = new ZipEntry(filename)
zos.putNextEntry(ze)
zos.write(result)
zos.closeEntry()
}
/**
* @param destination directory where the logs will be downloaded
*/
def downloadH2OLogs(destination: URI, logContainer: LogArchiveContainer): URI = {
val workersLogs = getLogsFromWorkers(logContainer)
val clientLogs = getLogsFromClient(logContainer)
val outputFileStem = "h2ologs_" + new SimpleDateFormat("yyyyMMdd_hhmmss").format(new Date)

// Add zip directory from the client node. Name it 'driver' since that's what Sparking Water users see.
if (clientResult != null) {
val filename = topDir + File.separator + "driver.zip"
val ze = new ZipEntry(filename)
zos.putNextEntry(ze)
zos.write(clientResult)
zos.closeEntry()
}
// Close the top-level directory.
zos.closeEntry()
} finally {
// Close the full zip file.
zos.close()
val finalArchiveByteArray = try {
val method = classOf[RequestServer].getDeclaredMethod("archiveLogs", classOf[LogArchiveContainer],
classOf[Date], classOf[Array[Array[Byte]]], classOf[Array[Byte]], classOf[String])
method.setAccessible(true)
val res = method.invoke(null, logContainer, new Date, workersLogs, clientLogs, outputFileStem)
res.asInstanceOf[Array[Byte]]
} catch {
case e: Exception => StringUtils.toBytes(e)
}

import java.io.FileOutputStream
val outputStream = new FileOutputStream(destination.toString)
val destinationFile = new File(destination.toString, outputFileStem + "." + logContainer.getFileExtension)
val outputStream = new FileOutputStream(destinationFile)
try {
baos.writeTo(outputStream)
outputStream.write(finalArchiveByteArray)
}
finally {
if (outputStream != null) outputStream.close()
outputStream.close()
}
destinationFile.toURI
}

def downloadH2OLogs(destination: URI, logContainer: String): URI = {
downloadH2OLogs(destination, LogArchiveContainer.valueOf(logContainer))
}

def downloadH2OLogs(destination: String, logArchiveContainer: LogArchiveContainer): String = {
downloadH2OLogs(new URI(destination), logArchiveContainer)
destination
}

def downloadH2OLogs(destination: String, logContainer: String = "ZIP"): String = {
downloadH2OLogs(destination, LogArchiveContainer.valueOf(logContainer))
}

def importHiveTable(database: String = HiveTableImporter.DEFAULT_DATABASE, table: String,
partitions: Array[Array[String]] = null, allowMultiFormat: Boolean = false): Frame = {
val hiveTableHandler = new ImportHiveTableHandler
Expand Down
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# artifacts group
group=ai.h2o
# Major version of H2O release
h2oMajorVersion=3.24.0
h2oMajorVersion=3.26.0
# Name of H2O major version
h2oMajorName=yates
h2oMajorName=yau
# H2O Build version, defined here to be overriden by -P option
h2oBuild=5
h2oBuild=1
# Scala version for Sparkling Water. By default, for 2.11 we use 2.11.12.
scalaBaseVersion=2.11
# Internal Nexus location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.ByteArrayInputStream
import java.util

import _root_.hex.genmodel.MojoReaderBackendFactory
import _root_.hex.genmodel.descriptor.JsonModelDescriptorReader
import _root_.hex.genmodel.attributes.ModelJsonReader
import com.google.gson.{GsonBuilder, JsonElement}
import hex.ModelCategory
import hex.genmodel.easy.EasyPredictModelWrapper
Expand Down Expand Up @@ -175,7 +175,8 @@ object H2OMOJOModel extends H2OMOJOReadable[PyH2OMOJOModel] with H2OMOJOLoader[P
val is = new ByteArrayInputStream(mojoData)
val reader = MojoReaderBackendFactory.createReaderBackend(is, MojoReaderBackendFactory.CachingStrategy.MEMORY)

val modelOutputJson = JsonModelDescriptorReader.parseModelJson(reader).getAsJsonObject("output")

val modelOutputJson = ModelJsonReader.parseModelJson(reader).getAsJsonObject("output")
if (modelOutputJson == null) {
"Model details not available!"
} else {
Expand Down
7 changes: 5 additions & 2 deletions py/pysparkling/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import warnings
import atexit
import sys
from h2o.utils.typechecks import assert_is_type, Enum


def _monkey_patch_H2OFrame(hc):
@staticmethod
Expand Down Expand Up @@ -193,8 +195,9 @@ def stop(self):
self.__stop()
sys.exit()

def download_h2o_logs(self, destination):
return self._jhc.h2oContext().downloadH2OLogs(destination)
def download_h2o_logs(self, destination, container = "ZIP"):
assert_is_type(container, Enum("ZIP", "LOG"))
return self._jhc.h2oContext().downloadH2OLogs(destination, container)

def __str__(self):
if H2OContext.is_initialized:
Expand Down

0 comments on commit c795a52

Please sign in to comment.