Skip to content

Commit

Permalink
Add more write() methods to Utf8ByteXmppXmlSplitter
Browse files Browse the repository at this point in the history
  • Loading branch information
Flowdalic committed Oct 21, 2018
1 parent 9e30141 commit ccda5ac
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 12 deletions.
Expand Up @@ -19,6 +19,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;

/**
* Extended version of {@link XmppXmlSplitter} allowing input to be bytes or
Expand Down Expand Up @@ -54,18 +56,121 @@ public Utf8ByteXmppXmlSplitter(XmppXmlSplitter xmppXmlSplitter) {
this.xmppXmlSplitter = xmppXmlSplitter;
}

private final char[] writeBuffer = new char[2];
private final byte[] buffer = new byte[6];

private char[] writeBuffer = new char[1024];
private int writeBufferPos;
private byte count;
private byte expectedLength;

@Override
public void write(int b) throws IOException {
write((byte) (b & 0xff));
}

/**
* Write a single byte. The byte must be part of a UTF-8 String.
*
* @param b the byte to write.
* @throws IOException if an error occurs.
*/
public void write(byte b) throws IOException {
process(b);
afterInputProcessed();
}

/**
* Write the given array of byte buffers.
*
* @param byteBuffers the array of byte buffers.
* @throws IOException if an error occurs.
*/
public void write(ByteBuffer[] byteBuffers) throws IOException {
write(Arrays.asList(byteBuffers));
}

/**
* Write the given collection of byte buffers.
*
* @param byteBuffers the collection of byte buffers.
* @throws IOException if an error occurs.
*/
public void write(Collection<? extends ByteBuffer> byteBuffers) throws IOException {
int requiredNewCapacity = 0;
for (ByteBuffer byteBuffer : byteBuffers) {
requiredNewCapacity += byteBuffer.remaining();
}

ensureWriteBufferHasCapacityFor(requiredNewCapacity);

for (ByteBuffer byteBuffer : byteBuffers) {
final int remaining = byteBuffer.remaining();

if (byteBuffer.isDirect()) {
int initialPosition = byteBuffer.position();
for (int i = 0; i < remaining; i++) {
process(byteBuffer.get(initialPosition + i));
}
} else {
writeInternal(byteBuffer.array(), byteBuffer.position(), remaining);
}

byteBuffer.flip();
}

afterInputProcessed();
}

/**
* Write the given byte buffer.
*
* @param byteBuffer the byte buffer.
* @throws IOException if an error occurs.
*/
public void write(ByteBuffer byteBuffer) throws IOException {
final int remaining = byteBuffer.remaining();
ensureWriteBufferHasCapacityFor(remaining);

if (byteBuffer.isDirect()) {
int initialPosition = byteBuffer.position();
for (int i = 0; i < remaining; i++) {
process(byteBuffer.get(initialPosition + i));
}
} else {
writeInternal(byteBuffer.array(), byteBuffer.position(), remaining);
}

afterInputProcessed();

byteBuffer.flip();
}

@Override
public void write(byte[] b, int offset, int length) throws IOException {
ensureWriteBufferHasCapacityFor(length);

writeInternal(b, offset, length);

afterInputProcessed();
}

private void writeInternal(byte[] b, int offset, int length) throws IOException {
for (int i = 0; i < length; i++ ) {
process(b[offset + i]);
}
}

/**
* Reset the write buffer to the given size.
*
* @param size the new write buffer size.
*/
public void resetWriteBuffer(int size) {
writeBuffer = new char[size];
writeBufferPos = 0;
}

private void process(byte b) throws IOException {
buffer[count] = b;

if (count == 0) {
Expand Down Expand Up @@ -119,28 +224,41 @@ public void write(byte b) throws IOException {
}
}

int len;
ensureWriteBufferHasCapacityFor(2);

if (codepoint < 0x10000) {
len = 1;
writeBuffer[0] = (char) codepoint;
appendToWriteBuffer((char) codepoint);
} else {
// We have to convert the codepoint into a surrogate pair.
len = 2;
// high surrogate: top ten bits added to 0xd800 give the first 16-bit code unit.
writeBuffer[0] = (char) (0xd800 + (codepoint & 0xffa00000));
appendToWriteBuffer((char) (0xd800 + (codepoint & 0xffa00000)));
// low surrogate: low ten bits added to 0xdc00 give the second 16-bit code unit.
writeBuffer[1] = (char) (0xdc00 + (codepoint & 0x3ff));
appendToWriteBuffer((char) (0xdc00 + (codepoint & 0x3ff)));
}

xmppXmlSplitter.write(writeBuffer, 0, len);

// Reset count since we are done handling this UTF-8 codepoint.
count = 0;
}
}

@Override
public void write(int b) throws IOException {
write((byte) (b & 0xff));
private void afterInputProcessed() throws IOException {
xmppXmlSplitter.write(writeBuffer, 0, writeBufferPos);
writeBufferPos = 0;
}

private void appendToWriteBuffer(char c) {
writeBuffer[writeBufferPos++] = c;
}

private void ensureWriteBufferHasCapacityFor(int additionalCapacity) {
final int requiredCapacity = writeBufferPos + additionalCapacity;
if (requiredCapacity <= writeBuffer.length) {
return;
}

// Simple resize logic of write buffer.
char[] newWriteBuffer = new char[requiredCapacity];
System.arraycopy(writeBuffer, 0, newWriteBuffer, 0, writeBufferPos);
writeBuffer = newWriteBuffer;
}
}
Expand Up @@ -22,6 +22,7 @@
import static org.jxmpp.xml.splitter.XmlSplitterTestUtil.transform;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -81,10 +82,45 @@ public void onCompleteElement(String completeElement) {
assertEquals(nextElement, completeElement);
}
}));
// Size the write buffer small that a resize is likely and we are able to test the code.
splitter.resetWriteBuffer(1);
splitter.write(utf8bytes);
assertTrue(queue.isEmpty());
}

@Test
public void simpleByteBufferArrayWriteTest() throws IOException {
final String stanza1 = "<message from='foo' to='bar'><body>Hi there</body></message>";
final String stanza2 = "<message from='foo' to='bar'><body>My name is John</body></message>";
testUtf8ByteSplitterByteBufferArray(stanza1, stanza2);
}

private static void testUtf8ByteSplitterByteBufferArray(String... elements) throws IOException {
final Queue<String> queue = new ArrayDeque<>();
List<ByteBuffer> byteBufferArray = new ArrayList<>();
for (String element : elements) {
queue.add(element);
byte[] utf8bytes = element.getBytes("UTF-8");
ByteBuffer byteBuffer = ByteBuffer.wrap(utf8bytes);
byteBufferArray.add(byteBuffer);
}

// TODO This is basically duplicate code which is also found in
// XmlSplitterTestUtil and should be replaced by it.
@SuppressWarnings("resource")
Utf8ByteXmppXmlSplitter splitter = new Utf8ByteXmppXmlSplitter(transform(new CompleteElementCallback() {
@Override
public void onCompleteElement(String completeElement) {
String nextElement = queue.poll();
assertEquals(nextElement, completeElement);
}
}));
// Size the write buffer small that a resize is likely and we are able to test the code.
splitter.resetWriteBuffer(1);
splitter.write(byteBufferArray);
assertTrue(queue.isEmpty());
}

private static String forCodepoint(int codepoint) {
return new String(new int[] { codepoint }, 0, 1);
}
Expand Down

0 comments on commit ccda5ac

Please sign in to comment.