From ca2198acb6ba2d8ec896723bc40acacb5ee21a3f Mon Sep 17 00:00:00 2001 From: Aniket Bhatnagar Date: Tue, 4 Nov 2014 23:54:39 +0530 Subject: [PATCH] [SPARK-3639] [Streaming] [Kinesis] Allow users to provide credentials This patch allows users of kinesis-asl to provide their AWS credentials used to access stream(s). Before this patch, they would have to set them up on each cluster node. --- .../streaming/kinesis/KinesisReceiver.scala | 14 +++- .../streaming/kinesis/KinesisUtils.scala | 73 +++++++++++++++++++ 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 1bd1f324298e7..b550542d473d3 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -25,6 +25,8 @@ import org.apache.spark.streaming.Duration import org.apache.spark.streaming.receiver.Receiver import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.auth.AWSCredentials +import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory @@ -58,6 +60,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker * (InitialPositionInStream.TRIM_HORIZON) or * the tip of the stream (InitialPositionInStream.LATEST). * @param storageLevel Storage level to use for storing the received objects + * @param awsCredentials Optional tuple containing AWS access key and secret. If not provided, + * DefaultAWSCredentialsProviderChain will be used to resolve AWS credentials. * * @return ReceiverInputDStream[Array[Byte]] */ @@ -67,7 +71,8 @@ private[kinesis] class KinesisReceiver( endpointUrl: String, checkpointInterval: Duration, initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel) + storageLevel: StorageLevel, + awsCredentials: Option[(String, String)] = None) extends Receiver[Array[Byte]](storageLevel) with Logging { receiver => /* @@ -119,7 +124,12 @@ private[kinesis] class KinesisReceiver( */ override def onStart() { workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID() - credentialsProvider = new DefaultAWSCredentialsProviderChain() + credentialsProvider = awsCredentials.map(credentials => new AWSCredentialsProvider() { + override def getCredentials: AWSCredentials = { + new BasicAWSCredentials(credentials._1, credentials._2) + } + override def refresh(): Unit = {} + }).getOrElse(new DefaultAWSCredentialsProviderChain()) kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId).withKinesisEndpoint(endpointUrl) .withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala index 96f4399accd3a..5de6ac3929401 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -25,6 +25,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.auth.AWSCredentials /** @@ -35,6 +36,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn object KinesisUtils { /** * Create an InputDStream that pulls messages from a Kinesis stream. + * Credentials used to access Kinesis stream will be resolved using + * DefaultAWSCredentialsProviderChain. * :: Experimental :: * @param ssc StreamingContext object * @param streamName Kinesis stream name @@ -63,9 +66,45 @@ object KinesisUtils { ssc.receiverStream(new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)) } + + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * :: Experimental :: + * @param ssc StreamingContext object + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + * See the Kinesis Spark Streaming documentation for more + * details on the different types of checkpoints. + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param storageLevel Storage level to use for storing the received objects + * @param credentials Credentials to access Kinesis stream. + * + * @return ReceiverInputDStream[Array[Byte]] + */ + @Experimental + def createStream( + ssc: StreamingContext, + streamName: String, + endpointUrl: String, + checkpointInterval: Duration, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel, + credentials: AWSCredentials): ReceiverInputDStream[Array[Byte]] = { + ssc.receiverStream(new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl, + checkpointInterval, initialPositionInStream, storageLevel, + Some(credentials.getAWSAccessKeyId(), credentials.getAWSSecretKey()))) + } /** * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * Credentials used to access Kinesis stream will be resolved using + * DefaultAWSCredentialsProviderChain. * :: Experimental :: * @param jssc Java StreamingContext object * @param streamName Kinesis stream name @@ -94,4 +133,38 @@ object KinesisUtils { jssc.receiverStream(new KinesisReceiver(jssc.ssc.sc.appName, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)) } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * :: Experimental :: + * @param jssc Java StreamingContext object + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + * See the Kinesis Spark Streaming documentation for more + * details on the different types of checkpoints. + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param storageLevel Storage level to use for storing the received objects + * @param credentials Credentials to access Kinesis stream. + * + * @return JavaReceiverInputDStream[Array[Byte]] + */ + @Experimental + def createStream( + jssc: JavaStreamingContext, + streamName: String, + endpointUrl: String, + checkpointInterval: Duration, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel, + credentials: AWSCredentials): JavaReceiverInputDStream[Array[Byte]] = { + jssc.receiverStream(new KinesisReceiver(jssc.ssc.sc.appName, streamName, + endpointUrl, checkpointInterval, initialPositionInStream, storageLevel, + Some(credentials.getAWSAccessKeyId(), credentials.getAWSSecretKey()))) + } }