Skip to content

Commit

Permalink
Protect against doing requests for snapshots that are over 400KB.
Browse files Browse the repository at this point in the history
  • Loading branch information
Thejipppp authored and spangaer committed Dec 18, 2023
1 parent 2105f52 commit 74cd88f
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ trait DynamoDBSnapshotRequests extends DynamoDBRequests {

val toUnit: Any => Unit = _ => ()

private def itemSize(partitionKey: String, serializedSnapshot: Array[Byte]) =
DynamoFixedByteSize + partitionKey.length + serializedSnapshot.size

def delete(metadata: SnapshotMetadata): Future[Unit] = {
val request = new DeleteItemRequest()
.withTableName(Table)
Expand Down Expand Up @@ -151,7 +154,8 @@ trait DynamoDBSnapshotRequests extends DynamoDBRequests {
private def toSnapshotItem(persistenceId: String, sequenceNr: Long, timestamp: Long, snapshot: Any): Future[Item] = {
val item: Item = new JHMap

item.put(Key, S(messagePartitionKey(persistenceId)))
val partitionKey = messagePartitionKey(persistenceId)
item.put(Key, S(partitionKey))
item.put(SequenceNr, N(sequenceNr))
item.put(Timestamp, N(timestamp))
val snapshotData = snapshot.asInstanceOf[AnyRef]
Expand All @@ -171,6 +175,11 @@ trait DynamoDBSnapshotRequests extends DynamoDBRequests {
}

fut.map { data =>
val size = itemSize(partitionKey, data)

if (size > MaxItemSize)
throw new DynamoDBSnapshotRejection(s"MaxItemSize exceeded: $size > $MaxItemSize")

item.put(PayloadData, B(data))
if (manifest.nonEmpty) {
item.put(SerializerManifest, S(manifest))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import com.typesafe.config.Config

import scala.concurrent.Future

class DynamoDBSnapshotRejection(message: String, cause: Throwable = null) extends RuntimeException(message, cause)

class DynamoDBSnapshotStore(config: Config) extends SnapshotStore with DynamoDBSnapshotRequests with ActorLogging {
val journalSettings = new DynamoDBSnapshotConfig(config)
val dynamo = dynamoClient(context.system, journalSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,22 @@ package object snapshot {
val SerializerId = "ser_id"
val SerializerManifest = "ser_manifest"
val PayloadData = "pay_data"

/**
* Returns (a slightly overestimated) size of the fixed fields in a snapshot DynamoDB record.
*
* Assumes the maximum of 21 bytes for a number.
*
* Sources
* https://zaccharles.medium.com/calculating-a-dynamodb-items-size-and-consumed-capacity-d1728942eb7c
* https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/CapacityUnitCalculations.html
*/
val DynamoFixedByteSize =
Key.length() + // + partitionKey.size
SequenceNr.length() + 21 +
Timestamp.length() + 21 +
Payload.length() + // + payload.size
100 + // Standard 100 bytes overhead
100 // Safety factor for enabling extra features

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.dynamodb.snapshot

import com.typesafe.config.ConfigFactory
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.testkit._
import org.apache.pekko.persistence._
import org.apache.pekko.persistence.SnapshotProtocol._
import org.apache.pekko.persistence.dynamodb.IntegSpec
import org.scalatest._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

class SnapshotTooBigSpec extends TestKit(ActorSystem("SnapshotTooBigSpec"))
with AnyWordSpecLike
with BeforeAndAfterAll
with BeforeAndAfterEach
with Matchers
with IntegSpec
with DynamoDBUtils {

override def beforeAll(): Unit = {
super.beforeAll()
this.ensureSnapshotTableExists()
}

override protected def beforeEach(): Unit = {
super.beforeEach()
senderProbe = TestProbe()
}

override def afterAll(): Unit = {
super.afterAll()
client.shutdown()
}

private var senderProbe: TestProbe = _
val persistenceId = "SnapshotTooBigSpec"
val snapshotStore = Persistence(system).snapshotStoreFor("")

"DynamoDB snapshot too big spec" must {

"1 reject a snapshot that is over 400 KB compressed." in {
// Expect 1 MB of random data to exceed 400KB compressed.
val bytes = new Array[Byte](1 << 20)
scala.util.Random.nextBytes(bytes)
val metadata = SnapshotMetadata.apply(persistenceId, 1, 0)
snapshotStore.tell(SaveSnapshot(metadata, bytes), senderProbe.ref)
val rej = senderProbe.expectMsgType[SaveSnapshotFailure]
rej.cause shouldBe a[DynamoDBSnapshotRejection]
rej.cause.getMessage().startsWith("MaxItemSize exceeded") shouldBe true
}
}
}

0 comments on commit 74cd88f

Please sign in to comment.