Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import pekko.cluster.Cluster
import pekko.cluster.ddata.GSet
import pekko.cluster.ddata.GSetKey
import pekko.cluster.ddata.Replicator
import pekko.cluster.ddata.Replicator.ReadMajority
import pekko.cluster.ddata.Replicator.WriteMajority
import pekko.cluster.ddata.Replicator.ReadAll
import pekko.cluster.ddata.Replicator.ReadMajorityPlus
import pekko.cluster.ddata.Replicator.WriteAll
import pekko.cluster.ddata.Replicator.WriteMajorityPlus
import pekko.cluster.ddata.SelfUniqueAddress
import pekko.cluster.sharding.ClusterShardingSettings
import pekko.cluster.sharding.ShardRegion.ShardId
Expand Down Expand Up @@ -52,16 +54,23 @@ private[pekko] final class DDataRememberEntitiesCoordinatorStore(
implicit val node: Cluster = Cluster(context.system)
implicit val selfUniqueAddress: SelfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress)

private val readMajority = ReadMajority(settings.tuningParameters.waitingForStateTimeout, majorityMinCap)
private val writeMajority = WriteMajority(settings.tuningParameters.updatingStateTimeout, majorityMinCap)
private val readConsistency = settings.tuningParameters.coordinatorStateReadMajorityPlus match {
case Int.MaxValue => ReadAll(settings.tuningParameters.waitingForStateTimeout)
case additional => ReadMajorityPlus(settings.tuningParameters.waitingForStateTimeout, additional, majorityMinCap)
}
private val writeConsistency = settings.tuningParameters.coordinatorStateWriteMajorityPlus match {
case Int.MaxValue => WriteAll(settings.tuningParameters.updatingStateTimeout)
case additional => WriteMajorityPlus(settings.tuningParameters.updatingStateTimeout, additional, majorityMinCap)
}

private val AllShardsKey = GSetKey[String](s"shard-$typeName-all")
private var retryGetCounter = 0
private var allShards: Option[Set[ShardId]] = None
private var coordinatorWaitingForShards: Option[ActorRef] = None

// eager load of remembered shard ids
def getAllShards(): Unit = {
replicator ! Replicator.Get(AllShardsKey, readMajority)
replicator ! Replicator.Get(AllShardsKey, readConsistency)
}
getAllShards()

