diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index e20d4248..d4ec5697 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -162,7 +162,9 @@ public void put(Collection sinkRecords) { attributes.put( ConnectorUtils.KAFKA_PARTITION_ATTRIBUTE, record.kafkaPartition().toString()); attributes.put(ConnectorUtils.KAFKA_OFFSET_ATTRIBUTE, Long.toString(record.kafkaOffset())); - attributes.put(ConnectorUtils.KAFKA_TIMESTAMP_ATTRIBUTE, record.timestamp().toString()); + if (record.timestamp() != null) { + attributes.put(ConnectorUtils.KAFKA_TIMESTAMP_ATTRIBUTE, record.timestamp().toString()); + } } if (includeHeaders) { for (Header header : getRecordHeaders(record)) { diff --git a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java index e68270ca..a063e424 100644 --- a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java +++ b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java @@ -407,9 +407,20 @@ public void testKafkaMetadata() { 1001, 50001L, TimestampType.CREATE_TIME)); + records.add( + new SinkRecord( + KAFKA_TOPIC, + 4, + STRING_SCHEMA, + KAFKA_MESSAGE_KEY, + BYTE_STRING_SCHEMA, + KAFKA_MESSAGE2, + 1002, + null, + TimestampType.CREATE_TIME)); task.put(records); ArgumentCaptor captor = ArgumentCaptor.forClass(PubsubMessage.class); - verify(publisher, times(2)).publish(captor.capture()); + verify(publisher, times(3)).publish(captor.capture()); List requestArgs = captor.getAllValues(); @@ -430,6 +441,13 @@ public void testKafkaMetadata() { attributes2.put(ConnectorUtils.KAFKA_TIMESTAMP_ATTRIBUTE, "50001"); expectedMessages.add( PubsubMessage.newBuilder().putAllAttributes(attributes2).setData(KAFKA_MESSAGE2).build()); + Map attributes3 = new HashMap<>(); + attributes3.put(ConnectorUtils.CPS_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY); + attributes3.put(ConnectorUtils.KAFKA_TOPIC_ATTRIBUTE, KAFKA_TOPIC); + attributes3.put(ConnectorUtils.KAFKA_PARTITION_ATTRIBUTE, "4"); + attributes3.put(ConnectorUtils.KAFKA_OFFSET_ATTRIBUTE, "1002"); + expectedMessages.add( + PubsubMessage.newBuilder().putAllAttributes(attributes3).setData(KAFKA_MESSAGE2).build()); assertEquals(requestArgs, expectedMessages); }