Skip to content

Commit

Permalink
Fix for hazelcast#8777
Browse files Browse the repository at this point in the history
  • Loading branch information
ihsandemir committed Nov 17, 2016
1 parent 951a334 commit 7fc7d52
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,23 @@ public interface ClientEndpointManager {
* todo: what happens when the endpoint was never registered
*
* @param endpoint the endpoint to remove.
* @param reason The reason why the endpoint is being removed
* @throws java.lang.NullPointerException if endpoint is null.
* @see #removeEndpoint(ClientEndpoint, boolean)
* @see #removeEndpoint(ClientEndpoint, boolean, String)
*/
void removeEndpoint(ClientEndpoint endpoint);
void removeEndpoint(ClientEndpoint endpoint, String reason);

/**
* Removes an endpoint and optionally closes it immediately.
*
* todo: what happens when the endpoint already is removed
* todo: what happens when the endpoint was never registered
*
* @param endpoint the endpoint to remove.
* @param ce the endpoint to remove.
* @param closeImmediately if the endpoint is immediately closed.
* @param reason The reason why the endpoint is being removed.
* @throws java.lang.NullPointerException if endpoint is null.
* @see #removeEndpoint(ClientEndpoint)
* @see #removeEndpoint(ClientEndpoint, String)
*/
void removeEndpoint(ClientEndpoint endpoint, boolean closeImmediately);
void removeEndpoint(ClientEndpoint ce, boolean closeImmediately, String reason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,28 +100,36 @@ public void registerEndpoint(ClientEndpoint endpoint) {
checkNotNull(endpoint, "endpoint can't be null");

final Connection conn = endpoint.getConnection();
ClientEndpoint existingEndpoint = endpoints.put(conn, endpoint);
clientEndpoints.put(endpoint.getUuid(), endpoint);
if (existingEndpoint != null && endpoint != existingEndpoint) {
if (existingEndpoint.isFirstConnection()) {
logger.severe("An endpoint (first connection) already exists for connection:" + conn);
} else {
logger.info("Changed " + conn + " as the first connection for " + endpoint);
final String clientUuid = endpoint.getUuid();
ClientEndpoint existingEndpoint = clientEndpoints.put(clientUuid, endpoint);
endpoints.put(conn, endpoint);
if (existingEndpoint != null) {
Connection existingEndpointConnection = existingEndpoint.getConnection();
endpoints.remove(existingEndpointConnection);
if (!conn.equals(existingEndpointConnection)) {
// close the old connection
existingEndpointConnection.close("A new authentication request from the same client with uuid " + clientUuid
+ " is received.", null);
}
} else {
if (endpoint != existingEndpoint) {

if (!endpoint.equals(existingEndpoint)) {
if (existingEndpoint.isFirstConnection()) {
logger.fine("An endpoint (first connection) already exists for connection:" + conn);
} else {
logger.fine("Changed " + conn + " as the first connection for " + endpoint);
}
totalRegistrations.inc();
}
}
}

@Override
public void removeEndpoint(ClientEndpoint endpoint) {
removeEndpoint(endpoint, false);
public void removeEndpoint(ClientEndpoint endpoint, String reason) {
removeEndpoint(endpoint, false, reason);
}

@Override
public void removeEndpoint(final ClientEndpoint ce, boolean closeImmediately) {
public void removeEndpoint(final ClientEndpoint ce, boolean closeImmediately, final String reason) {
checkNotNull(ce, "endpoint can't be null");

ClientEndpointImpl endpoint = (ClientEndpointImpl) ce;
Expand All @@ -138,7 +146,7 @@ public void removeEndpoint(final ClientEndpoint ce, boolean closeImmediately) {
final Connection connection = endpoint.getConnection();
if (closeImmediately) {
try {
connection.close(null, null);
connection.close(reason, null);
} catch (Throwable e) {
logger.warning("While closing client connection: " + connection, e);
}
Expand All @@ -147,7 +155,7 @@ public void removeEndpoint(final ClientEndpoint ce, boolean closeImmediately) {
public void run() {
if (connection.isAlive()) {
try {
connection.close(null, null);
connection.close(reason, null);
} catch (Throwable e) {
logger.warning("While closing client connection: " + e.toString());
}
Expand All @@ -169,14 +177,15 @@ public void removeEndpoints(String memberUuid) {
String ownerUuid = endpoint.getPrincipal().getOwnerUuid();
if (memberUuid.equals(ownerUuid)) {
iterator.remove();
removeEndpoint(endpoint, true);
removeEndpoint(endpoint, true, "Cleanup of disconnected client resource");
}
}
}

@Override
public void clear() {
endpoints.clear();
clientEndpoints.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void monitor(String memberUuid, ClientEndpointImpl clientEndpoint) {
+ ". Now: " + timeToString(currentTimeMillis)
+ ". LastTimePacketReceived: " + timeToString(lastTimePacketReceived);
logger.log(Level.WARNING, message);
connection.close(message, null);
clientEndpointManager.removeEndpoint(clientEndpoint, message);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public void run() throws Exception {
&& engine.removeOwnershipMapping(clientUuid, memberUuid)) {
Set<ClientEndpoint> endpoints = endpointManager.getEndpoints(clientUuid);
for (ClientEndpoint endpoint : endpoints) {
endpointManager.removeEndpoint(endpoint, true);
endpointManager
.removeEndpoint(endpoint, true, "ClientDisconnectionOperation: Cleanup of disconnected client resources");
}

NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private void handleAuthenticationFailure() {
exception = new HazelcastInstanceNotActiveException();
}
sendClientMessage(exception);
endpointManager.removeEndpoint(endpoint);
endpointManager.removeEndpoint(endpoint, "Authentication failed. " + exception.getMessage());
}

private void handleMissingEndpoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public AuthenticationBaseMessageTask(ClientMessage clientMessage, Node node, Con

protected void setEndpoint() {
if (connection.isAlive()) {
checkExistingEndpoint();
if (null == endpoint) {
endpoint = new ClientEndpointImpl(clientEngine, this.connection);
}
Expand All @@ -76,22 +75,6 @@ protected void setEndpoint() {
}
}

private void checkExistingEndpoint() {
if (null != principal) {
clientUuid = principal.getUuid();
endpoint = endpointManager.getEndpoint(clientUuid);
if (null != endpoint) {
Connection previousConnection = endpoint.getConnection();
if (null != previousConnection && !connection.equals(previousConnection)) {
previousConnection.close("A new authentication request from the same client with uuid " + clientUuid
+ " is received. Closing the existing connection " + previousConnection + " for this endpoint.",
null);
}
endpoint.setConnection(connection);
}
}
}

@Override
protected ClientEndpoint getEndpoint() {
if (connection.isAlive()) {
Expand Down

0 comments on commit 7fc7d52

Please sign in to comment.