Skip to content

Commit

Permalink
Extracted Var from MVar abstraction. No added functionality yet
Browse files Browse the repository at this point in the history
  • Loading branch information
Merlijn Boogerd committed Nov 24, 2017
1 parent 24190ed commit e9b7bf2
Show file tree
Hide file tree
Showing 22 changed files with 216 additions and 215 deletions.
40 changes: 40 additions & 0 deletions doc/Var.md
@@ -0,0 +1,40 @@
Variable programming
====================

# Introduction

We have gone through a transition of using immutable data instead of mutable data. This is due to negative publicity around using variables, most of which is warranted.

- Concurrent access to mutable datastructures is complicated to do in a way that is not prone to race conditions or other unintended effects
- When sharing data between two separate modules in your codebase, a variable has to be made visible on the highest level of the two modules' respective height. Often also exposing the variable to modules that
shouldn't be touching the data; in the limit introducing completely global variables.
- One might want to take control of some of the complexities by introducing a separation between readers and writers. This requires distinct interfaces for every datastructure. i.e. an IntWriter and IntReader, or DoubleWriter and DoubleReader

One convenience of variables is that for some problems they feel more intuitive than their immutable counterpart, and may be more performant on typical computer architectures as well.

One way of solving these issues is reifying variables as a `Var`, making explicit the fact that it can be `set`d and `sample`d at any point in time:

- Access to the variable is no longer done through its scope; the variable can be passed around as an object
- What modules receive the power to write or read can be easily controlled using the more general `VarWriter` and `VarReader` interfaces
- Having wrapped an arbitrary variable datastructure in our `Var`, we can implement its accessors in a concurrency-safe and performant manner, i.e. `AtomicReference`

Additional advantages that can be gained through this abstraction:

- functorial `map` to derive `Var`s from a base `Var` using an arbitrary function
- `inflate`/`zip` as a means of lifting the datastructure in a heterogeneous list, or expanding the list
- applicative `ap` as a means of deriving new `Var`s from multiple inputs, not unlike the `Signal` pattern in functional reactive programming.
- producing a `ReactivePublisher` such that all updates to the variable can be explicitly observed.

