Skip to content

Commit

Permalink
Agent is now monadic, added more tests to AgentTest
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonas Bonér committed Mar 31, 2010
1 parent 70b4d73 commit ffeaac3
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 75 deletions.
137 changes: 85 additions & 52 deletions akka-core/src/main/scala/actor/Agent.scala
Expand Up @@ -44,54 +44,66 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
* <pre>
* val agent = Agent(5)
*
* agent update (_ + 1)
* agent update (_ * 2)
* agent send (_ + 1)
* agent send (_ * 2)
*
* val result = agent()
* ... // use result
*
* agent.close
* </pre>
*
* Example of monadic usage:
* <pre>
* val agent1 = Agent(3)
* val agent2 = Agent(5)
*
* for {
* first <- agent1
* second <- agent2
* if first == second
* } process(first, second)
*
* agent1.close
* agent2.close
* </pre>
*
* NOTE: You can't call 'agent.get' or 'agent()' within an enclosing transaction since
* that will block the transaction indefinitely. But 'agent.update' or 'Agent(value)'
* that will block the transaction indefinitely. But 'agent.send' or 'Agent(value)'
* is fine.
*
* Original author:
* @author Vaclav Pech
*
* Inital AKKA port by:
* @author Viktor Klang
*
* Modifications by:
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class Agent[T] private (initialValue: T) extends Transactor {
import Agent._
private lazy val value = Ref[T]()

start
this ! ValueHolder(initialValue)
this !! Value(initialValue)

/**
* Periodically handles incoming messages.
*/
* Periodically handles incoming messages.
*/
def receive = {
case ValueHolder(x: T) => updateData(x)
case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait))
case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait))
case Value(v: T) =>
swap(v)
case Function(fun: (T => T)) =>
swap(fun(value.getOrWait))
case Procedure(proc: (T => Unit)) =>
proc(copyStrategy(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null"))))
}

/**
* Specifies how a copy of the value is made, defaults to using identity.
*/
protected def copyStrategy(t: T): T = t


/**
* Updates the internal state with the value provided as a by-name parameter.
*/
private final def updateData(newData: => T): Unit = value.swap(newData)
* Performs a CAS operation, atomically swapping the internal state with the value
* provided as a by-name parameter.
*/
private final def swap(newData: => T): Unit = value.swap(newData)

/**
* Submits a request to read the internal state.
Expand All @@ -105,46 +117,64 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
"Can't call Agent.get within an enclosing transaction.\n\tWould block indefinitely.\n\tPlease refactor your code.")
val ref = new AtomicReference[T]
val latch = new CountDownLatch(1)
get((x: T) => {ref.set(x); latch.countDown})
sendProc((x: T) => {ref.set(x); latch.countDown})
latch.await
ref.get
}

/**
* Asynchronously submits a request to read the internal state. The supplied function
* will be executed on the returned internal state value. A copy of the internal state
* will be used, depending on the underlying effective copyStrategy.
*/
final def get(message: (T => Unit)): Unit = this ! ProcedureHolder(message)

/**
* Submits a request to read the internal state. A copy of the internal state will be
* returned, depending on the underlying effective copyStrategy. Internally leverages
* the asynchronous getValue() method and then waits for its result on a CountDownLatch.
*/
* Submits a request to read the internal state. A copy of the internal state will be
* returned, depending on the underlying effective copyStrategy. Internally leverages
* the asynchronous getValue() method and then waits for its result on a CountDownLatch.
*/
final def apply(): T = get

/**
* Submits the provided function for execution against the internal agent's state.
*/
final def apply(message: (T => T)): Unit = this ! FunctionHolder(message)
* Submits the provided function for execution against the internal agent's state.
*/
final def apply(message: (T => T)): Unit = this ! Function(message)

/**
* Submits a new value to be set as the new agent's internal state.
*/
final def apply(message: T): Unit = this ! ValueHolder(message)
* Submits a new value to be set as the new agent's internal state.
*/
final def apply(message: T): Unit = this ! Value(message)

/**
* Submits the provided function for execution against the internal agent's state.
*/
final def update(message: (T => T)): Unit = this ! FunctionHolder(message)
* Submits the provided function of type 'T => T' for execution against the internal agent's state.
*/
final def send(message: (T) => T): Unit = this ! Function(message)

/**
* Submits a new value to be set as the new agent's internal state.
*/
// FIXME Change to 'send' when we have Scala 2.8 and we can remove the Actor.send method
final def update(message: T): Unit = this ! ValueHolder(message)
* Submits a new value to be set as the new agent's internal state.
*/
final def send(message: T): Unit = this ! Value(message)

/**
* Asynchronously submits a procedure of type 'T => Unit' to read the internal state.
* The supplied procedure will be executed on the returned internal state value. A copy
* of the internal state will be used, depending on the underlying effective copyStrategy.
* Does not change the value of the agent (this).
*/
final def sendProc(f: (T) => Unit): Unit = this ! Procedure(f)

/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
*/
final def map[B](f: (T) => B): Agent[B] = Agent(f(get))

/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
*/
final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)())

/**
* Applies function with type 'T => B' to the agent's internal state.
* Does not change the value of the agent (this).
*/
final def foreach(f: (T) => Unit): Unit = f(get)

