Skip to content

Commit

Permalink
stealing link fix (issue 2204)
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <riccardo.modanese@eurotech.com>
  • Loading branch information
riccardomodanese authored and Coduz committed Nov 28, 2018
1 parent 616e86d commit db1fbb3
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 22 deletions.
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2017 Eurotech and/or its affiliates and others
* Copyright (c) 2017, 2018 Eurotech and/or its affiliates and others
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
Expand All @@ -21,12 +21,22 @@
public class DefaultSystemMessageCreator implements SystemMessageCreator {

private static final String CONNECT_MESSAGE_TEMPLATE = "Device: [%s] - connected by user: [%s]";
private static final String DISCONNECT_MESSAGE_TEMPLATE = "Device: [%s] - disconnected by user: [%s]";

@Override
public String createMessage(SystemMessageType systemMessageType, KapuaConnectionContext kbc) {
return String.format(CONNECT_MESSAGE_TEMPLATE,
kbc.getClientId(),
kbc.getUserName());
switch (systemMessageType) {
case CONNECT:
return String.format(CONNECT_MESSAGE_TEMPLATE,
kbc.getClientId(),
kbc.getUserName());
case DISCONNECT:
return String.format(DISCONNECT_MESSAGE_TEMPLATE,
kbc.getClientId(),
kbc.getUserName());
default:
return "";
}
}

}
Expand Up @@ -21,7 +21,8 @@
public interface SystemMessageCreator {

enum SystemMessageType {
CONNECT
CONNECT,
DISCONNECT
}

/**
Expand Down
Expand Up @@ -106,6 +106,7 @@ public class KapuaSecurityBrokerFilter extends BrokerFilter {
public static final String VT_TOPIC_PREFIX = "VirtualTopic.";

private static final String CONNECT_MESSAGE_TOPIC_PATTERN = "VirtualTopic.%s.%s.%s.MQTT.CONNECT";
private static final String DISCONNECT_MESSAGE_TOPIC_PATTERN = "VirtualTopic.%s.%s.%s.MQTT.DISCONNECT";
private static final String BROKER_IP_RESOLVER_CLASS_NAME;
private static final String BROKER_ID_RESOLVER_CLASS_NAME;
private static final String AUTHENTICATOR_CLASS_NAME;
Expand Down Expand Up @@ -149,6 +150,7 @@ public KapuaSecurityBrokerFilter(Broker next) throws KapuaException {
options.put(Authenticator.ADDRESS_PREFIX_KEY, VT_TOPIC_PREFIX);
options.put(Authenticator.ADDRESS_CLASSIFIER_KEY, SystemSetting.getInstance().getMessageClassifier());
options.put(Authenticator.ADDRESS_CONNECT_PATTERN_KEY, CONNECT_MESSAGE_TOPIC_PATTERN);
options.put(Authenticator.ADDRESS_DISCONNECT_PATTERN_KEY, DISCONNECT_MESSAGE_TOPIC_PATTERN);
}

@Override
Expand Down
Expand Up @@ -28,6 +28,7 @@ public interface Authenticator {
String ADDRESS_CLASSIFIER_KEY = "address_classifier";
String ADDRESS_PREFIX_KEY = "address_prefix";
String ADDRESS_CONNECT_PATTERN_KEY = "address_connect_pattern";
String ADDRESS_DISCONNECT_PATTERN_KEY = "address_disconnect_pattern";

/**
* Execute the connect logic returning the authorization list (ACL)
Expand Down Expand Up @@ -56,4 +57,11 @@ public abstract List<org.eclipse.kapua.broker.core.plugin.authentication.Authori
*/
public abstract void sendConnectMessage(KapuaConnectionContext kcc);

/**
* Send the disconnect message (this message is mainly for internal use)
*
* @param kcc
*/
public abstract void sendDisconnectMessage(KapuaConnectionContext kcc);

}
Expand Up @@ -12,6 +12,7 @@
package org.eclipse.kapua.broker.core.plugin.authentication;

import com.codahale.metrics.Timer.Context;