Note that each of these can be implemented in a concurrency-safe and performant way using Akka Streams (Lightbend's reactive-streams implementation)

# Monotonic Variables

We will refer to the subset of datastructures whose operations are monotonic as monotonic datastructures. Their operations inflate the datastructure in a way that enables eventual consistency. For every
such datastructure a `merge` operation exists that is `commutative`, `associative` and `idempotent`, i.e. it is well behaved under re-ordering of operations or multiple executions thereof, which frequently
occur in a network setting.

Monotonic variables add another layer of safety, warranting their use in a truly global context. Here they are not limited to being global on a single machine, but they can be shared between _all_ machines,
with the guarantee that all updates that occur will eventually be reflected on all other machines sharing that variable, assuming those machines have moments of healthy operation where they can receive and
aid in the propagation of those updates. The speed with which such replication occurs is dependent on a number of factors, such as the number of replicas, the health of said replicas, and the efficiency of
the replication mechanism under those conditions. The advantage of monotonic variables is that replication can be implemented as an independent responsibility, separate from the application programmer that
queries and/or updates the variables.
Expand Up @@ -21,7 +21,7 @@ import java.util.function.UnaryOperator


import akka.actor.ActorRefFactory import akka.actor.ActorRefFactory
import algebra.lattice.BoundedJoinSemilattice import algebra.lattice.BoundedJoinSemilattice
import io.demograph.monotonic.mvar.{ ExecutionContext, UpdatableMVar } import io.demograph.monotonic.`var`.{ ExecutionContext, UpdatableMVar }


import scala.reflect.runtime.universe._ import scala.reflect.runtime.universe._
/** /**
Expand Down
66 changes: 66 additions & 0 deletions mem/src/main/scala/io/demograph/monotonic/var/AtomicMVar.scala
@@ -0,0 +1,66 @@
/*
* Copyright 2017 Merlijn Boogerd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.demograph.monotonic.`var`

import java.util.function.UnaryOperator

import algebra.lattice.{ BoundedJoinSemilattice, JoinSemilattice }
import io.demograph.monotonic.queue.{ MergingQueue, Queue }
import org.reactivestreams.Subscriber

/**
* A variable whose content is monotonic (all operations are monotonic), and whose content can be accessed / manipulated
* in an atomic fashion.
*/
class AtomicMVar[V: JoinSemilattice](initialValue: V) extends AtomicVar[V](initialValue) with MVar[V] {

/**
* Note that `set` here means really `inflate`. The method ensures that the update is an inflation w.r.t. the
* previous state.
*/
override protected[`var`] def _set(delta: V): V = {
val previous = ref.getAndUpdate(new UnaryOperator[V] {
override def apply(currentState: V): V = JoinSemilattice.join(currentState, delta)
})
updateSubscribers(enqueue(delta) andThen dispatchReadyElements)
previous
}

/**
* @param f The function to apply (note that this function should be without side-effects.
* In the current implementation, this function _will_ be applied twice!
* @return The previous state of this `MVar`
*/
override protected[`var`] def _update(f: V V): V = {
val oldState = ref.getAndUpdate(new UnaryOperator[V] {
override def apply(currentState: V): V = JoinSemilattice.join(currentState, f(currentState))
})
updateSubscribers(enqueue(f(oldState)) andThen dispatchReadyElements)
oldState
}

override protected def queueFromCurrentState = new MergingQueue[V](Some(sample))
}

object AtomicMVar {

def apply[V: JoinSemilattice](initialValue: V): MVar[V] = new AtomicMVar[V](initialValue)

def apply[V: BoundedJoinSemilattice](): MVar[V] = apply(BoundedJoinSemilattice[V].zero)

case class SubscriberState[V](subscriber: Subscriber[V], demand: Long, queue: Queue[V])
}
Expand Up @@ -14,44 +14,37 @@
* limitations under the License. * limitations under the License.
*/ */


package io.demograph.monotonic.mvar package io.demograph.monotonic.`var`


import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference } import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }
import java.util.function.UnaryOperator import java.util.function.UnaryOperator


import algebra.lattice.{ BoundedJoinSemilattice, JoinSemilattice } import io.demograph.monotonic.`var`.AtomicMVar.SubscriberState
import io.demograph.monotonic.mvar.AtomicMVar.SubscriberState import io.demograph.monotonic.queue.{ LWWQueue, Queue }
import io.demograph.monotonic.queue.{ MergingQueue, Queue }
import org.reactivestreams.{ Publisher, Subscriber, Subscription } import org.reactivestreams.{ Publisher, Subscriber, Subscription }


