Permalink
Browse files

Review feedback on the pr

  • Loading branch information...
1 parent c33dc2c commit 4ab87d3551f0b74e4fb6da611a5baea7aba93c6c @ScrapCodes ScrapCodes committed Feb 25, 2014
@@ -19,6 +19,9 @@
import java.io.Serializable;
+/**
+ * A function that returns zero or more records of type Double from each input record.
+ */
public interface DoubleFlatMapFunction<T> extends Serializable {
public Iterable<Double> call(T t) throws Exception;
}
@@ -19,6 +19,9 @@
import java.io.Serializable;
+/**
+ * A function that returns Doubles, and can be used to construct DoubleRDDs.
+ */
public interface DoubleFunction<T> extends Serializable {
- public Double call(T t) throws Exception;
+ public double call(T t) throws Exception;
}
@@ -19,6 +19,9 @@
import java.io.Serializable;
+/**
+ * A function that returns zero or more output records from each input record.
+ */
public interface FlatMapFunction<T, R> extends Serializable {
public Iterable<R> call(T t) throws Exception;
}
@@ -19,6 +19,9 @@
import java.io.Serializable;
+/**
+ * A function that takes two inputs and returns zero or more output records.
+ */
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
public Iterable<R> call(T1 t1, T2 t2) throws Exception;
}
@@ -19,6 +19,11 @@
import java.io.Serializable;
+/**
+ * Base interface for functions whose return types do not create special RDDs. PairFunction and
+ * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
+ * when mapping RDDs of other types.
+ */
public interface Function<T1, R> extends Serializable {
public R call(T1 v1) throws Exception;
}
@@ -19,6 +19,9 @@
import java.io.Serializable;
+/**
+ * A two-argument function that takes arguments of type T1 and T2 and returns an R.
+ */
public interface Function2<T1, T2, R> extends Serializable {
public R call(T1 v1, T2 v2) throws Exception;
}
@@ -19,6 +19,9 @@
import java.io.Serializable;
+/**
+ * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
+ */
public interface Function3<T1, T2, T3, R> extends Serializable {
public R call(T1 v1, T2 v2, T3 v3) throws Exception;
}
@@ -21,6 +21,11 @@
import java.io.Serializable;
+
+/**
+ * A function that returns zero or more key-value pair records from each input record. The
+ * key-value pairs are represented as scala.Tuple2 objects.
+ */
public interface PairFlatMapFunction<T, K, V> extends Serializable {
- public Iterable<Tuple2<K, V>> call(T t) throws Exception;
+ public Iterable<Tuple2<K, V>> call(T t) throws Exception;
}
@@ -17,9 +17,13 @@
package org.apache.spark.api.java.function;
-import java.io.Serializable;
import scala.Tuple2;
+import java.io.Serializable;
+
+/**
+ * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
+ */
public interface PairFunction<T, K, V> extends Serializable {
- public Tuple2<K, V> call(T t) throws Exception;
+ public Tuple2<K, V> call(T t) throws Exception;
}
@@ -19,6 +19,9 @@
import java.io.Serializable;
+/**
+ * A function with no return value.
+ */
public interface VoidFunction<T> extends Serializable {
public void call(T t) throws Exception;
}
@@ -699,14 +699,17 @@ object JavaPairRDD {
}
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
+
private[spark]
- implicit def toJFunction2[T1, T2, R](fun: JFunction2[T1, T2, R])
- : Function2[T1, T2, R] = (x: T1, x1: T2) => fun.call(x, x1)
- private[spark]
- implicit def toJFunction[T, R](fun: JFunction[T, R]): T => R = (x) => fun.call(x)
+ implicit def toScalaFunction2[T1, T2, R](fun: JFunction2[T1, T2, R]): Function2[T1, T2, R] = {
+ (x: T1, x1: T2) => fun.call(x, x1)
+ }
+
+ private[spark] implicit def toScalaFunction[T, R](fun: JFunction[T, R]): T => R = x => fun.call(x)
+
private[spark]
- implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) =
- (y: A) => x.call(y)
+ implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) = y => x.call(y)
+
/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
@@ -90,7 +90,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[(K2, V2)]]
new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -121,7 +121,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x).asScala
- def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -388,7 +388,7 @@ public void map() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
@Override
- public Double call(Integer x) {
+ public double call(Integer x) {
return 1.0 * x;
}
}).cache();
@@ -768,7 +768,7 @@ public void zip() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
@Override
- public Double call(Integer x) {
+ public double call(Integer x) {
return 1.0 * x;
}
});
@@ -30,6 +30,12 @@ There are a few key differences between the Java and Scala APIs:
classes for key-value pairs and doubles. For example,
[`JavaPairRDD`](api/core/index.html#org.apache.spark.api.java.JavaPairRDD)
stores key-value pairs.
+* To support upcoming java 8 lambda expression, methods are defined on the basis of
+ the passed anonymous function's (a.k.a lambda expression) return type,
+ for example mapToPair(...) or flatMapToPair returns
+ [`JavaPairRDD`](api/core/index.html#org.apache.spark.api.java.JavaPairRDD),
+ similarly mapToDouble and flatMapToDouble returns
+ [`JavaDoubleRDD`](api/core/index.html#org.apache.spark.api.java.JavaDoubleRDD).
* RDD methods like `collect()` and `countByKey()` return Java collections types,
such as `java.util.List` and `java.util.Map`.
* Key-value pairs, which are simply written as `(key, value)` in Scala, are represented
@@ -78,7 +84,6 @@ RDD [storage level](scala-programming-guide.html#rdd-persistence) constants, suc
declared in the [org.apache.spark.api.java.StorageLevels](api/core/index.html#org.apache.spark.api.java.StorageLevels) class. To
define your own storage level, you can use StorageLevels.create(...).
-
# Other Features
The Java API supports other Spark features, including
@@ -127,11 +132,20 @@ class Split extends FlatMapFunction<String, String> {
JavaRDD<String> words = lines.flatMap(new Split());
{% endhighlight %}
+Java 8+ users can also possibly write the above `FlatMapFunction` in a more concise way using
+lambda expression as follows:
+
+{% highlight java %}
+JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(s.split(" ")));
+{% endhighlight %}
+
+Same possibility applies to all passed in anonymous classes in java 8.
+
Continuing with the word count example, we map each word to a `(word, 1)` pair:
{% highlight java %}
import scala.Tuple2;
-JavaPairRDD<String, Integer> ones = words.map(
+JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2(s, 1);
@@ -140,7 +154,7 @@ JavaPairRDD<String, Integer> ones = words.map(
);
{% endhighlight %}
-Note that `map` was passed a `PairFunction<String, String, Integer>` and
+Note that `mapToPair` was passed a `PairFunction<String, String, Integer>` and
returned a `JavaPairRDD<String, Integer>`.
To finish the word count program, we will use `reduceByKey` to count the
@@ -164,7 +178,7 @@ possible to chain the RDD transformations, so the word count example could also
be written as:
{% highlight java %}
-JavaPairRDD<String, Integer> counts = lines.flatMap(
+JavaPairRDD<String, Integer> counts = lines.flatMapToPair(
...
).map(
...
@@ -17,26 +17,27 @@
package org.apache.spark;
+import java.io.File;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
import com.google.common.base.Optional;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.spark.api.java.JavaDoubleRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import scala.Tuple2;
-
-import java.io.File;
-import java.io.Serializable;
-import java.util.*;
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.*;
/**
* Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
@@ -221,12 +222,12 @@ public void mapsFromPairsToPairs() {
public void mapPartitions() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
- int sum = 0;
- while (iter.hasNext()) {
- sum += iter.next();
- }
- return Collections.singletonList(sum);
- });
+ int sum = 0;
+ while (iter.hasNext()) {
+ sum += iter.next();
+ }
+ return Collections.singletonList(sum);
+ });
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
}
@@ -242,8 +243,8 @@ public void sequenceFile() {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.mapToPair(
- pair -> new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())))
+ rdd.mapToPair(pair ->
+ new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())))
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
// Try reading the output back as an object file
@@ -17,21 +17,23 @@
package org.apache.spark.streaming;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.junit.Assert;
-import org.junit.Test;
-import scala.Tuple2;
-
-import java.io.Serializable;
-import java.util.*;
/**
* Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
@@ -329,7 +331,8 @@ public void testStreamingContextTransform() {
return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
});
JavaTestUtils.attachTestOutputStream(transformed2);
- List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
+ JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -475,7 +478,8 @@ public void testPairMap() { // Maps pair -> pair of different type
new Tuple2<Integer, String>(3, "new york"),
new Tuple2<Integer, String>(1, "new york")));
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap());
JavaTestUtils.attachTestOutputStream(reversed);
@@ -500,7 +504,8 @@ public void testPairMapPartitions() { // Maps pair -> pair of different type
new Tuple2<Integer, String>(3, "new york"),
new Tuple2<Integer, String>(1, "new york")));
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
@@ -634,7 +639,8 @@ public void testReduceByKeyAndWindow() {
Arrays.asList(new Tuple2<String, Integer>("california", 10),
new Tuple2<String, Integer>("new york", 4)));
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
@@ -657,7 +663,8 @@ public void testUpdateStateByKey() {
Arrays.asList(new Tuple2<String, Integer>("california", 14),
new Tuple2<String, Integer>("new york", 9)));
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
@@ -689,11 +696,13 @@ public void testReduceByKeyAndWindowWithInverse() {
Arrays.asList(new Tuple2<String, Integer>("california", 10),
new Tuple2<String, Integer>("new york", 4)));
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000), new Duration(1000));
+ pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000),
+ new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Oops, something went wrong.

0 comments on commit 4ab87d3

Please sign in to comment.