Skip to content

Commit

Permalink
Cleaned up an obsolete requirement to inherit from AnyRef for lattice…
Browse files Browse the repository at this point in the history
… values
  • Loading branch information
Merlijn Boogerd committed Sep 14, 2017
1 parent 8b1ab6c commit 73caf2d
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ trait MonotonicMap[K] {
* element cannot be handled as if being of type `V`. If no Subscriber is created for the Publisher, or the
* subscriber terminates, the Publisher is expected to clean up after itself.
*/
def read[V <: AnyRef](key: K): Publisher[V]
def read[V](key: K): Publisher[V]

/**
* Attempts to write `value` to `key`. We expect a `JoinSemilattice` for `V` as we ought to be able to merge any
Expand All @@ -51,6 +51,6 @@ trait MonotonicMap[K] {
* are expected to clean up after themselves if subscribers terminate. Implementations are expected to
* close the stream cleanly if no further progress can be made and no FatalFailure occurred.
*/
def write[V <: AnyRef: JoinSemilattice](key: K, value: V): Publisher[WriteNotification]
def write[V: JoinSemilattice](key: K, value: V): Publisher[WriteNotification]

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import InMemMonotonicMapMessages._
* Constructs an in-memory mmap.
* @tparam K The key type
*/
class InMemMonotonicMap[K <: AnyRef](storeActor: ActorRef) extends MonotonicMap[K] {
class InMemMonotonicMap[K](storeActor: ActorRef) extends MonotonicMap[K] {

/**
* Attempts to read a stream of updates for `key` as type `V`
Expand All @@ -36,8 +36,8 @@ class InMemMonotonicMap[K <: AnyRef](storeActor: ActorRef) extends MonotonicMap[
* cannot be handled as if being of type `V`. If no Subscriber is created for the Publisher, or the
* subscriber terminates, the Publisher is expected to clean up after itself.
*/
override def read[V <: AnyRef](key: K): Publisher[V] =
(s: Subscriber[_ >: V]) => storeActor ! Read(key, s.asInstanceOf[Subscriber[AnyRef]])
override def read[V](key: K): Publisher[V] =
(s: Subscriber[_ >: V]) => storeActor ! Read(key, s.asInstanceOf[Subscriber[Any]])

/**
* Attempts to write `value` to `key`. We expect a `JoinSemilattice` for `V` as we ought to be able to merge any
Expand All @@ -50,7 +50,7 @@ class InMemMonotonicMap[K <: AnyRef](storeActor: ActorRef) extends MonotonicMap[
* exceptions. All other errors should be transformed to instances of `WriteNotification`. Implementations
* are expected to clean up after themselves if subscribers terminate.
*/
override def write[V <: AnyRef: JoinSemilattice](key: K, value: V): Publisher[WriteNotification] =
override def write[V: JoinSemilattice](key: K, value: V): Publisher[WriteNotification] =
(s: Subscriber[_ >: WriteNotification]) =>
storeActor ! Write(key, value, implicitly[JoinSemilattice[V]], s.asInstanceOf[Subscriber[WriteNotification]])
}
Expand All @@ -64,6 +64,6 @@ object InMemMonotonicMap {
* @tparam K
* @return
*/
def apply[K <: AnyRef](initialState: Map[K, AnyRef] = Map.empty[K, AnyRef])(implicit actorRefFactory: ActorRefFactory): InMemMonotonicMap[K] =
def apply[K](initialState: Map[K, Any] = Map.empty[K, Any])(implicit actorRefFactory: ActorRefFactory): InMemMonotonicMap[K] =
new InMemMonotonicMap(actorRefFactory.actorOf(InMemMonotonicMapActor.props(initialState)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ import org.reactivestreams.{ Subscriber, Subscription }
* element is likely to be the entire state for the key, no effort is made to cut it into bite-sized chunks.
*/
object InMemMonotonicMapActor {
def props[K <: AnyRef](initialState: Map[K, AnyRef]): Props = Props(new InMemMonotonicMapActor[K](initialState))
def props[K](initialState: Map[K, Any]): Props = Props(new InMemMonotonicMapActor[K](initialState))
}

class InMemMonotonicMapActor[K <: AnyRef](initialState: Map[K, AnyRef]) extends Actor with ActorLogging {
class InMemMonotonicMapActor[K](initialState: Map[K, Any]) extends Actor with ActorLogging {

private[this] val log = getLogger

var readers: SubscribedReaders[K] = SubscribedReaders()
var writers: SubscribedWriters[K] = SubscribedWriters()

// The 'persistent' state for this map
var state: Map[K, (Set[Long], AnyRef)] = initialState.mapValues(any (Set.empty, any))
var state: Map[K, (Set[Long], Any)] = initialState.mapValues(any (Set.empty, any))

// logical timestamps for Reader/Writer subscribers
var subscriberIndex: Long = 0
Expand Down Expand Up @@ -83,7 +83,7 @@ class InMemMonotonicMapActor[K <: AnyRef](initialState: Map[K, AnyRef]) extends
override def request(n: Long): Unit = self ! UpdateDemand(key, index, n, writer)
}

def subscribeReader(key: K, subscriber: Subscriber[AnyRef], queue: Option[(Set[Long], AnyRef)]): (Long, SubscribedReaders[K]) = {
def subscribeReader(key: K, subscriber: Subscriber[Any], queue: Option[(Set[Long], Any)]): (Long, SubscribedReaders[K]) = {
nextSubscriberIndex { index
val subscription = createSubscription(key, index, writer = false)
subscriber.onSubscribe(subscription)
Expand Down Expand Up @@ -125,7 +125,7 @@ class InMemMonotonicMapActor[K <: AnyRef](initialState: Map[K, AnyRef]) extends
writers = subscriberState
}

def handleWrite(key: K, value: AnyRef, tracker: Subscriber[WriteNotification])(implicit lattice: JoinSemilattice[AnyRef]): Unit = {
def handleWrite(key: K, value: Any, tracker: Subscriber[WriteNotification])(implicit lattice: JoinSemilattice[Any]): Unit = {
val (writerIndex, subState) = subscribeWriter(key, tracker, Vector(Persisted()))
val trackedWrite = (Set(writerIndex), value)
// need to combine new value with possibly existing value (product lattice)
Expand All @@ -136,6 +136,6 @@ class InMemMonotonicMapActor[K <: AnyRef](initialState: Map[K, AnyRef]) extends
readerIndices.foreach(index => handleUpdateReader(key, index, readers.enqueue(Option(trackedWrite))))
}

private implicit def trackedWriteSemigroup(implicit joinSemilattice: JoinSemilattice[AnyRef]): Semigroup[(Set[Long], AnyRef)] =
(x: (Set[Long], AnyRef), y: (Set[Long], AnyRef)) => (x._1 ++ y._1, joinSemilattice.join(x._2, y._2))
private implicit def trackedWriteSemigroup(implicit joinSemilattice: JoinSemilattice[Any]): Semigroup[(Set[Long], Any)] =
(x: (Set[Long], Any), y: (Set[Long], Any)) => (x._1 ++ y._1, joinSemilattice.join(x._2, y._2))
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ object InMemMonotonicMapMessages {
/**
* Signals that the given subscriber desires read-subscription for the given key
*/
case class Read(key: AnyRef, subscriber: Subscriber[AnyRef])
case class Read(key: Any, subscriber: Subscriber[Any])

/**
* @param key The key to unsubscribe from
* @param index The index of the subscription to cancel
* @param writer True if the given index is a Writer, false if it is a Reader
*/
case class Unsubscribe(key: AnyRef, index: Long, writer: Boolean)
case class Unsubscribe(key: Any, index: Long, writer: Boolean)

/**
* Signals that the subscription with the given key and index has additional demand
*/
case class UpdateDemand(key: AnyRef, index: Long, demand: Long, writer: Boolean)
case class UpdateDemand(key: Any, index: Long, demand: Long, writer: Boolean)

/**
* Adds a value to the given key, with `subscriber` requiring feedback about its propagation
Expand All @@ -49,7 +49,7 @@ object InMemMonotonicMapMessages {
* @param subscriber The subscriber for `WriteNotification`s
* @tparam V The type of the value in `key`
*/
case class Write[V <: AnyRef](key: AnyRef, value: V, joinSemilattice: JoinSemilattice[V], subscriber: Subscriber[WriteNotification])
case class Write[V](key: Any, value: V, joinSemilattice: JoinSemilattice[V], subscriber: Subscriber[WriteNotification])

/**
* Signals that the write was stored in the memory state. This happens max. 1 time and should be the first event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import org.reactivestreams.Subscriber
*
*/
object SubscribedReaders {
type Inner[K] = Map[K, Map[Long, (Subscriber[AnyRef], Long, Option[(Set[Long], AnyRef)])]]
type Inner[K] = Map[K, Map[Long, (Subscriber[Any], Long, Option[(Set[Long], Any)])]]
def apply[K](): SubscribedReaders[K] = new SubscribedReaders(Map.empty)
}

case class SubscribedReaders[K] private[mmap] (state: SubscribedReaders.Inner[K]) extends Subscriptions[K] {

type Value = AnyRef
type Queue = Option[(Set[Long], AnyRef)]
type Value = Any
type Queue = Option[(Set[Long], Any)]
type Outer = SubscribedReaders[K]

override protected def instantiate(inner: Inner): SubscribedReaders[K] = SubscribedReaders(inner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ trait ActorTestBase extends TestKitBase with TestBase {
def source[T](publisher: Publisher[T]): Source[T, NotUsed] =
Source.fromPublisher(publisher)

def source[K, V <: AnyRef](map: MonotonicMap[K], key: K): Source[V, NotUsed] =
def source[K, V](map: MonotonicMap[K], key: K): Source[V, NotUsed] =
source(map.read[V](key))

def source[K, V <: AnyRef: JoinSemilattice](map: MonotonicMap[K], key: K, element: V): Source[WriteNotification, NotUsed] =
def source[K, V: JoinSemilattice](map: MonotonicMap[K], key: K, element: V): Source[WriteNotification, NotUsed] =
source(map.write(key, element))
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class InMemMonotonicMapSpec extends ActorTestBase {
all(writes) should have size (elementCount + 1)
}

def withStringMap(initialState: Map[String, AnyRef] = Map.empty[String, AnyRef])(test: InMemMonotonicMap[String] Any): Unit = {
def withStringMap(initialState: Map[String, Any] = Map.empty[String, Any])(test: InMemMonotonicMap[String] Any): Unit = {

// Instantiate the implementing actor, and wrap a map around it
val actor = watch(system.actorOf(InMemMonotonicMapActor.props(initialState)))
Expand Down

0 comments on commit 73caf2d

Please sign in to comment.