Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion connector/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,38 @@
<version>${aws.java.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<groupId>software.amazon.kinesis</groupId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but we don't expect this kind of dramatic dependency changes on the release branches, @vrozov .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun This is test only dependency and the old dependency does not work on 4.x.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun
Do you have any concern even though amazon-kinesis-producer is a test dependency and affects only kinesis-asl?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun @sarutak Note that the entire impact of the change is limited to KPLBasedKinesisTestUtils.scala and it is necessary due to other Spark dependencies being upgraded making it incompatible with the existing Kinesis producer library version.

<artifactId>amazon-kinesis-producer</artifactId>
<version>${aws.kinesis.producer.version}</version>
<scope>test</scope>
<exclusions>
<!-- auth and sts are excluded for SBT dependency management, so those dependencies
can be added explicitly -->
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</exclusion>
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
</exclusion>
<exclusion>
<groupId>com.kjetland</groupId>
<artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<version>${aws.java.sdk.v2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${aws.java.sdk.v2.version}</version>
<scope>test</scope>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to add these dependency for 4.0 even though they are not needed for 4.1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sarutak @dongjoon-hyun The issue is caused by 4.0 dependency on hadoop-aws:3.4.1 that was upgraded to 3.4.2 in 4.1. hadoop-aws:3.4.1 has transitive dependency on AWS Java SDK v2 2.24.6 that is not compatible with Kinesis Producer transitive dependency on AWS Java SDK v2 2.29.24. In general, Spark should be enforcing specific version of AWS Java SDK v2 (the one specified for aws.java.sdk.v2.version) instead of relying on transitive dependencies that may not be compatible with each other.

Note that this is not an issue for maven build as it correctly handles transitive dependencies, and causes the issue that you noticed in SBT.

</dependency>
<!-- manage this up explicitly to match Spark; com.amazonaws:aws-java-sdk-pom specifies
2.6.7 but says we can manage it up -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package org.apache.spark.streaming.kinesis

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.concurrent.{Executors, TimeUnit}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer,
KinesisProducerConfiguration, UserRecordResult}
import com.google.common.util.concurrent.{FutureCallback, Futures}

import org.apache.spark.util.ThreadUtils
import software.amazon.kinesis.producer.{KinesisProducer => KPLProducer,
KinesisProducerConfiguration, UserRecordResult}

private[kinesis] class KPLBasedKinesisTestUtils(streamShardCount: Int = 2)
extends KinesisTestUtils(streamShardCount) {
Expand All @@ -53,6 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
}

override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
val executor = Executors.newSingleThreadExecutor()
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
data.foreach { num =>
val str = num.toString
Expand All @@ -63,15 +63,17 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG

override def onSuccess(result: UserRecordResult): Unit = {
val shardId = result.getShardId
val seqNumber = result.getSequenceNumber()
val seqNumber = result.getSequenceNumber
val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
new ArrayBuffer[(Int, String)]())
sentSeqNumbers += ((num, seqNumber))
}
}
Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService())
Futures.addCallback(future, kinesisCallBack, executor)
}
producer.flushSync()
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq)
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq.sortBy(_._2))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
import com.amazonaws.services.kinesis.model._
import com.amazonaws.waiters.WaiterParameters

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{STREAM_NAME, TABLE_NAME}
Expand Down Expand Up @@ -61,6 +62,8 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
client
}

private lazy val streamExistsWaiter = kinesisClient.waiters().streamExists()

private lazy val dynamoDB = {
val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
Expand Down Expand Up @@ -184,18 +187,9 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
}

private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
val startTimeNs = System.nanoTime()
while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) {
Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
describeStream(streamNameToWaitFor).foreach { description =>
val streamStatus = description.getStreamStatus()
logDebug(s"\t- current state: $streamStatus\n")
if ("ACTIVE".equals(streamStatus)) {
return
}
}
}
require(false, s"Stream $streamName never became active")
val describeStreamRequest = new DescribeStreamRequest()
.withStreamName(streamNameToWaitFor)
streamExistsWaiter.run(new WaiterParameters(describeStreamRequest))
}
}

Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@
<codahale.metrics.version>4.2.30</codahale.metrics.version>
<!-- Should be consistent with SparkBuild.scala and docs -->
<avro.version>1.12.0</avro.version>
<aws.kinesis.client.version>1.12.0</aws.kinesis.client.version>
<aws.kinesis.client.version>1.15.3</aws.kinesis.client.version>
<!-- Should be consistent with Kinesis client dependency -->
<aws.java.sdk.version>1.11.655</aws.java.sdk.version>
<aws.java.sdk.version>1.12.681</aws.java.sdk.version>
<aws.java.sdk.v2.version>2.25.53</aws.java.sdk.v2.version>
<!-- the producer is used in tests -->
<aws.kinesis.producer.version>0.12.8</aws.kinesis.producer.version>
<aws.kinesis.producer.version>1.0.5</aws.kinesis.producer.version>
<!-- Do not use 3.0.0: https://github.com/GoogleCloudDataproc/hadoop-connectors/issues/1114 -->
<gcs-connector.version>hadoop3-2.2.26</gcs-connector.version>
<!-- org.apache.httpcomponents/httpclient-->
Expand Down