/** /**
* Ideas: *
* - Update the variable atomically, propagate asynchronously to subscribers
* - Wait for all subscriber demand to be positive, then update and propagate?
*/ */
class AtomicMVar[V: JoinSemilattice](initialValue: V) extends MVar[V] { class AtomicVar[V](initialValue: V) extends Var[V] {


protected val value: AtomicReference[V] = new AtomicReference[V](initialValue) protected val ref: AtomicReference[V] = new AtomicReference[V](initialValue)


override def sample: V = value.get() override def sample: V = ref.get()


override protected[mvar] def onUpdate(delta: V): Unit = { override protected[`var`] def _set(value: V): V = {
value.getAndUpdate(new UnaryOperator[V] { val previous = ref.getAndSet(value)
override def apply(currentState: V): V = JoinSemilattice.join(currentState, delta) updateSubscribers(enqueue(value) andThen dispatchReadyElements)
}) previous
updateSubscribers(enqueue(delta) andThen dispatchReadyElements)
} }


override protected[mvar] def onUpdate(f: V V): Unit = { override protected[`var`] def _update(f: V V): V = {
value.getAndUpdate(new UnaryOperator[V] { val previous = ref.getAndUpdate(new UnaryOperator[V] {
override def apply(currentState: V): V = { override def apply(currentState: V): V = f(currentState)
val delta = f(currentState)
val newState = JoinSemilattice.join(currentState, delta)
updateSubscribers(enqueue(delta) andThen dispatchReadyElements)
newState
}
}) })
updateSubscribers(enqueue(f(previous)) andThen dispatchReadyElements)
previous
} }


val index = new AtomicLong(0L) val index = new AtomicLong(0L)
Expand All @@ -61,7 +54,7 @@ class AtomicMVar[V: JoinSemilattice](initialValue: V) extends MVar[V] {
override def subscribe(s: Subscriber[_ >: V]): Unit = { override def subscribe(s: Subscriber[_ >: V]): Unit = {


val newIndex = index.incrementAndGet() val newIndex = index.incrementAndGet()
val state = SubscriberState[V](s.asInstanceOf[Subscriber[V]], 0L, new MergingQueue[V](Some(sample))) val state = SubscriberState[V](s.asInstanceOf[Subscriber[V]], 0L, queueFromCurrentState)


atomicMap.put(newIndex, state) atomicMap.put(newIndex, state)
val subscription = new Subscription { val subscription = new Subscription {
Expand All @@ -74,6 +67,8 @@ class AtomicMVar[V: JoinSemilattice](initialValue: V) extends MVar[V] {
} }
} }


protected def queueFromCurrentState: Queue[V] = new LWWQueue[V](Some(sample))

/* FIXME: Temporarily changed visibility to protected. Should probably be reverted! */ /* FIXME: Temporarily changed visibility to protected. Should probably be reverted! */
protected def updateSubscribers(f: SubscriberState[V] SubscriberState[V]): Unit = { protected def updateSubscribers(f: SubscriberState[V] SubscriberState[V]): Unit = {
atomicMap.replaceAll((t: Long, u: SubscriberState[_ >: V]) => f(u.asInstanceOf[SubscriberState[V]])) atomicMap.replaceAll((t: Long, u: SubscriberState[_ >: V]) => f(u.asInstanceOf[SubscriberState[V]]))
Expand All @@ -99,11 +94,3 @@ class AtomicMVar[V: JoinSemilattice](initialValue: V) extends MVar[V] {
SubscriberState[V](subscriber, demand - sendSize, newQueue) SubscriberState[V](subscriber, demand - sendSize, newQueue)
} }
} }
object AtomicMVar {

def apply[V: JoinSemilattice](initialValue: V): MVar[V] = new AtomicMVar[V](initialValue)

def apply[V: BoundedJoinSemilattice](): MVar[V] = apply(BoundedJoinSemilattice[V].zero)

case class SubscriberState[V](subscriber: Subscriber[V], demand: Long, queue: Queue[V])
}
Expand Up @@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */


package io.demograph.monotonic.mvar package io.demograph.monotonic.`var`


import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
Expand All @@ -27,29 +27,17 @@ class InMemExecutionContext(implicit system: ActorSystem) extends ExecutionConte


implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val materializer: ActorMaterializer = ActorMaterializer()


override def mvar[S: BoundedJoinSemilattice]: UpdatableMVar[S] = mvar(BoundedJoinSemilattice[S].zero) override def mvar[S: BoundedJoinSemilattice]: UpdatableMVar[S] =
mvar(BoundedJoinSemilattice[S].zero)


override def mvar[S: JoinSemilattice](initialValue: S): UpdatableMVar[S] = override def mvar[S: JoinSemilattice](initialValue: S): UpdatableMVar[S] =
new WritableMVar[S](initialValue) new WritableMVar[S](initialValue)


override def map[S: JoinSemilattice, T: BoundedJoinSemilattice](mvarS: MVar[S])(f: (S) T): MVar[T] = { override def map[S: JoinSemilattice, T: BoundedJoinSemilattice](mvarS: MVar[S])(f: (S) T): MVar[T] =
// Subscribe for updates on `mvarS`
// For each update `u`, apply `f` to obtain `f(u)`
// Update the MVar[T] with `f(u)`
// Propagate `f(u)` to all subscribers
new MapMVar[S, T](mvarS, f, BoundedJoinSemilattice[T].zero) new MapMVar[S, T](mvarS, f, BoundedJoinSemilattice[T].zero)
}

override def product[S: BoundedJoinSemilattice, T: BoundedJoinSemilattice](mvarS: MVar[S])(mvarT: MVar[T]): MVar[(S, T)] =
override def product[S: BoundedJoinSemilattice, T: BoundedJoinSemilattice](mvarS: MVar[S])(mvarT: MVar[T]): MVar[(S, T)] = {
// Subscribe for updates on `mvarS`
// Subscribe for updates on `mvarT`
// For each update `s`, lift `s` to `(s, t.bottom)`
// For each update `t`, lift `t` to `(s.bottom, t)`
// Merge updates from both subscriptions into a single subscription for `(s, t)`
// Update the MVar[(S, T)] with `(s, t)`
// Propagate `(s, t)` to all subscribers
new ProductMVar[S, T](mvarS, mvarT) new ProductMVar[S, T](mvarS, mvarT)
}
} }


object InMemExecutionContext { object InMemExecutionContext {
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */


package io.demograph.monotonic.mvar package io.demograph.monotonic.`var`


import akka.stream.Materializer import akka.stream.Materializer
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
Expand All @@ -27,5 +27,5 @@ case class MapMVar[S: JoinSemilattice, T: JoinSemilattice](source: MVar[S], f: S


val _ = Source.fromPublisher(source.publisher) val _ = Source.fromPublisher(source.publisher)
.map(f) .map(f)
.runForeach(onUpdate) .runForeach(_set)
} }
Expand Up @@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */


package io.demograph.monotonic.mvar package io.demograph.monotonic.`var`


import akka.stream.Materializer import akka.stream.Materializer
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
Expand All @@ -28,7 +28,7 @@ class ProductMVar[S: BJSL, T: BJSL](mvarS: MVar[S], mvarT: MVar[T])(implicit mat
private val srcS = Source.fromPublisher(mvarS.publisher).map(liftL[S, T]) private val srcS = Source.fromPublisher(mvarS.publisher).map(liftL[S, T])
private val srcT = Source.fromPublisher(mvarT.publisher).map(liftR[S, T]) private val srcT = Source.fromPublisher(mvarT.publisher).map(liftR[S, T])


srcS.merge(srcT).runForeach(onUpdate) srcS.merge(srcT).runForeach(_set)
} }


object ProductMVar { object ProductMVar {
Expand Down
Expand Up @@ -14,19 +14,14 @@
* limitations under the License. * limitations under the License.
*/ */


package io.demograph.monotonic.mvar package io.demograph.monotonic.`var`


import algebra.lattice.JoinSemilattice import algebra.lattice.JoinSemilattice


/** /**
* A Read-Write MVar. The Updatable interface is exposed to client code. * A Read-Write MVar. The Updatable interface is exposed to client code.
*/ */
class WritableMVar[S: JoinSemilattice](initialValue: S) extends AtomicMVar[S](initialValue) with Updatable[S] { class WritableMVar[S: JoinSemilattice](initialValue: S) extends AtomicMVar[S](initialValue) with Updatable[S] {
/** def set(delta: S): Unit = _set(delta)
* def update(f: S S): Unit = _update(f)
* @param s the value to be interpreted as an update
*/
override def update(s: S): Unit = onUpdate(s)

override def update(f: S S): Unit = onUpdate(f)
} }
2 changes: 1 addition & 1 deletion mem/src/main/scala/workspace.sc
@@ -1,4 +1,4 @@
import io.demograph.monotonic.mvar.MVar import io.demograph.monotonic.`var`.MVar


import scala.reflect.runtime.universe import scala.reflect.runtime.universe
import scala.reflect.runtime.universe._ import scala.reflect.runtime.universe._
Expand Down
Expand Up @@ -19,7 +19,7 @@ package io.demograph.monotonic
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference


import algebra.instances.set._ import algebra.instances.set._
import io.demograph.monotonic.mvar._ import io.demograph.monotonic.`var`._


import scala.reflect.runtime.universe._ import scala.reflect.runtime.universe._
/** /**
Expand All @@ -37,7 +37,7 @@ class InMemMonotonicMapSpec extends ActorTestBase {
mvar.sample shouldBe 'empty mvar.sample shouldBe 'empty


withClue("The returned MVar should now be bound to 'new-key'") { withClue("The returned MVar should now be bound to 'new-key'") {
mvar.update(Set("some-element")) mvar.set(Set("some-element"))
val mvar2: UpdatableMVar[Set[String]] = newMMap.get[Set[String]]("new-key") val mvar2: UpdatableMVar[Set[String]] = newMMap.get[Set[String]]("new-key")
mvar2.sample shouldBe Set("some-element") mvar2.sample shouldBe Set("some-element")
} }
Expand Down Expand Up @@ -67,7 +67,7 @@ class InMemMonotonicMapSpec extends ActorTestBase {
val mvar1 = newMMap.get[Set[String]]("key1") val mvar1 = newMMap.get[Set[String]]("key1")
val mvar2 = newMMap.get[Set[String]]("key2") val mvar2 = newMMap.get[Set[String]]("key2")


mvar1.update(Set("shared-value")) mvar1.set(Set("shared-value"))
eventually(mvar2.sample shouldBe Set("shared-value")) eventually(mvar2.sample shouldBe Set("shared-value"))
} }


Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */


package io.demograph.monotonic.mvar package io.demograph.monotonic.`var`


import io.demograph.monotonic.ActorTestBase import io.demograph.monotonic.ActorTestBase
import algebra.instances.set._ import algebra.instances.set._
Expand Down Expand Up @@ -47,10 +47,10 @@ class ExecutionContextSpec extends ActorTestBase with IntegrationPatience {
it should "allow updates of MVars" in { it should "allow updates of MVars" in {
val intSet: UpdatableMVar[Set[Int]] = ec.mvar[Set[Int]] val intSet: UpdatableMVar[Set[Int]] = ec.mvar[Set[Int]]


intSet.update(Set(1)) intSet.set(Set(1))
eventually(intSet.sample should contain only 1) eventually(intSet.sample should contain only 1)


intSet.update(Set(2)) intSet.set(Set(2))
eventually(intSet.sample should contain theSameElementsAs Set(1, 2)) eventually(intSet.sample should contain theSameElementsAs Set(1, 2))
} }


Expand All @@ -60,8 +60,8 @@ class ExecutionContextSpec extends ActorTestBase with IntegrationPatience {


eventually(setWithout4.sample shouldBe Set(1, 2, 3)) eventually(setWithout4.sample shouldBe Set(1, 2, 3))


intSet.update(Set(4)) intSet.set(Set(4))
intSet.update(Set(5)) intSet.set(Set(5))


eventually(setWithout4.sample shouldBe Set(1, 2, 3, 5)) eventually(setWithout4.sample shouldBe Set(1, 2, 3, 5))
} }
Expand All @@ -73,10 +73,10 @@ class ExecutionContextSpec extends ActorTestBase with IntegrationPatience {
val product = intSet.product(longSet) val product = intSet.product(longSet)
eventually(product.sample shouldBe (Set(1), Set(1L))) eventually(product.sample shouldBe (Set(1), Set(1L)))


intSet.update(Set(2)) intSet.set(Set(2))
eventually(product.sample shouldBe (Set(1, 2), Set(1L))) eventually(product.sample shouldBe (Set(1, 2), Set(1L)))


longSet.update(Set(2L)) longSet.set(Set(2L))
eventually(product.sample shouldBe (Set(1, 2), Set(1L, 2L))) eventually(product.sample shouldBe (Set(1, 2), Set(1L, 2L)))
} }
} }
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.demograph.monotonic package io.demograph.monotonic


import algebra.lattice.BoundedJoinSemilattice import algebra.lattice.BoundedJoinSemilattice
import io.demograph.monotonic.mvar.UpdatableMVar import io.demograph.monotonic.`var`.UpdatableMVar


import scala.reflect.runtime.universe._ import scala.reflect.runtime.universe._
/** /**
Expand Down

0 comments on commit e9b7bf2

Please sign in to comment.