Skip to content

Commit

Permalink
SPARK-964, Java 8 API Support.
Browse files Browse the repository at this point in the history
This patch adds a few methods to java API such that it is possible to pass lambdas instead
of Anonymous classes and also java 6/7 api users can use the same apis by passing anonymous classes.
To achieve this a few older API methods are removed and replaced with their ToPair/ToDouble versions.

1) all anonymous classes extending scala Function is replaced by interfaces.

2) Adds optional to run java 8 tests

Please refer to PR comments for more details.
  • Loading branch information
ScrapCodes committed Mar 3, 2014
1 parent 55a4f11 commit c33dc2c
Show file tree
Hide file tree
Showing 45 changed files with 1,581 additions and 506 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import scala.reflect.ClassTag
import java.io.Serializable;

/**
* A function that returns zero or more output records from each input record.
*/
abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
def elementType(): ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]]
public interface DoubleFlatMapFunction<T> extends Serializable {
public Iterable<Double> call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import scala.reflect.ClassTag
import java.io.Serializable;

/**
* A function that takes two inputs and returns zero or more output records.
*/
abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
public interface DoubleFunction<T> extends Serializable {
public Double call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

public interface FlatMapFunction<T, R> extends Serializable {
public Iterable<R> call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

public interface FlatMapFunction2<T1, T2, R> extends Serializable {
public Iterable<R> call(T1 t1, T2 t2) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import org.apache.spark.api.java.JavaSparkContext
import scala.reflect.ClassTag
import java.io.Serializable;

/**
* A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
*/
abstract class Function3[T1, T2, T3, R] extends WrappedFunction3[T1, T2, T3, R] with Serializable {
def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
public interface Function<T1, R> extends Serializable {
public R call(T1 v1) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

public interface Function2<T1, T2, R> extends Serializable {
public R call(T1 v1, T2 v2) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,10 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import scala.reflect.ClassTag
import org.apache.spark.api.java.JavaSparkContext
import java.io.Serializable;

/**
* A two-argument function that takes arguments of type T1 and T2 and returns an R.
*/
abstract class Function2[T1, T2, R] extends WrappedFunction2[T1, T2, R] with Serializable {
def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
public interface Function3<T1, T2, T3, R> extends Serializable {
public R call(T1 v1, T2 v2, T3 v3) throws Exception;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import scala.Tuple2;

import java.io.Serializable;

public interface PairFlatMapFunction<T, K, V> extends Serializable {
public Iterable<Tuple2<K, V>> call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;
import scala.Tuple2;

public interface PairFunction<T, K, V> extends Serializable {
public Tuple2<K, V> call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

public interface VoidFunction<T> extends Serializable {
public void call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[JDouble, java.lang.Boolean]): JavaDoubleRDD =
fromRDD(srdd.filter(x => f(x).booleanValue()))
fromRDD(srdd.filter(x => f.call(x).booleanValue()))

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
Expand Down
68 changes: 37 additions & 31 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -89,7 +89,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
new JavaPairRDD[K, V](rdd.filter(x => f.call(x).booleanValue()))

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
Expand Down Expand Up @@ -165,9 +165,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Simplified version of combineByKey that hash-partitions the output RDD.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
numPartitions: Int): JavaPairRDD[K, C] =
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
numPartitions: Int): JavaPairRDD[K, C] =
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))

/**
Expand Down Expand Up @@ -442,7 +442,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
def fn = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
}
Expand Down Expand Up @@ -511,49 +511,49 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: JobConf) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: JobConf) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
}

/** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
codec: Class[_ <: CompressionCodec]) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
codec: Class[_ <: CompressionCodec]) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: Configuration) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: Configuration) {
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
}

Expand Down Expand Up @@ -699,8 +699,14 @@ object JavaPairRDD {
}

implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd


private[spark]
implicit def toJFunction2[T1, T2, R](fun: JFunction2[T1, T2, R])
: Function2[T1, T2, R] = (x: T1, x1: T2) => fun.call(x, x1)
private[spark]
implicit def toJFunction[T, R](fun: JFunction[T, R]): T => R = (x) => fun.call(x)
private[spark]
implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) =
(y: A) => x.call(y)
/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
wrapRDD(rdd.filter((x => f(x).booleanValue())))
wrapRDD(rdd.filter((x => f.call(x).booleanValue())))

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
Expand Down
Loading

0 comments on commit c33dc2c

Please sign in to comment.