/**
* Closes the agents and makes it eligable for garbage collection.
*
Expand All @@ -154,16 +184,19 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
}

/**
* Provides factory methods to create Agents.
*/
object Agent {
* Provides factory methods to create Agents.
*
* @author Viktor Klang
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Agent {

/*
* The internal messages for passing around requests.
*/
private case class ProcedureHolder[T](fun: ((T) => Unit))
private case class FunctionHolder[T](fun: ((T) => T))
private case class ValueHolder[T](value: T)
private case class Value[T](value: T)
private case class Function[T](fun: ((T) => T))
private case class Procedure[T](fun: ((T) => Unit))

/**
* Creates a new Agent of type T with the initial value of value.
Expand All @@ -177,4 +210,4 @@ object Agent {
def apply[T](value: T, newCopyStrategy: (T) => T) = new Agent(value) {
override def copyStrategy(t: T) = newCopyStrategy(t)
}
}
}
14 changes: 5 additions & 9 deletions akka-core/src/main/scala/stm/Transaction.scala
Expand Up @@ -103,33 +103,29 @@ object Transaction extends TransactionManagement with Logging {
/**
* See ScalaDoc on Transaction class.
*/
def map[T](f: => T)(implicit transactionFamilyName: String): T =
atomic {f}
def map[T](f: => T): T = atomic {f}

/**
* See ScalaDoc on Transaction class.
*/
def flatMap[T](f: => T)(implicit transactionFamilyName: String): T =
atomic {f}
def flatMap[T](f: => T): T = atomic {f}

/**
* See ScalaDoc on Transaction class.
*/
def foreach(f: => Unit)(implicit transactionFamilyName: String): Unit =
atomic {f}
def foreach(f: => Unit): Unit = atomic {f}

/**
* See ScalaDoc on Transaction class.
*/
def atomic[T](body: => T)(implicit transactionFamilyName: String): T = {
def atomic[T](body: => T): T = {
var isTopLevelTransaction = true
new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = {
val result = body

val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\twith family name [%s]\n\tby joining transaction set [%s]",
mtx, transactionFamilyName, txSet)
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
txSet.joinCommit(mtx)

// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
Expand Down
71 changes: 59 additions & 12 deletions akka-core/src/test/scala/AgentTest.scala
Expand Up @@ -16,15 +16,12 @@ class AgentTest extends junit.framework.TestCase
with Suite with MustMatchers
with ActorTestUtil with Logging {

implicit val txFamilyName = "test"

@Test def testSendFun = verify(new TestActor {
def test = {
val agent = Agent(5)
handle(agent) {
agent update (_ + 1)
agent update (_ * 2)

agent send (_ + 1)
agent send (_ * 2)
val result = agent()
result must be(12)
}
Expand All @@ -35,21 +32,34 @@ with ActorTestUtil with Logging {
def test = {
val agent = Agent(5)
handle(agent) {
agent update 6
agent send 6
val result = agent()
result must be(6)
}
}
})

@Test def testOneAgentUpdateWithinEnlosingTransactionSuccess = {
@Test def testSendProc = verify(new TestActor {
def test = {
val agent = Agent(5)
var result = 0
handle(agent) {
agent sendProc (result += _)
agent sendProc (result += _)
Thread.sleep(1000)
result must be(10)
}
}
})

@Test def testOneAgentsendWithinEnlosingTransactionSuccess = {
case object Go
val agent = Agent(5)
val tx = transactor {
case Go => agent update (_ + 1)
case Go => agent send (_ + 1)
}
tx send Go
Thread.sleep(5000)
tx ! Go
Thread.sleep(1000)
val result = agent()
result must be(6)
agent.close
Expand All @@ -63,16 +73,53 @@ with ActorTestUtil with Logging {
val agent = Agent(5)
val tx = transactor {
case Go =>
agent update (_ * 2)
agent send (_ * 2)
try { agent() }
catch {
case _ => latch.countDown
}
}
tx send Go
tx ! Go
latch.await // FIXME should await with timeout and fail if timeout
agent.close
tx.stop
assert(true)
}

@Test def testAgentForeach = verify(new TestActor {
def test = {
val agent1 = Agent(3)
var result = 0
for (first <- agent1) {
result = first + 1
}
result must be(4)
agent1.close
}
})

@Test def testAgentMap = verify(new TestActor {
def test = {
val agent1 = Agent(3)
val result = for (first <- agent1) yield first + 1
result() must be(4)
result.close
agent1.close
}
})

@Test def testAgentFlatMap = verify(new TestActor {
def test = {
val agent1 = Agent(3)
val agent2 = Agent(5)
val result = for {
first <- agent1
second <- agent2
} yield second + first
result() must be(8)
result.close
agent1.close
agent2.close
}
})
}
4 changes: 2 additions & 2 deletions akka-core/src/test/scala/ShutdownSpec.scala
Expand Up @@ -13,7 +13,7 @@ object ActorShutdownRunner {

val myActor = new MyActor
myActor.start
myActor.send("test")
myActor ! "test"
myActor.stop
}
}
Expand All @@ -34,4 +34,4 @@ object RemoteServerAndClusterShutdownRunner {
s2.shutdown
s3.shutdown
}
}
}

0 comments on commit ffeaac3

Please sign in to comment.