Skip to content

Commit

Permalink
Merge 4917687 into 1d25593
Browse files Browse the repository at this point in the history
  • Loading branch information
philwills committed May 13, 2016
2 parents 1d25593 + 4917687 commit 51aae1d
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 8 deletions.
20 changes: 14 additions & 6 deletions src/main/scala/com/gu/scanamo/DynamoResultStream.scala
Expand Up @@ -11,27 +11,31 @@ import com.gu.scanamo.ops.{ScanamoOps, ScanamoOpsA}
import collection.convert.decorateAsScala._

private[scanamo] trait DynamoResultStream[Req, Res] {
type EvaluationKey = java.util.Map[String, AttributeValue]

def limit(req: Req): Option[Int]
def items(res: Res): java.util.List[java.util.Map[String, AttributeValue]]
def lastEvaluatedKey(res: Res): java.util.Map[String, AttributeValue]
def withExclusiveStartKey(req: Req, key: java.util.Map[String, AttributeValue]): Req
def lastEvaluatedKey(res: Res): EvaluationKey
def withExclusiveStartKey(req: Req, key: EvaluationKey): Req
def exec(req: Req): ScanamoOps[Res]

def stream[T: DynamoFormat](req: Req): ScanamoOps[Stream[Xor[DynamoReadError, T]]] = {

def streamMore(lastKey: Option[java.util.Map[String, AttributeValue]]): ScanamoOps[Stream[Xor[DynamoReadError, T]]] = {
def streamMore(lastKey: Option[EvaluationKey], remainingLimit: Option[Int]): ScanamoOps[Stream[Xor[DynamoReadError, T]]] = {
for {
queryResult <- exec(lastKey.foldLeft(req)(withExclusiveStartKey(_, _)))
results = items(queryResult).asScala.map(ScanamoFree.read[T]).toStream
resultsStillToGet = remainingLimit.map(_ - results.length)
resultStream <-
Option(lastEvaluatedKey(queryResult)).foldLeft(
Option(lastEvaluatedKey(queryResult)).filterNot(_ => resultsStillToGet.exists(_ <= 0)).foldLeft(
Free.pure[ScanamoOpsA, Stream[Xor[DynamoReadError, T]]](results)
)((rs, k) => for {
items <- rs
more <- streamMore(Some(k))
more <- streamMore(Some(k), resultsStillToGet)
} yield items #::: more)
} yield resultStream
}
streamMore(None)
streamMore(None, limit(req))
}
}

Expand All @@ -43,6 +47,8 @@ private[scanamo] object DynamoResultStream {
req.withExclusiveStartKey(key)

override def exec(req: ScanRequest): ScanamoOps[ScanResult] = ScanamoOps.scan(req)

override def limit(req: ScanRequest): Option[Int] = Option(req.getLimit).map(_.intValue)
}

object QueryResultStream extends DynamoResultStream[QueryRequest, QueryResult] {
Expand All @@ -52,5 +58,7 @@ private[scanamo] object DynamoResultStream {
req.withExclusiveStartKey(key)

override def exec(req: QueryRequest): ScanamoOps[QueryResult] = ScanamoOps.query(req)

override def limit(req: QueryRequest): Option[Int] = Option(req.getLimit).map(_.intValue)
}
}
14 changes: 14 additions & 0 deletions src/main/scala/com/gu/scanamo/ScanamoFree.scala
Expand Up @@ -45,15 +45,29 @@ object ScanamoFree {
def scan[T: DynamoFormat](tableName: String): ScanamoOps[Stream[Xor[DynamoReadError, T]]] =
ScanResultStream.stream[T](new ScanRequest().withTableName(tableName))

def scanWithLimit[T: DynamoFormat](tableName: String, limit: Int): ScanamoOps[Stream[Xor[DynamoReadError, T]]] =
ScanResultStream.stream[T](new ScanRequest().withTableName(tableName).withLimit(limit))

def scanIndex[T: DynamoFormat](tableName: String, indexName: String): ScanamoOps[Stream[Xor[DynamoReadError, T]]] =
ScanResultStream.stream[T](new ScanRequest().withTableName(tableName).withIndexName(indexName))

def scanIndexWithLimit[T: DynamoFormat](tableName: String, indexName: String, limit: Int): ScanamoOps[Stream[Xor[DynamoReadError, T]]] =
ScanResultStream.stream[T](new ScanRequest().withTableName(tableName).withIndexName(indexName).withLimit(limit))

def query[T: DynamoFormat](tableName: String)(query: Query[_]): ScanamoOps[Stream[Xor[DynamoReadError, T]]] =
QueryResultStream.stream[T](queryRequest(tableName)(query))

def queryWithLimit[T: DynamoFormat](tableName: String)(query: Query[_], limit: Int): ScanamoOps[Stream[Xor[DynamoReadError, T]]] =
QueryResultStream.stream[T](queryRequest(tableName)(query).withLimit(limit))

def queryIndex[T: DynamoFormat](tableName: String, indexName: String)(query: Query[_]): ScanamoOps[Stream[Xor[DynamoReadError, T]]] =
QueryResultStream.stream[T](queryRequest(tableName)(query).withIndexName(indexName))

def queryIndexWithLimit[T: DynamoFormat](tableName: String, indexName: String)(query: Query[_], limit: Int): ScanamoOps[Stream[Xor[DynamoReadError, T]]] = {
println(queryRequest(tableName)(query).withIndexName(indexName).withLimit(limit))
QueryResultStream.stream[T](queryRequest(tableName)(query).withIndexName(indexName).withLimit(limit))
}

/**
* {{{
* prop> import collection.convert.decorateAsJava._
Expand Down
54 changes: 52 additions & 2 deletions src/main/scala/com/gu/scanamo/Table.scala
Expand Up @@ -63,7 +63,32 @@ case class Table[V: DynamoFormat](name: String) {
def delete(key: UniqueKey[_]) = ScanamoFree.delete(name)(key)

/**
* Performs the chained operation if the condition is met
* Query or scan a table, limiting the number of items evaluated by Dynamo
* {{{
* >>> case class Transport(mode: String, line: String)
* >>> val transport = Table[Transport]("transport")
*
* >>> val client = LocalDynamoDB.client()
* >>> import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType._
*
* >>> LocalDynamoDB.withTable(client)("transport")('mode -> S, 'line -> S) {
* ... import com.gu.scanamo.syntax._
* ... val operations = for {
* ... _ <- transport.putAll(List(
* ... Transport("Underground", "Circle"),
* ... Transport("Underground", "Metropolitan"),
* ... Transport("Underground", "Central")))
* ... results <- transport.limit(1).query('mode -> "Underground" and ('line beginsWith "C"))
* ... } yield results.toList
* ... Scanamo.exec(client)(operations)
* ... }
* List(Right(Transport(Underground,Central)))
* }}}
*/
def limit(n: Int) = TableLimit(this, n)

/**
* Performs the chained operation, `put` if the condition is met
*
* {{{
* >>> case class Farm(animals: List[String])
Expand Down Expand Up @@ -160,7 +185,15 @@ case class Table[V: DynamoFormat](name: String) {
def given[T: ConditionExpression](condition: T) = ScanamoFree.given(name)(condition)
}

private[scanamo] case class Index[V: DynamoFormat](tableName: String, indexName: String)
private[scanamo] case class Index[V: DynamoFormat](tableName: String, indexName: String) {
/**
* Query or scan an index, limiting the number of items evaluated by Dynamo
*/
def limit(n: Int) = IndexLimit(this, n)
}

private[scanamo] case class TableLimit[V: DynamoFormat](table: Table[V], limit: Int)
private[scanamo] case class IndexLimit[V: DynamoFormat](index: Index[V], limit: Int)

/* typeclass */trait Scannable[T[_], V] {
def scan(t: T[V])(): ScanamoOps[Stream[Xor[DynamoReadError, V]]]
Expand Down Expand Up @@ -190,6 +223,15 @@ object Scannable {
override def scan(i: Index[V])(): ScanamoOps[Stream[Xor[DynamoReadError, V]]] =
ScanamoFree.scanIndex[V](i.tableName, i.indexName)
}

implicit def limitedTableScannable[V: DynamoFormat] = new Scannable[TableLimit, V] {
override def scan(t: TableLimit[V])(): ScanamoOps[Stream[Xor[DynamoReadError, V]]] =
ScanamoFree.scanWithLimit[V](t.table.name, t.limit)
}
implicit def limitedIndexScannable[V: DynamoFormat] = new Scannable[IndexLimit, V] {
override def scan(i: IndexLimit[V])(): ScanamoOps[Stream[Xor[DynamoReadError, V]]] =
ScanamoFree.scanIndexWithLimit[V](i.index.tableName, i.index.indexName, i.limit)
}
}

/* typeclass */ trait Queryable[T[_], V] {
Expand Down Expand Up @@ -220,4 +262,12 @@ object Queryable {
override def query(i: Index[V])(query: Query[_]): ScanamoOps[Stream[Xor[DynamoReadError, V]]] =
ScanamoFree.queryIndex[V](i.tableName, i.indexName)(query)
}
implicit def limitedTableQueryable[V: DynamoFormat] = new Queryable[TableLimit, V] {
override def query(t: TableLimit[V])(query: Query[_]): ScanamoOps[Stream[Xor[DynamoReadError, V]]] =
ScanamoFree.queryWithLimit[V](t.table.name)(query, t.limit)
}
implicit def limitedIndexQueryable[V: DynamoFormat] = new Queryable[IndexLimit, V] {
override def query(i: IndexLimit[V])(query: Query[_]): ScanamoOps[Stream[Xor[DynamoReadError, V]]] =
ScanamoFree.queryIndexWithLimit[V](i.index.tableName, i.index.indexName)(query, i.limit)
}
}

0 comments on commit 51aae1d

Please sign in to comment.