Skip to content

Commit

Permalink
fix: Handle null timestamp (#234)
Browse files Browse the repository at this point in the history
* fix: Update dependencies to versions without known security vulnerabilities.

* fix: Update Mockito dependencies to versions without known security vulnerabilities.

* fix: Handle null message gracefully.

* fix: Fix issues with ProjectTopicName not being defined by removing sharding of guava library

* fix: Handle null timestamp
  • Loading branch information
kamalaboulhosn committed Sep 3, 2020
1 parent 537b7e0 commit de8b79e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ public void put(Collection<SinkRecord> sinkRecords) {
attributes.put(
ConnectorUtils.KAFKA_PARTITION_ATTRIBUTE, record.kafkaPartition().toString());
attributes.put(ConnectorUtils.KAFKA_OFFSET_ATTRIBUTE, Long.toString(record.kafkaOffset()));
attributes.put(ConnectorUtils.KAFKA_TIMESTAMP_ATTRIBUTE, record.timestamp().toString());
if (record.timestamp() != null) {
attributes.put(ConnectorUtils.KAFKA_TIMESTAMP_ATTRIBUTE, record.timestamp().toString());
}
}
if (includeHeaders) {
for (Header header : getRecordHeaders(record)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,20 @@ public void testKafkaMetadata() {
1001,
50001L,
TimestampType.CREATE_TIME));
records.add(
new SinkRecord(
KAFKA_TOPIC,
4,
STRING_SCHEMA,
KAFKA_MESSAGE_KEY,
BYTE_STRING_SCHEMA,
KAFKA_MESSAGE2,
1002,
null,
TimestampType.CREATE_TIME));
task.put(records);
ArgumentCaptor<PubsubMessage> captor = ArgumentCaptor.forClass(PubsubMessage.class);
verify(publisher, times(2)).publish(captor.capture());
verify(publisher, times(3)).publish(captor.capture());
List<PubsubMessage> requestArgs = captor.getAllValues();


Expand All @@ -430,6 +441,13 @@ public void testKafkaMetadata() {
attributes2.put(ConnectorUtils.KAFKA_TIMESTAMP_ATTRIBUTE, "50001");
expectedMessages.add(
PubsubMessage.newBuilder().putAllAttributes(attributes2).setData(KAFKA_MESSAGE2).build());
Map<String, String> attributes3 = new HashMap<>();
attributes3.put(ConnectorUtils.CPS_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY);
attributes3.put(ConnectorUtils.KAFKA_TOPIC_ATTRIBUTE, KAFKA_TOPIC);
attributes3.put(ConnectorUtils.KAFKA_PARTITION_ATTRIBUTE, "4");
attributes3.put(ConnectorUtils.KAFKA_OFFSET_ATTRIBUTE, "1002");
expectedMessages.add(
PubsubMessage.newBuilder().putAllAttributes(attributes3).setData(KAFKA_MESSAGE2).build());

assertEquals(requestArgs, expectedMessages);
}
Expand Down

0 comments on commit de8b79e

Please sign in to comment.