Permalink
Browse files

Incorporate review comments, see #2284

  • Loading branch information...
1 parent 3f73705 commit 59f8210b85c4a5f2eaf88d1bf387ccdbb1cdc554 @patriknw patriknw committed Oct 9, 2012
@@ -103,9 +103,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
self ! HeartbeatTick
}
- override def preStart(): Unit = {
- cluster.subscribe(self, classOf[MemberEvent])
- }
+ override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent])
override def postStop(): Unit = {
heartbeatTask.cancel()
@@ -131,6 +129,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
all = state.members.collect { case m if m.address != selfAddress m.address }
joinInProgress --= all
consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor)
+ update()
}
def addMember(m: Member): Unit = if (m.address != selfAddress) {
@@ -259,11 +258,11 @@ private[cluster] object ClusterHeartbeatSenderConnection {
* Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]]
* and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address.
*
- * Netty blocks when sending to broken connections, and this actor uses
- * a configurable circuit breaker to reduce connect attempts to broken
+ * This actor exists only because Netty blocks when sending to broken connections,
+ * and this actor uses a configurable circuit breaker to reduce connect attempts to broken
* connections.
*
- * @see ClusterHeartbeatSender
+ * @see akka.cluster.ClusterHeartbeatSender
*/
private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef)
extends Actor with ActorLogging {
@@ -283,7 +282,8 @@ private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef)
case SendHeartbeat(heartbeatMsg, _, deadline)
if (!deadline.isOverdue) {
log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
- // the CircuitBreaker will measure elapsed time and open if too many long calls
+ // Netty blocks when sending to broken connections, the CircuitBreaker will
+ // measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker {
toRef ! heartbeatMsg
} catch { case e: CircuitBreakerOpenException /* skip sending heartbeat to broken connection */ }
@@ -16,17 +16,34 @@ import scala.concurrent.util.FiniteDuration
class ClusterSettings(val config: Config, val systemName: String) {
import config._
- final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold")
- final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
- final val FailureDetectorImplementationClass = getString("akka.cluster.failure-detector.implementation-class")
- final val FailureDetectorMinStdDeviation: FiniteDuration =
- Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
- final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration =
- Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
- final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
+ final val FailureDetectorThreshold: Double = {
+ val x = getDouble("akka.cluster.failure-detector.threshold")
+ require(x > 0.0, "failure-detector.threshold must be > 0")
+ x
+ }
+ final val FailureDetectorMaxSampleSize: Int = {
+ val n = getInt("akka.cluster.failure-detector.max-sample-size")
+ require(n > 0, "failure-detector.max-sample-size must be > 0"); n
+ }
+ final val FailureDetectorImplementationClass: String = getString("akka.cluster.failure-detector.implementation-class")
+ final val FailureDetectorMinStdDeviation: FiniteDuration = {
+ val d = Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
+ require(d > Duration.Zero, "failure-detector.min-std-deviation must be > 0"); d
+ }
+ final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = {
+ val d = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
+ require(d >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0"); d
+ }
+ final val HeartbeatInterval: FiniteDuration = {
+ val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
+ require(d > Duration.Zero, "failure-detector.heartbeat-interval must be > 0"); d
+ }
final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration
final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt
- final val MonitoredByNrOfMembers = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members")
+ final val MonitoredByNrOfMembers: Int = {
+ val n = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members")
+ require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n
+ }
final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map {
case AddressFromURIString(addr) addr

0 comments on commit 59f8210

Please sign in to comment.