Expand All @@ -84,38 +93,43 @@ private[pekko] final class DDataRememberEntitiesCoordinatorStore(
onGotAllShards(Set.empty)

case Replicator.GetFailure(AllShardsKey, _) =>
log.error(
"The ShardCoordinator was unable to get all shards state within 'waiting-for-state-timeout': {} millis (retrying)",
readMajority.timeout.toMillis)
retryGetCounter += 1
val template =
"Remember entities coordinator store unable to get initial shards within 'waiting-for-state-timeout': {} millis (retrying)"
if (retryGetCounter < 5)
log.warning(template, readConsistency.timeout.toMillis)
else
log.error(template, readConsistency.timeout.toMillis)
// repeat until GetSuccess
getAllShards()

case RememberEntitiesCoordinatorStore.AddShard(shardId) =>
replicator ! Replicator.Update(AllShardsKey, GSet.empty[String], writeMajority, Some((sender(), shardId)))(
replicator ! Replicator.Update(AllShardsKey, GSet.empty[String], writeConsistency, Some((sender(), shardId)))(
_ + shardId)

case Replicator.UpdateSuccess(AllShardsKey, Some((replyTo: ActorRef, shardId: ShardId))) =>
log.debug("The coordinator shards state was successfully updated with {}", shardId)
log.debug("Remember entities coordinator store shards successfully updated with {}", shardId)
replyTo ! RememberEntitiesCoordinatorStore.UpdateDone(shardId)

case Replicator.UpdateTimeout(AllShardsKey, Some((replyTo: ActorRef, shardId: ShardId))) =>
log.error(
"The ShardCoordinator was unable to update shards distributed state within 'updating-state-timeout': {} millis (retrying), adding shard={}",
writeMajority.timeout.toMillis,
"Remember entities coordinator store unable to update shards state within 'updating-state-timeout': {} millis (retrying), adding shard={}",
writeConsistency.timeout.toMillis,
shardId)
replyTo ! RememberEntitiesCoordinatorStore.UpdateFailed(shardId)

case Replicator.ModifyFailure(key, error, cause, Some((replyTo: ActorRef, shardId: ShardId))) =>
log.error(
cause,
"The remember entities store was unable to add shard [{}] (key [{}], failed with error: {})",
"Remember entities coordinator store was unable to add shard [{}] (key [{}], failed with error: {})",
shardId,
key,
error)
replyTo ! RememberEntitiesCoordinatorStore.UpdateFailed(shardId)
}

def onGotAllShards(shardIds: Set[ShardId]): Unit = {
retryGetCounter = 0
coordinatorWaitingForShards match {
case Some(coordinator) =>
coordinator ! RememberEntitiesCoordinatorStore.RememberedShards(shardIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ import pekko.cluster.ddata.Replicator.GetFailure
import pekko.cluster.ddata.Replicator.GetSuccess
import pekko.cluster.ddata.Replicator.ModifyFailure
import pekko.cluster.ddata.Replicator.NotFound
import pekko.cluster.ddata.Replicator.ReadMajority
import pekko.cluster.ddata.Replicator.ReadAll
import pekko.cluster.ddata.Replicator.ReadMajorityPlus
import pekko.cluster.ddata.Replicator.StoreFailure
import pekko.cluster.ddata.Replicator.Update
import pekko.cluster.ddata.Replicator.UpdateDataDeleted
import pekko.cluster.ddata.Replicator.UpdateSuccess
import pekko.cluster.ddata.Replicator.UpdateTimeout
import pekko.cluster.ddata.Replicator.WriteMajority
import pekko.cluster.ddata.Replicator.WriteAll
import pekko.cluster.ddata.Replicator.WriteMajorityPlus
import pekko.cluster.ddata.SelfUniqueAddress
import pekko.cluster.sharding.ClusterShardingSettings
import pekko.cluster.sharding.ShardRegion.EntityId
Expand Down Expand Up @@ -97,10 +99,18 @@ private[pekko] final class DDataRememberEntitiesShardStore(
implicit val node: Cluster = Cluster(context.system)
implicit val selfUniqueAddress: SelfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress)

private val readMajority = ReadMajority(settings.tuningParameters.waitingForStateTimeout, majorityMinCap)
private val readConsistency = settings.tuningParameters.coordinatorStateReadMajorityPlus match {
case Int.MaxValue => ReadAll(settings.tuningParameters.waitingForStateTimeout)
case additional => ReadMajorityPlus(settings.tuningParameters.waitingForStateTimeout, additional, majorityMinCap)
}
// Note that the timeout is actually updatingStateTimeout / 4 so that we fit 3 retries and a response in the timeout before the shard sees it as a failure
private val writeMajority = WriteMajority(settings.tuningParameters.updatingStateTimeout / 4, majorityMinCap)
private val writeConsistency = settings.tuningParameters.coordinatorStateWriteMajorityPlus match {
case Int.MaxValue => WriteAll(settings.tuningParameters.updatingStateTimeout / 4)
case additional => WriteMajorityPlus(settings.tuningParameters.updatingStateTimeout / 4, additional, majorityMinCap)
}
private val maxUpdateAttempts = 3
// Note: total for all 5 keys
private var maxReadAttemptsLeft = 15
private val keys = stateKeys(typeName, shardId)

if (log.isDebugEnabled) {
Expand All @@ -124,7 +134,8 @@ private[pekko] final class DDataRememberEntitiesShardStore(
def idle: Receive = {
case RememberEntitiesShardStore.GetEntities =>
// not supported, but we may get several if the shard timed out and retried
log.debug("Another get entities request after responding to one, not expected/supported, ignoring")
log.debug(
"Remember entities shard store got another get entities request after responding to one, not expected/supported, ignoring")
case update: RememberEntitiesShardStore.Update => onUpdate(update)
}

Expand Down Expand Up @@ -156,28 +167,39 @@ private[pekko] final class DDataRememberEntitiesShardStore(
receiveOne(i, ids)
case NotFound(_, Some(i: Int)) =>
receiveOne(i, Set.empty)
case GetFailure(key, _) =>
log.error(
"Unable to get an initial state within 'waiting-for-state-timeout': [{}] using [{}] (key [{}])",
readMajority.timeout.pretty,
readMajority,
key)
context.stop(self)
case GetFailure(key, Some(i)) =>
maxReadAttemptsLeft -= 1
if (maxReadAttemptsLeft > 0) {
log.warning(
"Remember entities shard store unable to get an initial state within 'waiting-for-state-timeout' for key [{}], retrying",
key)
replicator ! Get(key, readConsistency, Some(i))
} else {
log.error(
"Remember entities shard store unable to get an initial state within 'waiting-for-state-timeout' giving up after retrying: [{}] using [{}] (key [{}])",
readConsistency.timeout.pretty,
readConsistency,
key)
context.stop(self)
}
case GetDataDeleted(_, _) =>
log.error("Unable to get an initial state because it was deleted")
log.error("Remember entities shard store unable to get an initial state because it was deleted")
context.stop(self)
case update: RememberEntitiesShardStore.Update =>
log.warning("Got an update before load of initial entities completed, dropping update: [{}]", update)
log.warning(
"Remember entities shard store got an update before load of initial entities completed, dropping update: [{}]",
update)
case RememberEntitiesShardStore.GetEntities =>
if (gotKeys.size == numberOfKeys) {
// we already got all and was waiting for a request
log.debug("Got request from shard, sending remembered entities")
log.debug("Remember entities shard store got request from shard, sending remembered entities")
sender() ! RememberEntitiesShardStore.RememberedEntities(ids)
context.become(idle)
unstashAll()
} else {
// we haven't seen all ids yet
log.debug("Got request from shard, waiting for all remembered entities to arrive")
log.debug(
"Remember entities shard store got request from shard, waiting for all remembered entities to arrive")
context.become(waitingForAllEntityIds(gotKeys, ids, Some(sender())))
}
case _ =>
Expand All @@ -194,7 +216,7 @@ private[pekko] final class DDataRememberEntitiesShardStore(
allEvts.groupBy(evt => key(evt.id)).map {
case (key, evts) =>
(evts,
(Update(key, ORSet.empty[EntityId], writeMajority, Some(evts)) { existing =>
(Update(key, ORSet.empty[EntityId], writeConsistency, Some(evts)) { existing =>
evts.foldLeft(existing) {
case (acc, Started(id)) => acc :+ id
case (acc, Stopped(id)) => acc.remove(id)
Expand All @@ -218,7 +240,7 @@ private[pekko] final class DDataRememberEntitiesShardStore(
// updatesLeft used both to keep track of what work remains and for retrying on timeout up to a limit
def next(updatesLeft: Map[Set[Evt], (Update[ORSet[EntityId]], Int)]): Receive = {
case UpdateSuccess(_, Some(evts: Set[Evt] @unchecked)) =>
log.debug("The DDataShard state was successfully updated for [{}]", evts)
log.debug("Remember entities shard store state was successfully updated for [{}]", evts)
val remainingAfterThis = updatesLeft - evts
if (remainingAfterThis.isEmpty) {
requestor ! RememberEntitiesShardStore.UpdateDone(update.started, update.stopped)
Expand All @@ -230,31 +252,35 @@ private[pekko] final class DDataRememberEntitiesShardStore(
case UpdateTimeout(_, Some(evts: Set[Evt] @unchecked)) =>
val (updateForEvts, retriesLeft) = updatesLeft(evts)
if (retriesLeft > 0) {
log.debug("Retrying update because of write timeout, tries left [{}]", retriesLeft)
log.debug(
"Remember entities shard store retrying update because of write timeout, tries left [{}]",
retriesLeft)
replicator ! updateForEvts
context.become(next(updatesLeft.updated(evts, (updateForEvts, retriesLeft - 1))))
} else {
log.error(
"Unable to update state, within 'updating-state-timeout'= [{}], gave up after [{}] retries",
writeMajority.timeout.pretty,
"Remember entities shard store unable to update state, within 'updating-state-timeout'= [{}], gave up after [{}] retries",
writeConsistency.timeout.pretty,
maxUpdateAttempts)
// will trigger shard restart
context.stop(self)
}
case StoreFailure(_, _) =>
log.error("Unable to update state, due to store failure")
log.error("Remember entities shard store unable to update state, due to store failure")
// will trigger shard restart
context.stop(self)
case ModifyFailure(_, error, cause, _) =>
log.error(cause, "Unable to update state, due to modify failure: {}", error)
log.error(cause, "Remember entities shard store unable to update state, due to modify failure: {}", error)
// will trigger shard restart
context.stop(self)
case UpdateDataDeleted(_, _) =>
log.error("Unable to update state, due to delete")
log.error("Remember entities shard store unable to update state, due to delete")
// will trigger shard restart
context.stop(self)
case update: RememberEntitiesShardStore.Update =>
log.warning("Got a new update before write of previous completed, dropping update: [{}]", update)
log.warning(
"Remember entities shard store got a new update before write of previous completed, dropping update: [{}]",
update)
}

next(allUpdates)
Expand All @@ -263,7 +289,7 @@ private[pekko] final class DDataRememberEntitiesShardStore(
private def loadAllEntities(): Unit = {
(0 until numberOfKeys).toSet[Int].foreach { i =>
val key = keys(i)
replicator ! Get(key, readMajority, Some(i))
replicator ! Get(key, readConsistency, Some(i))
}
}

Expand Down
Loading