Skip to content
This repository was archived by the owner on Sep 13, 2022. It is now read-only.
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/********************************************************************************
* Copyright (c) 2020 Calypso Networks Association https://www.calypsonet-asso.org/
*
* See the NOTICE file(s) distributed with this work for additional information regarding copyright
* ownership.
*
* This program and the accompanying materials are made available under the terms of the Eclipse
* Public License 2.0 which is available at http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
********************************************************************************/
package org.eclipse.keyple.calypso.exception;

import org.eclipse.keyple.core.seproxy.exception.KeypleBaseException;

/**
* The exception {@code CalypsoNoSamResourceAvailableException} indicates that there are no SAM
* resources available.
*/
public class CalypsoNoSamResourceAvailableException extends KeypleBaseException {

/**
* @param message the message to identify the exception context
*/
public CalypsoNoSamResourceAvailableException(String message) {
super(message);
}

/**
* @param message the message to identify the exception context
*/
public CalypsoNoSamResourceAvailableException(String message, Throwable t) {
super(message, t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.eclipse.keyple.calypso.command.sam.SamRevision.AUTO;
import java.util.*;
import java.util.regex.Pattern;
import org.eclipse.keyple.calypso.exception.CalypsoNoSamResourceAvailableException;
import org.eclipse.keyple.core.selection.SeSelection;
import org.eclipse.keyple.core.selection.SelectionsResult;
import org.eclipse.keyple.core.seproxy.*;
Expand All @@ -40,8 +41,9 @@ public enum AllocationMode {
BLOCKING, NON_BLOCKING
}

/* the maximum time (in tenths of a second) during which the BLOCKING mode will wait */
private final static int MAX_BLOCKING_TIME = 1000; // 10 sec
/* the default maximum time (in milliseconds) during which the BLOCKING mode will wait */
private final static int MAX_BLOCKING_TIME = 1000; // 1 sec
private final int maxBlockingTime;
private final ReaderPlugin samReaderPlugin;
private final List<SamResource> localSamResources = new ArrayList<SamResource>();
private final boolean dynamicAllocationPlugin;
Expand All @@ -56,10 +58,13 @@ public enum AllocationMode {
* @param samReaderPlugin the plugin through which SAM readers are accessible
* @param samReaderFilter the regular expression defining how to identify SAM readers among
* others.
* @throws KeypleReaderException throw if an error occurs while getting the readers list.
* @param maxBlockingTime the maximum duration for which the allocateSamResource method will
* attempt to allocate a new reader by retrying (in milliseconds)
* @throws KeypleReaderException thrown if an error occurs while getting the readers list.
* @since 0.8.1
*/
public SamResourceManager(ReaderPlugin samReaderPlugin, String samReaderFilter)
throws KeypleReaderException {
public SamResourceManager(ReaderPlugin samReaderPlugin, String samReaderFilter,
int maxBlockingTime) throws KeypleReaderException {
this.samReaderPlugin = samReaderPlugin;
if (samReaderPlugin instanceof ReaderPoolPlugin) {
logger.info("Create SAM resource manager from reader pool plugin: {}",
Expand Down Expand Up @@ -95,6 +100,20 @@ public SamResourceManager(ReaderPlugin samReaderPlugin, String samReaderFilter)
}
}
}
this.maxBlockingTime = maxBlockingTime;
}

/**
* Alternate constructor with default max blocking time value
*
* @param samReaderPlugin the plugin through which SAM readers are accessible
* @param samReaderFilter the regular expression defining how to identify SAM readers among
* others.
* @throws KeypleReaderException thrown if an error occurs while getting the readers list.
*/
public SamResourceManager(ReaderPlugin samReaderPlugin, String samReaderFilter)
throws KeypleReaderException {
this(samReaderPlugin, samReaderFilter, MAX_BLOCKING_TIME);
}

/**
Expand Down Expand Up @@ -144,17 +163,24 @@ private SamResource createSamResource(SeReader samReader) throws KeypleReaderExc
* @param samIdentifier the targeted SAM identifier
* @return a SAM resource
* @throws KeypleReaderException if a reader error occurs
* @throws CalypsoNoSamResourceAvailableException if the reader allocation failed
*/
public SamResource allocateSamResource(AllocationMode allocationMode,
SamIdentifier samIdentifier) throws KeypleReaderException {
long maxBlockingDate = System.currentTimeMillis() + MAX_BLOCKING_TIME;
SamIdentifier samIdentifier)
throws KeypleReaderException, CalypsoNoSamResourceAvailableException {
long maxBlockingDate = System.currentTimeMillis() + maxBlockingTime;
boolean noSamResourceLogged = false;
logger.debug("Allocating SAM reader channel...");
while (true) {
if (dynamicAllocationPlugin) {
// virtually infinite number of readers
SeReader samReader = ((ReaderPoolPlugin) samReaderPlugin)
.allocateReader(samIdentifier.getGroupReference());
SeReader samReader = null;
try {
samReader = ((ReaderPoolPlugin) samReaderPlugin)
.allocateReader(samIdentifier.getGroupReference());
} catch (KeypleAllocationReaderException e) {
throw new CalypsoNoSamResourceAvailableException(e.getMessage(), e);
}
if (samReader != null) {
SamResource samResource = createSamResource(samReader);
logger.debug("Allocation succeeded. SAM resource created.");
Expand Down Expand Up @@ -191,13 +217,12 @@ public SamResource allocateSamResource(AllocationMode allocationMode,
logger.error("Interrupt exception in Thread.sleep.");
}
if (System.currentTimeMillis() >= maxBlockingDate) {
logger.error("The allocation process failed. Timeout {} sec exceeded .",
(MAX_BLOCKING_TIME / 100.0));
return null;
throw new CalypsoNoSamResourceAvailableException(
"The allocation process has timed out.");
}
}
}
return null;
throw new CalypsoNoSamResourceAvailableException("The allocation process has failed.");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package org.eclipse.keyple.core.seproxy;

import java.util.SortedSet;
import org.eclipse.keyple.core.seproxy.exception.KeypleAllocationReaderException;

/**
* The ReaderPoolPlugin interface provides methods to handle the access to an undefined number of
Expand Down Expand Up @@ -42,8 +43,9 @@ public interface ReaderPoolPlugin extends ReaderPlugin {
* @param groupReference the reference of the group to which the reader belongs (may be null
* depending on the implementation made)
* @return a SeReader object
* @throws KeypleAllocationReaderException if the allocation failed
*/
SeReader allocateReader(String groupReference);
SeReader allocateReader(String groupReference) throws KeypleAllocationReaderException;

/**
* Releases a SeReader previously allocated with allocateReader.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/********************************************************************************
* Copyright (c) 2019 Calypso Networks Association https://www.calypsonet-asso.org/
*
* See the NOTICE file(s) distributed with this work for additional information regarding copyright
* ownership.
*
* This program and the accompanying materials are made available under the terms of the Eclipse
* Public License 2.0 which is available at http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
********************************************************************************/
package org.eclipse.keyple.core.seproxy.exception;

/**
* The exception {@code KeypleAllocationReaderException} indicates that a reader allocation failed.
*/
public class KeypleAllocationReaderException extends KeypleBaseException {

/**
* @param message the message to identify the exception context
*/
public KeypleAllocationReaderException(String message) {
super(message);
}

/**
* Encapsulates a lower level reader exception
*
* @param message message to identify the exception context
* @param cause the cause
*/
public KeypleAllocationReaderException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import org.eclipse.keyple.core.seproxy.ReaderPoolPlugin;
import org.eclipse.keyple.core.seproxy.SeReader;
import org.eclipse.keyple.core.seproxy.exception.KeypleAllocationReaderException;
import org.eclipse.keyple.core.seproxy.message.SeResponse;
import org.eclipse.keyple.plugin.remotese.rm.IRemoteMethodExecutor;
import org.eclipse.keyple.plugin.remotese.rm.RemoteMethodName;
Expand Down Expand Up @@ -47,7 +48,15 @@ public TransportDto execute(TransportDto transportDto) {
String groupReference = body.get("groupReference").getAsString();

// Execute Remote Method
SeReader seReader = poolPlugin.allocateReader(groupReference);
SeReader seReader = null;
try {
seReader = poolPlugin.allocateReader(groupReference);
} catch (KeypleAllocationReaderException e) {
// if an exception occurs, send it into a keypleDto to the Master
return transportDto.nextTransportDTO(KeypleDtoHelper.ExceptionDTO(
getMethodName().getName(), e, null, null, null, keypleDto.getTargetNodeId(),
keypleDto.getRequesterNodeId(), keypleDto.getId()));
}

// Build Response
JsonObject bodyResp = new JsonObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.SortedSet;
import org.eclipse.keyple.core.seproxy.SeReader;
import org.eclipse.keyple.core.seproxy.exception.KeypleAllocationReaderException;
import org.eclipse.keyple.plugin.remotese.exception.KeypleRemoteException;
import org.eclipse.keyple.plugin.remotese.rm.RemoteMethodTxPoolEngine;
import org.eclipse.keyple.plugin.remotese.transport.DtoSender;
Expand Down Expand Up @@ -57,7 +58,7 @@ public SortedSet<String> getReaderGroupReferences() {
}

@Override
public SeReader allocateReader(String groupReference) {
public SeReader allocateReader(String groupReference) throws KeypleAllocationReaderException {

if (slaveNodeId == null) {
throw new IllegalStateException(
Expand All @@ -71,7 +72,8 @@ public SeReader allocateReader(String groupReference) {
// blocking call
return allocate.execute(rmTxEngine);
} catch (KeypleRemoteException e) {
return null;// todo throw exception here
throw new KeypleAllocationReaderException(
"Reader Allocation failed for group reference: " + groupReference, e);
}

}
Expand All @@ -92,7 +94,7 @@ public void releaseReader(SeReader seReader) {

VirtualReaderImpl virtualReader = (VirtualReaderImpl) seReader;

// call remote method for allocateReader
// call remote method for releaseReader
RmPoolReleaseTx releaseTx = new RmPoolReleaseTx(virtualReader.getNativeReaderName(),
virtualReader.getName(), this, this.dtoSender, slaveNodeId, dtoSender.getNodeId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ void setTimeout(long timeout) {
* @throws KeypleRemoteException if a problem occurs while sending
*/
public void send(IRemoteMethodTxCallback<T> callback) throws KeypleRemoteException {
if (logger.isTraceEnabled()) {
logger.trace("Send asynchronously keypleDto for {}", this);
}
this.callback = callback;
sender.sendDTO(this.dto());
}
Expand All @@ -110,12 +113,15 @@ public void send(IRemoteMethodTxCallback<T> callback) throws KeypleRemoteExcepti
*/
final public T execute(IRemoteMethodTxEngine rmTxEngine) throws KeypleRemoteException {

if (logger.isTraceEnabled()) {
logger.trace("execute {}", this.toString());
}
// register this method to receive response
rmTxEngine.register(this);

if (!isRegistered) {
throw new IllegalStateException(
"RemoteMethodTx#execute() can not be used until RemoteMethod is isRegistered in a RemoteMethodEngine, please call RemoteMethodEngine#register");
"RemoteMethodTx#execute() can not be used until RemoteMethodTx is registered in a RemoteMethodEngine, please call RemoteMethodEngine#register");
}
// logger.debug("Blocking Get {}", this.getClass().getCanonicalName());
final AbstractRemoteMethodTx thisInstance = this;
Expand All @@ -127,13 +133,15 @@ public void run() {
send(new IRemoteMethodTxCallback<T>() {
@Override
public void get(T response, KeypleRemoteException exception) {
logger.debug("Release lock of rm {} {}", thisInstance.getMethodName(),
thisInstance.id);
if (logger.isTraceEnabled()) {
logger.trace("Release lock of {}", thisInstance.toString());
}
lock.countDown();
}
});
} catch (KeypleRemoteException e) {
logger.error("Exception while sending Dto", e);
logger.error("Exception {} while sending Dto {} for {}", e.getMessage(),
thisInstance);
thisInstance.remoteException = e;
lock.countDown();
}
Expand All @@ -142,14 +150,18 @@ public void get(T response, KeypleRemoteException exception) {

try {
lock = new CountDownLatch(1);
logger.trace("" + "" + "Set callback on RemoteMethodTx {} {}",
this.getClass().getCanonicalName(), this.hashCode());
asyncSend.start();
logger.trace("Lock {}, {}", thisInstance.getMethodName(), this.id);
if (logger.isTraceEnabled()) {
logger.trace("Lock thread for {}", this.toString());
}

// lock until response is received
boolean responseReceived = lock.await(timeout, TimeUnit.MILLISECONDS);

if (responseReceived) {
logger.trace("Unlock {}, {}", this.getClass().getCanonicalName(), this.hashCode());
if (logger.isTraceEnabled()) {
logger.trace("Unlock thread for {}", this.toString());
}
if (this.remoteException != null) {
throw remoteException;
} else {
Expand All @@ -160,14 +172,14 @@ public void get(T response, KeypleRemoteException exception) {
* timeout, no answer has been received
*/
throw new KeypleRemoteException(
"Waiting time elapsed, no answer received from the other node for method "
+ this.getClass().getCanonicalName());
"Waiting time elapsed, no answer received from the other node for "
+ this.toString());
}


} catch (InterruptedException e) {
throw new IllegalStateException(
"Thread locking in blocking transmitSet has encountered an exception", e);
"Thread locking has encounterd an exception with " + this.toString(), e);
}
}

Expand All @@ -177,6 +189,9 @@ public void get(T response, KeypleRemoteException exception) {
* @param keypleDto
*/
void setResponse(KeypleDto keypleDto) {
if (logger.isTraceEnabled()) {
logger.trace("Response received {} for {}", keypleDto, this.toString());
}
try {
this.response = parseResponse(keypleDto);
this.callback.get(response, null);
Expand All @@ -203,5 +218,12 @@ public void setRegistered(Boolean registered) {
protected abstract KeypleDto dto();



@Override
public String toString() {
return "AbstractRemoteMethodTx{" + "sessionId='" + sessionId + '\'' + ", methodName='"
+ getMethodName() + '\'' + ", nativeReaderName='" + nativeReaderName + '\''
+ ", virtualReaderName='" + virtualReaderName + '\'' + ", targetNodeId='"
+ targetNodeId + '\'' + ", requesterNodeId='" + requesterNodeId + '\'' + ", id='"
+ id + '\'' + '}';
}
}
Loading