Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis push throughput (maxBytesPerSecond) throttling ineffective #2871

Closed
mberchon opened this issue May 30, 2022 · 4 comments
Closed

Kinesis push throughput (maxBytesPerSecond) throttling ineffective #2871

mberchon opened this issue May 30, 2022 · 4 comments

Comments

@mberchon
Copy link

Versions used

  • "com.lightbend.akka" %% "akka-stream" % 2.6.19,
  • "com.lightbend.akka" %% "akka-stream-alpakka-kinesis" % 3.0.4,
  • "software.amazon.awssdk" % "sdk-core" % 2.17.172.

Expected Behavior

KinesisFlow should be throttled to 1 MBs per shard whatever the factory methods used to create the ByteBuffer payload.

Actual Behavior

Flow throughput is unlimited

Relevant logs

akka.stream.alpakka.kinesis.scaladsl.KinesisFlow line 72

private def getPayloadByteSize[T](record: (PutRecordsRequestEntry, T)): Int = record match {
    case (request, _) => request.partitionKey.length + request.data.asByteBuffer.position()
}

request.data.asByteBuffer.position() returns 0 if data created from array wrapping

software.amazon.awssdk.core.BytesWrapper line 55

public final ByteBuffer asByteBuffer() {
    return ByteBuffer.wrap(bytes).asReadOnlyBuffer();
}
@mberchon
Copy link
Author

Unit test

  def `byte buffer wrapping`(): Unit = {

    val payload = s"My test payload ${UUID.randomUUID().toString}".padTo(1024*1000,'X').getBytes(StandardCharsets.UTF_8)
    val partition = UUID.randomUUID().toString

    val req = PutRecordsRequestEntry
      .builder()
      .partitionKey(partition)
      .data(SdkBytes.fromByteArray(payload))
      .build()

    req.data.asByteBuffer.position() shouldBe payload.length
    //Fail with: org.scalatest.exceptions.TestFailedException: 0 was not equal to 1024000, took 0.122 sec
  }
  @Test
  def `byte buffer allocate`(): Unit = {

    val payload = s"My test payload ${UUID.randomUUID().toString}".padTo(1024*1000,'X').getBytes(StandardCharsets.UTF_8)
    val partition = UUID.randomUUID().toString
    val bb = ByteBuffer.allocate(1024*1024)
    bb.put(payload)

    bb.position() shouldBe payload.length
    //Success

    val req = PutRecordsRequestEntry
      .builder()
      .partitionKey(partition)
      .data(SdkBytes.fromByteBuffer(bb))
      .build()

    req.data.asByteBuffer.position() shouldBe payload.length
    //Fail with: org.scalatest.exceptions.TestFailedException: 0 was not equal to 1024000, took 0.122 sec
  }

@mberchon mberchon changed the title Kinesis push throughput throttling ineffective Kinesis push throughput (maxBytesPerSecond) throttling ineffective May 30, 2022
@mberchon
Copy link
Author

mberchon commented May 30, 2022

My work around: add throughput throttle flow on the upstream with the following weight function

def weightFunction(request:PutRecordsRequestEntry):Int = request.partitionKey().length+request.data().asByteArray().length

.throttle(nbShard*1024*1024,1 second, nbShard*1024*1024, { case (req,_) => weightFunction(req)},1 second,ThrottleMode.Shaping)
.via(KinesisFlow.withContext[Ctx](streamName, KinesisFlowSettings.byNumberOfShards(nbShard).withMaxBatchSize(1)))

@jtjeferreira
Copy link
Contributor

@ennru I think this can be closed

@ennru
Copy link
Member

ennru commented Jan 4, 2024

Thank you for your effort!
Implemented with #3035

@ennru ennru closed this as completed Jan 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants