Skip to content

Commit

Permalink
feat: Fixes timestamps and more thorough testing (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Jan 29, 2021
1 parent 0faf688 commit ab6a124
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ public static InternalRow toInternalRow(
msg.offset().value(),
ByteArray.concat(msg.message().key().toByteArray()),
ByteArray.concat(msg.message().data().toByteArray()),
Timestamps.toMillis(msg.publishTime()),
Timestamps.toMicros(msg.publishTime()),
msg.message().eventTime().isPresent()
? Timestamps.toMillis(msg.message().eventTime().get())
? Timestamps.toMicros(msg.message().eventTime().get())
: null,
convertAttributesToSparkMap(msg.message().attributes())));
return InternalRow.apply(asScalaBufferConverter(list).asScala());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,56 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import java.nio.charset.StandardCharsets;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
import org.apache.spark.sql.types.DataTypes;
import org.junit.Test;

public class PslSparkUtilsTest {

@Test
public void testToInternalRow() {
Timestamp publishTimestamp = Timestamp.newBuilder().setSeconds(20000000L).setNanos(20).build();
Timestamp eventTimestamp = Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build();
Message message =
Message.builder()
.setKey(ByteString.copyFromUtf8("key"))
.setData(ByteString.copyFromUtf8("data"))
.setEventTime(Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build())
.setEventTime(eventTimestamp)
.setAttributes(
ImmutableListMultimap.of(
"key1", ByteString.copyFromUtf8("val1"),
"key1", ByteString.copyFromUtf8("val2"),
"key2", ByteString.copyFromUtf8("val3")))
.build();
SequencedMessage sequencedMessage =
SequencedMessage.of(
message,
Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build(),
Offset.of(10L),
10L);
PslSparkUtils.toInternalRow(
sequencedMessage,
UnitTestExamples.exampleSubscriptionPath(),
UnitTestExamples.examplePartition());
SequencedMessage.of(message, publishTimestamp, UnitTestExamples.exampleOffset(), 10L);
InternalRow row =
PslSparkUtils.toInternalRow(
sequencedMessage,
UnitTestExamples.exampleSubscriptionPath(),
UnitTestExamples.examplePartition());
assertThat(row.getString(0)).isEqualTo(UnitTestExamples.exampleSubscriptionPath().toString());
assertThat(row.getLong(1)).isEqualTo(UnitTestExamples.examplePartition().value());
assertThat(row.getLong(2)).isEqualTo(UnitTestExamples.exampleOffset().value());
assertThat(row.getBinary(3)).isEqualTo("key".getBytes(StandardCharsets.UTF_8));
assertThat(row.getBinary(4)).isEqualTo("data".getBytes(StandardCharsets.UTF_8));
assertThat(row.getLong(5)).isEqualTo(Timestamps.toMicros(publishTimestamp));
assertThat(row.getLong(6)).isEqualTo(Timestamps.toMicros(eventTimestamp));
ArrayData keys = row.getMap(7).keyArray();
ArrayData values = row.getMap(7).valueArray();
assertThat(keys.get(0, DataTypes.StringType).toString()).isEqualTo("key1");
assertThat(keys.get(1, DataTypes.StringType).toString()).isEqualTo("key2");
GenericArrayData valueOfKey1 =
(GenericArrayData) values.get(0, DataTypes.createArrayType(DataTypes.BinaryType));
GenericArrayData valueOfKey2 =
(GenericArrayData) values.get(1, DataTypes.createArrayType(DataTypes.BinaryType));
assertThat(valueOfKey1.getBinary(0)).isEqualTo("val1".getBytes(StandardCharsets.UTF_8));
assertThat(valueOfKey1.getBinary(1)).isEqualTo("val2".getBytes(StandardCharsets.UTF_8));
assertThat(valueOfKey2.getBinary(0)).isEqualTo("val3".getBytes(StandardCharsets.UTF_8));
}

@Test
Expand Down

0 comments on commit ab6a124

Please sign in to comment.