Skip to content

Commit

Permalink
Merge pull request #7473 from Donnerbart/cleanupClusterSuppressions
Browse files Browse the repository at this point in the history
Cleanup of cluster module wildcard suppressions.
  • Loading branch information
Ali committed Feb 17, 2016
2 parents ba76e62 + 2bfc044 commit 3b90d54
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 130 deletions.
12 changes: 0 additions & 12 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,6 @@
<suppress checks="MethodCountCheck|FileLengthCheck|ClassFanOutComplexityCheck" files="com/hazelcast/console/ConsoleApp"/>
<suppress checks="MethodCountCheck|FileLengthCheck" files="com/hazelcast/client/console/ClientConsoleApp"/>

<!-- Cluster -->
<!-- TODO: needs to be fixed -->
<suppress checks="MethodCount|ClassDataAbstractionCoupling|ClassFanOutComplexityCheck"
files="com/hazelcast/internal/cluster/impl/ClusterServiceImpl"/>
<suppress checks="" files="com/hazelcast/internal/cluster/impl/ConfigCheck"/>
<suppress checks="" files="com/hazelcast/internal/cluster/impl/AbstractJoiner"/>
<suppress checks="" files="com/hazelcast/internal/cluster/impl/MulticastJoiner"/>
<suppress checks="" files="com/hazelcast/internal/cluster/impl/MulticastService"/>
<suppress checks="" files="com/hazelcast/internal/cluster/impl/TcpIpJoiner"/>
<suppress checks="" files="com/hazelcast/internal/cluster/impl/operations/FinalizeJoinOperation"/>
<suppress checks="" files="com/hazelcast/internal/cluster/impl/operations/JoinCheckOperation"/>

<!-- Cache -->
<suppress checks="MethodCount" files="com/hazelcast/config/CacheConfig"/>
<suppress checks="MethodCount" files="com/hazelcast/cache/ICache"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@

package com.hazelcast.internal.cluster.impl;

import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.internal.cluster.Joiner;
import com.hazelcast.internal.cluster.impl.operations.JoinCheckOperation;
import com.hazelcast.internal.cluster.impl.operations.MemberRemoveOperation;
import com.hazelcast.internal.cluster.impl.operations.MergeClustersOperation;
import com.hazelcast.config.Config;
import com.hazelcast.core.Member;
import com.hazelcast.instance.GroupProperty;
import com.hazelcast.instance.Node;
import com.hazelcast.instance.NodeState;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.cluster.Joiner;
import com.hazelcast.internal.cluster.impl.operations.JoinCheckOperation;
import com.hazelcast.internal.cluster.impl.operations.MemberRemoveOperation;
import com.hazelcast.internal.cluster.impl.operations.MergeClustersOperation;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
Expand All @@ -50,19 +50,24 @@

