Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs #17

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -15,16 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import java.lang.{Double => JDouble, Iterable => JIterable}
import java.io.Serializable;

/**
* A function that returns zero or more records of type Double from each input record.
*/
// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
abstract class DoubleFlatMapFunction[T] extends WrappedFunction1[T, JIterable[JDouble]]
with Serializable {
// Intentionally left blank
public interface DoubleFlatMapFunction<T> extends Serializable {
public Iterable<Double> call(T t) throws Exception;
}
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

/**
* A function that returns Doubles, and can be used to construct DoubleRDDs.
*/
public interface DoubleFunction<T> extends Serializable {
public double call(T t) throws Exception;
}
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import scala.reflect.ClassTag
import java.io.Serializable;

/**
* A function that returns zero or more output records from each input record.
*/
abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
def elementType(): ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]]
}
public interface FlatMapFunction<T, R> extends Serializable {
public Iterable<R> call(T t) throws Exception;
}
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import scala.reflect.ClassTag
import java.io.Serializable;

/**
* A function that takes two inputs and returns zero or more output records.
*/
abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
}
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
public Iterable<R> call(T1 t1, T2 t2) throws Exception;
}
Expand Up @@ -15,17 +15,15 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import scala.reflect.ClassTag
import org.apache.spark.api.java.JavaSparkContext
import java.io.Serializable;

/**
* Base class for functions whose return types do not create special RDDs. PairFunction and
* Base interface for functions whose return types do not create special RDDs. PairFunction and
* DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
* when mapping RDDs of other types.
*/
abstract class Function[T, R] extends WrappedFunction1[T, R] with Serializable {
def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
public interface Function<T1, R> extends Serializable {
public R call(T1 v1) throws Exception;
}

Expand Up @@ -15,15 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import scala.reflect.ClassTag
import org.apache.spark.api.java.JavaSparkContext
import java.io.Serializable;

/**
* A two-argument function that takes arguments of type T1 and T2 and returns an R.
*/
abstract class Function2[T1, T2, R] extends WrappedFunction2[T1, T2, R] with Serializable {
def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
public interface Function2<T1, T2, R> extends Serializable {
public R call(T1 v1, T2 v2) throws Exception;
}

Expand Up @@ -15,14 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.api.java.function
package org.apache.spark.api.java.function;

import org.apache.spark.api.java.JavaSparkContext
import scala.reflect.ClassTag
import java.io.Serializable;

/**
* A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
*/
abstract class Function3[T1, T2, T3, R] extends WrappedFunction3[T1, T2, T3, R] with Serializable {
def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
public interface Function3<T1, T2, T3, R> extends Serializable {
public R call(T1 v1, T2 v2, T3 v3) throws Exception;
}
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

import scala.Tuple2;

/**
* A function that returns zero or more key-value pair records from each input record. The
* key-value pairs are represented as scala.Tuple2 objects.
*/
public interface PairFlatMapFunction<T, K, V> extends Serializable {
public Iterable<Tuple2<K, V>> call(T t) throws Exception;
}
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

import scala.Tuple2;

/**
* A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
*/
public interface PairFunction<T, K, V> extends Serializable {
public Tuple2<K, V> call(T t) throws Exception;
}
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

/**
* A function with no return value.
*/
public interface VoidFunction<T> extends Serializable {
public void call(T t) throws Exception;
}
Expand Up @@ -83,7 +83,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[JDouble, java.lang.Boolean]): JavaDoubleRDD =
fromRDD(srdd.filter(x => f(x).booleanValue()))
fromRDD(srdd.filter(x => f.call(x).booleanValue()))

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
Expand Down
67 changes: 38 additions & 29 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -89,7 +89,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
new JavaPairRDD[K, V](rdd.filter(x => f.call(x).booleanValue()))

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
Expand Down Expand Up @@ -165,9 +165,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Simplified version of combineByKey that hash-partitions the output RDD.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
numPartitions: Int): JavaPairRDD[K, C] =
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
numPartitions: Int): JavaPairRDD[K, C] =
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))

/**
Expand Down Expand Up @@ -442,7 +442,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
def fn = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
}
Expand Down Expand Up @@ -511,49 +511,49 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: JobConf) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: JobConf) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
}

/** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
codec: Class[_ <: CompressionCodec]) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
codec: Class[_ <: CompressionCodec]) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: Configuration) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: Configuration) {
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]) {
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
}

Expand Down Expand Up @@ -700,6 +700,15 @@ object JavaPairRDD {

implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd

private[spark]
implicit def toScalaFunction2[T1, T2, R](fun: JFunction2[T1, T2, R]): Function2[T1, T2, R] = {
(x: T1, x1: T2) => fun.call(x, x1)
}

private[spark] implicit def toScalaFunction[T, R](fun: JFunction[T, R]): T => R = x => fun.call(x)

private[spark]
implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) = y => x.call(y)

/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
Expand Down
Expand Up @@ -70,7 +70,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
wrapRDD(rdd.filter((x => f(x).booleanValue())))
wrapRDD(rdd.filter((x => f.call(x).booleanValue())))

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
Expand Down