diff --git a/jxmpp-core/src/main/java/org/jxmpp/xml/splitter/Utf8ByteXmppXmlSplitter.java b/jxmpp-core/src/main/java/org/jxmpp/xml/splitter/Utf8ByteXmppXmlSplitter.java index d288859..9ff1b55 100644 --- a/jxmpp-core/src/main/java/org/jxmpp/xml/splitter/Utf8ByteXmppXmlSplitter.java +++ b/jxmpp-core/src/main/java/org/jxmpp/xml/splitter/Utf8ByteXmppXmlSplitter.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; /** * Extended version of {@link XmppXmlSplitter} allowing input to be bytes or @@ -54,11 +56,18 @@ public Utf8ByteXmppXmlSplitter(XmppXmlSplitter xmppXmlSplitter) { this.xmppXmlSplitter = xmppXmlSplitter; } - private final char[] writeBuffer = new char[2]; private final byte[] buffer = new byte[6]; + + private char[] writeBuffer = new char[1024]; + private int writeBufferPos; private byte count; private byte expectedLength; + @Override + public void write(int b) throws IOException { + write((byte) (b & 0xff)); + } + /** * Write a single byte. The byte must be part of a UTF-8 String. * @@ -66,6 +75,102 @@ public Utf8ByteXmppXmlSplitter(XmppXmlSplitter xmppXmlSplitter) { * @throws IOException if an error occurs. */ public void write(byte b) throws IOException { + process(b); + afterInputProcessed(); + } + + /** + * Write the given array of byte buffers. + * + * @param byteBuffers the array of byte buffers. + * @throws IOException if an error occurs. + */ + public void write(ByteBuffer[] byteBuffers) throws IOException { + write(Arrays.asList(byteBuffers)); + } + + /** + * Write the given collection of byte buffers. + * + * @param byteBuffers the collection of byte buffers. + * @throws IOException if an error occurs. + */ + public void write(Collection byteBuffers) throws IOException { + int requiredNewCapacity = 0; + for (ByteBuffer byteBuffer : byteBuffers) { + requiredNewCapacity += byteBuffer.remaining(); + } + + ensureWriteBufferHasCapacityFor(requiredNewCapacity); + + for (ByteBuffer byteBuffer : byteBuffers) { + final int remaining = byteBuffer.remaining(); + + if (byteBuffer.isDirect()) { + int initialPosition = byteBuffer.position(); + for (int i = 0; i < remaining; i++) { + process(byteBuffer.get(initialPosition + i)); + } + } else { + writeInternal(byteBuffer.array(), byteBuffer.position(), remaining); + } + + byteBuffer.flip(); + } + + afterInputProcessed(); + } + + /** + * Write the given byte buffer. + * + * @param byteBuffer the byte buffer. + * @throws IOException if an error occurs. + */ + public void write(ByteBuffer byteBuffer) throws IOException { + final int remaining = byteBuffer.remaining(); + ensureWriteBufferHasCapacityFor(remaining); + + if (byteBuffer.isDirect()) { + int initialPosition = byteBuffer.position(); + for (int i = 0; i < remaining; i++) { + process(byteBuffer.get(initialPosition + i)); + } + } else { + writeInternal(byteBuffer.array(), byteBuffer.position(), remaining); + } + + afterInputProcessed(); + + byteBuffer.flip(); + } + + @Override + public void write(byte[] b, int offset, int length) throws IOException { + ensureWriteBufferHasCapacityFor(length); + + writeInternal(b, offset, length); + + afterInputProcessed(); + } + + private void writeInternal(byte[] b, int offset, int length) throws IOException { + for (int i = 0; i < length; i++ ) { + process(b[offset + i]); + } + } + + /** + * Reset the write buffer to the given size. + * + * @param size the new write buffer size. + */ + public void resetWriteBuffer(int size) { + writeBuffer = new char[size]; + writeBufferPos = 0; + } + + private void process(byte b) throws IOException { buffer[count] = b; if (count == 0) { @@ -119,28 +224,41 @@ public void write(byte b) throws IOException { } } - int len; + ensureWriteBufferHasCapacityFor(2); + if (codepoint < 0x10000) { - len = 1; - writeBuffer[0] = (char) codepoint; + appendToWriteBuffer((char) codepoint); } else { // We have to convert the codepoint into a surrogate pair. - len = 2; // high surrogate: top ten bits added to 0xd800 give the first 16-bit code unit. - writeBuffer[0] = (char) (0xd800 + (codepoint & 0xffa00000)); + appendToWriteBuffer((char) (0xd800 + (codepoint & 0xffa00000))); // low surrogate: low ten bits added to 0xdc00 give the second 16-bit code unit. - writeBuffer[1] = (char) (0xdc00 + (codepoint & 0x3ff)); + appendToWriteBuffer((char) (0xdc00 + (codepoint & 0x3ff))); } - xmppXmlSplitter.write(writeBuffer, 0, len); - // Reset count since we are done handling this UTF-8 codepoint. count = 0; } } - @Override - public void write(int b) throws IOException { - write((byte) (b & 0xff)); + private void afterInputProcessed() throws IOException { + xmppXmlSplitter.write(writeBuffer, 0, writeBufferPos); + writeBufferPos = 0; + } + + private void appendToWriteBuffer(char c) { + writeBuffer[writeBufferPos++] = c; + } + + private void ensureWriteBufferHasCapacityFor(int additionalCapacity) { + final int requiredCapacity = writeBufferPos + additionalCapacity; + if (requiredCapacity <= writeBuffer.length) { + return; + } + + // Simple resize logic of write buffer. + char[] newWriteBuffer = new char[requiredCapacity]; + System.arraycopy(writeBuffer, 0, newWriteBuffer, 0, writeBufferPos); + writeBuffer = newWriteBuffer; } } diff --git a/jxmpp-core/src/test/java/org/jxmpp/xml/splitter/Utf8ByteXmppXmlSplitterTest.java b/jxmpp-core/src/test/java/org/jxmpp/xml/splitter/Utf8ByteXmppXmlSplitterTest.java index d458273..a587776 100644 --- a/jxmpp-core/src/test/java/org/jxmpp/xml/splitter/Utf8ByteXmppXmlSplitterTest.java +++ b/jxmpp-core/src/test/java/org/jxmpp/xml/splitter/Utf8ByteXmppXmlSplitterTest.java @@ -22,6 +22,7 @@ import static org.jxmpp.xml.splitter.XmlSplitterTestUtil.transform; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; @@ -81,10 +82,45 @@ public void onCompleteElement(String completeElement) { assertEquals(nextElement, completeElement); } })); + // Size the write buffer small that a resize is likely and we are able to test the code. + splitter.resetWriteBuffer(1); splitter.write(utf8bytes); assertTrue(queue.isEmpty()); } + @Test + public void simpleByteBufferArrayWriteTest() throws IOException { + final String stanza1 = "Hi there"; + final String stanza2 = "My name is John"; + testUtf8ByteSplitterByteBufferArray(stanza1, stanza2); + } + + private static void testUtf8ByteSplitterByteBufferArray(String... elements) throws IOException { + final Queue queue = new ArrayDeque<>(); + List byteBufferArray = new ArrayList<>(); + for (String element : elements) { + queue.add(element); + byte[] utf8bytes = element.getBytes("UTF-8"); + ByteBuffer byteBuffer = ByteBuffer.wrap(utf8bytes); + byteBufferArray.add(byteBuffer); + } + + // TODO This is basically duplicate code which is also found in + // XmlSplitterTestUtil and should be replaced by it. + @SuppressWarnings("resource") + Utf8ByteXmppXmlSplitter splitter = new Utf8ByteXmppXmlSplitter(transform(new CompleteElementCallback() { + @Override + public void onCompleteElement(String completeElement) { + String nextElement = queue.poll(); + assertEquals(nextElement, completeElement); + } + })); + // Size the write buffer small that a resize is likely and we are able to test the code. + splitter.resetWriteBuffer(1); + splitter.write(byteBufferArray); + assertTrue(queue.isEmpty()); + } + private static String forCodepoint(int codepoint) { return new String(new int[] { codepoint }, 0, 1); }