import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.broker.core.message.system.DefaultSystemMessageCreator;
import org.eclipse.kapua.broker.core.message.system.SystemMessageCreator;
Expand Down Expand Up @@ -90,12 +91,13 @@ public List<AuthorizationEntry> connect(KapuaConnectionContext kcc)

@Override
public void disconnect(KapuaConnectionContext kcc, Throwable error) {
if (!isAdminUser(kcc)) {
if (isAdminUser(kcc)) {
clientMetric.getDisconnectionKapuasys().inc();
userAuthenticationLogic.disconnect(kcc, error);
adminAuthenticationLogic.disconnect(kcc, error);
} else {
clientMetric.getDisconnectionClient().inc();
adminAuthenticationLogic.disconnect(kcc, error);
userAuthenticationLogic.disconnect(kcc, error);
sendDisconnectMessage(kcc);
}
}

Expand All @@ -120,6 +122,27 @@ public void sendConnectMessage(KapuaConnectionContext kcc) {
loginSendLogingUpdateMsgTimeContex.stop();
}

@Override
public void sendDisconnectMessage(KapuaConnectionContext kcc) {
Context loginSendLogingUpdateMsgTimeContex = loginMetric.getSendLoginUpdateMsgTime().time();
String message = systemMessageCreator.createMessage(SystemMessageType.DISCONNECT, kcc);
JmsAssistantProducerWrapper producerWrapper = null;
try {
producerWrapper = JmsAssistantProducerPool.getIOnstance(DESTINATIONS.NO_DESTINATION).borrowObject();
producerWrapper.send(String.format((String) options.get(Authenticator.ADDRESS_DISCONNECT_PATTERN_KEY),
SystemSetting.getInstance().getMessageClassifier(), kcc.getAccountName(), kcc.getClientId()),
message,
kcc);
} catch (Exception e) {
logger.error("Exception sending the connect message: {}", e.getMessage(), e);
} finally {
if (producerWrapper != null) {
JmsAssistantProducerPool.getIOnstance(DESTINATIONS.NO_DESTINATION).returnObject(producerWrapper);
}
}
loginSendLogingUpdateMsgTimeContex.stop();
}

protected boolean isAdminUser(KapuaConnectionContext kcc) {
String adminUsername = SystemSetting.getInstance().getString(SystemSettingKey.SYS_ADMIN_USERNAME);
return kcc.getUserName().equals(adminUsername);
Expand Down
Expand Up @@ -101,6 +101,7 @@ public List<AuthorizationEntry> connect(KapuaConnectionContext kcc) throws Kapua
@Override
public void disconnect(KapuaConnectionContext kcc, Throwable error) {
boolean stealingLinkDetected = false;
logger.debug("Old connection id: {} - new connection id: {} - error: {} - error cause: {}", kcc.getOldConnectionId(), kcc.getConnectionId(), error, (error!=null ? error.getCause() : "NULL"), error);
if (kcc.getOldConnectionId() != null) {
stealingLinkDetected = !kcc.getOldConnectionId().equals(kcc.getConnectionId());
} else {
Expand All @@ -119,18 +120,16 @@ public void disconnect(KapuaConnectionContext kcc, Throwable error) {
kcc.getConnectionId(),
kcc.getClientIp());
} else {

DeviceConnection deviceConnection;
try {
deviceConnection = KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.findByClientId(kcc.getScopeId(), kcc.getClientId()));
} catch (Exception e) {
throw new ShiroException("Error while looking for device connection on updating the device!", e);
}

if (deviceConnection != null) {
// the device connection must be not null
// update device connection (if the disconnection wasn't caused by a stealing link)
if (error instanceof KapuaDuplicateClientIdException) {
if (error instanceof KapuaDuplicateClientIdException || (error!=null && error.getCause() instanceof KapuaDuplicateClientIdException)) {
logger.debug("Skip device connection status update since is coming from a stealing link condition. Client id: {} - Connection id: {}",
kcc.getClientId(),
kcc.getConnectionId());
Expand Down
Expand Up @@ -24,6 +24,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Device that connects to MQTT broker and listens for messages as kapua-sys user
Expand All @@ -33,6 +34,8 @@
*/
public class MqttDevice {

private static final AtomicInteger COUNT = new AtomicInteger(0);

/**
* Logger.
*/
Expand All @@ -46,7 +49,7 @@ public class MqttDevice {
/**
* Listening mqtt client name.
*/
private static final java.lang.String LISTENER_NAME = "ListenerClient";
private static final java.lang.String LISTENER_NAME = "ListenerClient_";

/**
* System user under which Device is listening for messages.
Expand Down Expand Up @@ -84,6 +87,8 @@ public class MqttDevice {
*/
private MqttClient subscribedClient;

private String clientId;

/**
* Map for storing received messages that clients are listening to.
* It is Map of Maps, first key is clientId. Key in second map is
Expand All @@ -99,6 +104,7 @@ public class MqttDevice {
public MqttDevice() {

mqttClients = new HashMap<>();
clientId = LISTENER_NAME + COUNT.incrementAndGet();
}

/**
Expand All @@ -110,7 +116,7 @@ public void mqttSubscriberConnect() {
subscriberOpts.setUserName(SYS_USER);
subscriberOpts.setPassword(SYS_PASSWORD.toCharArray());
try {
subscribedClient = new MqttClient(BROKER_URI, LISTENER_NAME,
subscribedClient = new MqttClient(BROKER_URI, clientId,
new MemoryPersistence());
subscribedClient.connect(subscriberOpts);
subscribedClient.subscribe(NO_TOPIC_FILTER, DEFAULT_QOS);
Expand All @@ -121,24 +127,24 @@ public void mqttSubscriberConnect() {
subscribedClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
logger.info("Listener connection to broker lost. {}", throwable.getMessage(), throwable);
logger.info("(Client {}) Listener connection to broker lost. {}", clientId, throwable.getMessage(), throwable);
}

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
logger.info("Message arrived in Listener with topic: {}", topic);
logger.info("(Client {}) - Message arrived in Listener with topic: {}", clientId, topic);
// exclude the connect messages sent by the broker (that may affect the tests)
// this messages can be received by this callback before the listenerReceivedMqttMessage is properly initialized. So a check for null should be performed
// TODO manage this client in a better way, so the list of the received messages should be internal and exposed as getter to the caller.
if (listenerReceivedMqttMessage != null) {
if (!topic.contains("MQTT/CONNECT")) {
if (!topic.contains("MQTT/CONNECT") || topic.contains("MQTT/DISCONNECT")) {
listenerReceivedMqttMessage.clear();
listenerReceivedMqttMessage.put(topic, new String(mqttMessage.getPayload()));
} else {
logger.info("Received CONNECT message. The message will be discarded!");
logger.info("(Client {}) - Received CONNECT/DISCONNECT message. The message will be discarded!", clientId);
}
} else {
logger.info("Received message map is null. The message is not stored!");
logger.info("(Client {}) - Received message map is null. The message is not stored!", clientId);
}
}

Expand All @@ -153,7 +159,7 @@ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
* Disconnect Device mqtt subscriber that listens on mqtt broker.
*/
public void mqttSubscriberDisconnect() {

logger.info("(Client {}) - Unsubscribing", clientId);
try {
try (final Suppressed<Exception> s = Suppressed.withException()) {
s.run(subscribedClient::disconnect);
Expand Down Expand Up @@ -217,7 +223,7 @@ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
* Disconnect Device mqtt client that listens and sends messages to mqtt broker.
*/
public void mqttClientsDisconnect() {

logger.info("(Client {}) - Disconnecting", clientId);
for (Map.Entry<String, MqttClient> mqttClient : mqttClients.entrySet()) {
try {
try (final Suppressed<Exception> s = Suppressed.withException()) {
Expand Down
Expand Up @@ -393,8 +393,8 @@ Feature: Broker ACL tests
Given I expect the exception "MqttException" with the text "*"
When broker with clientId "client-1" and user "luise" and password "kapua-password" is listening on topic "$EDC/acme/foo/bar/NOTIFY/client-1"
# Then An exception was thrown
# And clients are disconnected
# And Mqtt Device is stoped
And clients are disconnected
And Mqtt Device is stoped
#
# Data view
#
Expand Down

0 comments on commit db1fbb3

Please sign in to comment.