Skip to content

Commit

Permalink
add partition support
Browse files Browse the repository at this point in the history
  • Loading branch information
balshor committed Feb 18, 2011
1 parent 7aa6709 commit 87129d0
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 32 deletions.
138 changes: 107 additions & 31 deletions src/main/scala/simplistic/SimpleAPI.scala
Expand Up @@ -17,6 +17,7 @@ package simplistic
import Request.{AttributeOperation, AddValue, ReplaceValue}

import scala.collection.MapProxy
import collection.generic.Sorted

/**
* A map implementation which holds a snapshot of the properties and values of an item.
Expand Down Expand Up @@ -45,45 +46,31 @@ class ItemNameSnapshot(val name: String, val self: Map[String,Set[String]])
override def toString = "ItemSnapshot(" + name + ", " + self + ")"
}

/**
* A class which serves as a proxy to a Domain within simpleDB. This class holds no data
* other than a reference to the domain name. Calls to methods which access items from
* within the domain will always result in network requests to SimpleDB.
*/
class Domain(val name: String)(implicit val api: SimpleAPI) {
import api._
import StreamMaker._

/**
* Return a current snapshot of the metadata associated with this domain.
*
* This is the analog of the 'DomainMetadata' request.
*/
def metadata: DomainMetadataResult = (new DomainMetadataRequest(name)).response.result
trait DomainLike {

/**
* Delete this domain from the SimpleDB account.
*
* This is the analog of the 'DeleteDomain' request.
*/
def delete() = (new DeleteDomainRequest(name)).response.metadata
def delete(): Any

/**
* Create a domain within SimpleDB corresponding to this object if one doesn't exist
* already.
*
* This is the analog of the 'CreateDomain'
*/
def create() = (new CreateDomainRequest(name)).response.metadata
def create(): Any

/** Return a reference to a theoretically unique item with a new UUID as it's name. */
def unique = new Item(this, java.util.UUID.randomUUID.toString)
def unique: Item

/** Return a reference to an item with a given name within this domain. */
def item(name: String) = new Item(this, name)
def item(name: String): Item

/** Return a reference to an item given an ItemNameSnapshot (as returned from select) */
def item(snapshot: ItemNameSnapshot) = new Item(this, snapshot.name)
def item(snapshot: ItemNameSnapshot): Item

/**
* Return a stream containing all of the items within the domain. One simpleDB request
Expand All @@ -94,7 +81,7 @@ class Domain(val name: String)(implicit val api: SimpleAPI) {
* This the exact analog of using the 'Query' request without specifying a query
* expression.
*/
def items: Stream[Item] = api.items("itemName() from `%s`".format(name), this)
def items: Stream[Item]

/**
* Return a stream containing all of the items within the domain with all of their
Expand All @@ -104,7 +91,7 @@ class Domain(val name: String)(implicit val api: SimpleAPI) {
* This is the analog of using the 'QueryWithAttributes' request without specifying a
* query expression.
*/
def itemsWithAttributes: Stream[ItemSnapshot] = withAttributes(Set[String]())
def itemsWithAttributes: Stream[ItemSnapshot]

/**
* Return a stream containing the items matching a given query with all of their
Expand Down Expand Up @@ -134,9 +121,8 @@ class Domain(val name: String)(implicit val api: SimpleAPI) {
* This is the analog of using the 'QueryWithAttributes' request with a query
* expression and a list of attributes.
*/
def withAttributes(expression: String, attributes: Set[String]): Stream[ItemSnapshot] =
withAttributes(Some(expression), attributes)

def withAttributes(expression: String, attributes: Set[String]): Stream[ItemSnapshot] =
withAttributes(Some(expression), attributes)

/**
* Return a stream containing the items matching an optional query with a selected set
Expand All @@ -146,18 +132,53 @@ class Domain(val name: String)(implicit val api: SimpleAPI) {
*
* This is the analog of using the 'QueryWithAttributes' request.
*/
def withAttributes(expression: Option[String], attributes: Set[String]): Stream[ItemSnapshot] = {
def withAttributes(expression: Option[String], attributes: Set[String]): Stream[ItemSnapshot]

/**
* Perform a batch of attribute modifications on multiple items within the same domain in
* one operation.
*/
def apply(batch: List[AttributeOperation]*): Any
}

/**
* A class which serves as a proxy to a Domain within simpleDB. This class holds no data
* other than a reference to the domain name. Calls to methods which access items from
* within the domain will always result in network requests to SimpleDB.
*/
class Domain(val name: String)(implicit val api: SimpleAPI) extends DomainLike {
import api._
import StreamMaker._

/**
* Return a current snapshot of the metadata associated with this domain.
*
* This is the analog of the 'DomainMetadata' request.
*/
def metadata = (new DomainMetadataRequest(name)).response.result

def delete() = (new DeleteDomainRequest(name)).response.metadata

def create() = (new CreateDomainRequest(name)).response.metadata

def unique = new Item(this, java.util.UUID.randomUUID.toString)

def item(name: String) = new Item(this, name)

def item(snapshot: ItemNameSnapshot) = new Item(this, snapshot.name)

def items: Stream[Item] = api.items("itemName() from `%s`".format(name), this)

def itemsWithAttributes: Stream[ItemSnapshot] = withAttributes(Set[String]())

def withAttributes(expression: Option[String], attributes: Set[String]): Stream[ItemSnapshot] = {
expression match {
case None => select("* from `%s`".format(name), this)
case Some(where) => select("* from `%s` where %s".format(name, where), this)
}
}

/**
* Perform a batch of attribute modifications on multiple items within the same domain in
* one operation.
*/
def apply(batch: List[AttributeOperation]*) = {
def apply(batch: List[AttributeOperation]*): ResponseMetadata = {
// combine the attributes into a single operation.
val operations = (List[AttributeOperation]() /: batch) (_ ++ _)
new BatchPutAttributesRequest(name, operations).response.metadata
Expand All @@ -166,6 +187,53 @@ class Domain(val name: String)(implicit val api: SimpleAPI) {
override def toString = name
}

class Partitions(val partitions: Seq[String], val partitionChooser: (String,Seq[Domain]) => Domain = { (key,domains) =>
val index = math.abs(key.hashCode % domains.size) match {
case x if (x > 0) => x
case _ => 0
}
domains(index)
})(implicit val api: SimpleAPI) extends DomainLike {
import api._

val domains = partitions.sorted map { name => new Domain(name) }
private def partition(name: String): Domain = partitionChooser(name, domains)

def metadata = domains.map(_.metadata)

def unique = {
val name = java.util.UUID.randomUUID.toString
new Item(partition(name), name)
}

def delete(): List[ResponseMetadata] = domains map(_.delete) toList

def create(): List[ResponseMetadata] = domains map(_.create) toList

def item(name:String) = partition(name).item(name)

def item(snapshot: ItemNameSnapshot) = partition(snapshot.name).item(snapshot)

def items: Stream[Item] = Stream(domains.map(_.items): _*).flatten

def itemsWithAttributes: Stream[ItemSnapshot] = Stream(domains.map(_.itemsWithAttributes): _*).flatten

def withAttributes(expression: Option[String], attributes: Set[String]): Stream[ItemSnapshot] = {
Stream(domains.map(_.withAttributes(expression, attributes)): _*).flatten
}

def apply(batch: List[AttributeOperation]*): List[ResponseMetadata] = {
val operations = (List[AttributeOperation]() /: batch) (_ ++ _)
operations groupBy { op =>
partition(op.name)
} filter(!_._2.isEmpty) map { case (domain, operations) =>
new BatchPutAttributesRequest(domain.name, operations).response.metadata
} toList
}

override def toString = domains.map(_.toString).mkString("[", ",", "]")
}

/**
* A trait that defines batch operations for updating attributes on more than one item at a time.
*/
Expand Down Expand Up @@ -402,6 +470,14 @@ trait SimpleAPI extends Concrete
*/
def domain(name: String) = new Domain(name)

/**
* Return a proxy object representing the named simpleDB domains as partitions backing a
* domain-like service. No request is made and the domains may or not exist on the server.
* Domains may be created or deleted on the server using the 'create' and 'delete' methods,
* respectively.
*/
def partitions(names: String*) = new Partitions(names.toList)

/**
* Return a stream of all of the domains within the simpleDB account. As usual this stream
* is fetched lazily, and additional requests to simpleDB will be made only when needed.
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/simplistic/DomainSuite.scala
Expand Up @@ -5,8 +5,8 @@ import org.scalatest.matchers.ShouldMatchers

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DomainSuite extends WordSpec with ShouldMatchers with TestUtil.CleanBefore with TestUtil.StopAndStartServer {
import TestUtil._

import TestUtil._
val testDomain = account.domain("test")

"Domain.create" should {
Expand Down
66 changes: 66 additions & 0 deletions src/test/scala/simplistic/PartitionsSuite.scala
@@ -0,0 +1,66 @@
package simplistic

import org.scalatest.WordSpec
import org.scalatest.matchers.ShouldMatchers

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class PartitionsSuite extends WordSpec with ShouldMatchers with TestUtil.CleanBefore with TestUtil.StopAndStartServer {

import TestUtil._
val testDomains = account.partitions("test1", "test2")

"Partitions.create" should {

"create the underlying domains" in {
testDomains.create()
account.domains.map(_.name).toSet should be === Set("test1", "test2")
}

"be idempotent" in {
testDomains.create()
testDomains.create()
account.domains.map(_.name).toSet should be === Set("test1", "test2")
}

"create different underlying domains" in {
account.partitions("foo", "fab").create()
account.partitions("bar", "baz").create()
account.domains.map(_.name).toSet should be === Set("foo", "fab", "bar", "baz")
}

}

"Partitions.delete" should {

"delete the underlying domains" in {
testDomains.create()
account.domains.map(_.name).toSet should be === Set("test1", "test2")
testDomains.delete()
account.domains.map(_.name).toSet should be === Set.empty
}
}

"Partitions.metadata" should {

"return metadata about the underlying domains" in {
testDomains.create()
val metas = testDomains.metadata
metas.size should be === 2
for(meta <- metas) {
meta.itemCount should be === 0
meta.attributeNameCount should be === 0
}
}
}

"Partitions.unique" should {
"return unique item ids" in {
val u1 = testDomains.unique
val u2 = testDomains.unique
u1.name should not be === (u2.name)
}
}


}

0 comments on commit 87129d0

Please sign in to comment.