From b0d74f934b5fc9f8dc810d84670efde878c0349f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 18 May 2015 14:54:36 -0700 Subject: [PATCH 1/5] Updated examples. --- .../streaming/JavaKinesisWordCountASL.java | 248 +++++++++--------- .../streaming/KinesisWordCountASL.scala | 244 +++++++++-------- 2 files changed, 259 insertions(+), 233 deletions(-) diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java index b0bff27a61c19..5321a0438608d 100644 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -41,139 +41,143 @@ /** * Java-friendly Kinesis Spark Streaming WordCount example - * - * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details + *

+ * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details * on the Kinesis Spark Streaming integration. - * - * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard - * for the given stream. - * It then starts pulling from the last checkpointed sequence number of the given - * and . - * + *

+ * This example spins up 1 Kinesis Receiver per shard for the given stream. + * It then starts pulling from the last checkpointed sequence number of the given stream. + *

* Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region - * - * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials - * in the following order of precedence: - * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - * Java System Properties - aws.accessKeyId and aws.secretKey - * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs - * Instance profile credentials - delivered through the Amazon EC2 metadata service - * - * Usage: JavaKinesisWordCountASL - * is the name of the Kinesis stream (ie. mySparkStream) - * is the endpoint of the Kinesis service - * (ie. https://kinesis.us-east-1.amazonaws.com) - * + *

+ * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + *

+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + *

+ * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name] + * [stream-name] is the name of the Kinesis stream (ie. mySparkStream) + * [endpoint-url] is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + *

* Example: - * $ export AWS_ACCESS_KEY_ID= - * $ export AWS_SECRET_KEY= - * $ $SPARK_HOME/bin/run-example \ - * org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com - * + * # export AWS keys if necessary + * $ export AWS_ACCESS_KEY_ID=[your-access-key] + * $ export AWS_SECRET_KEY=[your-secret-key] + *

+ * # run the example + * $ SPARK_HOME/bin/run-example \ + * org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \ + * https://kinesis.us-east-1.amazonaws.com + *

* Note that number of workers/threads should be 1 more than the number of receivers. * This leaves one thread available for actually processing the data. - * - * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data - * onto the Kinesis stream. - * Usage instructions for KinesisWordCountProducerASL are provided in the class definition. + *

+ * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data + * onto the Kinesis stream. */ public final class JavaKinesisWordCountASL { // needs to be public for access from run-example - private static final Pattern WORD_SEPARATOR = Pattern.compile(" "); - private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class); + private static final Pattern WORD_SEPARATOR = Pattern.compile(" "); + private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class); + + public static void main(String[] args) { + // Check that all required args were passed in. + if (args.length < 4) { + System.err.println( + "Usage: JavaKinesisWordCountASL \n" + + " is the name of the app, used to track the read data in DynamoDB\n" + + " is the name of the Kinesis stream\n" + + " is the endpoint of the Kinesis service\n" + + " (e.g. https://kinesis.us-east-1.amazonaws.com)\n" + + " region for DynamoDB+CloudWatch backing services\n" + + " (e.g. us-east-1)\n" + + "Generate data for the Kinesis stream using the example KinesisWordCountProducerASL.\n" + + "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" + + "details.\n" + ); + System.exit(1); + } - /* Make the constructor private to enforce singleton */ - private JavaKinesisWordCountASL() { + StreamingExamples.setStreamingLogLevels(); + + // Populate the appropriate variables from the given args + String kinesisAppName = args[0]; + String streamName = args[1]; + String endpointUrl = args[2]; + String regionName = args[3]; + + // Create a Kinesis client in order to determine the number of shards for the given stream + AmazonKinesisClient kinesisClient = + new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()); + kinesisClient.setEndpoint(endpointUrl); + int numShards = + kinesisClient.describeStream(streamName).getStreamDescription().getShards().size(); + + + // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard. + // This is not a necessity; if there are less receivers/DStreams than the number of shards, + // then the shards will be automatically distributed among the receivers and each receiver + // will receive data from multiple shards. + int numStreams = numShards; + + // Spark Streaming batch interval + Duration batchInterval = new Duration(2000); + + // Kinesis checkpoint interval. Same as batchInterval for this example. + Duration checkpointInterval = batchInterval; + + // Setup the Spark config and StreamingContext + SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL"); + JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval); + + // Create the Kinesis DStreams + List> streamsList = new ArrayList>(numStreams); + for (int i = 0; i < numStreams; i++) { + streamsList.add( + KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName, + InitialPositionInStream.LATEST, checkpointInterval, StorageLevel.MEMORY_AND_DISK_2()) + ); } - public static void main(String[] args) { - /* Check that all required args were passed in. */ - if (args.length < 2) { - System.err.println( - "Usage: JavaKinesisWordCountASL \n" + - " is the name of the Kinesis stream\n" + - " is the endpoint of the Kinesis service\n" + - " (e.g. https://kinesis.us-east-1.amazonaws.com)\n"); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - /* Populate the appropriate variables from the given args */ - String streamName = args[0]; - String endpointUrl = args[1]; - /* Set the batch interval to a fixed 2000 millis (2 seconds) */ - Duration batchInterval = new Duration(2000); - - /* Create a Kinesis client in order to determine the number of shards for the given stream */ - AmazonKinesisClient kinesisClient = new AmazonKinesisClient( - new DefaultAWSCredentialsProviderChain()); - kinesisClient.setEndpoint(endpointUrl); - - /* Determine the number of shards from the stream */ - int numShards = kinesisClient.describeStream(streamName) - .getStreamDescription().getShards().size(); - - /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard */ - int numStreams = numShards; - - /* Setup the Spark config. */ - SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount"); - - /* Kinesis checkpoint interval. Same as batchInterval for this example. */ - Duration checkpointInterval = batchInterval; - - /* Setup the StreamingContext */ - JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval); - - /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */ - List> streamsList = new ArrayList>(numStreams); - for (int i = 0; i < numStreams; i++) { - streamsList.add( - KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval, - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()) - ); - } - - /* Union all the streams if there is more than 1 stream */ - JavaDStream unionStreams; - if (streamsList.size() > 1) { - unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size())); - } else { - /* Otherwise, just use the 1 stream */ - unionStreams = streamsList.get(0); - } - - /* - * Split each line of the union'd DStreams into multiple words using flatMap to produce the collection. - * Convert lines of byte[] to multiple Strings by first converting to String, then splitting on WORD_SEPARATOR. - */ - JavaDStream words = unionStreams.flatMap(new FlatMapFunction() { - @Override - public Iterable call(byte[] line) { - return Lists.newArrayList(WORD_SEPARATOR.split(new String(line))); - } - }); - - /* Map each word to a (word, 1) tuple, then reduce/aggregate by word. */ - JavaPairDStream wordCounts = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2(s, 1); - } - }).reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + // Union all the streams if there is more than 1 stream + JavaDStream unionStreams; + if (streamsList.size() > 1) { + unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size())); + } else { + // Otherwise, just use the 1 stream + unionStreams = streamsList.get(0); + } + + // Convert each line of Array[Byte] to String, and split into words + JavaDStream words = unionStreams.flatMap(new FlatMapFunction() { + @Override + public Iterable call(byte[] line) { + return Lists.newArrayList(WORD_SEPARATOR.split(new String(line))); + } + }); + + // Map each word to a (word, 1) tuple so we can reduce by key to count the words + JavaPairDStream wordCounts = words.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }).reduceByKey(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); /* Print the first 10 wordCounts */ - wordCounts.print(); + wordCounts.print(); - /* Start the streaming context and await termination */ - jssc.start(); - jssc.awaitTermination(); - } + // Start the streaming context and await termination + jssc.start(); + jssc.awaitTermination(); + } } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala index 32da0858d1a1d..df7fe223bb8d8 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala @@ -18,213 +18,236 @@ package org.apache.spark.examples.streaming import java.nio.ByteBuffer + import scala.util.Random -import org.apache.spark.Logging -import org.apache.spark.SparkConf -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.Milliseconds -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions -import org.apache.spark.streaming.kinesis.KinesisUtils -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain + +import com.amazonaws.auth.{DefaultAWSCredentialsProviderChain, BasicAWSCredentials} import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.model.PutRecordRequest -import org.apache.log4j.Logger -import org.apache.log4j.Level +import org.apache.log4j.{Level, Logger} + +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Milliseconds, StreamingContext} +import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions +import org.apache.spark.streaming.kinesis.KinesisUtils + /** - * Kinesis Spark Streaming WordCount example. + * Consumes messages from a Amazon Kinesis streams and does wordcount. * - * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details on - * the Kinesis Spark Streaming integration. + * This example spins up 1 Kinesis Receiver per shard for the given stream. + * It then starts pulling from the last checkpointed sequence number of the given stream. * - * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard - * for the given stream. - * It then starts pulling from the last checkpointed sequence number of the given - * and . + * Usage: KinesisWordCountASL + * is the name of the consumer app, used to track the read data in DynamoDB + * name of the Kinesis stream (ie. mySparkStream) + * endpoint of the Kinesis service + * (e.g. https://kinesis.us-east-1.amazonaws.com) + * region name for DynamoDB and CloudWatch backing services (e.g. us-east-1) * - * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region - * - * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials - * in the following order of precedence: - * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - * Java System Properties - aws.accessKeyId and aws.secretKey - * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs - * Instance profile credentials - delivered through the Amazon EC2 metadata service - * - * Usage: KinesisWordCountASL - * is the name of the Kinesis stream (ie. mySparkStream) - * is the endpoint of the Kinesis service - * (ie. https://kinesis.us-east-1.amazonaws.com) * * Example: - * $ export AWS_ACCESS_KEY_ID= - * $ export AWS_SECRET_KEY= - * $ $SPARK_HOME/bin/run-example \ - * org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com + * # export AWS keys if necessary + * $ export AWS_ACCESS_KEY_ID= + * $ export AWS_SECRET_KEY= + * + * # run the example + * $ SPARK_HOME/bin/run-example streaming.KinesisWordCountASL myAppName mySparkStream \ + * https://kinesis.us-east-1.amazonaws.com us-east-1 + * + * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data + * onto the Kinesis stream. + * + * This code uses the DefaultAWSCredentialsProviderChain to find credentials + * in the following order: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * For more information, see + * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html * - * - * Note that number of workers/threads should be 1 more than the number of receivers. - * This leaves one thread available for actually processing the data. + * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on + * the Kinesis Spark Streaming integration. * - * There is a companion helper class below called KinesisWordCountProducerASL which puts - * dummy data onto the Kinesis stream. - * Usage instructions for KinesisWordCountProducerASL are provided in that class definition. */ -private object KinesisWordCountASL extends Logging { +object KinesisWordCountASL extends Logging { def main(args: Array[String]) { - /* Check that all required args were passed in. */ - if (args.length < 2) { + // Check that all required args were passed in. + if (args.length < 4) { System.err.println( """ - |Usage: KinesisWordCount + |Usage: KinesisWordCountASL + | + | is the name of the consumer app, used to track the read data in DynamoDB | is the name of the Kinesis stream | is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com) + | region name for DynamoDB + CloudWatch backing services + | (e.g. us-east-1) + | + |Generate input data for Kinesis stream using the example KinesisWordCountProducerASL. + |See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more + |details. """.stripMargin) System.exit(1) } StreamingExamples.setStreamingLogLevels() - /* Populate the appropriate variables from the given args */ - val Array(streamName, endpointUrl) = args + // Populate the appropriate variables from the given args + val Array(appName, streamName, endpointUrl, regionName) = args - /* Determine the number of shards from the stream */ - val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) + + // Determine the number of shards from the stream using the low-level Kinesis Client + // from the AWS Java SDK. + val credentials = new DefaultAWSCredentialsProviderChain().getCredentials() + require(credentials != null, + "No AWS credentials found. Please specify credentials using one of the methods specified " + + "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html") + val kinesisClient = new AmazonKinesisClient(credentials) kinesisClient.setEndpoint(endpointUrl) - val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() - .size() + val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size + - /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */ + // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard. + // This is not a necessity; if there are less receivers/DStreams than the number of shards, + // then the shards will be automatically distributed among the receivers and each receiver + // will receive data from multiple shards. val numStreams = numShards - /* Setup the and SparkConfig and StreamingContext */ - /* Spark Streaming batch interval */ + // Spark Streaming batch interval val batchInterval = Milliseconds(2000) - val sparkConfig = new SparkConf().setAppName("KinesisWordCount") - val ssc = new StreamingContext(sparkConfig, batchInterval) - /* Kinesis checkpoint interval. Same as batchInterval for this example. */ + // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information + //on sequence number of records that have been received. Same as batchInterval for this example. val kinesisCheckpointInterval = batchInterval - /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */ + // Setup the SparkConfig and StreamingContext + val sparkConfig = new SparkConf().setAppName("KinesisWordCountASL") + val ssc = new StreamingContext(sparkConfig, batchInterval) + + // Create the Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => - KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) + KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, + InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) } - /* Union all the streams */ + // Union all the streams val unionStreams = ssc.union(kinesisStreams) - /* Convert each line of Array[Byte] to String, split into words, and count them */ - val words = unionStreams.flatMap(byteArray => new String(byteArray) - .split(" ")) + // Convert each line of Array[Byte] to String, and split into words + val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) - /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */ + // Map each word to a (word, 1) tuple so we can reduce by key to count the words val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) - - /* Print the first 10 wordCounts */ + + // Print the first 10 wordCounts wordCounts.print() - /* Start the streaming context and await termination */ + // Start the streaming context and await termination ssc.start() ssc.awaitTermination() } } /** - * Usage: KinesisWordCountProducerASL - * + * Usage: KinesisWordCountProducerASL \ + * * is the name of the Kinesis stream (ie. mySparkStream) - * is the endpoint of the Kinesis service + * is the endpoint of the Kinesis service * (ie. https://kinesis.us-east-1.amazonaws.com) * is the rate of records per second to put onto the stream * is the rate of records per second to put onto the stream * * Example: - * $ export AWS_ACCESS_KEY_ID= - * $ export AWS_SECRET_KEY= - * $ $SPARK_HOME/bin/run-example \ + * $ SPARK_HOME/bin/run-example \ * org.apache.spark.examples.streaming.KinesisWordCountProducerASL mySparkStream \ * https://kinesis.us-east-1.amazonaws.com 10 5 */ -private object KinesisWordCountProducerASL { +object KinesisWordCountProducerASL { def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: KinesisWordCountProducerASL " + - " ") + if (args.length < 6) { + System.err.println( + """ + |Usage: KinesisWordCountProducerASL + | is the name of the Kinesis stream + | is the endpoint of the Kinesis service + | (e.g. https://kinesis.us-east-1.amazonaws.com) + | is the rate of records per second to put onto the stream + | is the rate of records per second to put onto the stream + | + """.stripMargin) + System.exit(1) } StreamingExamples.setStreamingLogLevels() - /* Populate the appropriate variables from the given args */ - val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args + // Populate the appropriate variables from the given args + val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) + = args - /* Generate the records and return the totals */ - val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt) + // Generate the records and return the totals + val totals = generate(stream, endpoint, recordsPerSecond.toInt, + wordsPerRecord.toInt) - /* Print the array of (index, total) tuples */ + // Print the array of (word, total) tuples println("Totals") - totals.foreach(total => println(total.toString())) + totals.foreach(println(_)) } def generate(stream: String, endpoint: String, recordsPerSecond: Int, - wordsPerRecord: Int): Seq[(Int, Int)] = { - - val MaxRandomInts = 10 + wordsPerRecord: Int): Seq[(String, Int)] = { - /* Create the Kinesis client */ + val randomWords = List("spark","you","are","my","father") + val totals = scala.collection.mutable.Map[String, Int]() + + // Create the low-level Kinesis Client from the AWS Java SDK. val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpoint) println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" + - s" $recordsPerSecond records per second and $wordsPerRecord words per record"); - - val totals = new Array[Int](MaxRandomInts) - /* Put String records onto the stream per the given recordPerSec and wordsPerRecord */ + s" $recordsPerSecond records per second and $wordsPerRecord words per record"); + + // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord for (i <- 1 to 5) { - - /* Generate recordsPerSec records to put onto the stream */ + // Generate recordsPerSec records to put onto the stream val records = (1 to recordsPerSecond.toInt).map { recordNum => - /* - * Randomly generate each wordsPerRec words between 0 (inclusive) - * and MAX_RANDOM_INTS (exclusive) - */ + // Randomly generate wordsPerRecord number of words val data = (1 to wordsPerRecord.toInt).map(x => { - /* Generate the random int */ - val randomInt = Random.nextInt(MaxRandomInts) + /* Get a random index to a word */ + val randomWordIdx = Random.nextInt(randomWords.size) + val randomWord = randomWords(randomWordIdx) - /* Keep track of the totals */ - totals(randomInt) += 1 + // Increment total count to compare to server counts later + totals(randomWord) = totals.getOrElse(randomWord, 0) + 1 - randomInt.toString() + randomWord }).mkString(" ") - /* Create a partitionKey based on recordNum */ + // Create a partitionKey based on recordNum val partitionKey = s"partitionKey-$recordNum" - /* Create a PutRecordRequest with an Array[Byte] version of the data */ + // Create a PutRecordRequest with an Array[Byte] version of the data val putRecordRequest = new PutRecordRequest().withStreamName(stream) .withPartitionKey(partitionKey) .withData(ByteBuffer.wrap(data.getBytes())); - /* Put the record onto the stream and capture the PutRecordResult */ + // Put the record onto the stream and capture the PutRecordResult val putRecordResult = kinesisClient.putRecord(putRecordRequest); } - /* Sleep for a second */ + // Sleep for a second Thread.sleep(1000) println("Sent " + recordsPerSecond + " records") } - - /* Convert the totals to (index, total) tuple */ - (0 to (MaxRandomInts - 1)).zip(totals) + // Convert the totals to (index, total) tuple + totals.toSeq.sortBy(_._1) } } @@ -233,7 +256,6 @@ private object KinesisWordCountProducerASL { * This has been lifted from the examples/ project to remove the circular dependency. */ private[streaming] object StreamingExamples extends Logging { - /** Set reasonable logging levels for streaming if the user has not configured log4j. */ def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements From 011cbe25686e7f2aed471b2ba6609604289018f3 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 18 May 2015 16:37:36 -0700 Subject: [PATCH 2/5] More fixes --- .../streaming/JavaKinesisWordCountASL.java | 81 +++++++++---------- .../streaming/KinesisWordCountASL.scala | 38 ++++----- 2 files changed, 60 insertions(+), 59 deletions(-) diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java index 5321a0438608d..5a344efbc10ab 100644 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.regex.Pattern; +import com.amazonaws.regions.RegionUtils; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; @@ -40,44 +41,41 @@ import com.google.common.collect.Lists; /** - * Java-friendly Kinesis Spark Streaming WordCount example - *

- * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details - * on the Kinesis Spark Streaming integration. - *

+ * Consumes messages from a Amazon Kinesis streams and does wordcount. + * * This example spins up 1 Kinesis Receiver per shard for the given stream. * It then starts pulling from the last checkpointed sequence number of the given stream. - *

- * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region - *

- * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials - * in the following order of precedence: - *

- * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - * Java System Properties - aws.accessKeyId and aws.secretKey - * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs - * Instance profile credentials - delivered through the Amazon EC2 metadata service - *

+ * * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name] - * [stream-name] is the name of the Kinesis stream (ie. mySparkStream) - * [endpoint-url] is the endpoint of the Kinesis service - * (ie. https://kinesis.us-east-1.amazonaws.com) - *

+ * [app-name] is the name of the consumer app, used to track the read data in DynamoDB + * [stream-name] name of the Kinesis stream (ie. mySparkStream) + * [endpoint-url] endpoint of the Kinesis service + * (e.g. https://kinesis.us-east-1.amazonaws.com) + * + * * Example: - * # export AWS keys if necessary - * $ export AWS_ACCESS_KEY_ID=[your-access-key] - * $ export AWS_SECRET_KEY=[your-secret-key] - *

- * # run the example - * $ SPARK_HOME/bin/run-example \ - * org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com - *

- * Note that number of workers/threads should be 1 more than the number of receivers. - * This leaves one thread available for actually processing the data. - *

- * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data + * # export AWS keys if necessary + * $ export AWS_ACCESS_KEY_ID=[your-access-key] + * $ export AWS_SECRET_KEY= + * + * # run the example + * $ SPARK_HOME/bin/run-example streaming.JavaKinesisWordCountASL myAppName mySparkStream \ + * https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class called KinesisWordProducerASL which puts dummy data * onto the Kinesis stream. + * + * This code uses the DefaultAWSCredentialsProviderChain to find credentials + * in the following order: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * For more information, see + * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html + * + * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on + * the Kinesis Spark Streaming integration. */ public final class JavaKinesisWordCountASL { // needs to be public for access from run-example private static final Pattern WORD_SEPARATOR = Pattern.compile(" "); @@ -85,16 +83,14 @@ public final class JavaKinesisWordCountASL { // needs to be public for access fr public static void main(String[] args) { // Check that all required args were passed in. - if (args.length < 4) { + if (args.length != 3) { System.err.println( - "Usage: JavaKinesisWordCountASL \n" + + "Usage: JavaKinesisWordCountASL \n\n" + " is the name of the app, used to track the read data in DynamoDB\n" + " is the name of the Kinesis stream\n" + " is the endpoint of the Kinesis service\n" + " (e.g. https://kinesis.us-east-1.amazonaws.com)\n" + - " region for DynamoDB+CloudWatch backing services\n" + - " (e.g. us-east-1)\n" + - "Generate data for the Kinesis stream using the example KinesisWordCountProducerASL.\n" + + "Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" + "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" + "details.\n" ); @@ -107,7 +103,6 @@ public static void main(String[] args) { String kinesisAppName = args[0]; String streamName = args[1]; String endpointUrl = args[2]; - String regionName = args[3]; // Create a Kinesis client in order to determine the number of shards for the given stream AmazonKinesisClient kinesisClient = @@ -127,7 +122,11 @@ public static void main(String[] args) { Duration batchInterval = new Duration(2000); // Kinesis checkpoint interval. Same as batchInterval for this example. - Duration checkpointInterval = batchInterval; + Duration kinesisCheckpointInterval = batchInterval; + + // Get the region name from the endpoint URL to save Kinesis Client Library metadata in + // DynamoDB of the same region as the Kinesis stream + String regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName(); // Setup the Spark config and StreamingContext SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL"); @@ -138,7 +137,7 @@ public static void main(String[] args) { for (int i = 0; i < numStreams; i++) { streamsList.add( KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName, - InitialPositionInStream.LATEST, checkpointInterval, StorageLevel.MEMORY_AND_DISK_2()) + InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2()) ); } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala index df7fe223bb8d8..6f54e7d75f77d 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import scala.util.Random import com.amazonaws.auth.{DefaultAWSCredentialsProviderChain, BasicAWSCredentials} +import com.amazonaws.regions.RegionUtils import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.model.PutRecordRequest @@ -45,7 +46,6 @@ import org.apache.spark.streaming.kinesis.KinesisUtils * name of the Kinesis stream (ie. mySparkStream) * endpoint of the Kinesis service * (e.g. https://kinesis.us-east-1.amazonaws.com) - * region name for DynamoDB and CloudWatch backing services (e.g. us-east-1) * * * Example: @@ -54,10 +54,10 @@ import org.apache.spark.streaming.kinesis.KinesisUtils * $ export AWS_SECRET_KEY= * * # run the example - * $ SPARK_HOME/bin/run-example streaming.KinesisWordCountASL myAppName mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com us-east-1 + * $ SPARK_HOME/bin/run-example streaming.KinesisWordCountASL myAppName mySparkStream \ + * https://kinesis.us-east-1.amazonaws.com * - * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data + * There is a companion helper class called KinesisWordProducerASL which puts dummy data * onto the Kinesis stream. * * This code uses the DefaultAWSCredentialsProviderChain to find credentials @@ -71,24 +71,21 @@ import org.apache.spark.streaming.kinesis.KinesisUtils * * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on * the Kinesis Spark Streaming integration. - * */ object KinesisWordCountASL extends Logging { def main(args: Array[String]) { // Check that all required args were passed in. - if (args.length < 4) { + if (args.length != 3) { System.err.println( """ |Usage: KinesisWordCountASL - | + | | is the name of the consumer app, used to track the read data in DynamoDB | is the name of the Kinesis stream | is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com) - | region name for DynamoDB + CloudWatch backing services - | (e.g. us-east-1) | - |Generate input data for Kinesis stream using the example KinesisWordCountProducerASL. + |Generate input data for Kinesis stream using the example KinesisWordProducerASL. |See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more |details. """.stripMargin) @@ -98,7 +95,7 @@ object KinesisWordCountASL extends Logging { StreamingExamples.setStreamingLogLevels() // Populate the appropriate variables from the given args - val Array(appName, streamName, endpointUrl, regionName) = args + val Array(appName, streamName, endpointUrl) = args // Determine the number of shards from the stream using the low-level Kinesis Client @@ -125,6 +122,10 @@ object KinesisWordCountASL extends Logging { //on sequence number of records that have been received. Same as batchInterval for this example. val kinesisCheckpointInterval = batchInterval + // Get the region name from the endpoint URL to save Kinesis Client Library metadata in + // DynamoDB of the same region as the Kinesis stream + val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() + // Setup the SparkConfig and StreamingContext val sparkConfig = new SparkConf().setAppName("KinesisWordCountASL") val ssc = new StreamingContext(sparkConfig, batchInterval) @@ -154,8 +155,9 @@ object KinesisWordCountASL extends Logging { } /** - * Usage: KinesisWordCountProducerASL \ + * Usage: KinesisWordProducerASL \ * + * * is the name of the Kinesis stream (ie. mySparkStream) * is the endpoint of the Kinesis service * (ie. https://kinesis.us-east-1.amazonaws.com) @@ -163,16 +165,16 @@ object KinesisWordCountASL extends Logging { * is the rate of records per second to put onto the stream * * Example: - * $ SPARK_HOME/bin/run-example \ - * org.apache.spark.examples.streaming.KinesisWordCountProducerASL mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com 10 5 + * $ SPARK_HOME/bin/run-example streaming.KinesisWordProducerASL mySparkStream \ + * https://kinesis.us-east-1.amazonaws.com us-east-1 10 5 */ -object KinesisWordCountProducerASL { +object KinesisWordProducerASL { def main(args: Array[String]) { - if (args.length < 6) { + if (args.length != 4) { System.err.println( """ - |Usage: KinesisWordCountProducerASL + |Usage: KinesisWordProducerASL + | | is the name of the Kinesis stream | is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com) From 841987fe874e706be973dbb2133509edd08635b4 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 18 May 2015 17:31:12 -0700 Subject: [PATCH 3/5] Small update --- .../spark/examples/streaming/KinesisWordCountASL.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala index 6f54e7d75f77d..2aa379bdb6e29 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala @@ -189,15 +189,14 @@ object KinesisWordProducerASL { StreamingExamples.setStreamingLogLevels() // Populate the appropriate variables from the given args - val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) - = args + val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args // Generate the records and return the totals val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt) // Print the array of (word, total) tuples - println("Totals") + println("Totals for the words sent") totals.foreach(println(_)) } From f080872ed1f186b663cce62f6dabe7c31ef5045d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 18 May 2015 17:33:01 -0700 Subject: [PATCH 4/5] More cleanup --- .../spark/examples/streaming/KinesisWordCountASL.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala index 2aa379bdb6e29..2de5bb86e5755 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala @@ -213,12 +213,12 @@ object KinesisWordProducerASL { kinesisClient.setEndpoint(endpoint) println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" + - s" $recordsPerSecond records per second and $wordsPerRecord words per record"); + s" $recordsPerSecond records per second and $wordsPerRecord words per record") // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord - for (i <- 1 to 5) { + for (i <- 1 to 10) { // Generate recordsPerSec records to put onto the stream - val records = (1 to recordsPerSecond.toInt).map { recordNum => + val records = (1 to recordsPerSecond.toInt).foreach { recordNum => // Randomly generate wordsPerRecord number of words val data = (1 to wordsPerRecord.toInt).map(x => { /* Get a random index to a word */ @@ -237,10 +237,10 @@ object KinesisWordProducerASL { // Create a PutRecordRequest with an Array[Byte] version of the data val putRecordRequest = new PutRecordRequest().withStreamName(stream) .withPartitionKey(partitionKey) - .withData(ByteBuffer.wrap(data.getBytes())); + .withData(ByteBuffer.wrap(data.getBytes())) // Put the record onto the stream and capture the PutRecordResult - val putRecordResult = kinesisClient.putRecord(putRecordRequest); + val putRecordResult = kinesisClient.putRecord(putRecordRequest) } // Sleep for a second From 7cc307bcbd6debc5ae189b53435686d51fcd482b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 18 May 2015 17:37:09 -0700 Subject: [PATCH 5/5] More tweaks --- .../streaming/JavaKinesisWordCountASL.java | 18 +++++++++++------- .../streaming/KinesisWordCountASL.scala | 5 +++-- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java index 5a344efbc10ab..06e0ff28afd95 100644 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -97,6 +97,7 @@ public static void main(String[] args) { System.exit(1); } + // Set default log4j logging level to WARN to hide Spark logs StreamingExamples.setStreamingLogLevels(); // Populate the appropriate variables from the given args @@ -165,14 +166,17 @@ public Iterable call(byte[] line) { public Tuple2 call(String s) { return new Tuple2(s, 1); } - }).reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + } + ).reduceByKey( + new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + } + ); - /* Print the first 10 wordCounts */ + // Print the first 10 wordCounts wordCounts.print(); // Start the streaming context and await termination diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala index 2de5bb86e5755..640ca049e2ec4 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala @@ -186,6 +186,7 @@ object KinesisWordProducerASL { System.exit(1) } + // Set default log4j logging level to WARN to hide Spark logs StreamingExamples.setStreamingLogLevels() // Populate the appropriate variables from the given args @@ -221,7 +222,7 @@ object KinesisWordProducerASL { val records = (1 to recordsPerSecond.toInt).foreach { recordNum => // Randomly generate wordsPerRecord number of words val data = (1 to wordsPerRecord.toInt).map(x => { - /* Get a random index to a word */ + // Get a random index to a word val randomWordIdx = Random.nextInt(randomWords.size) val randomWord = randomWords(randomWordIdx) @@ -257,7 +258,7 @@ object KinesisWordProducerASL { * This has been lifted from the examples/ project to remove the circular dependency. */ private[streaming] object StreamingExamples extends Logging { - /** Set reasonable logging levels for streaming if the user has not configured log4j. */ + // Set reasonable logging levels for streaming if the user has not configured log4j. def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) {