Permalink
Switch branches/tags
2.5.12_2.13.0-M3 2.5.13-release-2.13.0-M3 2.5.14_2.13.0-M3 2.5.15_2.13.0-M3 2.5.16_2.13.0-M3 2.5.17_2.13.0-M3 25047/simplify-test StreamRefsSpecAnyRemoting actorRefWithAckDocTest akka-typed-javadsl-behaviour-build-higherorder arteryTests asyncDnsRetryOverTcpWhenTruncated camel-java9 convertedReference doc-remove-tracking-2 findFreeUdpPort fixLagomLink integrate-docs-scala-java issue-23926-remote-watch javaAndScalaUnidoc jdk9-build jr/w/gce-benchmark-setup ktoso-patch-1 leavingClusterEventLogs master mavenUpperCase multiJvmShardingLogTest must raboof-patch-1 release-1.1.3 release-1.1.4 release-1.2 release-1.3 release-1.3.1 release-2.0 release-2.1 release-2.2 release-2.3 release-2.4-http release-2.4 releaseFromTravis releaseWithJdk9ForJdk8 releasing-2.4.11.2 removeAkkaSslConfig revert-21064-wip-http-javadsl-package-alignment-johanandren revert-24689-unidoc-cluster-richard sbtWhitesourceIssue20 scala-2.13.0-M4 scala-2.13.0-M5 scalafix stream-http-2.0 streamReference tcp-stream-termination-22163 testPrValidation typedPersistentStashingActor unidoc2 v2.5.6-preparations v2.5.7-preparations v2.5.8-preparations wip-19623-subsource-cannot-push-twice-johanandren wip-19964-broken-link-WebSocket-example-RK wip-20984-typed-reliable-delivery-poc-patriknw wip-21624-SendQueue-debug-patriknw wip-23207-log-artery-dropped-patriknw wip-23593-RestartSpec-patriknw wip-23770-req-resp-patriknw wip-24113-cs-cs-chbatey wip-24155-jackson-patriknw wip-24980-debug-patriknw wip-25072-optimized-recovery-patriknw wip-25072-recovery-first-patriknw wip-25482-then-reply-patriknw wip-25484-enclosing-class-patriknw wip-25485-raffle-sample-patriknw wip-25717-persistence-stash-patriknw wip-25950-allObservers-patriknw wip-ThisActorSystemQuarantinedEvent-patriknw wip-Typed-Session-RK wip-auto-confirmation-patriknw wip-conflate-example wip-connection-issue-patriknw wip-debug-LargeMessageClusterSpec-patriknw wip-debug-tests wip-experiments-√ wip-gcloud-faninout-2m wip-gossip-unreachable-patriknw wip-jdk-9-release-8-2m wip-jdk9-build-2m wip-lanes-debug-patriknw wip-managed-blocking-for-tpe-√ wip-misc-doc-fixes-RK wip-persistence-typed-impl-patriknw wip-persistenceId-patriknw wip-rm-ActorRef-Future-patriknw wip-rolling-artery-patriknw wip-rp-docs wip-scala-2.13-2m wip-streams-improvements-√ wip-tol-latest-api-mappings-plugin-2m wip-tol-latest-api-mappings-plugin wip-tol-no-tests-2m wip-typed-Java8-∂π wip-validate-pull-request-sbt-1.0-2m
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
156 lines (126 sloc) 4.61 KB
/*
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.ddata
import akka.cluster.Cluster
import akka.cluster.UniqueAddress
import java.math.BigInteger
import akka.annotation.InternalApi
object GCounter {
val empty: GCounter = new GCounter
def apply(): GCounter = empty
/**
* Java API
*/
def create(): GCounter = empty
/**
* Extract the [[GCounter#value]].
*/
def unapply(c: GCounter): Option[BigInt] = Some(c.value)
private val Zero = BigInt(0)
}
/**
* Implements a 'Growing Counter' CRDT, also called a 'G-Counter'.
*
* It is described in the paper
* <a href="http://hal.upmc.fr/file/index/docid/555588/filename/techreport.pdf">A comprehensive study of Convergent and Commutative Replicated Data Types</a>.
*
* A G-Counter is a increment-only counter (inspired by vector clocks) in
* which only increment and merge are possible. Incrementing the counter
* adds 1 to the count for the current node. Divergent histories are
* resolved by taking the maximum count for each node (like a vector
* clock merge). The value of the counter is the sum of all node counts.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
@SerialVersionUID(1L)
final class GCounter private[akka] (
private[akka] val state: Map[UniqueAddress, BigInt] = Map.empty,
override val delta: Option[GCounter] = None)
extends DeltaReplicatedData with ReplicatedDelta
with ReplicatedDataSerialization with RemovedNodePruning with FastMerge {
import GCounter.Zero
type T = GCounter
type D = GCounter
/**
* Scala API: Current total value of the counter.
*/
def value: BigInt = state.values.foldLeft(Zero) { (acc, v) acc + v }
/**
* Java API: Current total value of the counter.
*/
def getValue: BigInteger = value.bigInteger
/**
* Increment the counter with the delta `n` specified.
* The delta must be zero or positive.
*/
def +(n: Long)(implicit node: Cluster): GCounter = increment(node, n)
/**
* Increment the counter with the delta `n` specified.
* The delta `n` must be zero or positive.
*/
def increment(node: Cluster, n: Long = 1): GCounter =
increment(node.selfUniqueAddress, n)
/**
* INTERNAL API
*/
@InternalApi private[akka] def increment(key: UniqueAddress): GCounter = increment(key, 1)
/**
* INTERNAL API
*/
@InternalApi private[akka] def increment(key: UniqueAddress, n: BigInt): GCounter = {
require(n >= 0, "Can't decrement a GCounter")
if (n == 0) this
else {
val nextValue = state.get(key) match {
case Some(v) v + n
case None n
}
val newDelta = delta match {
case None new GCounter(Map(key nextValue))
case Some(d) new GCounter(d.state + (key nextValue))
}
assignAncestor(new GCounter(state + (key nextValue), Some(newDelta)))
}
}
override def merge(that: GCounter): GCounter =
if ((this eq that) || that.isAncestorOf(this)) this.clearAncestor()
else if (this.isAncestorOf(that)) that.clearAncestor()
else {
var merged = that.state
for ((key, thisValue) state) {
val thatValue = merged.getOrElse(key, Zero)
if (thisValue > thatValue)
merged = merged.updated(key, thisValue)
}
clearAncestor()
new GCounter(merged)
}
override def mergeDelta(thatDelta: GCounter): GCounter = merge(thatDelta)
override def zero: GCounter = GCounter.empty
override def resetDelta: GCounter =
if (delta.isEmpty) this
else assignAncestor(new GCounter(state))
override def modifiedByNodes: Set[UniqueAddress] = state.keySet
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
state.contains(removedNode)
override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): GCounter =
state.get(removedNode) match {
case Some(value) new GCounter(state - removedNode).increment(collapseInto, value)
case None this
}
override def pruningCleanup(removedNode: UniqueAddress): GCounter =
new GCounter(state - removedNode)
// this class cannot be a `case class` because we need different `unapply`
override def toString: String = s"GCounter($value)"
override def equals(o: Any): Boolean = o match {
case other: GCounter state == other.state
case _ false
}
override def hashCode: Int = state.hashCode
}
object GCounterKey {
def create(id: String): Key[GCounter] = GCounterKey(id)
}
@SerialVersionUID(1L)
final case class GCounterKey(_id: String) extends Key[GCounter](_id) with ReplicatedDataSerialization