Skip to content

Commit

Permalink
[java8API] SPARK-964 Investigate the potential for using JDK 8 lambda…
Browse files Browse the repository at this point in the history
… expressions for the Java/Scala APIs

Author: Prashant Sharma <prashant.s@imaginea.com>
Author: Patrick Wendell <pwendell@gmail.com>

Closes apache#17 from ScrapCodes/java8-lambdas and squashes the following commits:

95850e6 [Patrick Wendell] Some doc improvements and build changes to the Java 8 patch.
85a954e [Prashant Sharma] Nit. import orderings.
673f7ac [Prashant Sharma] Added support for -java-home as well
80a13e8 [Prashant Sharma] Used fake class tag syntax
26eb3f6 [Prashant Sharma] Patrick's comments on PR.
35d8d79 [Prashant Sharma] Specified java 8 building in the docs
31d4cd6 [Prashant Sharma] Maven build to support -Pjava8-tests flag.
4ab87d3 [Prashant Sharma] Review feedback on the pr
c33dc2c [Prashant Sharma] SPARK-964, Java 8 API Support.
  • Loading branch information
ScrapCodes authored and pwendell committed Mar 4, 2014
1 parent b14ede7 commit 181ec50
Show file tree
Hide file tree
Showing 52 changed files with 1,946 additions and 551 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import java.lang.{Double => JDouble, Iterable => JIterable}
import java.io.Serializable;

/**
* A function that returns zero or more records of type Double from each input record.
*/
// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
abstract class DoubleFlatMapFunction[T] extends WrappedFunction1[T, JIterable[JDouble]]
with Serializable {
// Intentionally left blank
public interface DoubleFlatMapFunction<T> extends Serializable {
public Iterable<Double> call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

/**
* A function that returns Doubles, and can be used to construct DoubleRDDs.
*/
public interface DoubleFunction<T> extends Serializable {
public double call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import scala.reflect.ClassTag
import java.io.Serializable;

/**
* A function that returns zero or more output records from each input record.
*/
abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
def elementType(): ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]]
}
public interface FlatMapFunction<T, R> extends Serializable {
public Iterable<R> call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import scala.reflect.ClassTag
import java.io.Serializable;

/**
* A function that takes two inputs and returns zero or more output records.
*/
abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
}
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
public Iterable<R> call(T1 t1, T2 t2) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import scala.reflect.ClassTag
import org.apache.spark.api.java.JavaSparkContext
import java.io.Serializable;

/**
* Base class for functions whose return types do not create special RDDs. PairFunction and
* Base interface for functions whose return types do not create special RDDs. PairFunction and
* DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
* when mapping RDDs of other types.
*/
abstract class Function[T, R] extends WrappedFunction1[T, R] with Serializable {
def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
public interface Function<T1, R> extends Serializable {
public R call(T1 v1) throws Exception;
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import scala.reflect.ClassTag
import org.apache.spark.api.java.JavaSparkContext
import java.io.Serializable;

/**
* A two-argument function that takes arguments of type T1 and T2 and returns an R.
*/
abstract class Function2[T1, T2, R] extends WrappedFunction2[T1, T2, R] with Serializable {
def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
public interface Function2<T1, T2, R> extends Serializable {
public R call(T1 v1, T2 v2) throws Exception;
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import org.apache.spark.api.java.JavaSparkContext
import scala.reflect.ClassTag
import java.io.Serializable;

/**
* A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
*/
abstract class Function3[T1, T2, T3, R] extends WrappedFunction3[T1, T2, T3, R] with Serializable {
def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
public interface Function3<T1, T2, T3, R> extends Serializable {
public R call(T1 v1, T2 v2, T3 v3) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

import scala.Tuple2;

/**
* A function that returns zero or more key-value pair records from each input record. The
* key-value pairs are represented as scala.Tuple2 objects.
*/
public interface PairFlatMapFunction<T, K, V> extends Serializable {
public Iterable<Tuple2<K, V>> call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

import scala.Tuple2;

/**
* A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
*/
public interface PairFunction<T, K, V> extends Serializable {
public Tuple2<K, V> call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

/**
* A function with no return value.
*/
public interface VoidFunction<T> extends Serializable {
public void call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[JDouble, java.lang.Boolean]): JavaDoubleRDD =
fromRDD(srdd.filter(x => f(x).booleanValue()))
fromRDD(srdd.filter(x => f.call(x).booleanValue()))

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
Expand Down
67 changes: 38 additions & 29 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -89,7 +89,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
new JavaPairRDD[K, V](rdd.filter(x => f.call(x).booleanValue()))

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
Expand Down Expand Up @@ -165,9 +165,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Simplified version of combineByKey that hash-partitions the output RDD.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
numPartitions: Int): JavaPairRDD[K, C] =
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
numPartitions: Int): JavaPairRDD[K, C] =
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))

/**
Expand Down Expand Up @@ -442,7 +442,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
def fn = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
}
Expand Down Expand Up @@ -511,49 +511,49 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: JobConf) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: JobConf) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
}

/** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
codec: Class[_ <: CompressionCodec]) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
codec: Class[_ <: CompressionCodec]) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: Configuration) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: Configuration) {
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
}

Expand Down Expand Up @@ -700,6 +700,15 @@ object JavaPairRDD {

implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd

private[spark]
implicit def toScalaFunction2[T1, T2, R](fun: JFunction2[T1, T2, R]): Function2[T1, T2, R] = {
(x: T1, x1: T2) => fun.call(x, x1)
}

private[spark] implicit def toScalaFunction[T, R](fun: JFunction[T, R]): T => R = x => fun.call(x)

private[spark]
implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) = y => x.call(y)

/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
wrapRDD(rdd.filter((x => f(x).booleanValue())))
wrapRDD(rdd.filter((x => f.call(x).booleanValue())))

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
Expand Down
Loading

0 comments on commit 181ec50

Please sign in to comment.