From cc36510f2c60cb6b5bfd5be0d0c7d551b36d5afe Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 21 May 2015 10:32:20 -0700 Subject: [PATCH 01/15] Added KinesisStreamSuite --- .../kinesis/KinesisStreamSuite.scala | 90 ++++++++++ .../streaming/kinesis/KinesisTestUtils.scala | 157 ++++++++++++++++++ 2 files changed, 247 insertions(+) create mode 100644 extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala create mode 100644 extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala new file mode 100644 index 0000000000000..7329ac4a98cdc --- /dev/null +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -0,0 +1,90 @@ +package org.apache.spark.streaming.kinesis + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.{Try, Random} + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.scalatest.concurrent.Eventually +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} + +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.util.Utils + +class KinesisStreamSuite extends FunSuite + with Eventually with BeforeAndAfter with BeforeAndAfterAll { + + private val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" + private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" + private val testUtils = new KinesisTestUtils(endpointUrl) + + private var ssc: StreamingContext = _ + + override def beforeAll(): Unit = { + testUtils.createStream() + } + + override def afterAll(): Unit = { + testUtils.deleteStream() + } + + before { + testUtils.deleteDynamoDBTable(kinesisAppName) + } + + after { + if (ssc != null) { + ssc.stop() + ssc = null + } + testUtils.deleteDynamoDBTable(kinesisAppName) + } + + if (shouldRunTests) { + + test("basic operation") { + val conf = new SparkConf() + .setMaster("local[4]") + .setAppName(kinesisAppName) // Setting Spark app name to Kinesis app name + ssc = new StreamingContext(conf, Milliseconds(1000)) + //val stream = KinesisUtils.createStream(ssc, testUtils.streamName, testUtils.endpointUrl, + // Seconds(10), InitialPositionInStream.LATEST, StorageLevel.MEMORY_ONLY) + + val credentials = new DefaultAWSCredentialsProviderChain().getCredentials() + val stream = KinesisUtils.createStream(ssc, kinesisAppName, testUtils.streamName, + testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, + Seconds(10), StorageLevel.MEMORY_ONLY, + credentials.getAWSAccessKeyId, credentials.getAWSSecretKey) + + + val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] + stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => + collected ++= rdd.collect() + println("Collected = " + rdd.collect().toSeq.mkString(", ")) + } + ssc.start() + + val testData = 1 to 10 + eventually(timeout(120 seconds), interval(10 second)) { + testUtils.pushData(testData) + assert(collected === testData.toSet, "\nData received does not match data sent") + } + } + } + + def shouldRunTests: Boolean = { + val isSystemVariableSet = true // sys.env.get("RUN_KINESIS_STREAM_TESTS").nonEmpty + def isCredentialsAvailable: Boolean = Try { + new DefaultAWSCredentialsProviderChain().getCredentials + }.isSuccess + isSystemVariableSet && isCredentialsAvailable + } +} \ No newline at end of file diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala new file mode 100644 index 0000000000000..97399036d8fa4 --- /dev/null +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -0,0 +1,157 @@ +package org.apache.spark.streaming.kinesis + +import java.nio.ByteBuffer +import java.util.concurrent.TimeUnit + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.regions.RegionUtils +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient +import com.amazonaws.services.dynamodbv2.document.DynamoDB +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.model._ + +import org.apache.spark.Logging + +class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extends Logging { + + val regionName = if (_regionName.length == 0) { + RegionUtils.getRegionByEndpoint(endpointUrl).getName() + } else { + RegionUtils.getRegion(_regionName).getName() + } + val streamShardCount = 2 + + private val createStreamTimeoutSeconds = 300 + private val describeStreamPollTimeSeconds = 1 + + @volatile + private var streamCreated = false + private var _streamName: String = _ + + + private val credentialsProvider = new DefaultAWSCredentialsProviderChain() + + private lazy val kinesisClient = { + val client = new AmazonKinesisClient(credentialsProvider) + client.setEndpoint(endpointUrl) + client + } + + private lazy val dynamoDB = { + val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()) + dynamoDBClient.setRegion(RegionUtils.getRegion(regionName)) + new DynamoDB(dynamoDBClient) + } + + def streamName: String = { + require(streamCreated, "Stream not yet created, call createStream() to create one") + _streamName + } + + def createStream(): Unit = { + println("Creating stream") + require(!streamCreated, "Stream already created") + _streamName = findNonExistentStreamName() + + // Create a stream. The number of shards determines the provisioned throughput. + val createStreamRequest = new CreateStreamRequest() + createStreamRequest.setStreamName(_streamName) + createStreamRequest.setShardCount(2) + kinesisClient.createStream(createStreamRequest) + + // The stream is now being created. Wait for it to become active. + waitForStreamToBeActive(_streamName) + streamCreated = true + println("Created stream") + } + + /** + * Push data to Kinesis stream and return a map of + * shardId -> seq of (data, seq number) pushed to corresponding shard + */ + def pushData(testData: Seq[Int]): Map[String, Seq[(Int, String)]] = { + require(streamCreated, "Stream not yet created, call createStream() to create one") + val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]() + + testData.foreach { num => + val str = num.toString + val putRecordRequest = new PutRecordRequest().withStreamName(streamName) + .withData(ByteBuffer.wrap(str.getBytes())) + .withPartitionKey(str) + + val putRecordResult = kinesisClient.putRecord(putRecordRequest) + val shardId = putRecordResult.getShardId + val seqNumber = putRecordResult.getSequenceNumber() + val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId, + new ArrayBuffer[(Int, String)]()) + sentSeqNumbers += ((num, seqNumber)) + + } + println(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") + shardIdToSeqNumbers.toMap + } + + def describeStream(streamNameToDescribe: String = streamName): Option[StreamDescription] = { + try { + val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe) + val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription() + Some(desc) + } catch { + case rnfe: ResourceNotFoundException => + None + } + } + + def deleteStream(): Unit = { + try { + if (describeStream().nonEmpty) { + val deleteStreamRequest = new DeleteStreamRequest() + kinesisClient.deleteStream(streamName) + } + } catch { + case e: Exception => + logWarning(s"Could not delete stream $streamName") + } + } + + def deleteDynamoDBTable(tableName: String): Unit = { + + try { + val table = dynamoDB.getTable(tableName) + table.delete() + table.waitForDelete() + } catch { + case e: Exception => + logWarning(s"Could not delete DynamoDB table $tableName") + } + } + + private def findNonExistentStreamName(): String = { + var testStreamName: String = null + do { + Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds)) + testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}" + } while (describeStream(testStreamName).nonEmpty) + testStreamName + } + + private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = { + val startTime = System.currentTimeMillis() + val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds) + while (System.currentTimeMillis() < endTime) { + Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds)) + describeStream(streamNameToWaitFor).foreach { description => + val streamStatus = description.getStreamStatus() + println(s"\t- current state: $streamStatus\n") + if ("ACTIVE".equals(streamStatus)) { + return + } + } + } + require(false, s"Stream $streamName never became active") + } +} From 129d436eb3c81feb22ed998e7a501fea1a64b54b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 14 Jul 2015 22:22:48 -0700 Subject: [PATCH 02/15] Minor updates --- .../spark/streaming/kinesis/KinesisStreamSuite.scala | 8 +++++--- .../apache/spark/streaming/kinesis/KinesisTestUtils.scala | 5 ++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 7329ac4a98cdc..94b4c0c9aa017 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -48,12 +48,12 @@ class KinesisStreamSuite extends FunSuite testUtils.deleteDynamoDBTable(kinesisAppName) } - if (shouldRunTests) { + if (KinesisStreamSuite.shouldRunTests) { test("basic operation") { val conf = new SparkConf() .setMaster("local[4]") - .setAppName(kinesisAppName) // Setting Spark app name to Kinesis app name + .setAppName(kinesisAppName) // Setting Spark app name to Kinesis app name ssc = new StreamingContext(conf, Milliseconds(1000)) //val stream = KinesisUtils.createStream(ssc, testUtils.streamName, testUtils.endpointUrl, // Seconds(10), InitialPositionInStream.LATEST, StorageLevel.MEMORY_ONLY) @@ -79,8 +79,10 @@ class KinesisStreamSuite extends FunSuite } } } +} - def shouldRunTests: Boolean = { +object KinesisStreamSuite { + def shouldRunTests(): Boolean = { val isSystemVariableSet = true // sys.env.get("RUN_KINESIS_STREAM_TESTS").nonEmpty def isCredentialsAvailable: Boolean = Try { new DefaultAWSCredentialsProviderChain().getCredentials diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 97399036d8fa4..e193add0791da 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -71,7 +71,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend /** * Push data to Kinesis stream and return a map of - * shardId -> seq of (data, seq number) pushed to corresponding shard + * shardId -> seq of (data, seq number) pushed to corresponding shard */ def pushData(testData: Seq[Int]): Map[String, Seq[(Int, String)]] = { require(streamCreated, "Stream not yet created, call createStream() to create one") @@ -119,7 +119,6 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend } def deleteDynamoDBTable(tableName: String): Unit = { - try { val table = dynamoDB.getTable(tableName) table.delete() @@ -146,7 +145,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds)) describeStream(streamNameToWaitFor).foreach { description => val streamStatus = description.getStreamStatus() - println(s"\t- current state: $streamStatus\n") + logDebug(s"\t- current state: $streamStatus\n") if ("ACTIVE".equals(streamStatus)) { return } From 4d7070311b165f5d59387855983e5d3ccea34fab Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 14 Jul 2015 22:34:17 -0700 Subject: [PATCH 03/15] Added license --- .../streaming/kinesis/KinesisStreamSuite.scala | 17 +++++++++++++++++ .../streaming/kinesis/KinesisTestUtils.scala | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 94b4c0c9aa017..5f50e842cfd51 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.kinesis import scala.collection.mutable diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index e193add0791da..8bacdbe680f71 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.kinesis import java.nio.ByteBuffer From 465b55d8d9c4614c5f7a70339804f89c7819241b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 15 Jul 2015 19:34:58 -0700 Subject: [PATCH 04/15] Made unit tests optional in a nice way --- .../streaming/kinesis/KinesisFunSuite.scala | 17 +++++ .../kinesis/KinesisStreamSuite.scala | 65 +++++++------------ .../streaming/kinesis/KinesisTestUtils.scala | 37 ++++++++--- 3 files changed, 70 insertions(+), 49 deletions(-) create mode 100644 extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala new file mode 100644 index 0000000000000..dba4fc4140f99 --- /dev/null +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala @@ -0,0 +1,17 @@ +package org.apache.spark.streaming.kinesis + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.SparkFunSuite + +abstract class KinesisFunSuite extends SparkFunSuite with BeforeAndAfterAll { + import KinesisTestUtils._ + + def testOrIgnore(testName: String)(testBody: => Unit) { + if (shouldRunTests) { + test(testName)(testBody) + } else { + ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody) + } + } +} \ No newline at end of file diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 5f50e842cfd51..00b64542c9935 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -29,14 +29,14 @@ import org.apache.hadoop.fs.FileSystem import org.scalatest.concurrent.Eventually import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{SparkFunSuite, SparkContext, SparkConf} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.util.Utils -class KinesisStreamSuite extends FunSuite +class KinesisStreamSuite extends KinesisFunSuite with Eventually with BeforeAndAfter with BeforeAndAfterAll { private val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" @@ -46,6 +46,7 @@ class KinesisStreamSuite extends FunSuite private var ssc: StreamingContext = _ override def beforeAll(): Unit = { + super.beforeAll() testUtils.createStream() } @@ -65,45 +66,29 @@ class KinesisStreamSuite extends FunSuite testUtils.deleteDynamoDBTable(kinesisAppName) } - if (KinesisStreamSuite.shouldRunTests) { - - test("basic operation") { - val conf = new SparkConf() - .setMaster("local[4]") - .setAppName(kinesisAppName) // Setting Spark app name to Kinesis app name - ssc = new StreamingContext(conf, Milliseconds(1000)) - //val stream = KinesisUtils.createStream(ssc, testUtils.streamName, testUtils.endpointUrl, - // Seconds(10), InitialPositionInStream.LATEST, StorageLevel.MEMORY_ONLY) - - val credentials = new DefaultAWSCredentialsProviderChain().getCredentials() - val stream = KinesisUtils.createStream(ssc, kinesisAppName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - credentials.getAWSAccessKeyId, credentials.getAWSSecretKey) - - - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] - stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => - collected ++= rdd.collect() - println("Collected = " + rdd.collect().toSeq.mkString(", ")) - } - ssc.start() + testOrIgnore("basic operation") { + val conf = new SparkConf() + .setMaster("local[4]") + .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name + ssc = new StreamingContext(conf, Milliseconds(1000)) + val credentials = KinesisTestUtils.getAWSCredentials() + val stream = KinesisUtils.createStream(ssc, kinesisAppName, testUtils.streamName, + testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, + Seconds(10), StorageLevel.MEMORY_ONLY, + credentials.getAWSAccessKeyId, credentials.getAWSSecretKey) + + val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] + stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => + collected ++= rdd.collect() + logInfo("Collected = " + rdd.collect().toSeq.mkString(", ")) + } + ssc.start() - val testData = 1 to 10 - eventually(timeout(120 seconds), interval(10 second)) { - testUtils.pushData(testData) - assert(collected === testData.toSet, "\nData received does not match data sent") - } + val testData = 1 to 10 + eventually(timeout(120 seconds), interval(10 second)) { + testUtils.pushData(testData) + assert(collected === testData.toSet, "\nData received does not match data sent") } + ssc.stop() } } - -object KinesisStreamSuite { - def shouldRunTests(): Boolean = { - val isSystemVariableSet = true // sys.env.get("RUN_KINESIS_STREAM_TESTS").nonEmpty - def isCredentialsAvailable: Boolean = Try { - new DefaultAWSCredentialsProviderChain().getCredentials - }.isSuccess - isSystemVariableSet && isCredentialsAvailable - } -} \ No newline at end of file diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 8bacdbe680f71..00b0937df7a9b 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.util.Random +import scala.util.{Failure, Random, Success, Try} -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} import com.amazonaws.regions.RegionUtils import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB @@ -49,11 +49,8 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend private var streamCreated = false private var _streamName: String = _ - - private val credentialsProvider = new DefaultAWSCredentialsProviderChain() - private lazy val kinesisClient = { - val client = new AmazonKinesisClient(credentialsProvider) + val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials()) client.setEndpoint(endpointUrl) client } @@ -70,7 +67,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend } def createStream(): Unit = { - println("Creating stream") + logInfo("Creating stream") require(!streamCreated, "Stream already created") _streamName = findNonExistentStreamName() @@ -83,7 +80,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend // The stream is now being created. Wait for it to become active. waitForStreamToBeActive(_streamName) streamCreated = true - println("Created stream") + logInfo("Created stream") } /** @@ -108,7 +105,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend sentSeqNumbers += ((num, seqNumber)) } - println(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") + logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") shardIdToSeqNumbers.toMap } @@ -171,3 +168,25 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend require(false, s"Stream $streamName never became active") } } + +object KinesisTestUtils { + + + val envVarName = "RUN_KINESIS_TESTS" + + val shouldRunTests = sys.env.get(envVarName).nonEmpty + + def isAWSCredentialsPresent: Boolean = { + Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess + } + + def getAWSCredentials(): AWSCredentials = { + assert(shouldRunTests, + "Kinesis test not enabled, should not attempt to get AWS credentials") + Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match { + case Success(cred) => cred + case Failure(e) => + throw new Exception("Kinesis tests enabled, but could get not AWS credentials") + } + } +} From 24a992b107b44ae48a7851f29d4a57a53bed6718 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 15 Jul 2015 19:53:22 -0700 Subject: [PATCH 05/15] Moved KinesisTestUtils to src instead of test for future python usage --- .../streaming/kinesis/KinesisTestUtils.scala | 10 +++-- .../streaming/kinesis/KinesisFunSuite.scala | 1 + .../kinesis/KinesisStreamSuite.scala | 45 ++++++++++++------- 3 files changed, 38 insertions(+), 18 deletions(-) rename extras/kinesis-asl/src/{test => main}/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala (96%) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala similarity index 96% rename from extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala rename to extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 00b0937df7a9b..29808ad142af0 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -33,7 +33,12 @@ import com.amazonaws.services.kinesis.model._ import org.apache.spark.Logging -class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extends Logging { +/** + * Shared utility methods + */ +private class KinesisTestUtils( + val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com", + _regionName: String = "") extends Logging { val regionName = if (_regionName.length == 0) { RegionUtils.getRegionByEndpoint(endpointUrl).getName() @@ -169,8 +174,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend } } -object KinesisTestUtils { - +private[kinesis] object KinesisTestUtils { val envVarName = "RUN_KINESIS_TESTS" diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala index dba4fc4140f99..04026f79c1b22 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala @@ -4,6 +4,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite + abstract class KinesisFunSuite extends SparkFunSuite with BeforeAndAfterAll { import KinesisTestUtils._ diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 00b64542c9935..7646be971b7a1 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -39,41 +39,56 @@ import org.apache.spark.util.Utils class KinesisStreamSuite extends KinesisFunSuite with Eventually with BeforeAndAfter with BeforeAndAfterAll { - private val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" + private lazy val awsCredentials = KinesisTestUtils.getAWSCredentials() private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" - private val testUtils = new KinesisTestUtils(endpointUrl) + private val kinesisTestUtils = new KinesisTestUtils() private var ssc: StreamingContext = _ + private var sc: SparkContext = _ override def beforeAll(): Unit = { - super.beforeAll() - testUtils.createStream() + kinesisTestUtils.createStream() + val conf = new SparkConf() + .setMaster("local[4]") + .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name + sc = new SparkContext(conf) } override def afterAll(): Unit = { - testUtils.deleteStream() + sc.stop() + kinesisTestUtils.deleteStream() + kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) } before { - testUtils.deleteDynamoDBTable(kinesisAppName) + // Delete the table so that each unit test can start from scratch without prior state + kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) } after { if (ssc != null) { - ssc.stop() + ssc.stop(stopSparkContext = false) ssc = null } - testUtils.deleteDynamoDBTable(kinesisAppName) } + test("API") { + // The API is tested separately because other tests may not run all the time + ssc = new StreamingContext(sc, Milliseconds(1000)) + val stream1 = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, + kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, + Seconds(10), StorageLevel.MEMORY_ONLY) + val stream2 = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, + kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, + Seconds(10), StorageLevel.MEMORY_ONLY, "", "") + } + + testOrIgnore("basic operation") { - val conf = new SparkConf() - .setMaster("local[4]") - .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name - ssc = new StreamingContext(conf, Milliseconds(1000)) + ssc = new StreamingContext(sc, Milliseconds(1000)) val credentials = KinesisTestUtils.getAWSCredentials() - val stream = KinesisUtils.createStream(ssc, kinesisAppName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, + val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, + kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, Seconds(10), StorageLevel.MEMORY_ONLY, credentials.getAWSAccessKeyId, credentials.getAWSSecretKey) @@ -86,7 +101,7 @@ class KinesisStreamSuite extends KinesisFunSuite val testData = 1 to 10 eventually(timeout(120 seconds), interval(10 second)) { - testUtils.pushData(testData) + kinesisTestUtils.pushData(testData) assert(collected === testData.toSet, "\nData received does not match data sent") } ssc.stop() From c6be0d72744566e75bce661de9c86ff1145c7df1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 15 Jul 2015 19:56:52 -0700 Subject: [PATCH 06/15] minor changes --- .../spark/streaming/kinesis/KinesisStreamSuite.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 7646be971b7a1..992eff8d7d98f 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -39,10 +39,11 @@ import org.apache.spark.util.Utils class KinesisStreamSuite extends KinesisFunSuite with Eventually with BeforeAndAfter with BeforeAndAfterAll { - private lazy val awsCredentials = KinesisTestUtils.getAWSCredentials() - private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" private val kinesisTestUtils = new KinesisTestUtils() + // This is the name that KCL uses to save metadata to DynamoDB + private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" + private var ssc: StreamingContext = _ private var sc: SparkContext = _ @@ -56,6 +57,7 @@ class KinesisStreamSuite extends KinesisFunSuite override def afterAll(): Unit = { sc.stop() + // Delete the stream as well as the table kinesisTestUtils.deleteStream() kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) } @@ -86,11 +88,11 @@ class KinesisStreamSuite extends KinesisFunSuite testOrIgnore("basic operation") { ssc = new StreamingContext(sc, Milliseconds(1000)) - val credentials = KinesisTestUtils.getAWSCredentials() + val aWSCredentials = KinesisTestUtils.getAWSCredentials() val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, Seconds(10), StorageLevel.MEMORY_ONLY, - credentials.getAWSAccessKeyId, credentials.getAWSSecretKey) + aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey) val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => From 88f6dab2f88cff0c18ab5866d0fe0a860a7db972 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 15 Jul 2015 20:02:57 -0700 Subject: [PATCH 07/15] Added scala docs --- .../apache/spark/streaming/kinesis/KinesisTestUtils.scala | 2 +- .../org/apache/spark/streaming/kinesis/KinesisFunSuite.scala | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 29808ad142af0..b8ab2a5178d36 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -34,7 +34,7 @@ import com.amazonaws.services.kinesis.model._ import org.apache.spark.Logging /** - * Shared utility methods + * Shared utility methods for performing Kinesis tests that actually transfer data */ private class KinesisTestUtils( val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com", diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala index 04026f79c1b22..a4b44b8afb791 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala @@ -4,7 +4,10 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite - +/** + * Helper class that runs Kinesis real data transfer tests or + * ignores them based on env variable is set or not. + */ abstract class KinesisFunSuite extends SparkFunSuite with BeforeAndAfterAll { import KinesisTestUtils._ From dbb33a5abd87828573b569e7002c18f4313e4c5d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 15 Jul 2015 20:05:21 -0700 Subject: [PATCH 08/15] Added license --- .../streaming/kafka/KafkaStreamSuite.scala | 2 +- .../streaming/kinesis/KinesisFunSuite.scala | 19 ++++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 797b07f80d8ee..255da9c73fc1c 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -26,9 +26,9 @@ import kafka.serializer.StringDecoder import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually -import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} +import org.apache.spark.{SparkConf, SparkFunSuite} class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll { private var ssc: StreamingContext = _ diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala index a4b44b8afb791..3b02d3bd398ff 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.kinesis import org.scalatest.BeforeAndAfterAll @@ -18,4 +35,4 @@ abstract class KinesisFunSuite extends SparkFunSuite with BeforeAndAfterAll { ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody) } } -} \ No newline at end of file +} From 18c2208f57b9f99c42b26e9fae849da52c2a05df Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 15 Jul 2015 20:53:52 -0700 Subject: [PATCH 09/15] Changed how SparkFunSuite is inherited --- .../org/apache/spark/streaming/kinesis/KinesisFunSuite.scala | 4 ++-- .../apache/spark/streaming/kinesis/KinesisStreamSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala index 3b02d3bd398ff..86dab19a37d90 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kinesis -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{FunSuite, BeforeAndAfterAll} import org.apache.spark.SparkFunSuite @@ -25,7 +25,7 @@ import org.apache.spark.SparkFunSuite * Helper class that runs Kinesis real data transfer tests or * ignores them based on env variable is set or not. */ -abstract class KinesisFunSuite extends SparkFunSuite with BeforeAndAfterAll { +trait KinesisSuiteHelper { self: FunSuite => import KinesisTestUtils._ def testOrIgnore(testName: String)(testBody: => Unit) { diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 992eff8d7d98f..231214f8eb1d3 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.util.Utils -class KinesisStreamSuite extends KinesisFunSuite +class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper with Eventually with BeforeAndAfter with BeforeAndAfterAll { private val kinesisTestUtils = new KinesisTestUtils() From deb7f4fedd7cd2d4c15affd492eef43b4909b649 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 16 Jul 2015 00:32:04 -0700 Subject: [PATCH 10/15] Removed scalatest.FunSuite --- .../org/apache/spark/streaming/kinesis/KinesisFunSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala index 86dab19a37d90..b48837b639229 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala @@ -17,15 +17,13 @@ package org.apache.spark.streaming.kinesis -import org.scalatest.{FunSuite, BeforeAndAfterAll} - import org.apache.spark.SparkFunSuite /** * Helper class that runs Kinesis real data transfer tests or * ignores them based on env variable is set or not. */ -trait KinesisSuiteHelper { self: FunSuite => +trait KinesisSuiteHelper { self: SparkFunSuite => import KinesisTestUtils._ def testOrIgnore(testName: String)(testBody: => Unit) { From 90c9bdeaed8157fa6f1afe13efeea289b397fc78 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 16 Jul 2015 00:42:13 -0700 Subject: [PATCH 11/15] Removed scalatest.FunSuite --- .../spark/streaming/kinesis/KinesisStreamSuite.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 231214f8eb1d3..7aaa69a4924d4 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -20,21 +20,15 @@ package org.apache.spark.streaming.kinesis import scala.collection.mutable import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.{Try, Random} +import scala.util.Random -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem import org.scalatest.concurrent.Eventually -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.apache.spark.{SparkFunSuite, SparkContext, SparkConf} -import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper with Eventually with BeforeAndAfter with BeforeAndAfterAll { From a297b59909c0f8a839a28ba19a84321588df4b92 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 16 Jul 2015 00:42:59 -0700 Subject: [PATCH 12/15] Reverted unnecessary change in KafkaStreamSuite --- .../org/apache/spark/streaming/kafka/KafkaStreamSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 255da9c73fc1c..797b07f80d8ee 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -26,9 +26,9 @@ import kafka.serializer.StringDecoder import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import org.apache.spark.{SparkConf, SparkFunSuite} class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll { private var ssc: StreamingContext = _ From c7caef775d162a9a33a6df15d4a2be406935a7eb Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 17 Jul 2015 01:12:16 -0700 Subject: [PATCH 13/15] Address comments --- .../streaming/kinesis/KinesisTestUtils.scala | 2 +- .../kinesis/KinesisReceiverSuite.scala | 17 ----------- .../kinesis/KinesisStreamSuite.scala | 28 ++++++++++++------- 3 files changed, 19 insertions(+), 28 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index b8ab2a5178d36..bd8ad03bcd5e3 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -108,8 +108,8 @@ private class KinesisTestUtils( val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId, new ArrayBuffer[(Int, String)]()) sentSeqNumbers += ((num, seqNumber)) - } + logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") shardIdToSeqNumbers.toMap } diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index 2103dca6b766f..98f2c7c4f1bfb 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -73,23 +73,6 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft checkpointStateMock, currentClockMock) } - test("KinesisUtils API") { - val ssc = new StreamingContext(master, framework, batchDuration) - // Tests the API, does not actually test data receiving - val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", Seconds(2), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) - val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", "us-west-2", - InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2) - val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", "us-west-2", - InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2, - "awsAccessKey", "awsSecretKey") - - ssc.stop() - } - test("check serializability of SerializableAWSCredentials") { Utils.deserialize[SerializableAWSCredentials]( Utils.serialize(new SerializableAWSCredentials("x", "y"))) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 7aaa69a4924d4..b5ae405b70688 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -68,20 +68,28 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper } } - test("API") { - // The API is tested separately because other tests may not run all the time - ssc = new StreamingContext(sc, Milliseconds(1000)) - val stream1 = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, - kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY) - val stream2 = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, - kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, "", "") + test("KinesisUtils API") { + ssc = new StreamingContext(sc, Seconds(1)) + // Tests the API, does not actually test data receiving + val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream", + "https://kinesis.us-west-2.amazonaws.com", Seconds(2), + InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) + val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", + "https://kinesis.us-west-2.amazonaws.com", "us-west-2", + InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2) + val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", + "https://kinesis.us-west-2.amazonaws.com", "us-west-2", + InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2, + "awsAccessKey", "awsSecretKey") } + /** + * Test the stream by sending data to a Kinesis stream and receiving from it. + * This by default ignore as the user may not have access to the + */ testOrIgnore("basic operation") { - ssc = new StreamingContext(sc, Milliseconds(1000)) + ssc = new StreamingContext(sc, Seconds(1)) val aWSCredentials = KinesisTestUtils.getAWSCredentials() val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, From 1ea5ce066d67f7fde4b2c717c882e2a806ce1d1d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 17 Jul 2015 01:21:23 -0700 Subject: [PATCH 14/15] Added more comments --- .../apache/spark/streaming/kinesis/KinesisTestUtils.scala | 1 + .../spark/streaming/kinesis/KinesisStreamSuite.scala | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index bd8ad03bcd5e3..bd7ec665af3b0 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -45,6 +45,7 @@ private class KinesisTestUtils( } else { RegionUtils.getRegion(_regionName).getName() } + val streamShardCount = 2 private val createStreamTimeoutSeconds = 300 diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index b5ae405b70688..5fb5c241fe7e8 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -51,13 +51,16 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper override def afterAll(): Unit = { sc.stop() - // Delete the stream as well as the table + // Delete the Kinesis stream as well as the DynamoDB table generated by + // Kinesis Client Library when consuming the stream kinesisTestUtils.deleteStream() kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) } before { - // Delete the table so that each unit test can start from scratch without prior state + // Delete the DynamoDB table generated by Kinesis Client Library when + // consuming from the stream, so that each unit test can start from + // scratch without prior history of data consumption kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) } From 0e16db5bc89e5339e34e7a7bd0c94cbe76d07729 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 17 Jul 2015 01:26:52 -0700 Subject: [PATCH 15/15] Added more comments regarding testOrIgnore --- .../apache/spark/streaming/kinesis/KinesisTestUtils.scala | 2 +- .../apache/spark/streaming/kinesis/KinesisFunSuite.scala | 1 + .../apache/spark/streaming/kinesis/KinesisStreamSuite.scala | 6 +++++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index bd7ec665af3b0..f6bf552e6bb8e 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -179,7 +179,7 @@ private[kinesis] object KinesisTestUtils { val envVarName = "RUN_KINESIS_TESTS" - val shouldRunTests = sys.env.get(envVarName).nonEmpty + val shouldRunTests = sys.env.get(envVarName) == Some("1") def isAWSCredentialsPresent: Boolean = { Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala index b48837b639229..6d011f295e7f7 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.SparkFunSuite trait KinesisSuiteHelper { self: SparkFunSuite => import KinesisTestUtils._ + /** Run the test if environment variable is set or ignore the test */ def testOrIgnore(testName: String)(testBody: => Unit) { if (shouldRunTests) { test(testName)(testBody) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 5fb5c241fe7e8..d3dd541fe4371 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -89,7 +89,11 @@ class KinesisStreamSuite extends SparkFunSuite with KinesisSuiteHelper /** * Test the stream by sending data to a Kinesis stream and receiving from it. - * This by default ignore as the user may not have access to the + * This test is not run by default as it requires AWS credentials that the test + * environment may not have. Even if there is AWS credentials available, the user + * may not want to run these tests to avoid the Kinesis costs. To enable this test, + * you must have AWS credentials available through the default AWS provider chain, + * and you have to set the system environment variable RUN_KINESIS_TESTS=1 . */ testOrIgnore("basic operation") { ssc = new StreamingContext(sc, Seconds(1))