diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java index 45bcedebb4117..3e9f0f4b8f127 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java @@ -25,7 +25,7 @@ import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.StorageLevels; -import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; @@ -35,8 +35,9 @@ /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * * Usage: JavaNetworkWordCount - * and describe the TCP server that Spark Streaming would connect to receive data. + * and describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` @@ -56,7 +57,7 @@ public static void main(String[] args) { // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount"); - JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); // Create a JavaReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java new file mode 100644 index 0000000000000..d3165d2942bea --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.regex.Pattern; + +import scala.Tuple2; +import com.google.common.collect.Lists; +import com.google.common.io.Files; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.Time; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; + +/** + * Counts words in text encoded with UTF8 received from the network every second. + * + * Usage: JavaRecoverableNetworkWordCount + * and describe the TCP server that Spark Streaming would connect to receive + * data. directory to HDFS-compatible file system which checkpoint data + * file to which the word counts will be appended + * + * and must be absolute paths + * + * To run this on your local machine, you need to first run a Netcat server + * + * `$ nc -lk 9999` + * + * and run the example as + * + * `$ ./bin/run-example org.apache.spark.examples.streaming.JavaRecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint/ ~/out` + * + * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create + * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if + * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from + * the checkpoint data. + * + * To run this example in a local standalone cluster with automatic driver recovery, + * + * `$ bin/spark-class org.apache.spark.deploy.Client -s launch \ + * \ + * org.apache.spark.examples.streaming.JavaRecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint ~/out` + * + * would typically be + * /examples/target/scala-XX/spark-examples....jar + * + * Refer to the online documentation for more details. + */ +public final class JavaRecoverableNetworkWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private static JavaStreamingContext createContext(String ip, int port, String outputPath) { + + // If you do not see this printed, that means the StreamingContext has been loaded + // from the new checkpoint + System.out.println("Creating new context"); + final File outputFile = new File(outputPath); + if (outputFile.exists()) { + outputFile.delete(); + } + SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount"); + // Create the context with a 1 second batch size + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); + + // Create a socket stream on target ip:port and count the + // words in input stream of \n delimited text (eg. generated by 'nc') + JavaReceiverInputDStream lines = ssc.socketTextStream(ip, port); + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + JavaPairDStream wordCounts = words.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }).reduceByKey(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + wordCounts.foreachRDD(new Function2, Time, Void>() { + @Override + public Void call(JavaPairRDD rdd, Time time) throws IOException { + String counts = "Counts at time " + time + " " + rdd.collect(); + System.out.println(counts); + System.out.println("Appending to " + outputFile.getAbsolutePath()); + Files.append(counts + "\n", outputFile, Charset.defaultCharset()); + return null; + } + }); + + return ssc; + } + + public static void main(String[] args) { + if (args.length != 4) { + System.err.println("You arguments were " + Arrays.asList(args)); + System.err.println( + "Usage: JavaRecoverableNetworkWordCount \n" + + " . and describe the TCP server that Spark\n" + + " Streaming would connect to receive data. directory to\n" + + " HDFS-compatible file system which checkpoint data file to which\n" + + " the word counts will be appended\n" + + "\n" + + "In local mode, should be 'local[n]' with n > 1\n" + + "Both and must be absolute paths"); + System.exit(1); + } + + final String ip = args[0]; + final int port = Integer.parseInt(args[1]); + String checkpointDirectory = args[2]; + final String outputPath = args[3]; + JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { + @Override + public JavaStreamingContext create() { + return createContext(ip, port, outputPath); + } + }; + JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory); + ssc.start(); + ssc.awaitTermination(); + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 6af3a0f33efc2..845705602958f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -31,15 +31,13 @@ import org.apache.spark.util.IntParam /** * Counts words in text encoded with UTF8 received from the network every second. * - * Usage: NetworkWordCount + * Usage: RecoverableNetworkWordCount * and describe the TCP server that Spark Streaming would connect to receive * data. directory to HDFS-compatible file system which checkpoint data * file to which the word counts will be appended * - * In local mode, should be 'local[n]' with n > 1 * and must be absolute paths * - * * To run this on your local machine, you need to first run a Netcat server * * `$ nc -lk 9999` @@ -66,7 +64,6 @@ import org.apache.spark.util.IntParam * * Refer to the online documentation for more details. */ - object RecoverableNetworkWordCount { def createContext(ip: String, port: Int, outputPath: String) = {