Skip to content

Commit

Permalink
fix: removing delivery attempt attribute when dead lettering is not e…
Browse files Browse the repository at this point in the history
…nabled (#72)

* fix: creating fix to not populate delivery attempt attribute when dead
lettering is not enabled

* Adding unit test for case in which a received message has no delivery attempt
  • Loading branch information
hannahrogers-google committed Jan 30, 2020
1 parent c618bc8 commit 535854d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,16 @@ private void processBatch(List<OutstandingMessage> batch) {
}

private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
return PubsubMessage.newBuilder(receivedMessage.getMessage())
.putAttributes(
"googclient_deliveryattempt", Integer.toString(receivedMessage.getDeliveryAttempt()))
.build();
PubsubMessage originalMessage = receivedMessage.getMessage();
int deliveryAttempt = receivedMessage.getDeliveryAttempt();
// Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In
// this case, do not populate the PubsubMessage with the delivery attempt attribute.
if (deliveryAttempt > 0) {
return PubsubMessage.newBuilder(originalMessage)
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build();
}
return originalMessage;
}

private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,11 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver)
}

/** Returns the delivery attempt count for a received {@link PubsubMessage} */
public static int getDeliveryAttempt(PubsubMessage message) {
return Integer.parseInt(message.getAttributesOrDefault("googclient_deliveryattempt", "0"));
public static Integer getDeliveryAttempt(PubsubMessage message) {
if (!message.containsAttributes("googclient_deliveryattempt")) {
return null;
}
return Integer.parseInt(message.getAttributesOrThrow("googclient_deliveryattempt"));
}

/** Subscription which the subscriber is subscribed to. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.google.cloud.pubsub.v1;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
Expand Down Expand Up @@ -59,6 +61,7 @@ public void run() {
private List<ModAckItem> sentModAcks;
private FakeClock clock;
private FlowController flowController;
private boolean messageContainsDeliveryAttempt;

@AutoValue
abstract static class ModAckItem {
Expand All @@ -82,8 +85,13 @@ public void setUp() {
@Override
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
assertThat(message.getData()).isEqualTo(MESSAGE_DATA);
assertThat(message.getAttributesOrThrow("googclient_deliveryattempt"))
.isEqualTo(Integer.toString(DELIVERY_INFO_COUNT));
if (messageContainsDeliveryAttempt) {
assertTrue(message.containsAttributes("googclient_deliveryattempt"));
assertThat(message.getAttributesOrThrow("googclient_deliveryattempt"))
.isEqualTo(Integer.toString(DELIVERY_INFO_COUNT));
} else {
assertFalse(message.containsAttributes("googclient_deliveryattempt"));
}
consumers.add(consumer);
}
};
Expand Down Expand Up @@ -126,6 +134,8 @@ public void sendAckOperations(
systemExecutor,
clock);
dispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);

messageContainsDeliveryAttempt = true;
}

@Test
Expand All @@ -136,6 +146,22 @@ public void testReceipt() {
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
}

@Test
public void testReceiptNoDeliveryAttempt() {
messageContainsDeliveryAttempt = false;
ReceivedMessage messageNoDeliveryAttempt =
ReceivedMessage.newBuilder()
.setAckId("ackid")
.setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build())
.build();
dispatcher.processReceivedMessages(Collections.singletonList(messageNoDeliveryAttempt));
dispatcher.processOutstandingAckOperations();
assertThat(sentModAcks)
.contains(
ModAckItem.of(
messageNoDeliveryAttempt.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
}

@Test
public void testAck() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,16 @@ public void tearDown() throws Exception {

@Test
public void testDeliveryAttemptHelper() {
int deliveryAttempt = 3;
Integer deliveryAttempt = 3;
PubsubMessage message =
PubsubMessage.newBuilder()
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build();
assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt);

// In the case where delivery attempt attribute is not populated, expect null
PubsubMessage emptyMessage = PubsubMessage.newBuilder().build();
assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), 0);
assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), null);
}

@Test
Expand Down

0 comments on commit 535854d

Please sign in to comment.