Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

SPARK 1084.1 (resubmitted)

(Ported from https://github.com/apache/incubator-spark/pull/637 )

Author: Sean Owen <sowen@cloudera.com>

Closes #31 from srowen/SPARK-1084.1 and squashes the following commits:

6c4a32c [Sean Owen] Suppress warnings about legitimate unchecked array creations, or change code to avoid it
f35b833 [Sean Owen] Fix two misc javadoc problems
254e8ef [Sean Owen] Fix one new style error introduced in scaladoc warning commit
5b2fce2 [Sean Owen] Fix scaladoc invocation warning, and enable javac warnings properly, with plugin config updates
007762b [Sean Owen] Remove dead scaladoc links
b8ff8cb [Sean Owen] Replace deprecated Ant <tasks> with <target>
  • Loading branch information...
commit 12bbca20657c17d5ebfceaacb37dddc851772675 1 parent aace2c0
@srowen srowen authored pwendell committed
View
14 bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -27,7 +27,7 @@ object Bagel extends Logging {
/**
* Runs a Bagel program.
- * @param sc [[org.apache.spark.SparkContext]] to use for the program.
+ * @param sc org.apache.spark.SparkContext to use for the program.
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
* Key will be the vertex id.
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
@@ -38,10 +38,10 @@ object Bagel extends Logging {
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
* after each superstep and provides the result to each vertex in the next
* superstep.
- * @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
+ * @param partitioner org.apache.spark.Partitioner partitions values by key
* @param numPartitions number of partitions across which to split the graph.
* Default is the default parallelism of the SparkContext
- * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of
+ * @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of
* intermediate RDDs in each superstep. Defaults to caching in memory.
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to
* the Vertex, optional Aggregator and the current superstep,
@@ -131,7 +131,7 @@ object Bagel extends Logging {
/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
- * [[org.apache.spark.HashPartitioner]] and default storage level
+ * org.apache.spark.HashPartitioner and default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
@@ -146,7 +146,7 @@ object Bagel extends Logging {
/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
- * default [[org.apache.spark.HashPartitioner]]
+ * default org.apache.spark.HashPartitioner
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
@@ -166,7 +166,7 @@ object Bagel extends Logging {
/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
- * default [[org.apache.spark.HashPartitioner]],
+ * default org.apache.spark.HashPartitioner,
* [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -180,7 +180,7 @@ object Bagel extends Logging {
/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
- * the default [[org.apache.spark.HashPartitioner]]
+ * the default org.apache.spark.HashPartitioner
* and [[org.apache.spark.bagel.DefaultCombiner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
View
4 core/pom.xml
@@ -225,7 +225,7 @@
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
- <tasks>
+ <target>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
@@ -238,7 +238,7 @@
</not>
</condition>
</fail>
- </tasks>
+ </target>
</configuration>
</execution>
</executions>
View
2  core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -351,7 +351,7 @@ class SparkContext(
* using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* @param conf JobConf for setting up the dataset
- * @param inputFormatClass Class of the [[InputFormat]]
+ * @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minSplits Minimum number of Hadoop Splits to generate.
View
2  core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -81,7 +81,7 @@ class JobLogger(val user: String, val logDirName: String)
/**
* Create a log file for one job
* @param jobID ID of the job
- * @exception FileNotFoundException Fail to create log file
+ * @throws FileNotFoundException Fail to create log file
*/
protected def createLogWriter(jobID: Int) {
try {
View
4 core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
@@ -23,9 +23,9 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.typesafe.config.Config
/**
- * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
+ * An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception
* This is necessary as Spark Executors are allowed to recover from fatal exceptions
- * (see [[org.apache.spark.executor.Executor]]).
+ * (see org.apache.spark.executor.Executor)
*/
object IndestructibleActorSystem {
def apply(name: String, config: Config): ActorSystem =
View
6 core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -19,9 +19,9 @@ package org.apache.spark.util
/**
* A class for tracking the statistics of a set of numbers (count, mean and variance) in a
- * numerically robust way. Includes support for merging two StatCounters. Based on
- * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
- * Welford and Chan's algorithms for running variance]].
+ * numerically robust way. Includes support for merging two StatCounters. Based on Welford
+ * and Chan's [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance algorithms]]
+ * for running variance.
*
* @constructor Initialize the StatCounter with the given values.
*/
View
2  core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -136,7 +136,7 @@ object Vector {
/**
* Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
- * between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided.
+ * between 0.0 and 1.0. Optional scala.util.Random number generator can be provided.
*/
def random(length: Int, random: Random = new XORShiftRandom()) =
Vector(length, _ => random.nextDouble())
View
35 core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -75,8 +75,9 @@ public int compare(Integer a, Integer b) {
else if (a < b) return 1;
else return 0;
}
- };
+ }
+ @SuppressWarnings("unchecked")
@Test
public void sparkContextUnion() {
// Union of non-specialized JavaRDDs
@@ -148,6 +149,7 @@ public void call(String s) {
Assert.assertEquals(2, foreachCalls);
}
+ @SuppressWarnings("unchecked")
@Test
public void lookup() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
@@ -179,6 +181,7 @@ public Boolean call(Integer x) {
Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
}
+ @SuppressWarnings("unchecked")
@Test
public void cogroup() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
@@ -197,6 +200,7 @@ public void cogroup() {
cogrouped.collect();
}
+ @SuppressWarnings("unchecked")
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
@@ -243,6 +247,7 @@ public Integer call(Integer a, Integer b) {
Assert.assertEquals(33, sum);
}
+ @SuppressWarnings("unchecked")
@Test
public void foldByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
@@ -265,6 +270,7 @@ public Integer call(Integer a, Integer b) {
Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
}
+ @SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
@@ -320,8 +326,8 @@ public void approximateResults() {
public void take() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Assert.assertEquals(1, rdd.first().intValue());
- List<Integer> firstTwo = rdd.take(2);
- List<Integer> sample = rdd.takeSample(false, 2, 42);
+ rdd.take(2);
+ rdd.takeSample(false, 2, 42);
}
@Test
@@ -359,8 +365,8 @@ public Boolean call(Double x) {
Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01);
- Double first = rdd.first();
- List<Double> take = rdd.take(5);
+ rdd.first();
+ rdd.take(5);
}
@Test
@@ -438,11 +444,11 @@ public void flatMap() {
return lengths;
}
});
- Double x = doubles.first();
- Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+ Assert.assertEquals(5.0, doubles.first(), 0.01);
Assert.assertEquals(11, pairs.count());
}
+ @SuppressWarnings("unchecked")
@Test
public void mapsFromPairsToPairs() {
List<Tuple2<Integer, String>> pairs = Arrays.asList(
@@ -509,6 +515,7 @@ public void repartition() {
}
}
+ @SuppressWarnings("unchecked")
@Test
public void persist() {
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
@@ -573,6 +580,7 @@ public void textFilesCompressed() throws IOException {
Assert.assertEquals(expected, readRDD.collect());
}
+ @SuppressWarnings("unchecked")
@Test
public void sequenceFile() {
File tempDir = Files.createTempDir();
@@ -602,6 +610,7 @@ public void sequenceFile() {
Assert.assertEquals(pairs, readRDD.collect());
}
+ @SuppressWarnings("unchecked")
@Test
public void writeWithNewAPIHadoopFile() {
File tempDir = Files.createTempDir();
@@ -632,6 +641,7 @@ public String call(Tuple2<IntWritable, Text> x) {
}).collect().toString());
}
+ @SuppressWarnings("unchecked")
@Test
public void readWithNewAPIHadoopFile() throws IOException {
File tempDir = Files.createTempDir();
@@ -674,6 +684,7 @@ public void objectFilesOfInts() {
Assert.assertEquals(expected, readRDD.collect());
}
+ @SuppressWarnings("unchecked")
@Test
public void objectFilesOfComplexTypes() {
File tempDir = Files.createTempDir();
@@ -690,6 +701,7 @@ public void objectFilesOfComplexTypes() {
Assert.assertEquals(pairs, readRDD.collect());
}
+ @SuppressWarnings("unchecked")
@Test
public void hadoopFile() {
File tempDir = Files.createTempDir();
@@ -719,6 +731,7 @@ public String call(Tuple2<IntWritable, Text> x) {
}).collect().toString());
}
+ @SuppressWarnings("unchecked")
@Test
public void hadoopFileCompressed() {
File tempDir = Files.createTempDir();
@@ -824,7 +837,7 @@ public Float zero(Float initialValue) {
}
};
- final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+ final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
rdd.foreach(new VoidFunction<Integer>() {
public void call(Integer x) {
floatAccum.add((float) x);
@@ -876,6 +889,7 @@ public void checkpointAndRestore() {
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
}
+ @SuppressWarnings("unchecked")
@Test
public void mapOnPairRDD() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
@@ -900,6 +914,7 @@ public void mapOnPairRDD() {
}
+ @SuppressWarnings("unchecked")
@Test
public void collectPartitions() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
@@ -968,7 +983,7 @@ public void countApproxDistinctByKey() {
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
@Override
public Tuple2<Integer, int[]> call(Integer x) throws Exception {
@@ -976,6 +991,6 @@ public void collectAsMapWithIntArrayValues() {
}
});
pairRDD.collect(); // Works fine
- Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
+ pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
}
View
5 pom.xml
@@ -592,12 +592,13 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>2.5.1</version>
+ <version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<maxmem>1024m</maxmem>
+ <fork>true</fork>
</configuration>
</plugin>
<plugin>
@@ -612,7 +613,7 @@
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
- <version>1.0-M2</version>
+ <version>1.0-RC2</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
View
4 repl/pom.xml
@@ -98,7 +98,7 @@
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
- <tasks>
+ <target>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
@@ -111,7 +111,7 @@
</not>
</condition>
</fail>
- </tasks>
+ </target>
</configuration>
</execution>
</executions>
View
16 streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -127,7 +127,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
- * single sequence to generate the RDDs of the new DStream. [[org.apache.spark.Partitioner]]
+ * single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner
* is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
@@ -151,7 +151,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
- * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+ * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control
* thepartitioning of each RDD.
*/
def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
@@ -161,7 +161,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in
- * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+ * org.apache.spark.rdd.PairRDDFunctions for more information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
@@ -176,7 +176,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in
- * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+ * org.apache.spark.rdd.PairRDDFunctions for more information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
@@ -479,7 +479,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
@@ -579,7 +579,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def join[W](
other: JavaPairDStream[K, W],
@@ -619,7 +619,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def leftOuterJoin[W](
other: JavaPairDStream[K, W],
@@ -660,7 +660,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
- * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def rightOuterJoin[W](
View
4 streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -406,7 +406,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
* [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
- * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+ * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
*/
def transform[T](
dstreams: JList[JavaDStream[_]],
@@ -429,7 +429,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
* [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
- * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+ * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
*/
def transform[K, V](
dstreams: JList[JavaDStream[_]],
View
16 streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -65,7 +65,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying `groupByKey` on each RDD. The supplied
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
val createCombiner = (v: V) => ArrayBuffer[V](v)
@@ -95,7 +95,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
- * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+ * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
@@ -376,7 +376,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
@@ -396,7 +396,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated. Note, that
* this function may generate a different a tuple with a different key
@@ -453,7 +453,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
+ * The supplied org.apache.spark.Partitioner is used to partition the generated RDDs.
*/
def cogroup[W: ClassTag](
other: DStream[(K, W)],
@@ -483,7 +483,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def join[W: ClassTag](
other: DStream[(K, W)],
@@ -518,7 +518,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
- * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def leftOuterJoin[W: ClassTag](
@@ -554,7 +554,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
- * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def rightOuterJoin[W: ClassTag](
View
124 streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -19,7 +19,6 @@
import scala.Tuple2;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.*;
@@ -30,7 +29,6 @@
import com.google.common.io.Files;
import com.google.common.collect.Sets;
-import org.apache.spark.SparkConf;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -38,6 +36,7 @@
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -45,6 +44,8 @@
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
+
+ @SuppressWarnings("unchecked")
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@@ -64,6 +65,7 @@ public void testCount() {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testMap() {
List<List<String>> inputData = Arrays.asList(
@@ -87,6 +89,7 @@ public Integer call(String s) throws Exception {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testWindow() {
List<List<Integer>> inputData = Arrays.asList(
@@ -108,6 +111,7 @@ public void testWindow() {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testWindowWithSlideDuration() {
List<List<Integer>> inputData = Arrays.asList(
@@ -132,6 +136,7 @@ public void testWindowWithSlideDuration() {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFilter() {
List<List<String>> inputData = Arrays.asList(
@@ -155,13 +160,16 @@ public Boolean call(String s) throws Exception {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testRepartitionMorePartitions() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
- JavaDStream repartitioned = stream.repartition(4);
+ JavaDStream<Integer> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
+ JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
+ stream.repartition(4);
JavaTestUtils.attachTestOutputStream(repartitioned);
List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
Assert.assertEquals(2, result.size());
@@ -172,13 +180,16 @@ public void testRepartitionMorePartitions() {
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testRepartitionFewerPartitions() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
- JavaDStream repartitioned = stream.repartition(2);
+ JavaDStream<Integer> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
+ JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
+ stream.repartition(2);
JavaTestUtils.attachTestOutputStream(repartitioned);
List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
Assert.assertEquals(2, result.size());
@@ -188,6 +199,7 @@ public void testRepartitionFewerPartitions() {
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testGlom() {
List<List<String>> inputData = Arrays.asList(
@@ -206,6 +218,7 @@ public void testGlom() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testMapPartitions() {
List<List<String>> inputData = Arrays.asList(
@@ -217,16 +230,17 @@ public void testMapPartitions() {
Arrays.asList("YANKEESRED SOCKS"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
- @Override
- public Iterable<String> call(Iterator<String> in) {
- String out = "";
- while (in.hasNext()) {
- out = out + in.next().toUpperCase();
- }
- return Lists.newArrayList(out);
- }
- });
+ JavaDStream<String> mapped = stream.mapPartitions(
+ new FlatMapFunction<Iterator<String>, String>() {
+ @Override
+ public Iterable<String> call(Iterator<String> in) {
+ String out = "";
+ while (in.hasNext()) {
+ out = out + in.next().toUpperCase();
+ }
+ return Lists.newArrayList(out);
+ }
+ });
JavaTestUtils.attachTestOutputStream(mapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -247,6 +261,7 @@ public Integer call(Integer i1, Integer i2) throws Exception {
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduce() {
List<List<Integer>> inputData = Arrays.asList(
@@ -267,6 +282,7 @@ public void testReduce() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByWindow() {
List<List<Integer>> inputData = Arrays.asList(
@@ -289,6 +305,7 @@ public void testReduceByWindow() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testQueueStream() {
List<List<Integer>> expected = Arrays.asList(
@@ -312,6 +329,7 @@ public void testQueueStream() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testTransform() {
List<List<Integer>> inputData = Arrays.asList(
@@ -344,6 +362,7 @@ public Integer call(Integer i) throws Exception {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testVariousTransform() {
// tests whether all variations of transform can be called from Java
@@ -423,6 +442,7 @@ public void testVariousTransform() {
}
+ @SuppressWarnings("unchecked")
@Test
public void testTransformWith() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -492,6 +512,7 @@ public void testTransformWith() {
}
+ @SuppressWarnings("unchecked")
@Test
public void testVariousTransformWith() {
// tests whether all variations of transformWith can be called from Java
@@ -591,6 +612,7 @@ public void testVariousTransformWith() {
);
}
+ @SuppressWarnings("unchecked")
@Test
public void testStreamingContextTransform(){
List<List<Integer>> stream1input = Arrays.asList(
@@ -658,6 +680,7 @@ public void testStreamingContextTransform(){
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(
@@ -683,6 +706,7 @@ public void testFlatMap() {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairFlatMap() {
List<List<String>> inputData = Arrays.asList(
@@ -718,22 +742,24 @@ public void testPairFlatMap() {
new Tuple2<Integer, String>(9, "s")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
- @Override
- public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
- List<Tuple2<Integer, String>> out = Lists.newArrayList();
- for (String letter: in.split("(?!^)")) {
- out.add(new Tuple2<Integer, String>(in.length(), letter));
- }
- return out;
- }
- });
+ JavaPairDStream<Integer,String> flatMapped = stream.flatMap(
+ new PairFlatMapFunction<String, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter: in.split("(?!^)")) {
+ out.add(new Tuple2<Integer, String>(in.length(), letter));
+ }
+ return out;
+ }
+ });
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testUnion() {
List<List<Integer>> inputData1 = Arrays.asList(
@@ -778,6 +804,7 @@ public void testUnion() {
// PairDStream Functions
+ @SuppressWarnings("unchecked")
@Test
public void testPairFilter() {
List<List<String>> inputData = Arrays.asList(
@@ -810,7 +837,8 @@ public Boolean call(Tuple2<String, Integer> in) throws Exception {
Assert.assertEquals(expected, result);
}
- List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+ @SuppressWarnings("unchecked")
+ private List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
new Tuple2<String, String>("california", "giants"),
new Tuple2<String, String>("new york", "yankees"),
@@ -820,7 +848,8 @@ public Boolean call(Tuple2<String, Integer> in) throws Exception {
new Tuple2<String, String>("new york", "rangers"),
new Tuple2<String, String>("new york", "islanders")));
- List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+ @SuppressWarnings("unchecked")
+ private List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
Arrays.asList(
new Tuple2<String, Integer>("california", 1),
new Tuple2<String, Integer>("california", 3),
@@ -832,6 +861,7 @@ public Boolean call(Tuple2<String, Integer> in) throws Exception {
new Tuple2<String, Integer>("new york", 3),
new Tuple2<String, Integer>("new york", 1)));
+ @SuppressWarnings("unchecked")
@Test
public void testPairMap() { // Maps pair -> pair of different type
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -864,6 +894,7 @@ public void testPairMap() { // Maps pair -> pair of different type
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairMapPartitions() { // Maps pair -> pair of different type
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -901,6 +932,7 @@ public void testPairMapPartitions() { // Maps pair -> pair of different type
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairMap2() { // Maps pair -> single
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -925,6 +957,7 @@ public Integer call(Tuple2<String, Integer> in) throws Exception {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
@@ -967,6 +1000,7 @@ public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairGroupByKey() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -989,6 +1023,7 @@ public void testPairGroupByKey() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairReduceByKey() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1013,6 +1048,7 @@ public void testPairReduceByKey() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCombineByKey() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1043,6 +1079,7 @@ public Integer call(Integer i) throws Exception {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCountByValue() {
List<List<String>> inputData = Arrays.asList(
@@ -1068,6 +1105,7 @@ public void testCountByValue() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testGroupByKeyAndWindow() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1113,6 +1151,7 @@ public void testGroupByKeyAndWindow() {
return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByKeyAndWindow() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1136,6 +1175,7 @@ public void testReduceByKeyAndWindow() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testUpdateStateByKey() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1171,6 +1211,7 @@ public void testUpdateStateByKey() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByKeyAndWindowWithInverse() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
@@ -1194,6 +1235,7 @@ public void testReduceByKeyAndWindowWithInverse() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCountByValueAndWindow() {
List<List<String>> inputData = Arrays.asList(
@@ -1227,6 +1269,7 @@ public void testCountByValueAndWindow() {
Assert.assertEquals(expected, unorderedResult);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
@@ -1271,6 +1314,7 @@ public void testPairTransform() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairToNormalRDDTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
@@ -1312,6 +1356,8 @@ public Integer call(Tuple2<Integer, Integer> in) {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
+ @Test
public void testMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -1342,6 +1388,7 @@ public String call(String s) throws Exception {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFlatMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -1386,6 +1433,7 @@ public void testFlatMapValues() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCoGroup() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1429,6 +1477,7 @@ public void testCoGroup() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1472,6 +1521,7 @@ public void testJoin() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testLeftOuterJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
@@ -1503,6 +1553,7 @@ public void testLeftOuterJoin() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCheckpointMasterRecovery() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
@@ -1541,7 +1592,8 @@ public Integer call(String s) throws Exception {
}
- /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+ /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+ @SuppressWarnings("unchecked")
@Test
public void testCheckpointofIndividualStream() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
@@ -1581,16 +1633,14 @@ public void testSocketTextStream() {
@Test
public void testSocketString() {
class Converter extends Function<InputStream, Iterable<String>> {
- public Iterable<String> call(InputStream in) {
+ public Iterable<String> call(InputStream in) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
List<String> out = new ArrayList<String>();
- try {
- while (true) {
- String line = reader.readLine();
- if (line == null) { break; }
- out.add(line);
- }
- } catch (IOException e) { }
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) { break; }
+ out.add(line);
+ }
return out;
}
}
View
4 yarn/pom.xml
@@ -133,7 +133,7 @@
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
- <tasks>
+ <target>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
@@ -146,7 +146,7 @@
</not>
</condition>
</fail>
- </tasks>
+ </target>
</configuration>
</execution>
</executions>
Please sign in to comment.
Something went wrong with that request. Please try again.