Skip to content

Commit

Permalink
feat(broker): a message can be published idempotent
Browse files Browse the repository at this point in the history
* new field 'messageId' in message record
* reject message if another message with the same id, name and
correlation-key is already published

related to #1012
  • Loading branch information
saig0 committed Jul 12, 2018
1 parent 613ba9b commit 76ddabe
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,72 @@
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.subscription.message.data.MessageRecord;
import io.zeebe.broker.subscription.message.state.MessageDataStore;
import io.zeebe.broker.subscription.message.state.MessageDataStore.MessageEntry;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.intent.MessageIntent;

public class PublishMessageProcessor implements TypedRecordProcessor<MessageRecord> {

private final MessageDataStore dataStore;

private MessageEntry entry;

private boolean isDuplicate;
private String rejectionReason;

public PublishMessageProcessor(MessageDataStore dataStore) {
this.dataStore = dataStore;
}

@Override
public void processRecord(TypedRecord<MessageRecord> record) {
// a message is always accepted
isDuplicate = false;

final MessageRecord message = record.getValue();
entry =
new MessageEntry(
bufferAsString(message.getName()),
bufferAsString(message.getCorrelationKey()),
message.getPayload().byteArray(),
message.hasMessageId() ? bufferAsString(message.getMessageId()) : null);

if (message.hasMessageId() && dataStore.hasMessage(entry)) {
isDuplicate = true;

rejectionReason =
String.format(
"message with id '%s' is already published", bufferAsString(message.getMessageId()));
}
}

@Override
public boolean executeSideEffects(
TypedRecord<MessageRecord> record, TypedResponseWriter responseWriter) {

if (isDuplicate) {
return responseWriter.writeRejection(record, RejectionType.BAD_VALUE, rejectionReason);
}

return responseWriter.writeRecord(MessageIntent.PUBLISHED, record);
}

@Override
public void updateState(TypedRecord<MessageRecord> record) {
final MessageRecord message = record.getValue();
public long writeRecord(TypedRecord<MessageRecord> record, TypedStreamWriter writer) {

dataStore.addMessage(
new MessageEntry(
bufferAsString(message.getName()),
bufferAsString(message.getCorrelationKey()),
message.getPayload().byteArray()));
if (isDuplicate) {
return writer.writeRejection(record, RejectionType.BAD_VALUE, rejectionReason);
}

return writer.writeFollowUpEvent(record.getKey(), MessageIntent.PUBLISHED, record.getValue());
}

@Override
public void updateState(TypedRecord<MessageRecord> record) {
if (!isDuplicate) {
dataStore.addMessage(entry);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ public class MessageRecord extends UnpackedObject {

private final StringProperty nameProp = new StringProperty("name");
private final StringProperty correlationKeyProp = new StringProperty("correlationKey");

private final DocumentProperty payloadProp = new DocumentProperty("payload");
private final StringProperty messageIdProp = new StringProperty("messageId", "");

public MessageRecord() {
this.declareProperty(nameProp).declareProperty(correlationKeyProp).declareProperty(payloadProp);
this.declareProperty(nameProp)
.declareProperty(correlationKeyProp)
.declareProperty(payloadProp)
.declareProperty(messageIdProp);
}

public DirectBuffer getName() {
Expand All @@ -43,4 +48,12 @@ public DirectBuffer getCorrelationKey() {
public DirectBuffer getPayload() {
return payloadProp.getValue();
}

public boolean hasMessageId() {
return messageIdProp.hasValue();
}

public DirectBuffer getMessageId() {
return messageIdProp.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ public void addMessage(MessageEntry message) {
getData().messages.add(message);
}

public boolean hasMessage(MessageEntry message) {
return getData()
.messages
.stream()
.anyMatch(
m ->
m.getId() != null
&& m.getId().equals(message.getId())
&& m.getName().equals(message.getName())
&& m.getCorrelationKey().equals(message.getCorrelationKey()));
}

public static class MessageData {

private List<MessageEntry> messages = new ArrayList<>();
Expand All @@ -41,11 +53,13 @@ public static class MessageEntry {
private final String name;
private final String correlationKey;
private final byte[] payload;
private final String id;

public MessageEntry(String name, String correlationKey, byte[] payload) {
public MessageEntry(String name, String correlationKey, byte[] payload, String id) {
this.name = name;
this.correlationKey = correlationKey;
this.payload = payload;
this.id = id;
}

public String getName() {
Expand All @@ -59,5 +73,9 @@ public String getCorrelationKey() {
public byte[] getPayload() {
return payload;
}

public String getId() {
return id;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@
package io.zeebe.broker.subscription.message;

import static io.zeebe.msgpack.spec.MsgPackHelper.EMTPY_OBJECT;
import static io.zeebe.test.broker.protocol.clientapi.TestTopicClient.intent;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.entry;

import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.broker.test.MsgPackUtil;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.MessageIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandResponse;
import io.zeebe.test.broker.protocol.clientapi.SubscribedRecord;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
Expand All @@ -53,12 +57,29 @@ public void shouldPublishMessage() {
.done()
.sendAndAwait();

assertThat(response.recordType()).isEqualTo(RecordType.EVENT);
assertThat(response.intent()).isEqualTo(MessageIntent.PUBLISHED);
assertThat(response.getValue())
.containsExactly(
entry("name", "order canceled"),
entry("correlationKey", "order-123"),
entry("payload", EMTPY_OBJECT));
entry("payload", EMTPY_OBJECT),
entry("messageId", ""));

final SubscribedRecord publishedEvent =
apiRule
.topic()
.receiveEvents()
.filter(intent(MessageIntent.PUBLISHED))
.findFirst()
.orElseThrow(() -> new AssertionError("no follow-up event found"));

assertThat(publishedEvent.value())
.containsExactly(
entry("name", "order canceled"),
entry("correlationKey", "order-123"),
entry("payload", EMTPY_OBJECT),
entry("messageId", ""));
}

@Test
Expand All @@ -79,6 +100,79 @@ public void shouldPublishMessageWithPayload() {
assertThat(response.getValue()).contains(entry("payload", MsgPackUtil.MSGPACK_PAYLOAD));
}

@Test
public void shouldPublishMessageWithMessageId() {

final ExecuteCommandResponse response =
apiRule
.createCmdRequest()
.type(ValueType.MESSAGE, MessageIntent.PUBLISH)
.command()
.put("name", "order canceled")
.put("correlationKey", "order-123")
.put("messageId", "msg-1")
.done()
.sendAndAwait();

assertThat(response.intent()).isEqualTo(MessageIntent.PUBLISHED);
assertThat(response.getValue()).contains(entry("messageId", "msg-1"));
}

@Test
public void shouldPublishSecondMessageWithDifferenId() {

publishMessage("order canceled", "order-123", "msg-1");

final ExecuteCommandResponse response = publishMessage("order canceled", "order-123", "msg-2");

assertThat(response.intent()).isEqualTo(MessageIntent.PUBLISHED);
}

@Test
public void shouldPublishSecondMessageWithDifferentName() {

publishMessage("order canceled", "order-123", "msg-1");

final ExecuteCommandResponse response = publishMessage("order shipped", "order-123", "msg-1");

assertThat(response.intent()).isEqualTo(MessageIntent.PUBLISHED);
}

@Test
public void shouldPublishSecondMessageWithDiffentCorrelationKey() {

publishMessage("order canceled", "order-123", "msg-1");

final ExecuteCommandResponse response = publishMessage("order canceled", "order-456", "msg-1");

assertThat(response.intent()).isEqualTo(MessageIntent.PUBLISHED);
}

@Test
public void shouldRejectToPublishSameMessageAgain() {

publishMessage("order canceled", "order-123", "msg-1");

final ExecuteCommandResponse response = publishMessage("order canceled", "order-123", "msg-1");

assertThat(response.recordType()).isEqualTo(RecordType.COMMAND_REJECTION);
assertThat(response.rejectionType()).isEqualTo(RejectionType.BAD_VALUE);
assertThat(response.rejectionReason())
.isEqualTo("message with id 'msg-1' is already published");

final SubscribedRecord rejection =
apiRule
.topic()
.receiveRejections()
.filter(intent(MessageIntent.PUBLISH))
.findFirst()
.orElseThrow(() -> new AssertionError("no rejection found"));

assertThat(rejection.rejectionType()).isEqualTo(RejectionType.BAD_VALUE);
assertThat(rejection.rejectionReason())
.isEqualTo("message with id 'msg-1' is already published");
}

@Test
public void shouldFailToPublishMessageWithoutName() {

Expand Down Expand Up @@ -108,4 +202,18 @@ public void shouldFailToPublishMessageWithoutCorrelationKey() {
assertThatThrownBy(() -> request.sendAndAwait())
.hasMessageContaining("Property 'correlationKey' has no valid value");
}

private ExecuteCommandResponse publishMessage(
String name, String correlationKey, String messageId) {

return apiRule
.createCmdRequest()
.type(ValueType.MESSAGE, MessageIntent.PUBLISH)
.command()
.put("name", name)
.put("correlationKey", correlationKey)
.put("messageId", messageId)
.done()
.sendAndAwait();
}
}

0 comments on commit 76ddabe

Please sign in to comment.