Skip to content

Commit

Permalink
Added some more methods to Index, as well as added return-types for p…
Browse files Browse the repository at this point in the history
…ut and remove as well as restructured some of the code
  • Loading branch information
viktorklang committed Oct 5, 2010
1 parent 30712c6 commit 441387b
Showing 1 changed file with 75 additions and 32 deletions.
107 changes: 75 additions & 32 deletions akka-actor/src/main/scala/actor/ActorRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -311,84 +311,127 @@ object ActorRegistry extends ListenerManagement {
}
}

/**
* An implementation of a ConcurrentMultiMap
* Adds/remove is serialized over the specified key
* Reads are fully concurrent <-- el-cheapo
*
* @author Viktor Klang
*/
class Index[K <: AnyRef,V <: AnyRef : Manifest] {
import scala.collection.JavaConversions._

private val Naught = Array[V]() //Nil for Arrays
private val container = new ConcurrentHashMap[K, JSet[V]]
private val emptySet = new ConcurrentSkipListSet[V]

def put(key: K, value: V) {
/**
* Associates the value of type V with the key of type K
* @returns true if the value didn't exist for the key previously, and false otherwise
*/
def put(key: K, value: V): Boolean = {
//Tailrecursive spin-locking put
@tailrec def spinPut(k: K, v: V): Boolean = {
var retry = false
var added = false
val set = container get k

//Returns whether it needs to be retried or not
def tryPut(set: JSet[V], v: V): Boolean = {
set.synchronized {
if (set.isEmpty) true //IF the set is empty then it has been removed, so signal retry
if (set ne null) {
set.synchronized {
if (set.isEmpty) {
retry = true //IF the set is empty then it has been removed, so signal retry
}
else { //Else add the value to the set and signal that retry is not needed
set add v
false
added = set add v
retry = false
}
}
}
}

@tailrec def syncPut(k: K, v: V): Boolean = {
var retry = false
val set = container get k
if (set ne null) retry = tryPut(set,v)
else {
val newSet = new ConcurrentSkipListSet[V]
newSet add v

// Parry for two simultaneous putIfAbsent(id,newSet)
val oldSet = container.putIfAbsent(k,newSet)
if (oldSet ne null)
retry = tryPut(oldSet,v)
if (oldSet ne null) {
oldSet.synchronized {
if (oldSet.isEmpty) {
retry = true //IF the set is empty then it has been removed, so signal retry
}
else { //Else try to add the value to the set and signal that retry is not needed
added = oldSet add v
retry = false
}
}
} else {
added = true
}
}

if (retry) syncPut(k,v)
else true
if (retry) spinPut(k,v)
else added
}

syncPut(key,value)
spinPut(key,value)
}

def values(key: K) = {
/**
* @returns a _new_ array of all existing values for the given key at the time of the call
*/
def values(key: K): Array[V] = {
val set: JSet[V] = container get key
if (set ne null) set toArray Naught
else Naught
}

def foreach(key: K)(fun: (V) => Unit) {
val set = container get key
if (set ne null)
set foreach fun
val result = if (set ne null) set toArray Naught else Naught
result.asInstanceOf[Array[V]]
}

/**
* @returns Some(value) for the first matching value where the supplied function returns true for the given key,
* if no matches it returns None
*/
def findValue(key: K)(f: (V) => Boolean): Option[V] = {
import scala.collection.JavaConversions._
val set = container get key
if (set ne null)
set.iterator.find(f)
else
None
}

/**
* Applies the supplied function to all keys and their values
*/
def foreach(fun: (K,V) => Unit) {
import scala.collection.JavaConversions._
container.entrySet foreach {
(e) => e.getValue.foreach(fun(e.getKey,_))
}
}

def remove(key: K, value: V) {
/**
* Disassociates the value of type V from the key of type K
* @returns true if the value was disassociated from the key and false if it wasn't previously associated with the key
*/
def remove(key: K, value: V): Boolean = {
val set = container get key

if (set ne null) {
set.synchronized {
if (set.remove(value)) { //If we can remove the value
if (set.isEmpty) //and the set becomes empty
container.remove(key,emptySet) //We try to remove the key if it's mapped to an empty set

true //Remove succeeded
}
else false //Remove failed
}
}
} else false //Remove failed
}

def clear = { foreach(remove _) }
/**
* @returns true if the underlying containers is empty, may report false negatives when the last remove is underway
*/
def isEmpty: Boolean = container.isEmpty

/**
* Removes all keys and all values
*/
def clear = foreach { case (k,v) => remove(k,v) }
}

0 comments on commit 441387b

Please sign in to comment.