Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
nklmish committed Apr 12, 2018
1 parent 70a276c commit 37b1327
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 94 deletions.
Expand Up @@ -21,7 +21,6 @@
import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.header.internals.RecordHeaders;
import org.axonframework.common.Assert; import org.axonframework.common.Assert;
import org.axonframework.eventhandling.EventMessage; import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventsourcing.DomainEventMessage; import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.SerializedObject;


Expand Down Expand Up @@ -198,7 +197,7 @@ public static String generateMetadataKey(String key) {
} }


/** /**
* Extracts real key name used to send Axon metadata. * Extracts actual key name used to send Axon metadata.
* E.g.<code>"axon-metadata-foo"</code> will extract <code>foo</code> * E.g.<code>"axon-metadata-foo"</code> will extract <code>foo</code>
* *
* @param metadataKey the generated metadata key. * @param metadataKey the generated metadata key.
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.axonframework.eventsourcing.DomainEventMessage; import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.SerializedObject;


import static org.assertj.core.api.Assertions.assertThat;
import static org.axonframework.kafka.eventhandling.HeaderUtils.generateMetadataKey; import static org.axonframework.kafka.eventhandling.HeaderUtils.generateMetadataKey;
import static org.axonframework.kafka.eventhandling.HeaderUtils.valueAsLong; import static org.axonframework.kafka.eventhandling.HeaderUtils.valueAsLong;
import static org.axonframework.kafka.eventhandling.HeaderUtils.valueAsString; import static org.axonframework.kafka.eventhandling.HeaderUtils.valueAsString;
Expand All @@ -31,9 +32,6 @@
import static org.axonframework.messaging.Headers.MESSAGE_REVISION; import static org.axonframework.messaging.Headers.MESSAGE_REVISION;
import static org.axonframework.messaging.Headers.MESSAGE_TIMESTAMP; import static org.axonframework.messaging.Headers.MESSAGE_TIMESTAMP;
import static org.axonframework.messaging.Headers.MESSAGE_TYPE; import static org.axonframework.messaging.Headers.MESSAGE_TYPE;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;


/** /**
* Util for asserting Kafka headers sent via Axon. * Util for asserting Kafka headers sent via Axon.
Expand All @@ -48,18 +46,18 @@ private HeaderAssertUtils() {


static void assertEventHeaders(String metaKey, EventMessage<?> evt, SerializedObject<byte[]> so, static void assertEventHeaders(String metaKey, EventMessage<?> evt, SerializedObject<byte[]> so,
Headers headers) { Headers headers) {
assertThat(headers.toArray().length, greaterThanOrEqualTo(5)); assertThat(headers.toArray().length).isGreaterThanOrEqualTo(5);
assertThat(valueAsString(headers, MESSAGE_ID), is(evt.getIdentifier())); assertThat(valueAsString(headers, MESSAGE_ID)).isEqualTo(evt.getIdentifier());
assertThat(valueAsLong(headers, MESSAGE_TIMESTAMP), is(evt.getTimestamp().toEpochMilli())); assertThat(valueAsLong(headers, MESSAGE_TIMESTAMP)).isEqualTo(evt.getTimestamp().toEpochMilli());
assertThat(valueAsString(headers, MESSAGE_TYPE), is(so.getType().getName())); assertThat(valueAsString(headers, MESSAGE_TYPE)).isEqualTo(so.getType().getName());
assertThat(valueAsString(headers, MESSAGE_REVISION), is(so.getType().getRevision())); assertThat(valueAsString(headers, MESSAGE_REVISION)).isEqualTo(so.getType().getRevision());
assertThat(valueAsString(headers, generateMetadataKey(metaKey)), is(evt.getMetaData().get(metaKey))); assertThat(valueAsString(headers, generateMetadataKey(metaKey))).isEqualTo(evt.getMetaData().get(metaKey));
} }


static void assertDomainHeaders(DomainEventMessage<?> evt, Headers headers) { static void assertDomainHeaders(DomainEventMessage<?> evt, Headers headers) {
assertThat(headers.toArray().length, greaterThanOrEqualTo(8)); assertThat(headers.toArray().length).isGreaterThanOrEqualTo(8);
assertThat(valueAsLong(headers, AGGREGATE_SEQ), is(evt.getSequenceNumber())); assertThat(valueAsLong(headers, AGGREGATE_SEQ)).isEqualTo(evt.getSequenceNumber());
assertThat(valueAsString(headers, AGGREGATE_ID), is(evt.getAggregateIdentifier())); assertThat(valueAsString(headers, AGGREGATE_ID)).isEqualTo(evt.getAggregateIdentifier());
assertThat(valueAsString(headers, AGGREGATE_TYPE), is(evt.getType())); assertThat(valueAsString(headers, AGGREGATE_TYPE)).isEqualTo(evt.getType());
} }
} }

0 comments on commit 37b1327

Please sign in to comment.