Skip to content

Commit

Permalink
Support different charsets for TextPayload
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Nov 24, 2021
1 parent 52d06a1 commit 8576dc1
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.base.model.common;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -61,25 +62,46 @@ public static ByteBuffer empty() {
return ByteBuffer.allocate(0);
}


/**
* Creates a string from the ByteBuffer.
* Transforms the given String to a ByteBuffer assuming UTF-8 charset.
*
* @param string the string to transform.
* @return the bytebuffer.
*/
@Nullable
public static ByteBuffer fromUtf8String(@Nullable final String string) {
if (null == string) {
return null;
}
return ByteBuffer.wrap(string.getBytes(StandardCharsets.UTF_8));
}


/**
* Creates a string from the ByteBuffer assuming UTF-8 charset.
*
* @param byteBuffer the ByteBuffer to decode.
* @return the ByteBuffer in UTF-8 representation or {@code null} if it was null.
*/
@Nullable
public static String toUtf8String(@Nullable final ByteBuffer byteBuffer) {
if (null == byteBuffer) {
return null;
}
return StandardCharsets.UTF_8.decode(byteBuffer.asReadOnlyBuffer()).toString();
return toString(byteBuffer, StandardCharsets.UTF_8);
}

public static ByteBuffer fromUtf8String(@Nullable final String string) {
if (null == string) {

/**
* Creates a string from the ByteBuffer using the given charset.
*
* @param value the ByteBuffer to decode.
* @return the ByteBuffer or {@code null} if it was null.
*/
@Nullable
public static String toString(final ByteBuffer value, final Charset charset) {
if (null == value) {
return null;
}
return ByteBuffer.wrap(string.getBytes(StandardCharsets.UTF_8));
return charset.decode(value.asReadOnlyBuffer()).toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -24,9 +25,11 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.eclipse.ditto.base.model.common.ByteBufferUtils;
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.contenttype.ContentType;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;
Expand Down Expand Up @@ -114,8 +117,11 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
value, messageHeaders, key
);

final Charset charset = CharsetDeterminer.getInstance()
.apply(messageHeaders.get(DittoHeaderDefinition.CONTENT_TYPE.getKey()));

final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(messageHeaders)
.withTextAndBytes(ByteBufferUtils.toUtf8String(value), value)
.withTextAndBytes(ByteBufferUtils.toString(value, charset), value)
.withAuthorizationContext(source.getAuthorizationContext())
.withEnforcement(headerEnforcementFilterFactory.getFilter(messageHeaders))
.withHeaderMapping(source.getHeaderMapping())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,46 @@ public void messageWithBytepayloadIsTransformedToExternalMessage() {
assertThat(externalMessage.getHeaders().get(KAFKA_TIMESTAMP.getName())).isEqualTo(Long.toString(TIMESTAMP));
}

@Test
public void temp() {
final String original = "Test";
final byte[] utf8Bytes = original.getBytes(StandardCharsets.UTF_8);
final byte[] originalBytes = original.getBytes(StandardCharsets.UTF_16);
System.out.println(new String(utf8Bytes, StandardCharsets.UTF_8));
System.out.println(new String(originalBytes, StandardCharsets.UTF_8));
System.out.println(new String(originalBytes, StandardCharsets.UTF_16));
}

@Test
public void messageWithUTF16CharsetTransformedToExternalMessage() {
final String deviceId = "ditto:test-device";
final RecordHeaders headers =
new RecordHeaders(List.of(new RecordHeader("device_id", deviceId.getBytes(StandardCharsets.UTF_8)),
new RecordHeader("content-type",
"text/plain; charset=utf-16".getBytes(StandardCharsets.UTF_8))));
final String original = "Test";
final byte[] utf16Bytes = original.getBytes(StandardCharsets.UTF_16);
final ByteBuffer bytePayload = ByteBuffer.wrap(utf16Bytes);
final ConsumerRecord<String, ByteBuffer> consumerRecord = mock(ConsumerRecord.class);
when(consumerRecord.headers()).thenReturn(headers);
when(consumerRecord.key()).thenReturn("someKey");
when(consumerRecord.value()).thenReturn(bytePayload);
when(consumerRecord.topic()).thenReturn("someTopic");
when(consumerRecord.timestamp()).thenReturn(TIMESTAMP);
final TransformationResult transformResult = underTest.transform(consumerRecord);

assertThat(transformResult).isNotNull();
assertThat(transformResult.getExternalMessage()).isPresent();
final ExternalMessage externalMessage = transformResult.getExternalMessage().get();
assertThat(externalMessage.isTextMessage()).isTrue();
assertThat(externalMessage.isBytesMessage()).isTrue();
assertThat(externalMessage.getTextPayload()).contains("Test");
assertThat(externalMessage.getBytePayload()).contains(bytePayload);
assertThat(externalMessage.getHeaders().get(KAFKA_TOPIC.getName())).isEqualTo("someTopic");
assertThat(externalMessage.getHeaders().get(KAFKA_KEY.getName())).isEqualTo("someKey");
assertThat(externalMessage.getHeaders().get(KAFKA_TIMESTAMP.getName())).isEqualTo(Long.toString(TIMESTAMP));
}

@Test
public void transformWithoutDeviceIdHeaderCausesDittoRuntimeException() {
final RecordHeaders headers = new RecordHeaders(List.of());
Expand Down

0 comments on commit 8576dc1

Please sign in to comment.