Skip to content

Commit

Permalink
Fix resumeTransaction always being forced when tx sync is inactive (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nielsm5 committed Aug 8, 2022
1 parent bb2a08a commit 66b2ff9
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,31 +91,26 @@ public InputStream getContent() throws IOException {
@Override
public void writeTo(OutputStream outStream) throws IOException {
int length = Math.toIntExact(getContentLength());
if(message.requiresStream()) {
try (InputStream inStream = message.asInputStream()) {
final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE];
int readLen;
if(length < 0) {
// consume until EOF
while((readLen = inStream.read(buffer)) != -1) {
outStream.write(buffer, 0, readLen);
}
} else {
// consume no more than length
long remaining = length;
while(remaining > 0) {
readLen = inStream.read(buffer, 0, (int) Math.min(OUTPUT_BUFFER_SIZE, remaining));
if(readLen == -1) {
break;
}
outStream.write(buffer, 0, readLen);
remaining -= readLen;
try (InputStream inStream = message.asInputStream()) {
final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE];
int readLen;
if(length < 0) {
// consume until EOF
while((readLen = inStream.read(buffer)) != -1) {
outStream.write(buffer, 0, readLen);
}
} else {
// consume no more than length
long remaining = length;
while(remaining > 0) {
readLen = inStream.read(buffer, 0, (int) Math.min(OUTPUT_BUFFER_SIZE, remaining));
if(readLen == -1) {
break;
}
outStream.write(buffer, 0, readLen);
remaining -= readLen;
}
}
} else {
outStream.write(message.asByteArray(), 0, length);
outStream.flush();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class TransactionConnector<T,R> implements AutoCloseable {
private Thread parentThread;
private Thread childThread;
private ThrowingRunnable<?> onEndChildThreadAction;

private boolean childThreadTransactionSuspended;

private TransactionConnector(TransactionConnectorCoordinator<T,R> coordinator, Object owner) {
Expand All @@ -51,7 +51,7 @@ private TransactionConnector(TransactionConnectorCoordinator<T,R> coordinator, O
this.coordinator = coordinator;
this.owner = owner;
}

/**
* factory method, to be called from 'main' thread.
*
Expand All @@ -72,7 +72,7 @@ public static <T,R> TransactionConnector<T,R> getInstance(IThreadConnectableTran
coordinator.setLastInThread(instance);
return instance;
}

/**
* resume transaction, that was saved in parent thread, in the child thread.
* After beginChildThread() has been called, new transactional resources cannot be enlisted in the parentThread,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private TransactionConnectorCoordinator(IThreadConnectableTransactionManager<T,R
transaction = this.txManager.getCurrentTransaction();
suspendTransaction();
}

public static <T,R> TransactionConnectorCoordinator<T,R> getInstance(IThreadConnectableTransactionManager<T,R> txManager) {
if (txManager==null) {
throw new IllegalStateException("txManager is null");
Expand All @@ -68,12 +68,12 @@ public void setLastInThread(TransactionConnector<T,R> target) {
log.debug("setting lastInThread [{}] to target [{}]", lastInThread, target);
lastInThread = target;
}

public boolean isLastInThread(TransactionConnector<T,R> target) {
log.debug("comparing lastInThread [{}] to target [{}]", lastInThread, target);
return lastInThread==target;
}

/**
* Execute an action with the thread prepared for enlisting transactional resources.
* To be called for obtaining transactional resources (like JDBC connections) if a TransactionConnector might already have been created on the thread.
Expand Down Expand Up @@ -106,8 +106,8 @@ public static <T, R, E extends Exception> boolean onEndChildThread(ThrowingRunna
}
return false;
}


public void resumeTransactionInChildThread(TransactionConnector<T,R> requester) {
Thread thread = Thread.currentThread();
if (thread!=parentThread) {
Expand All @@ -128,21 +128,21 @@ public void suspendTransaction() {
log.debug("suspending transaction of parent thread [{}], current thread [{}]", ()->parentThread.getName(), ()->Thread.currentThread().getName());
resourceHolder = this.txManager.suspendTransaction(transaction);
suspended = true;
} else {
} else {
log.debug("transaction of parent thread [{}] was already suspended, current thread [{}]", ()->parentThread.getName(), ()->Thread.currentThread().getName());
}
}

public void resumeTransaction() {
resumeTransaction(false);
}
public void resumeTransaction(boolean force) {
if (suspended || force) {
log.debug("resumeTransaction() resuming transaction of parent thread [{}], current thread [{}]", ()->parentThread.getName(), ()->Thread.currentThread().getName());
if (!force || !TransactionSynchronizationManager.isSynchronizationActive()) {
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
txManager.resumeTransaction(transaction, resourceHolder);
}
} else {
} else {
log.debug("resumeTransaction() transaction of parent thread [{}] was already resumed, current thread [{}]", ()->parentThread.getName(), ()->Thread.currentThread().getName());
}
suspended = false;
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/nl/nn/adapterframework/pipes/XsltPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ public boolean supportsOutputStreamPassThrough() {
return false;
}

/**
* If true, then this pipe will process the XSLT while streaming in a different thread. Can be used to switch streaming xslt off for debugging purposes
* @ff.default set by appconstant xslt.streaming.default
*/
public void setStreamingXslt(boolean streamingActive) {
sender.setStreamingXslt(streamingActive);
}


@Override
protected MessageOutputStream provideOutputStream(PipeLineSession session) throws StreamingException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public MessageOutputStream provideOutputStream(PipeLineSession session, IForward
log.debug("sender [{}] cannot provide outputstream", () -> getName());
return null;
}
ThreadConnector threadConnector = isStreamingXslt() ? new ThreadConnector(this, threadLifeCycleEventListener, txManager, session) : null;
ThreadConnector threadConnector = getStreamingXslt() ? new ThreadConnector(this, threadLifeCycleEventListener, txManager, session) : null;
MessageOutputStream target = MessageOutputStream.getTargetStream(this, session, next);
try {
TransformerPool poolToUse = getTransformerPoolToUse(session);
Expand Down
46 changes: 24 additions & 22 deletions core/src/main/java/nl/nn/adapterframework/senders/XsltSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ public class XsltSender extends StreamingSenderBase implements IThreadCreator {

protected ThreadLifeCycleEventListener<Object> threadLifeCycleEventListener;
protected @Setter IThreadConnectableTransactionManager txManager;
private @Getter boolean streamingXslt;

private @Getter Boolean streamingXslt = null;

/**
* The <code>configure()</code> method instantiates a transformer for the specified
Expand All @@ -112,7 +111,7 @@ public void configure() throws ConfigurationException {
parameterNamesMustBeUnique = true;
super.configure();

streamingXslt = AppConstants.getInstance(getConfigurationClassLoader()).getBoolean(XmlUtils.XSLT_STREAMING_BY_DEFAULT_KEY, false);
if(streamingXslt == null) streamingXslt = AppConstants.getInstance(getConfigurationClassLoader()).getBoolean(XmlUtils.XSLT_STREAMING_BY_DEFAULT_KEY, false);
dynamicTransformerPoolMap = Collections.synchronizedMap(new LRUMap(transformerPoolMapSize));

if(StringUtils.isNotEmpty(getXpathExpression()) && getOutputType()==null) {
Expand Down Expand Up @@ -326,31 +325,34 @@ public PipeRunResult sendMessage(Message message, PipeLineSession session, IForw
if (message==null) {
throw new SenderException(getLogPrefix()+"got null input");
}
try {
try (ThreadConnector threadConnector = streamingXslt ? new ThreadConnector(this, threadLifeCycleEventListener, txManager, session) : null) {
try (MessageOutputStream target=MessageOutputStream.getTargetStream(this, session, next)) {
TransformerPool poolToUse = getTransformerPoolToUse(session);
ContentHandler handler = createHandler(message, threadConnector, session, poolToUse, target);
if (isDebugInput() && log.isDebugEnabled()) {
handler = new XmlTap(handler) {
@Override
public void endDocument() throws SAXException {
super.endDocument();
log.debug(getLogPrefix()+" xml input ["+getWriter()+"]");
}
};
}
XMLReader reader = getXmlReader(session, handler, (resource,label)->target.closeOnClose(resource));
InputSource source = message.asInputSource();
reader.parse(source);
return target.getPipeRunResult();

try (ThreadConnector threadConnector = streamingXslt ? new ThreadConnector(this, threadLifeCycleEventListener, txManager, session) : null) {
try (MessageOutputStream target=MessageOutputStream.getTargetStream(this, session, next)) {
TransformerPool poolToUse = getTransformerPoolToUse(session);
ContentHandler handler = createHandler(message, threadConnector, session, poolToUse, target);
if (isDebugInput() && log.isDebugEnabled()) {
handler = new XmlTap(handler) {
@Override
public void endDocument() throws SAXException {
super.endDocument();
log.debug(getLogPrefix()+" xml input ["+getWriter()+"]");
}
};
}
XMLReader reader = getXmlReader(session, handler, (resource,label)->target.closeOnClose(resource));
InputSource source = message.asInputSource();
reader.parse(source);
return target.getPipeRunResult();
}
} catch (Exception e) {
throw new SenderException(getLogPrefix()+"Exception on transforming input", e);
throw new SenderException(getLogPrefix()+"Cannot transform input", e);
}
}

@IbisDoc({"If true, then this sender will process the XSLT while streaming in a different thread. Can be used to switch streaming off for debugging purposes","set by appconstant xslt.streaming.default"})
public void setStreamingXslt(Boolean streamingActive) {
this.streamingXslt = streamingActive;
}

@Override
public boolean isSynchronous() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public boolean isBinary() {
}

public boolean isRepeatable() {
return request instanceof String || request instanceof ThrowingSupplier || request instanceof byte[] || request instanceof ByteArrayInputStream || request instanceof Node;
return request instanceof String || request instanceof ThrowingSupplier || request instanceof byte[] || request instanceof Node;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,21 @@

public class MessageOutputStream implements AutoCloseable {
protected static Logger log = LogUtil.getLogger(MessageOutputStream.class);

private INamedObject owner;
protected Object requestStream;
private Message response;
private PipeForward forward;
private String conversionCharset;

private MessageOutputStream nextStream;
private MessageOutputStream tail;

private Set<AutoCloseable> resourcesToClose;

private ThreadConnector<?> threadConnector;
private ThreadConnector<?> targetThreadConnector;

protected MessageOutputStream(INamedObject owner, IForwardTarget next, String conversionCharset) {
this.owner=owner;
this.conversionCharset=conversionCharset;
Expand All @@ -74,7 +74,7 @@ protected MessageOutputStream(INamedObject owner, MessageOutputStream nextStream
this.conversionCharset=conversionCharset;
connect(nextStream);
}

public MessageOutputStream(INamedObject owner, OutputStream stream, IForwardTarget next) {
this(owner, stream, next, null);
}
Expand All @@ -89,7 +89,7 @@ public MessageOutputStream(INamedObject owner, OutputStream stream, MessageOutpu
this(owner, nextStream, conversionCharset);
this.requestStream=stream;
}

public MessageOutputStream(INamedObject owner, Writer writer, IForwardTarget next) {
this(owner, writer, next, null);
}
Expand All @@ -104,7 +104,7 @@ public MessageOutputStream(INamedObject owner, Writer writer, MessageOutputStrea
this(owner, nextStream, conversionCharset);
this.requestStream=writer;
}

// this constructor for testing only
<T> MessageOutputStream(ContentHandler handler) {
this(null, (IForwardTarget)null, null);
Expand All @@ -117,7 +117,7 @@ public <T> MessageOutputStream(INamedObject owner, ContentHandler handler, Messa
threadConnector = new ThreadConnector<T>(owner, threadLifeCycleEventListener, txManager, session);
this.targetThreadConnector = targetThreadConnector;
}

// this constructor for testing only
<T> MessageOutputStream(JsonEventHandler handler) {
this(null, (IForwardTarget)null, null);
Expand All @@ -141,7 +141,7 @@ private void connect(MessageOutputStream nextStream) {
tail=nextStream.tail;
}
}

protected void setRequestStream(Object requestStream) {
this.requestStream = requestStream;
}
Expand All @@ -159,14 +159,14 @@ public void closeRequestStream() throws IOException {
public void afterClose() throws Exception {
// can be overridden when necessary
}

public void closeOnClose(AutoCloseable resource) {
if (resourcesToClose==null) {
resourcesToClose = new LinkedHashSet<>();
}
resourcesToClose.add(resource);
}

@Override
public final void close() throws Exception {
try {
Expand Down Expand Up @@ -220,11 +220,11 @@ private String getLogPrefix() {
}
return "MessageOutputStream of "+ClassUtils.nameOf(owner)+" ";
}

public OutputStream asStream() throws StreamingException {
return asStream(null);
}

public OutputStream asStream(String charset) throws StreamingException {
if (requestStream instanceof OutputStream) {
if (log.isDebugEnabled()) log.debug(getLogPrefix() + "returning OutputStream as OutputStream");
Expand Down Expand Up @@ -347,7 +347,7 @@ public void captureCharacterStream(Writer writer, int maxSize) {
}
log.warn("captureCharacterStream() called before stream is installed.");
}

public ByteArrayOutputStream captureBinaryStream() {
ByteArrayOutputStream result = new ByteArrayOutputStream();
captureBinaryStream(result);
Expand Down Expand Up @@ -378,7 +378,7 @@ public void captureBinaryStream(OutputStream outputStream, int maxSize) {
}
log.warn("captureBinaryStream() called before stream is installed.");
}

/**
* Response message, e.g. the filename, of the {IOutputStreamTarget target}
* after processing the stream. It is the responsibility of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void configure() throws ConfigurationException {
super.configure();
canProvideOutputStream = getParameterList()==null || !getParameterList().isInputValueOrContextRequiredForResolution();
}

@Override
// can make this sendMessage() 'final', debugging handled by IStreamingSender.sendMessage(), that includes the MessageOutputStream
public final Message sendMessage(Message message, PipeLineSession session) throws SenderException, TimeoutException {
Expand All @@ -52,5 +52,4 @@ protected boolean canProvideOutputStream() {
public boolean supportsOutputStreamPassThrough() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ public interface ThreadLifeCycleEventListener<T> {
public <O> O threadCreated(T ref, O request);
public <O> O threadEnded(T ref, O result);
public Throwable threadAborted(T ref, Throwable t);

}
Loading

0 comments on commit 66b2ff9

Please sign in to comment.