From 2909b24d37c2ce9fbccbe36e05e571d909064e49 Mon Sep 17 00:00:00 2001 From: christian ohr Date: Thu, 16 Mar 2017 14:41:10 +0100 Subject: [PATCH 1/3] Make MLLP Decoder more robust when requests are flooding and more flexible on charset errors --- .../camel/component/hl7/HL7MLLPConfig.java | 21 +++ .../camel/component/hl7/HL7MLLPDecoder.java | 135 +++++++++-------- .../hl7/HL7MLLPCodecMessageFloodingTest.java | 138 ++++++++++++++++++ 3 files changed, 236 insertions(+), 58 deletions(-) create mode 100644 components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java diff --git a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java index 2d5abb2452eb7..5f580a75443cf 100644 --- a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java +++ b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java @@ -17,6 +17,7 @@ package org.apache.camel.component.hl7; import java.nio.charset.Charset; +import java.nio.charset.CodingErrorAction; import ca.uhn.hl7v2.DefaultHapiContext; import ca.uhn.hl7v2.HapiContext; @@ -41,6 +42,10 @@ public class HL7MLLPConfig { private boolean produceString = true; + private CodingErrorAction malformedInputErrorAction = CodingErrorAction.REPORT; + + private CodingErrorAction unmappableCharacterErrorAction = CodingErrorAction.REPORT; + public Charset getCharset() { return charset; } @@ -113,4 +118,20 @@ public boolean isProduceString() { public void setProduceString(boolean produceString) { this.produceString = produceString; } + + public CodingErrorAction getMalformedInputErrorAction() { + return malformedInputErrorAction; + } + + public void setMalformedInputErrorAction(CodingErrorAction malformedInputErrorAction) { + this.malformedInputErrorAction = malformedInputErrorAction; + } + + public CodingErrorAction getUnmappableCharacterErrorAction() { + return unmappableCharacterErrorAction; + } + + public void setUnmappableCharacterErrorAction(CodingErrorAction unmappableCharacterErrorAction) { + this.unmappableCharacterErrorAction = unmappableCharacterErrorAction; + } } diff --git a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java index 4a54d47a04ab3..2866aca70d07e 100644 --- a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java +++ b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java @@ -42,76 +42,89 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder { this.config = config; } - @Override - protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) { + protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { // Get the state of the current message and - // Skip what we have already scanned + // Skip what we have already scanned before DecoderState state = decoderState(session); in.position(state.current()); + LOG.debug("Received data, checking from position {} to {}", in.position(), in.limit()); + boolean messageDecoded = false; + while (in.hasRemaining()) { + + int previousPosition = in.position(); byte current = in.get(); - // If it is the start byte and mark the position - if (current == config.getStartByte()) { - state.markStart(in.position() - 1); - } - // If it is the end bytes, extract the payload and return - if (state.previous() == config.getEndByte1() && current == config.getEndByte2()) { - - // Remember the current position and limit. - int position = in.position(); - int limit = in.limit(); - LOG.debug("Message ends at position {} with length {}", - position, position - state.start()); - try { + // Check if we are at the end of an HL7 message + if (current == config.getEndByte2() && state.previous() == config.getEndByte1()) { + if (state.isStarted()) { + // Save the current buffer pointers and reset them to surround the identifier message + int currentPosition = in.position(); + int currentLimit = in.limit(); + LOG.debug("Message ends at position {} with length {}", previousPosition, previousPosition - state.start() + 1); in.position(state.start()); - in.limit(position); - // The bytes between in.position() and in.limit() - // now contain a full MLLP message including the - // start and end bytes. - out.write(config.isProduceString() - ? parseMessageToString(in.slice(), charsetDecoder(session)) - : parseMessageToByteArray(in.slice())); - } catch (CharacterCodingException cce) { - throw new IllegalArgumentException("Exception while finalizing the message", cce); - } finally { - // Reset position, limit, and state - in.limit(limit); - in.position(position); - state.reset(); + in.limit(currentPosition); + LOG.debug("Set start to position {} and limit to {}", in.position(), in.limit()); + + // Now create string or byte[] from this part of the buffer and restore the buffer pointers + try { + out.write(config.isProduceString() + ? parseMessageToString(in.slice(), charsetDecoder(session)) + : parseMessageToByteArray(in.slice())); + messageDecoded = true; + } finally { + LOG.debug("Resetting to position {} and limit to {}", currentPosition, currentLimit); + in.position(currentPosition); + in.limit(currentLimit); + state.reset(); + } + } else { + LOG.warn("Ignoring message end at position {} until start byte has been seen.", previousPosition); + } + } else { + // Check if we are at the start of an HL7 message + if (current == config.getStartByte()) { + state.markStart(previousPosition); + } else { + // Remember previous byte in state object because the buffer could + // be theoretically exhausted right between the two end bytes + state.markPrevious(current); } - return true; + messageDecoded = false; } - // Remember previous byte in state object because the buffer could - // be theoretically exhausted right between the two end bytes - state.markPrevious(current); } - // Could not find a complete message in the buffer. - // Reset to the initial position and return false so that this method - // is called again with more data. - LOG.debug("No complete message yet at position {} ", in.position()); - state.markCurrent(in.position()); - in.position(0); - return false; + if (!messageDecoded) { + // Could not find a complete message in the buffer. + // Reset to the initial position (just as nothing had been read yet) + // and return false so that this method is called again with more data. + LOG.debug("No complete message yet at position {} ", in.position()); + state.markCurrent(in.position()); + in.position(0); + } + return messageDecoded; } // Make a defensive byte copy (the buffer will be reused) // and omit the start and the two end bytes of the MLLP message // returning a byte array - private Object parseMessageToByteArray(IoBuffer slice) throws CharacterCodingException { - byte[] dst = new byte[slice.limit() - 3]; - slice.skip(1); // skip start byte - slice.get(dst, 0, dst.length); + private Object parseMessageToByteArray(IoBuffer buf) throws CharacterCodingException { + int len = buf.limit() - 3; + LOG.debug("Making byte array of length {}", len); + byte[] dst = new byte[len]; + buf.skip(1); // skip start byte + buf.get(dst, 0, len); + buf.skip(2); // skip end bytes // Only do this if conversion is enabled if (config.isConvertLFtoCR()) { + LOG.debug("Replacing LF by CR"); for (int i = 0; i < dst.length; i++) { - if (dst[i] == (byte)'\n') { - dst[i] = (byte)'\r'; + if (dst[i] == (byte) '\n') { + dst[i] = (byte) '\r'; } } } @@ -121,12 +134,16 @@ private Object parseMessageToByteArray(IoBuffer slice) throws CharacterCodingExc // Make a defensive byte copy (the buffer will be reused) // and omit the start and the two end bytes of the MLLP message // returning a String - private Object parseMessageToString(IoBuffer slice, CharsetDecoder decoder) throws CharacterCodingException { - slice.skip(1); // skip start byte - String message = slice.getString(slice.limit() - 3, decoder); + private Object parseMessageToString(IoBuffer buf, CharsetDecoder decoder) throws CharacterCodingException { + int len = buf.limit() - 3; + LOG.debug("Making string of length {} using charset {}", len, decoder.charset()); + buf.skip(1); // skip start byte + String message = buf.getString(len, decoder); + buf.skip(2); // skip end bytes // Only do this if conversion is enabled if (config.isConvertLFtoCR()) { + LOG.debug("Replacing LF by CR"); message = message.replace('\n', '\r'); } return message; @@ -142,7 +159,9 @@ private CharsetDecoder charsetDecoder(IoSession session) { synchronized (session) { CharsetDecoder decoder = (CharsetDecoder) session.getAttribute(CHARSET_DECODER); if (decoder == null) { - decoder = config.getCharset().newDecoder(); + decoder = config.getCharset().newDecoder() + .onMalformedInput(config.getMalformedInputErrorAction()) + .onUnmappableCharacter(config.getUnmappableCharacterErrorAction()); session.setAttribute(CHARSET_DECODER, decoder); } return decoder; @@ -164,25 +183,22 @@ private DecoderState decoderState(IoSession session) { * Holds the state of the decoding process */ private static class DecoderState { - private int startPos; + private int startPos = -1; private int currentPos; private byte previousByte; - private boolean started; void reset() { - startPos = 0; + startPos = -1; currentPos = 0; - started = false; previousByte = 0; } void markStart(int position) { - if (started) { + if (isStarted()) { LOG.warn("Ignoring message start at position {} before previous message has ended.", position); } else { startPos = position; LOG.debug("Message starts at position {}", startPos); - started = true; } } @@ -205,6 +221,9 @@ public int current() { public byte previous() { return previousByte; } - } + public boolean isStarted() { + return startPos >= 0; + } + } } diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java new file mode 100644 index 0000000000000..17133cc4b7c23 --- /dev/null +++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hl7; + +import ca.uhn.hl7v2.model.Message; +import ca.uhn.hl7v2.model.v24.message.ADR_A19; +import ca.uhn.hl7v2.model.v24.segment.MSA; +import ca.uhn.hl7v2.model.v24.segment.MSH; +import ca.uhn.hl7v2.model.v24.segment.QRD; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Unit test for the HL7MLLP Codec. + */ +public class HL7MLLPCodecMessageFloodingTest extends HL7TestSupport { + + + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + HL7MLLPCodec codec = new HL7MLLPCodec(); + codec.setCharset("ISO-8859-1"); + codec.setConvertLFtoCR(false); + jndi.bind("hl7codec", codec); + return jndi; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec") + .unmarshal().hl7() + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + Message input = exchange.getIn().getBody(Message.class); + Message response = input.generateACK(); + exchange.getOut().setBody(response); + Thread.sleep(50); // simulate some processing time + } + }) + .to("mock:result"); + } + }; + } + + @Test + public void testHL7MessageFlood() throws Exception { + + // Write and receive using plain sockets and in different threads + Socket socket = new Socket("localhost", getPort()); + BufferedOutputStream outputStream = new BufferedOutputStream(new DataOutputStream(socket.getOutputStream())); + final BufferedInputStream inputStream = new BufferedInputStream(new DataInputStream(socket.getInputStream())); + + int messageCount = 100; + CountDownLatch latch = new CountDownLatch(messageCount); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + int response; + StringBuilder s = new StringBuilder(); + try { + int i = 0; + boolean cont = true; + while (cont && (response = inputStream.read()) >= 0) { + if (response == 28) { + response = inputStream.read(); // read second end byte + if (response == 13) { + // Responses must arrive in same order + cont = s.toString().contains(String.format("X%dX", i++)); + s.setLength(0); + latch.countDown(); + } + } else { + s.append((char) response); + } + } + } catch (IOException ignored) { + } + } + }); + t.start(); + + String in = "MSH|^~\\&|MYSENDER|MYRECEIVER|MYAPPLICATION||200612211200||QRY^A19|X%dX|P|2.4\r" + + "QRD|200612211200|R|I|GetPatient|||1^RD|0101701234|DEM||"; + for (int i = 0; i < messageCount; i++) { + String msg = String.format(in, i); + outputStream.write(11); + outputStream.flush(); + // Some systems send end bytes in a separate frame + // Thread.sleep(10); + outputStream.write(msg.getBytes()); + outputStream.flush(); + // Some systems send end bytes in a separate frame + // Thread.sleep(10); + outputStream.write(28); + outputStream.write(13); + outputStream.flush(); + // Potentially wait after message + // Thread.sleep(10); + } + + boolean success = latch.await(20, TimeUnit.SECONDS); + + outputStream.close(); + inputStream.close(); + socket.close(); + + assertTrue(success); + } + +} From f1d01aca3d9a4224bb8a5e6a23d9600f2e21790e Mon Sep 17 00:00:00 2001 From: christian ohr Date: Thu, 16 Mar 2017 14:44:10 +0100 Subject: [PATCH 2/3] remove unused imports --- .../camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java index 17133cc4b7c23..f1e5ba35156aa 100644 --- a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java +++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java @@ -17,10 +17,6 @@ package org.apache.camel.component.hl7; import ca.uhn.hl7v2.model.Message; -import ca.uhn.hl7v2.model.v24.message.ADR_A19; -import ca.uhn.hl7v2.model.v24.segment.MSA; -import ca.uhn.hl7v2.model.v24.segment.MSH; -import ca.uhn.hl7v2.model.v24.segment.QRD; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; From 87affbfc3d35702c5c1baa67e179b1d94ab425b9 Mon Sep 17 00:00:00 2001 From: christian ohr Date: Thu, 16 Mar 2017 21:24:40 +0100 Subject: [PATCH 3/3] make checkstyle happy --- .../camel/component/hl7/HL7MLLPConfig.java | 14 +++++++------- .../hl7/HL7MLLPCodecMessageFloodingTest.java | 18 +++++++++--------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java index 5f580a75443cf..88d58f0d26c4c 100644 --- a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java +++ b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java @@ -42,10 +42,10 @@ public class HL7MLLPConfig { private boolean produceString = true; - private CodingErrorAction malformedInputErrorAction = CodingErrorAction.REPORT; - + private CodingErrorAction malformedInputErrorAction = CodingErrorAction.REPORT; + private CodingErrorAction unmappableCharacterErrorAction = CodingErrorAction.REPORT; - + public Charset getCharset() { return charset; } @@ -118,20 +118,20 @@ public boolean isProduceString() { public void setProduceString(boolean produceString) { this.produceString = produceString; } - + public CodingErrorAction getMalformedInputErrorAction() { return malformedInputErrorAction; } - public void setMalformedInputErrorAction(CodingErrorAction malformedInputErrorAction) { + public void setMalformedInputErrorAction(CodingErrorAction malformedInputErrorAction) { this.malformedInputErrorAction = malformedInputErrorAction; } - + public CodingErrorAction getUnmappableCharacterErrorAction() { return unmappableCharacterErrorAction; } public void setUnmappableCharacterErrorAction(CodingErrorAction unmappableCharacterErrorAction) { this.unmappableCharacterErrorAction = unmappableCharacterErrorAction; - } + } } diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java index f1e5ba35156aa..5b085846a84bc 100644 --- a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java +++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java @@ -16,13 +16,6 @@ */ package org.apache.camel.component.hl7; -import ca.uhn.hl7v2.model.Message; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.JndiRegistry; -import org.junit.Test; - import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; @@ -32,6 +25,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import ca.uhn.hl7v2.model.Message; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; + /** * Unit test for the HL7MLLP Codec. */ @@ -103,8 +103,8 @@ public void run() { }); t.start(); - String in = "MSH|^~\\&|MYSENDER|MYRECEIVER|MYAPPLICATION||200612211200||QRY^A19|X%dX|P|2.4\r" + - "QRD|200612211200|R|I|GetPatient|||1^RD|0101701234|DEM||"; + String in = "MSH|^~\\&|MYSENDER|MYRECEIVER|MYAPPLICATION||200612211200||QRY^A19|X%dX|P|2.4\r" + + "QRD|200612211200|R|I|GetPatient|||1^RD|0101701234|DEM||"; for (int i = 0; i < messageCount; i++) { String msg = String.format(in, i); outputStream.write(11);