From cc745c5b15ab0cac6dc4d3d13f9486743571a4a4 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 5 Apr 2016 13:16:56 -0700 Subject: [PATCH] Tue Apr 5 13:16:56 PDT 2016 --- external/java8-tests/pom.xml | 12 +++ .../sql/Java8DatasetAggregatorSuite.java | 61 +++++++++++++ .../sources/JavaDatasetAggregatorSuite.java | 86 ++++++++++--------- 3 files changed, 118 insertions(+), 41 deletions(-) create mode 100644 external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java diff --git a/external/java8-tests/pom.xml b/external/java8-tests/pom.xml index f5a06467ee59f..1ea9196e9dfe3 100644 --- a/external/java8-tests/pom.xml +++ b/external/java8-tests/pom.xml @@ -58,6 +58,18 @@ test-jar test + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + test-jar + test + org.apache.spark spark-test-tags_${scala.binary.version} diff --git a/external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java b/external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java new file mode 100644 index 0000000000000..23abfa397061d --- /dev/null +++ b/external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources; + +import java.util.Arrays; + +import org.junit.Assert; +import org.junit.Test; +import scala.Tuple2; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.KeyValueGroupedDataset; +import org.apache.spark.sql.expressions.java.typed; + +/** + * Suite that replicates tests in JavaDatasetAggregatorSuite using lambda syntax. + */ +public class Java8DatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase { + @Test + public void testTypedAggregationAverage() { + KeyValueGroupedDataset> grouped = generateGroupedDataset(); + Dataset> agged = grouped.agg(typed.avg(v -> (double)(v._2() * 2))); + Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 6.0)), agged.collectAsList()); + } + + @Test + public void testTypedAggregationCount() { + KeyValueGroupedDataset> grouped = generateGroupedDataset(); + Dataset> agged = grouped.agg(typed.count(v -> v)); + Assert.assertEquals(Arrays.asList(tuple2("a", 2), tuple2("b", 1)), agged.collectAsList()); + } + + @Test + public void testTypedAggregationSumDouble() { + KeyValueGroupedDataset> grouped = generateGroupedDataset(); + Dataset> agged = grouped.agg(typed.sum(v -> (double)v._2())); + Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 3.0)), agged.collectAsList()); + } + + @Test + public void testTypedAggregationSumLong() { + KeyValueGroupedDataset> grouped = generateGroupedDataset(); + Dataset> agged = grouped.agg(typed.sumLong(v -> (long)v._2())); + Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList()); + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java index c8d0eecd5c707..594f4675bd1f8 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java @@ -41,46 +41,7 @@ /** * Suite for testing the aggregate functionality of Datasets in Java. */ -public class JavaDatasetAggregatorSuite implements Serializable { - private transient JavaSparkContext jsc; - private transient TestSQLContext context; - - @Before - public void setUp() { - // Trigger static initializer of TestData - SparkContext sc = new SparkContext("local[*]", "testing"); - jsc = new JavaSparkContext(sc); - context = new TestSQLContext(sc); - context.loadTestData(); - } - - @After - public void tearDown() { - context.sparkContext().stop(); - context = null; - jsc = null; - } - - private Tuple2 tuple2(T1 t1, T2 t2) { - return new Tuple2<>(t1, t2); - } - - private KeyValueGroupedDataset> generateGroupedDataset() { - Encoder> encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT()); - List> data = - Arrays.asList(tuple2("a", 1), tuple2("a", 2), tuple2("b", 3)); - Dataset> ds = context.createDataset(data, encoder); - - return ds.groupByKey( - new MapFunction, String>() { - @Override - public String call(Tuple2 value) throws Exception { - return value._1(); - } - }, - Encoders.STRING()); - } - +public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase { @Test public void testTypedAggregationAnonClass() { KeyValueGroupedDataset> grouped = generateGroupedDataset(); @@ -100,7 +61,6 @@ public void testTypedAggregationAnonClass() { } static class IntSumOf extends Aggregator, Integer, Integer> { - @Override public Integer zero() { return 0; @@ -170,3 +130,47 @@ public Long call(Tuple2 value) throws Exception { Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList()); } } + +/** + * Common test base shared across this and Java8DatasetAggregatorSuite. + */ +class JavaDatasetAggregatorSuiteBase implements Serializable { + protected transient JavaSparkContext jsc; + protected transient TestSQLContext context; + + @Before + public void setUp() { + // Trigger static initializer of TestData + SparkContext sc = new SparkContext("local[*]", "testing"); + jsc = new JavaSparkContext(sc); + context = new TestSQLContext(sc); + context.loadTestData(); + } + + @After + public void tearDown() { + context.sparkContext().stop(); + context = null; + jsc = null; + } + + protected Tuple2 tuple2(T1 t1, T2 t2) { + return new Tuple2<>(t1, t2); + } + + protected KeyValueGroupedDataset> generateGroupedDataset() { + Encoder> encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT()); + List> data = + Arrays.asList(tuple2("a", 1), tuple2("a", 2), tuple2("b", 3)); + Dataset> ds = context.createDataset(data, encoder); + + return ds.groupByKey( + new MapFunction, String>() { + @Override + public String call(Tuple2 value) throws Exception { + return value._1(); + } + }, + Encoders.STRING()); + } +}