Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45720] Upgrade AWS SDK to v2 for Spark Kinesis connector module #44211

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

junyuc25
Copy link

@junyuc25 junyuc25 commented Dec 6, 2023

What changes were proposed in this pull request?

As Spark is moving to 4.0, one of the major improvement is to upgrade AWS SDK to v2.

Currently other than directly using AWS SDKv1 codes, the Spark Kinesis connector is also
using on these libraries that depends on SDKv1:

  • Kinesis Client Library (KCL) allows users to easily consume and process data from Amazon Kinesis
  • Kinesis Producer Library (KPL) allows users to create reliable and efficient message producers for Amazon Kinesis

The main purpose of this PR is to upgrading AWS SDK to v2 for the Spark Kinesis
conector. While the changes includes upgrading AWS SDK and KCL to v2, we will not
upgrade KPL because it has not yet been migrated to SDKv2.

Why are the changes needed?

As the GA of AWS SDK v2, the SDKv1 has entered maintenance mode where its future
release are only limited to address critical bug and security issues. More details about the SDK maintenance policy can be found here. To keep Spark’s dependent softwares up to date, we should consider upgrading the SDK to v2.
These changes could keep Spark Kinesis connector up to date, and enable users to
receive continuous support from the above libraries.

Does this PR introduce any user-facing change?

Yes. With this change, the Spark Kinesis connector will no longer work with SDKv1.
Any applications that are running with previous version of Spark Kinesis connector
would require update before migrating to Spark 4.0.

AWS SDKv2 and KCLv2 contain several major changes
that are not backward compatible with their previous versions. And some public classes
in the module (i.e. KinesisInputDStream) are using one of these breaking changes. Thus
these user-facing classes require updates as well.

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

No

@junyuc25 junyuc25 changed the title [SPARK-45720] Upgrade AWS SDK to v2 for Spark Kinesis connector module [WIP][SPARK-45720] Upgrade AWS SDK to v2 for Spark Kinesis connector module Dec 6, 2023
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${aws.kinesis.client.guava.version}</version>
<scope>compile</scope>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, I'm not sure this is okay or not, @junyuc25 .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit more on why this is not OK? I think this pattern is also seen in other modules like https://github.com/apache/spark/blob/master/connector/connect/server/pom.xml#L159-L164

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you share more about your concern here @dongjoon-hyun ? Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being late.

  1. Is this only for Kinesis or for all AWS SKD v2?
  2. Instead of the following, can we use the latest33.0.0-jre like [SPARK-46768][BUILD] Upgrade Guava used by the connect module to 33.0-jre #44795 ?
<aws.kinesis.client.guava.version>32.1.1-jre</aws.kinesis.client.guava.version>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This change is only for the Kinesis connector modules.
  2. Yes I changed it to use the same Guava version as the one used in other module.

@dongjoon-hyun
Copy link
Member

Anyway, thank you so much for working on this area, @junyuc25 .

@@ -54,14 +54,38 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
Copy link
Member

@pan3793 pan3793 Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please clarify the relationship between
software.amazon.kinesis:amazon-kinesis-client and software.amazon.awssdk:kinesis?

Seems they don't share the version, and the latter is hygienic (no third-party dependencies other than software.amazon.awssdk:*)

Handling Jackson/Guava/Protobuf dependencies conflict is always painful, is it possible to provide a similar hygienic artifact for the former to make the downstream project easier to consume?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pan3793, KCL (software.amazon.kinesis:amazon-kinesis-client) is built on top of Kinesis Data Stream API (software.amazon.awssdk:kinesis), and KCL provides additional functionalities such as load balancing, error recovery etc. According to this doc, generally it is recommended to use KCL over Kinesis Data Stream API:https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html .

Perhaps we could try to raise an issue to the KCL repo and ask if they can release a "clean" version of the library with all the third-party dependencies shaded. But currently I'm not aware there is such a hygienic version of KCL. I guess we probably have to live with this issue if we use KCL.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could try to raise an issue to the KCL repo and ask if they can release a "clean" version of the library with all the third-party dependencies shaded.

Would you like to drive this? It will benefit all downstream projects.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pan3793, I submitted an ticket to the KCL repo: awslabs/amazon-kinesis-client#1245. Let's see what response we would get. On the other hand, it seems to me that this would be a follow-up task, rather than a blocker for this PR? Please correct me if I am wrong.

