Skip to content

Commit

Permalink
Added TransportClientPoolExhaustedException at transport level
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Codutti <alberto.codutti@eurotech.com>
  • Loading branch information
Coduz committed Aug 2, 2023
1 parent c744109 commit 2f7bf35
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.kapua.service.device.call.kura;

import com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.eclipse.kapua.KapuaEntityNotFoundException;
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
Expand All @@ -39,10 +40,10 @@
import org.eclipse.kapua.transport.TransportClientFactory;
import org.eclipse.kapua.transport.TransportFacade;
import org.eclipse.kapua.transport.exception.TransportClientGetException;
import org.eclipse.kapua.transport.exception.TransportClientPoolExhaustedException;
import org.eclipse.kapua.transport.exception.TransportTimeoutException;
import org.eclipse.kapua.transport.message.TransportMessage;

import org.checkerframework.checker.nullness.qual.Nullable;
import javax.validation.constraints.NotNull;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -190,10 +191,11 @@ protected KuraResponseMessage sendInternal(@NotNull KuraRequestMessage requestMe
*
* @param kuraRequestMessage The {@link KuraRequestMessage} to send.
* @return The {@link TransportFacade} to use to send the {@link KuraResponseMessage}.
* @throws TransportClientGetException If getting the {@link TransportFacade} causes an {@link Exception}.
* @throws TransportClientPoolExhaustedException If a {@link TransportFacade} cannot be obtained within the configured timeout.
* @throws TransportClientGetException If getting the {@link TransportFacade} causes an {@link Exception}.
* @since 1.0.0
*/
protected TransportFacade<?, ?, ?, ?> borrowClient(KuraRequestMessage kuraRequestMessage) throws TransportClientGetException {
protected TransportFacade<?, ?, ?, ?> borrowClient(KuraRequestMessage kuraRequestMessage) throws TransportClientGetException, TransportClientPoolExhaustedException {
String serverIp = null;
try {
serverIp = KapuaSecurityUtils.doPrivileged(() -> {
Expand All @@ -217,9 +219,10 @@ protected KuraResponseMessage sendInternal(@NotNull KuraRequestMessage requestMe

Map<String, Object> configParameters = new HashMap<>(1);
configParameters.put("serverAddress", serverIp);

return TRANSPORT_CLIENT_FACTORY.getFacade(configParameters);
} catch (TransportClientGetException tcge) {
throw tcge;
} catch (TransportClientGetException | TransportClientPoolExhaustedException tce) {
throw tce;
} catch (Exception e) {
throw new TransportClientGetException(e, serverIp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.model.KapuaObjectFactory;
import org.eclipse.kapua.transport.exception.TransportClientGetException;
import org.eclipse.kapua.transport.exception.TransportClientPoolExhaustedException;
import org.eclipse.kapua.transport.message.TransportChannel;
import org.eclipse.kapua.transport.message.TransportMessage;
import org.eclipse.kapua.transport.message.TransportPayload;
Expand Down Expand Up @@ -43,10 +44,11 @@ public interface TransportClientFactory<C extends TransportChannel, P extends Tr
*
* @param configParameters a {@link Map} containing optional config values for the facade
* @return An instance of the {@link TransportFacade} implementing class.
* @throws TransportClientGetException If error occurs when getting the {@link TransportFacade}.
* @throws TransportClientPoolExhaustedException If a {@link TransportFacade} cannot be obtained within the configured timeout.
* @throws TransportClientGetException If error occurs when getting the {@link TransportFacade}.
* @since 1.0.0
*/
T getFacade(Map<String, Object> configParameters) throws TransportClientGetException;
T getFacade(Map<String, Object> configParameters) throws TransportClientGetException, TransportClientPoolExhaustedException;

/**
* Gets an instance of the {@link TransportClientConnectOptions} implementing class.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*******************************************************************************
* Copyright (c) 2020, 2022 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.transport.exception;

import javax.validation.constraints.NotNull;

/**
* The {@link TransportException} to throw when is not possible to get an instance of the {@link org.eclipse.kapua.transport.TransportFacade} within the configured timeout
*
* @since 2.0.0
*/
public class TransportClientPoolExhaustedException extends TransportException {

private final String serverIp;
private final Long borrowWaitTimeout;

/**
* Constructor.
*
* @param serverIp The serverIp to which connect the {@link org.eclipse.kapua.transport.TransportFacade}
* @since 2.0.0
*/
public TransportClientPoolExhaustedException(@NotNull String serverIp, Long borrowWaitTimeout) {
super(TransportErrorCodes.CLIENT_POOL_EXHAUSTED, serverIp, borrowWaitTimeout);

this.serverIp = serverIp;
this.borrowWaitTimeout = borrowWaitTimeout;
}

/**
* Constructor.
*
* @param cause the root cause of the {@link Exception}.
* @param serverIp The serverIp to which connect the {@link org.eclipse.kapua.transport.TransportFacade}
* @since 2.0.0
*/
public TransportClientPoolExhaustedException(@NotNull Throwable cause, @NotNull String serverIp, Long borrowWaitTimeout) {
super(TransportErrorCodes.CLIENT_POOL_EXHAUSTED, cause, serverIp, borrowWaitTimeout);

this.serverIp = serverIp;
this.borrowWaitTimeout = borrowWaitTimeout;
}

/**
* Gets the IP to which we wanted unsuccessfully to connect.
*
* @return The IP to which we wanted unsuccessfully to connect.
* @since 2.0.0
*/
public String getRequestMessage() {
return serverIp;
}

/**
* Gets the configured timeout for the borrow operation.
*
* @return The configured timeout for the borrow operation.
* @since 2.0.0
*/
public Long getBorrowWaitTimeout() {
return borrowWaitTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,33 @@
import org.eclipse.kapua.KapuaErrorCode;

/**
* {@link KapuaErrorCode}s for {@link TransportException}s.
*
* @since 1.1.0
*/
public enum TransportErrorCodes implements KapuaErrorCode {

/**
* An error occurred when sending the {@link org.eclipse.kapua.transport.message.TransportMessage}.
*
* @see TransportSendException
* @since 1.1.0
*/
SEND_ERROR,

/**
* A response as not been received within the given timeout.
*
* @see TransportTimeoutException
* @since 1.1.0
*/
TIMEOUT,

/**
* Getting the {@link org.eclipse.kapua.transport.TransportFacade} produces an error.
*
* @see TransportClientGetException
* @since 1.2.0
*/
CLIENT_GET
CLIENT_GET,

/**
* @see TransportClientPoolExhaustedException
* @since 2.0.0
*/
CLIENT_POOL_EXHAUSTED
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
#
###############################################################################
CLIENT_GET=Cannot get an instance of the transport client to connect to host: {0}
CLIENT_POOL_EXHAUSTED=Cannot get an instance of the transport client to connect to host {0} within the configured timeout of {1}. This means that the pool is currently exhausted.
SEND_ERROR=An error occurred when sending the message: {0}
TIMEOUT=The request has not received a response within the timeout of: {0}ms
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.kapua.locator.KapuaProvider;
import org.eclipse.kapua.transport.TransportClientFactory;
import org.eclipse.kapua.transport.exception.TransportClientGetException;
import org.eclipse.kapua.transport.exception.TransportClientPoolExhaustedException;
import org.eclipse.kapua.transport.message.mqtt.MqttMessage;
import org.eclipse.kapua.transport.message.mqtt.MqttPayload;
import org.eclipse.kapua.transport.message.mqtt.MqttTopic;
Expand All @@ -33,7 +34,7 @@
public class MqttClientFactoryImpl implements TransportClientFactory<MqttTopic, MqttPayload, MqttMessage, MqttMessage, MqttFacade, MqttClientConnectionOptions> {

@Override
public MqttFacade getFacade(Map<String, Object> configParameters) throws TransportClientGetException {
public MqttFacade getFacade(Map<String, Object> configParameters) throws TransportClientGetException, TransportClientPoolExhaustedException {
String host = (String) configParameters.get("serverAddress");

if (Strings.isNullOrEmpty(host)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
*******************************************************************************/
package org.eclipse.kapua.transport.mqtt;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.eclipse.kapua.transport.TransportFacade;
import org.eclipse.kapua.transport.exception.TransportClientGetException;
import org.eclipse.kapua.transport.exception.TransportClientPoolExhaustedException;
import org.eclipse.kapua.transport.exception.TransportSendException;
import org.eclipse.kapua.transport.exception.TransportTimeoutException;
import org.eclipse.kapua.transport.message.mqtt.MqttMessage;
Expand All @@ -22,11 +24,13 @@
import org.eclipse.kapua.transport.mqtt.exception.MqttClientCallbackSetException;
import org.eclipse.kapua.transport.mqtt.exception.MqttClientSubscribeException;
import org.eclipse.kapua.transport.mqtt.pooling.MqttClientPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.checkerframework.checker.nullness.qual.Nullable;
import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;

/**
* Implementation of {@link TransportFacade} API for MQTT transport facade.
Expand All @@ -35,6 +39,8 @@
*/
public class MqttFacade implements TransportFacade<MqttTopic, MqttPayload, MqttMessage, MqttMessage> {

private static final Logger LOG = LoggerFactory.getLogger(MqttFacade.class);

/**
* The {@link MqttClient} used to make requests.
*
Expand All @@ -50,16 +56,31 @@ public class MqttFacade implements TransportFacade<MqttTopic, MqttPayload, MqttM
/**
* Initializes a {@link MqttFacade} to be used to send requests to devices.
*
* @throws TransportClientGetException When MQTT client is not available for the given node URI.
* @throws TransportClientPoolExhaustedException When {@link MqttClient} cannot be borrowed in the configured timeout (client.pool.borrow.wait.max)
* @throws TransportClientGetException When {@link MqttClient} is not available for the given node URI.
* @since 1.0.0
*/
public MqttFacade(@NotNull String nodeUri) throws TransportClientGetException {
public MqttFacade(@NotNull String nodeUri) throws TransportClientGetException, TransportClientPoolExhaustedException {
this.nodeUri = nodeUri;

//
// Get the client form the pool
Long maxBorrowTimeout = null;
try {
borrowedClient = MqttClientPool.getInstance(nodeUri).borrowObject();
MqttClientPool perBrokerMqttClientPool = MqttClientPool.getInstance(nodeUri);
maxBorrowTimeout = perBrokerMqttClientPool.getMaxWaitMillis();

long borrowTimerStart = System.nanoTime();
borrowedClient = perBrokerMqttClientPool.borrowObject();
long borrowTimerStop = System.nanoTime();

if (LOG.isDebugEnabled()) {
// Logging statistics on borrow time
LOG.debug("MqttClient borrowed for BrokerNode {} in {}ns. Max time waited currently is {}ms with a {}ms timeout",
nodeUri, (borrowTimerStop - borrowTimerStart), perBrokerMqttClientPool.getMaxBorrowWaitTimeMillis(), maxBorrowTimeout);
}
} catch (NoSuchElementException nsee) {
throw new TransportClientPoolExhaustedException(nsee, nodeUri, maxBorrowTimeout);
} catch (Exception e) {
throw new TransportClientGetException(e, nodeUri);
}
Expand Down

0 comments on commit 2f7bf35

Please sign in to comment.