Skip to content

Commit

Permalink
Merge pull request #70 from randallalexander/master
Browse files Browse the repository at this point in the history
batch deleteAll
  • Loading branch information
Phil Wills committed Nov 24, 2016
2 parents c37bd36 + 6285206 commit f624bc3
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 5 deletions.
29 changes: 28 additions & 1 deletion src/main/scala/com/gu/scanamo/Scanamo.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.gu.scanamo

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB
import com.amazonaws.services.dynamodbv2.model.{BatchWriteItemResult, DeleteItemResult, PutItemResult, UpdateItemResult}
import com.amazonaws.services.dynamodbv2.model.{BatchWriteItemResult, DeleteItemResult, PutItemResult}
import com.gu.scanamo.error.DynamoReadError
import com.gu.scanamo.ops.{ScanamoInterpreters, ScanamoOps}
import com.gu.scanamo.query._
Expand Down Expand Up @@ -164,6 +164,33 @@ object Scanamo {
def delete(client: AmazonDynamoDB)(tableName: String)(key: UniqueKey[_]): DeleteItemResult =
exec(client)(ScanamoFree.delete(tableName)(key))

/**
* Deletes multiple items from a table by a unique key
*
* {{{
* >>> case class Farm(animals: List[String])
* >>> case class Farmer(name: String, age: Long, farm: Farm)
*
* >>> import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType._
* >>> import com.gu.scanamo.syntax._
* >>> val client = LocalDynamoDB.client()
*
* >>> val dataSet = Set(
* ... Farmer("Patty", 200L, Farm(List("unicorn"))),
* ... Farmer("Ted", 40L, Farm(List("T-Rex"))),
* ... Farmer("Jack", 2L, Farm(List("velociraptor"))))
*
* >>> LocalDynamoDB.withTable(client)("farmers")('name -> S) {
* ... Scanamo.putAll(client)("farmers")(dataSet)
* ... Scanamo.deleteAll(client)("farmers")('name -> dataSet.map(_.name))
* ... Scanamo.scan[Farmer](client)("farmers").toList
* ... }
* List()
* }}}
*/
def deleteAll(client: AmazonDynamoDB)(tableName: String)(items: UniqueKeys[_]): List[BatchWriteItemResult] =
exec(client)(ScanamoFree.deleteAll(tableName)(items))

/**
* Updates an attribute that is not part of the key
*
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/com/gu/scanamo/ScanamoAsync.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.gu.scanamo

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync
import com.amazonaws.services.dynamodbv2.model.{BatchWriteItemResult, DeleteItemResult, PutItemResult, UpdateItemResult}
import com.amazonaws.services.dynamodbv2.model.{BatchWriteItemResult, DeleteItemResult, PutItemResult}
import com.gu.scanamo.error.DynamoReadError
import com.gu.scanamo.ops.{ScanamoInterpreters, ScanamoOps}
import com.gu.scanamo.query.{Query, UniqueKey, UniqueKeys}
import com.gu.scanamo.update.UpdateExpression

import scala.concurrent.{ExecutionContext, Future}

/**
Expand Down Expand Up @@ -42,6 +41,10 @@ object ScanamoAsync {
(implicit ec: ExecutionContext): Future[DeleteItemResult] =
exec(client)(ScanamoFree.delete(tableName)(key))

def deleteAll(client: AmazonDynamoDBAsync)(tableName: String)(items: UniqueKeys[_])
(implicit ec: ExecutionContext): Future[List[BatchWriteItemResult]] =
exec(client)(ScanamoFree.deleteAll(tableName)(items))

def update[V: DynamoFormat, U: UpdateExpression](client: AmazonDynamoDBAsync)(tableName: String)(
key: UniqueKey[_], expression: U)(implicit ec: ExecutionContext
): Future[Either[DynamoReadError,V]] =
Expand Down
17 changes: 16 additions & 1 deletion src/main/scala/com/gu/scanamo/ScanamoFree.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,33 @@ object ScanamoFree {
import cats.syntax.traverse._
import collection.JavaConverters._

private val batchSize = 25

def put[T](tableName: String)(item: T)(implicit f: DynamoFormat[T]): ScanamoOps[PutItemResult] =
ScanamoOps.put(ScanamoPutRequest(tableName, f.write(item), None))

def putAll[T](tableName: String)(items: Set[T])(implicit f: DynamoFormat[T]): ScanamoOps[List[BatchWriteItemResult]] =
items.grouped(25).toList.traverseU(batch =>
items.grouped(batchSize).toList.traverseU(batch =>
ScanamoOps.batchWrite(
new BatchWriteItemRequest().withRequestItems(Map(tableName -> batch.toList.map(i =>
new WriteRequest().withPutRequest(new PutRequest().withItem(f.write(i).getM))
).asJava).asJava)
)
)

def deleteAll(tableName: String)(items: UniqueKeys[_]): ScanamoOps[List[BatchWriteItemResult]] = {
items.asAVMap.grouped(batchSize).toList.traverseU { batch =>
ScanamoOps.batchWrite(
new BatchWriteItemRequest().withRequestItems(
Map(tableName -> batch.toList
.map(item =>
new WriteRequest().withDeleteRequest(
new DeleteRequest().withKey(item.asJava)))
.asJava).asJava)
)
}
}

def get[T](tableName: String)(key: UniqueKey[_])
(implicit ft: DynamoFormat[T]): ScanamoOps[Option[Either[DynamoReadError, T]]] =
for {
Expand Down
29 changes: 29 additions & 0 deletions src/main/scala/com/gu/scanamo/Table.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,35 @@ case class Table[V: DynamoFormat](name: String) {
def getAll(keys: UniqueKeys[_]) = ScanamoFree.getAll[V](name)(keys)
def delete(key: UniqueKey[_]) = ScanamoFree.delete(name)(key)

/**
* Deletes multiple items by a unique key
*
* {{{
* >>> case class Farm(animals: List[String])
* >>> case class Farmer(name: String, age: Long, farm: Farm)
* >>> val farm = Table[Farmer]("farmers")
*
* >>> import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType._
* >>> import com.gu.scanamo.syntax._
* >>> val client = LocalDynamoDB.client()
*
* >>> val dataSet = Set(
* ... Farmer("Patty", 200L, Farm(List("unicorn"))),
* ... Farmer("Ted", 40L, Farm(List("T-Rex"))),
* ... Farmer("Jack", 2L, Farm(List("velociraptor"))))
* >>> LocalDynamoDB.withTable(client)("farmers")('name -> S) {
* ... val operations = for {
* ... _ <- farm.putAll(dataSet)
* ... _ <- farm.deleteAll('name -> dataSet.map(_.name))
* ... scanned <- farm.scan
* ... } yield scanned.toList
* ... Scanamo.exec(client)(operations)
* ... }
* List()
* }}}
*/
def deleteAll(items: UniqueKeys[_]) = ScanamoFree.deleteAll(name)(items)

/**
* A secondary index on the table which can be scanned, or queried against
*
Expand Down
26 changes: 25 additions & 1 deletion src/test/scala/com/gu/scanamo/ScanamoAsyncTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{FunSpec, Matchers}
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType._
import com.gu.scanamo.query.{KeyEquals, KeyList, UniqueKey, UniqueKeys}
import com.gu.scanamo.query._

class ScanamoAsyncTest extends FunSpec with Matchers with ScalaFutures {
implicit val defaultPatience =
Expand Down Expand Up @@ -73,6 +73,30 @@ class ScanamoAsyncTest extends FunSpec with Matchers with ScalaFutures {
}
}

it("should deleteAll asynchronously") {
LocalDynamoDB.usingTable(client)("asyncFarmers")('name -> S) {

case class Farm(asyncAnimals: List[String])
case class Farmer(name: String, age: Long, farm: Farm)

import com.gu.scanamo.syntax._

val dataSet = Set(
Farmer("Patty", 200L, Farm(List("unicorn"))),
Farmer("Ted", 40L, Farm(List("T-Rex"))),
Farmer("Jack", 2L, Farm(List("velociraptor")))
)

Scanamo.putAll(client)("asyncFarmers")(dataSet)

val maybeFarmer = for {
_ <- ScanamoAsync.deleteAll(client)("asyncFarmers")('name -> dataSet.map(_.name))
} yield Scanamo.scan[Farmer](client)("asyncFarmers")

maybeFarmer.futureValue should equal(List.empty)
}
}

it("should update asynchronously") {
LocalDynamoDB.usingTable(client)("forecast")('location -> S) {

Expand Down

0 comments on commit f624bc3

Please sign in to comment.