public abstract class AbstractJoiner implements Joiner {

private static final int JOIN_TRY_COUNT = 5;
private static final long MIN_WAIT_SECONDS_BEFORE_JOIN = 10;
private static final long SPLIT_BRAIN_CONN_TIMEOUT = 5000;
private static final long SPLIT_BRAIN_SLEEP_TIME = 10;
private static final int SPLIT_BRAIN_JOIN_CHECK_TIMEOUT_SECONDS = 10;

private final AtomicLong joinStartTime = new AtomicLong(Clock.currentTimeMillis());
private final AtomicInteger tryCount = new AtomicInteger(0);
// map blacklisted endpoints. Boolean value represents if blacklist is temporary or permanent
protected final ConcurrentMap<Address, Boolean> blacklistedAddresses = new ConcurrentHashMap<Address, Boolean>();
protected final Config config;
protected final Node node;
protected final ClusterServiceImpl clusterService;
protected final ClusterJoinManager clusterJoinManager;
protected final ILogger logger;

// map blacklisted endpoints. Boolean value represents if blacklist is temporary or permanent
final ConcurrentMap<Address, Boolean> blacklistedAddresses = new ConcurrentHashMap<Address, Boolean>();
final ClusterJoinManager clusterJoinManager;

private final AtomicLong joinStartTime = new AtomicLong(Clock.currentTimeMillis());
private final AtomicInteger tryCount = new AtomicInteger(0);

private final long mergeNextRunDelayMs;
private volatile Address targetAddress;

Expand All @@ -75,6 +80,16 @@ public AbstractJoiner(Node node) {
mergeNextRunDelayMs = node.groupProperties.getMillis(GroupProperty.MERGE_NEXT_RUN_DELAY_SECONDS);
}

@Override
public final long getStartTime() {
return joinStartTime.get();
}

@Override
public void setTargetAddress(Address targetAddress) {
this.targetAddress = targetAddress;
}

@Override
public void blacklist(Address address, boolean permanent) {
logger.info(address + " is added to the blacklist.");
Expand Down Expand Up @@ -113,7 +128,7 @@ private void postJoin() {
if (node.getState() != NodeState.ACTIVE) {
return;
}
if (tryCount.incrementAndGet() == 5) {
if (tryCount.incrementAndGet() == JOIN_TRY_COUNT) {
logger.warning("Join try count exceed limit, setting this node as master!");
node.setAsMaster();
}
Expand All @@ -138,8 +153,9 @@ private void ensureConnectionToAllMembers() {
while (checkCount++ < connectAllWaitSeconds && !allConnected) {
try {
//noinspection BusyWait
Thread.sleep(1000);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
EmptyStatement.ignore(ignored);
}

allConnected = true;
Expand All @@ -156,17 +172,20 @@ private void ensureConnectionToAllMembers() {
}
}

protected final long getMaxJoinMillis() {
final long getMaxJoinMillis() {
return node.getGroupProperties().getMillis(GroupProperty.MAX_JOIN_SECONDS);
}

protected final long getMaxJoinTimeToMasterNode() {
final long getMaxJoinTimeToMasterNode() {
// max join time to found master node,
// this should be significantly greater than MAX_WAIT_SECONDS_BEFORE_JOIN property
// hence we add 10 seconds more
return TimeUnit.SECONDS.toMillis(10) + node.getGroupProperties().getMillis(GroupProperty.MAX_WAIT_SECONDS_BEFORE_JOIN);
return TimeUnit.SECONDS.toMillis(MIN_WAIT_SECONDS_BEFORE_JOIN)
+ node.getGroupProperties().getMillis(GroupProperty.MAX_WAIT_SECONDS_BEFORE_JOIN);
}

@SuppressWarnings({"checkstyle:methodlength", "checkstyle:returncount",
"checkstyle:npathcomplexity", "checkstyle:cyclomaticcomplexity" })
boolean shouldMerge(JoinMessage joinMessage) {
if (joinMessage == null) {
return false;
Expand Down Expand Up @@ -207,7 +226,7 @@ boolean shouldMerge(JoinMessage joinMessage) {
node.nodeEngine.getOperationService()
.send(new MemberRemoveOperation(node.getThisAddress()), joinMessage.getAddress());
logger.info(node.getThisAddress() + " CANNOT merge to " + joinMessage.getAddress()
+ ", because it thinks this-node as its member.");
+ ", because it thinks this-node as its member.");
return false;
}

Expand All @@ -223,7 +242,7 @@ boolean shouldMerge(JoinMessage joinMessage) {

int targetDataMemberCount = joinMessage.getDataMemberCount();
int currentDataMemberCount = clusterService.getSize(DATA_MEMBER_SELECTOR);

if (targetDataMemberCount > currentDataMemberCount) {
// I should join the other cluster
logger.info(node.getThisAddress() + " is merging to " + joinMessage.getAddress()
Expand Down Expand Up @@ -267,7 +286,8 @@ JoinMessage sendSplitBrainJoinMessage(Address target) {
Connection conn = node.connectionManager.getOrConnect(target, true);
long timeout = SPLIT_BRAIN_CONN_TIMEOUT;
while (conn == null) {
if ((timeout -= SPLIT_BRAIN_SLEEP_TIME) < 0) {
timeout -= SPLIT_BRAIN_SLEEP_TIME;
if (timeout < 0) {
return null;
}
try {
Expand All @@ -281,11 +301,11 @@ JoinMessage sendSplitBrainJoinMessage(Address target) {
}

NodeEngine nodeEngine = node.nodeEngine;
Future f = nodeEngine.getOperationService().createInvocationBuilder(ClusterServiceImpl.SERVICE_NAME,
Future future = nodeEngine.getOperationService().createInvocationBuilder(ClusterServiceImpl.SERVICE_NAME,
new JoinCheckOperation(node.createSplitBrainJoinMessage()), target)
.setTryCount(1).invoke();
try {
return (JoinMessage) f.get(10, TimeUnit.SECONDS);
return (JoinMessage) future.get(SPLIT_BRAIN_JOIN_CHECK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
logger.finest("Timeout during join check!", e);
} catch (Exception e) {
Expand All @@ -300,7 +320,7 @@ public void reset() {
tryCount.set(0);
}

protected void startClusterMerge(final Address targetAddress) {
void startClusterMerge(final Address targetAddress) {
ClusterServiceImpl clusterService = node.clusterService;

if (!prepareClusterState(clusterService)) {
Expand Down Expand Up @@ -344,7 +364,7 @@ private boolean prepareClusterState(ClusterServiceImpl clusterService) {
}

try {
Thread.sleep(1000);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
logger.warning("Interrupted while preparing cluster for merge!");
// restore interrupt flag
Expand All @@ -365,17 +385,7 @@ private boolean preCheckClusterState(final ClusterService clusterService) {
return true;
}

@Override
public final long getStartTime() {
return joinStartTime.get();
}

@Override
public void setTargetAddress(Address targetAddress) {
this.targetAddress = targetAddress;
}

public Address getTargetAddress() {
Address getTargetAddress() {
final Address target = targetAddress;
targetAddress = null;
return target;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,8 @@

package com.hazelcast.internal.cluster.impl;

import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.cluster.MemberAttributeOperationType;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.operations.MemberInfoUpdateOperation;
import com.hazelcast.internal.cluster.impl.operations.MemberRemoveOperation;
import com.hazelcast.internal.cluster.impl.operations.ShutdownNodeOperation;
import com.hazelcast.internal.cluster.impl.operations.TriggerMemberListPublishOperation;
import com.hazelcast.core.InitialMembershipEvent;
import com.hazelcast.core.InitialMembershipListener;
import com.hazelcast.core.Member;
Expand All @@ -36,6 +30,12 @@
import com.hazelcast.instance.LifecycleServiceImpl;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.operations.MemberInfoUpdateOperation;
import com.hazelcast.internal.cluster.impl.operations.MemberRemoveOperation;
import com.hazelcast.internal.cluster.impl.operations.ShutdownNodeOperation;
import com.hazelcast.internal.cluster.impl.operations.TriggerMemberListPublishOperation;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.logging.ILogger;
Expand Down Expand Up @@ -85,6 +85,7 @@
import static java.util.Collections.unmodifiableMap;
import static java.util.Collections.unmodifiableSet;

@SuppressWarnings({"checkstyle:methodcount", "checkstyle:classdataabstractioncoupling", "checkstyle:classfanoutcomplexity" })
public class ClusterServiceImpl implements ClusterService, ConnectionListener, ManagedService,
EventPublishingService<MembershipEvent, MembershipListener>, TransactionalService {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.util.EmptyStatement;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -89,7 +90,7 @@ public ConfigCheck(Config config, String joinerType) {
* @param found
* @return true if compatible. False if part of another group.
* @throws ConfigMismatchException if the configuration is not compatible.
* An exception is thrown so we can pass a nice message.
* An exception is thrown so we can pass a nice message.
*/
public boolean isCompatible(ConfigCheck found) {
// check group-properties.
Expand All @@ -105,7 +106,7 @@ public boolean isCompatible(ConfigCheck found) {
return true;
}

public boolean isSameGroup(ConfigCheck found){
public boolean isSameGroup(ConfigCheck found) {
if (!equals(groupName, found.groupName)) {
return false;
}
Expand All @@ -122,8 +123,8 @@ private void verifyApplicationValidationToken(ConfigCheck found) {
String expectedValidationToken = properties.get(APPLICATION_VALIDATION_TOKEN.getName());
String foundValidationToken = found.properties.get(APPLICATION_VALIDATION_TOKEN.getName());
if (!equals(expectedValidationToken, foundValidationToken)) {
throw new ConfigMismatchException("Incompatible '" + APPLICATION_VALIDATION_TOKEN + "'! expected: " +
expectedValidationToken + ", found: " + foundValidationToken);
throw new ConfigMismatchException("Incompatible '" + APPLICATION_VALIDATION_TOKEN + "'! expected: "
+ expectedValidationToken + ", found: " + foundValidationToken);
}
}

Expand Down Expand Up @@ -215,6 +216,7 @@ public void readData(ObjectDataInput in) throws IOException {
try {
memberGroupType = PartitionGroupConfig.MemberGroupType.valueOf(s);
} catch (IllegalArgumentException ignored) {
EmptyStatement.ignore(ignored);
}
}
int propSize = in.readInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@

public class MulticastJoiner extends AbstractJoiner {

private static final long JOIN_RETRY_INTERVAL = 1000L;
private static final int PUBLISH_INTERVAL_MIN = 50;
private static final int PUBLISH_INTERVAL_MAX = 200;
private static final long JOIN_RETRY_INTERVAL = 1000L;
private static final int TRY_COUNT_MAX_LAST_DIGITS = 512;
private static final int TRY_COUNT_MODULO = 10;

private final AtomicInteger currentTryCount = new AtomicInteger(0);
private final AtomicInteger maxTryCount;
Expand Down Expand Up @@ -122,7 +124,7 @@ public void onMessage(Object msg) {
if (node.clusterService.getMember(joinInfo.getAddress()) != null) {
if (logger.isFinestEnabled()) {
logger.finest("Ignoring merge join response, since " + joinInfo.getAddress()
+ " is already a member.");
+ " is already a member.");
}
return;
}
Expand All @@ -140,6 +142,7 @@ public void onMessage(Object msg) {
}
}
} catch (InterruptedException ignored) {
EmptyStatement.ignore(ignored);
} catch (Exception e) {
if (logger != null) {
logger.warning(e);
Expand All @@ -154,6 +157,12 @@ public String getType() {
return "multicast";
}

void onReceivedJoinRequest(JoinRequest joinRequest) {
if (joinRequest.getUuid().compareTo(node.localMember.getUuid()) < 0) {
maxTryCount.incrementAndGet();
}
}

private Address findMasterWithMulticast() {
try {
if (logger.isFinestEnabled()) {
Expand Down Expand Up @@ -182,28 +191,22 @@ private Address findMasterWithMulticast() {

private int calculateTryCount() {
final NetworkConfig networkConfig = config.getNetworkConfig();
int timeoutSeconds = networkConfig.getJoin().getMulticastConfig().getMulticastTimeoutSeconds();
long timeoutMillis = TimeUnit.SECONDS.toMillis(networkConfig.getJoin().getMulticastConfig().getMulticastTimeoutSeconds());
int avgPublishInterval = (PUBLISH_INTERVAL_MAX + PUBLISH_INTERVAL_MIN) / 2;
int tryCount = timeoutSeconds * 1000 / avgPublishInterval;
int tryCount = (int) timeoutMillis / avgPublishInterval;
String host = node.getThisAddress().getHost();
int lastDigits;
try {
lastDigits = Integer.parseInt(host.substring(host.lastIndexOf('.') + 1));
} catch (NumberFormatException e) {
lastDigits = RandomPicker.getInt(512);
lastDigits = RandomPicker.getInt(TRY_COUNT_MAX_LAST_DIGITS);
}
int portDiff = node.getThisAddress().getPort() - networkConfig.getPort();
tryCount += (lastDigits + portDiff) % 10;
tryCount += (lastDigits + portDiff) % TRY_COUNT_MODULO;
return tryCount;
}

private int getPublishInterval() {
return RandomPicker.getInt(PUBLISH_INTERVAL_MIN, PUBLISH_INTERVAL_MAX);
}

public void onReceivedJoinRequest(JoinRequest joinRequest) {
if (joinRequest.getUuid().compareTo(node.localMember.getUuid()) < 0) {
maxTryCount.incrementAndGet();
}
}
}
Loading

0 comments on commit 3b90d54

Please sign in to comment.