Skip to content

Commit

Permalink
[SW-302] Upgrade Spark dependency to Spark 2.1.0
Browse files Browse the repository at this point in the history
Includes:
  - Spark dependency upgraded to Spark 2.1.0
  - Documentation updated
  - Test fixed: Added a parameter in the test to specify whether we allow plain (non-primitive) values to be nullable. A new version of Spark seems to suddenly decide that thay are nullable by default.
  • Loading branch information
mmalohlava committed Mar 3, 2017
1 parent d8a53d6 commit ed119e9
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 20 deletions.
14 changes: 9 additions & 5 deletions README.md
Expand Up @@ -29,21 +29,24 @@ The Sparkling Water is developed in multiple parallel branches.
Each branch corresponds to a Spark major release (e.g., branch **rel-1.5** provides implementation of Sparkling Water for Spark **1.5**).

Please, switch to the right branch:
- For Spark 2.1 use branch [rel-2.1](https://github.com/h2oai/sparkling-water/tree/rel-2.1)
- For Spark 2.0 use branch [rel-2.0](https://github.com/h2oai/sparkling-water/tree/rel-2.0)
- For Spark 1.6 use branch [rel-1.6](https://github.com/h2oai/sparkling-water/tree/rel-1.6)
- For Spark 1.5 use branch [rel-1.5](https://github.com/h2oai/sparkling-water/tree/rel-1.5)
- For Spark 1.4 use branch [rel-1.4](https://github.com/h2oai/sparkling-water/tree/rel-1.4)
- For Spark 1.3 use branch [rel-1.3](https://github.com/h2oai/sparkling-water/tree/rel-1.3)

> Note: The [master](https://github.com/h2oai/sparkling-water/tree/master) branch includes the latest changes
> **Note** Older releases are available here:
> - For Spark 1.5 use branch [rel-1.5](https://github.com/h2oai/sparkling-water/tree/rel-1.5)
> - For Spark 1.4 use branch [rel-1.4](https://github.com/h2oai/sparkling-water/tree/rel-1.4)
> - For Spark 1.3 use branch [rel-1.3](https://github.com/h2oai/sparkling-water/tree/rel-1.3)
> **Note** The [master](https://github.com/h2oai/sparkling-water/tree/master) branch includes the latest changes
for the latest Spark version. They are back-ported into older Sparkling Water versions.

<a name="Req"></a>
### Requirements

* Linux/OS X/Windows
* Java 7+
* [Spark 1.3+](https://spark.apache.org/downloads.html)
* [Spark 1.6+](https://spark.apache.org/downloads.html)
* `SPARK_HOME` shell variable must point to your local Spark installation

---
Expand Down Expand Up @@ -83,6 +86,7 @@ pip install future
### Download Binaries
For each Sparkling Water you can download binaries here:
* [Sparkling Water - Latest version](http://h2o-release.s3.amazonaws.com/sparkling-water/master/latest.html)
* [Sparkling Water - Latest 2.1 version](http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.1/latest.html)
* [Sparkling Water - Latest 2.0 version](http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.0/latest.html)
* [Sparkling Water - Latest 1.6 version](http://h2o-release.s3.amazonaws.com/sparkling-water/rel-1.6/latest.html)
* [Sparkling Water - Latest 1.5 version](http://h2o-release.s3.amazonaws.com/sparkling-water/rel-1.5/latest.html)
Expand Down
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION

/**
* Spark session utils which can access Spark private API.
Expand Down
Expand Up @@ -96,15 +96,16 @@ object H2OSchemaUtils {
* @return list of types with their positions
*/
def expandedSchema(sc: SparkContext, srdd: DataFrame): Seq[(Seq[Int], StructField, Byte)] = {
val schema: StructType = srdd.schema
// Collect max size in array and vector columns to expand them
val arrayColIdxs = collectArrayLikeTypes(srdd.schema.fields)
val vecColIdxs = collectVectorLikeTypes(srdd.schema.fields)
val arrayColIdxs = collectArrayLikeTypes(schema.fields)
val vecColIdxs = collectVectorLikeTypes(schema.fields)
val numOfArrayCols = arrayColIdxs.length
// Collect max arrays for this RDD, it is distributed operation
val fmaxLens = collectMaxArrays(sc, srdd.rdd, arrayColIdxs, vecColIdxs)
// Flattens RDD's schema
val flatRddSchema = flatSchema(srdd.schema)
val typeIndx = collectTypeIndx(srdd.schema.fields)
val flatRddSchema = flatSchema(schema)
val typeIndx = collectTypeIndx(schema.fields)
val typesAndPath = typeIndx
.zip(flatRddSchema) // Seq[(Seq[Int], StructField)]
var arrayCnt = 0; var vecCnt = 0
Expand Down
Expand Up @@ -27,23 +27,22 @@ import org.apache.spark.h2o.utils.H2OSchemaUtils.flatSchema
import org.apache.spark.h2o.utils.{H2OSchemaUtils, SharedSparkTestContext}
import org.apache.spark.h2o.{Frame => _, H2OFrame => _}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner
import org.scalatest.{Assertions, FunSuite}
import water.Key
import water.fvec._
import water.parser.BufferedString
import water.{DKV, Key}

import scala.reflect.ClassTag

/**
* Testing Conversions between H2OFrame and Spark DataFrame
*/
@RunWith(classOf[JUnitRunner])
class DataFrameConverterTest extends FunSuite with SharedSparkTestContext {

override def createSparkContext: SparkContext = new SparkContext("local[*]", "test-local", conf = defaultSparkConf)

test("Creation of H2ODataFrame") {
Expand Down Expand Up @@ -425,13 +424,24 @@ class DataFrameConverterTest extends FunSuite with SharedSparkTestContext {
import spark.implicits._
val num = 2
val values = (1 to num).map(x => ComposedA(PrimitiveA(x, "name=" + x), x * 1.0))
val df = sc.parallelize(values).toDF
val rdd: RDD[ComposedA] = sc.parallelize(values)
val df = rdd.toDF

val expectObjectsNullableByDefault = true

val expandedSchema = H2OSchemaUtils.expandedSchema(sc, df)
assert(expandedSchema === Vector(
val expected: Vector[(List[Int], StructField, Int)] = Vector(
(List(0, 0), StructField("a.n", IntegerType), 0),
(List(0, 1), StructField("a.name", StringType), 0),
(List(1), StructField("weight", DoubleType, nullable=false), 0)))
(List(1), StructField("weight", DoubleType, nullable = expectObjectsNullableByDefault), 0))
Assertions.assertResult(expected.length)(expandedSchema.length)

assertResult(expectObjectsNullableByDefault, "Nullability in component#2")(expandedSchema(2)._2.nullable)
for {i <- expected.indices} {
assertResult(expected(i), s"@$i")(expandedSchema(i))
}

assert(expandedSchema === expected)

// Verify transformation into dataframe
val h2oFrame = hc.asH2OFrame(df)
Expand Down Expand Up @@ -554,6 +564,7 @@ class DataFrameConverterTest extends FunSuite with SharedSparkTestContext {

test("SW-304 DateType column conversion failure") {
import java.sql.Date

import sqlContext.implicits._
val df = sc.parallelize(Seq(DateField(Date.valueOf("2016-12-24")))).toDF("created_date")
val hf = hc.asH2OFrame(df)
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
@@ -1,14 +1,14 @@
#Tue, 19 May 2015 17:15:25 -0700
group=ai.h2o
version=2.0.99999-SNAPSHOT
version=2.1.99999-SNAPSHOT
# Major version of H2O release
h2oMajorVersion=3.10.3
# Name of H2O major version
h2oMajorName=tverberg
# H2O Build version, defined here to be overriden by -P option
h2oBuild=2
# Spark version
sparkVersion=2.0.1
sparkVersion=2.1.0

# By default we build spark with scala 2.11, if you want to build sparkling
# water against spark build with scala 2.10, then this property can be overriden by -P option.
Expand Down
1 change: 1 addition & 0 deletions py/README.rst
Expand Up @@ -2,6 +2,7 @@ PySparkling and Spark Version
=============================
There exist multiple PySparkling packages, each is intended to be used with different Spark version.

- h2o_pysparkling_2.1 - for Spark 2.1.x
- h2o_pysparkling_2.0 - for Spark 2.0.x
- h2o_pysparkling_1.6 - for Spark 1.6.x
- h2o_pysparkling_1.5 - for Spark 1.5.x
Expand Down

0 comments on commit ed119e9

Please sign in to comment.