spark default of 2.5.0 because KCL appears to have introduced
a dependency on protobuf 2.6.1 somewhere between KCL 1.4.0 and 1.6.1.
-->
<scope>compile</scope>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad to see that Kinesis can use the same protobuf-java as other modules, but is this feasible?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @LuciferYang, I saw similar patterns in other modules as well: https://github.com/apache/spark/blob/v3.5.0/connector/connect/server/pom.xml#L173-L175. I believe this should work as long as we shade it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, I removed this shaded protobuf-java completely in Apache Spark 4.0.0 independently.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dongjoon-hyun for letting me know. Updated the PR to use the same protobuf-java version as specified in <protobuf.version>. I had to change the scope to "compile" to fix ClassNotFoundExceptions.

@junyuc25 junyuc25 marked this pull request as ready for review January 10, 2024 08:19
@@ -16,7 +16,8 @@
*/
package org.apache.spark.streaming.kinesis;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: omit the empty line #L19

.asScala
.find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
.map(_.getName)
.find(r => kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about r => kinesisServiceMetadata.endpointFor(r).equals(uri)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the URI objects would be different I we make this change. Comparing these two different URI causes test failure.

Stacktrace:

 Cause: java.lang.IllegalArgumentException: Could not resolve region for endpoint: https://kinesis.us-west-2.amazonaws.com
  at org.apache.spark.streaming.kinesis.KinesisTestUtils$.$anonfun$getRegionNameByEndpoint$3(KinesisTestUtils.scala:237)
  at scala.Option.getOrElse(Option.scala:201)
  at org.apache.spark.streaming.kinesis.KinesisTestUtils$.getRegionNameByEndpoint(KinesisTestUtils.scala:237)
  at org.apache.spark.streaming.kinesis.KinesisStreamTests.<init>(KinesisStreamSuite.scala:51)
  at org.apache.spark.streaming.kinesis.WithoutAggregationKinesisStreamSuite.<init>(KinesisStreamSuite.scala:434)

So to avoid this failure, I would prefer to keep this line. kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost)

@@ -60,7 +60,7 @@ private[kinesis] class KinesisCheckpointer(
* we will use that to make the final checkpoint. If `null` is provided, we will not make the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you correct this comment section? At least no more IRecordProcessor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. Updated.


// shardId populated during initialize()
@volatile
private var shardId: String = _

/**
* The Kinesis Client Library calls this method during IRecordProcessor initialization.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the comments before methods (such as initialize, processRecords ) are removed in this class? And for the new adding public methods, please add method comments as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments.

val splitShardRequest = new SplitShardRequest()
splitShardRequest.withStreamName(_streamName)
splitShardRequest.withShardToSplit(shardId)
// Set a half of the max hash value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should not be omitted.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it back

.asScala
.find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
.map(_.getName)
.find(r => kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above response.

}

def getAWSCredentials(): AWSCredentials = {
def getAwsCredentials: AwsCredentials = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This modification will change the API. Please revert it and keep () in method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

/**
* Returns AWSStaticCredentialsProvider constructed using basic AWS keypair. Falls back to using
* Returns StaticCredentialsProvider constructed using basic AWS keypair. Falls back to using
* DefaultCredentialsProviderChain if unable to construct a AWSCredentialsProviderChain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWSCredentialsProviderChain was retired, check all terms in all comments in this file.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@LantaoJin
Copy link
Contributor

LantaoJin commented Jan 22, 2024

@junyuc25 why you close this PR? And you should remove the [WIP] in the title when your PR is ready for review, or committers cannot know when could start to review.

@junyuc25 junyuc25 changed the title [WIP][SPARK-45720] Upgrade AWS SDK to v2 for Spark Kinesis connector module [SPARK-45720] Upgrade AWS SDK to v2 for Spark Kinesis connector module Jan 23, 2024
@junyuc25 junyuc25 reopened this Jan 23, 2024
@junyuc25
Copy link
Author

@junyuc25 why you close this PR? And you should remove the [WIP] in the title when your PR is ready for review, or committers cannot know when could start to review.

Looks like I deleted the branch by accident. Updated the title and reopened the PR.

@junyuc25 junyuc25 force-pushed the upgrade-kinesis-connector branch 2 times, most recently from d0f3157 to 615c370 Compare February 26, 2024 02:41
@junyuc25 junyuc25 force-pushed the upgrade-kinesis-connector branch from 615c370 to eb93c37 Compare May 9, 2024 12:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants