Skip to content

Commit

Permalink
Clean up the GrpcBackedQueryUpdateMessage
Browse files Browse the repository at this point in the history
-Rename payload to serializedPayload to be in line with the other gRPC
backed messages
-Change GrpcMetaData metaData to Supplier<MetaData>to be in line with
the other gRPC backed messages
-Provide private constructor to allow usage of withMetaData
-Implement withMetaData and andMetaData
-Provide test for the GrpcBackedQueryUpdateMessage
  • Loading branch information
smcvb committed Feb 18, 2019
1 parent 1ddce27 commit 239424f
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 12 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -25,6 +25,7 @@
import org.axonframework.serialization.Serializer; import org.axonframework.serialization.Serializer;


import java.util.Map; import java.util.Map;
import java.util.function.Supplier;


/** /**
* Wrapper that allows clients to access a gRPC {@link QueryUpdate} as a {@link SubscriptionQueryUpdateMessage}. * Wrapper that allows clients to access a gRPC {@link QueryUpdate} as a {@link SubscriptionQueryUpdateMessage}.
Expand All @@ -36,8 +37,8 @@
class GrpcBackedQueryUpdateMessage<U> implements SubscriptionQueryUpdateMessage<U> { class GrpcBackedQueryUpdateMessage<U> implements SubscriptionQueryUpdateMessage<U> {


private final QueryUpdate queryUpdate; private final QueryUpdate queryUpdate;
private final LazyDeserializingObject<U> payload; private final LazyDeserializingObject<U> serializedPayload;
private final GrpcMetaData metadata; private final Supplier<MetaData> metaDataSupplier;


/** /**
* Instantiate a {@link GrpcBackedQueryUpdateMessage} with the given {@code queryUpdate}, using the provided * Instantiate a {@link GrpcBackedQueryUpdateMessage} with the given {@code queryUpdate}, using the provided
Expand All @@ -49,8 +50,17 @@ class GrpcBackedQueryUpdateMessage<U> implements SubscriptionQueryUpdateMessage<
*/ */
public GrpcBackedQueryUpdateMessage(QueryUpdate queryUpdate, Serializer serializer) { public GrpcBackedQueryUpdateMessage(QueryUpdate queryUpdate, Serializer serializer) {
this.queryUpdate = queryUpdate; this.queryUpdate = queryUpdate;
this.payload = new LazyDeserializingObject<>(new GrpcSerializedObject(queryUpdate.getPayload()), serializer); this.serializedPayload =
this.metadata = new GrpcMetaData(queryUpdate.getMetaDataMap(), serializer); new LazyDeserializingObject<>(new GrpcSerializedObject(queryUpdate.getPayload()), serializer);
this.metaDataSupplier = new GrpcMetaData(queryUpdate.getMetaDataMap(), serializer);
}

private GrpcBackedQueryUpdateMessage(QueryUpdate queryUpdate,
LazyDeserializingObject<U> serializedPayload,
Supplier<MetaData> metaDataSupplier) {
this.queryUpdate = queryUpdate;
this.serializedPayload = serializedPayload;
this.metaDataSupplier = metaDataSupplier;
} }


@Override @Override
Expand All @@ -60,26 +70,26 @@ public String getIdentifier() {


@Override @Override
public MetaData getMetaData() { public MetaData getMetaData() {
return metadata.get(); return metaDataSupplier.get();
} }


@Override @Override
public U getPayload() { public U getPayload() {
return payload.getObject(); return serializedPayload.getObject();
} }


@Override @Override
public Class<U> getPayloadType() { public Class<U> getPayloadType() {
return payload.getType(); return serializedPayload.getType();
} }


@Override @Override
public SubscriptionQueryUpdateMessage<U> withMetaData(Map<String, ?> metaData) { public GrpcBackedQueryUpdateMessage<U> withMetaData(Map<String, ?> metaData) {
throw new UnsupportedOperationException(); return new GrpcBackedQueryUpdateMessage<>(queryUpdate, serializedPayload, () -> MetaData.from(metaData));
} }


@Override @Override
public SubscriptionQueryUpdateMessage<U> andMetaData(Map<String, ?> metaData) { public GrpcBackedQueryUpdateMessage<U> andMetaData(Map<String, ?> metaData) {
throw new UnsupportedOperationException(); return withMetaData(getMetaData().mergedWith(metaData));
} }
} }
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (c) 2010-2019. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.axonserver.connector.query.subscription;

import io.axoniq.axonserver.grpc.query.QueryUpdate;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.messaging.MetaData;
import org.axonframework.queryhandling.GenericSubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.junit.*;

import java.util.Objects;

import static org.junit.Assert.*;

public class GrpcBackedQueryUpdateMessageTest {

private static final TestQueryUpdate TEST_QUERY_UPDATE = new TestQueryUpdate("aggregateId", 42);
private static final String SUBSCRIPTION_ID = "subscription-id";

private final Serializer serializer = XStreamSerializer.defaultSerializer();
private final SubscriptionMessageSerializer subscriptionMessageSerializer =
new SubscriptionMessageSerializer(serializer, serializer, new AxonServerConfiguration());

@Test
public void testGetIdentifierReturnsTheSameIdentifierAsSpecifiedInTheQueryUpdate() {
SubscriptionQueryUpdateMessage<Object> testSubscriptionQueryUpdateMessage =
GenericSubscriptionQueryUpdateMessage.asUpdateMessage(TEST_QUERY_UPDATE);
QueryUpdate testQueryUpdate =
subscriptionMessageSerializer.serialize(testSubscriptionQueryUpdateMessage, SUBSCRIPTION_ID)
.getSubscriptionQueryResponse()
.getUpdate();
GrpcBackedQueryUpdateMessage<TestQueryUpdate> testSubject =
new GrpcBackedQueryUpdateMessage<>(testQueryUpdate, serializer);

assertEquals(testQueryUpdate.getMessageIdentifier(), testSubject.getIdentifier());
}

@Test
public void testGetMetaDataReturnsTheSameMapAsWasInsertedInTheQueryUpdate() {
MetaData expectedMetaData = MetaData.with("some-key", "some-value");
SubscriptionQueryUpdateMessage<Object> testSubscriptionQueryUpdateMessage =
GenericSubscriptionQueryUpdateMessage.asUpdateMessage(TEST_QUERY_UPDATE).withMetaData(expectedMetaData);
QueryUpdate testQueryUpdate =
subscriptionMessageSerializer.serialize(testSubscriptionQueryUpdateMessage, SUBSCRIPTION_ID)
.getSubscriptionQueryResponse()
.getUpdate();
GrpcBackedQueryUpdateMessage<TestQueryUpdate> testSubject =
new GrpcBackedQueryUpdateMessage<>(testQueryUpdate, serializer);

assertEquals(expectedMetaData, testSubject.getMetaData());
}

@Test
public void testGetPayloadReturnsAnIdenticalObjectAsInsertedThroughTheQueryUpdate() {
TestQueryUpdate expectedQueryUpdate = TEST_QUERY_UPDATE;
SubscriptionQueryUpdateMessage<Object> testSubscriptionQueryUpdateMessage =
GenericSubscriptionQueryUpdateMessage.asUpdateMessage(expectedQueryUpdate);
QueryUpdate testQueryUpdate =
subscriptionMessageSerializer.serialize(testSubscriptionQueryUpdateMessage, SUBSCRIPTION_ID)
.getSubscriptionQueryResponse()
.getUpdate();
GrpcBackedQueryUpdateMessage<TestQueryUpdate> testSubject =
new GrpcBackedQueryUpdateMessage<>(testQueryUpdate, serializer);

assertEquals(expectedQueryUpdate, testSubject.getPayload());
}

@Test
public void testGetPayloadTypeReturnsTheTypeOfTheInsertedQueryUpdate() {
SubscriptionQueryUpdateMessage<Object> testSubscriptionQueryUpdateMessage =
GenericSubscriptionQueryUpdateMessage.asUpdateMessage(TEST_QUERY_UPDATE);
QueryUpdate testQueryUpdate =
subscriptionMessageSerializer.serialize(testSubscriptionQueryUpdateMessage, SUBSCRIPTION_ID)
.getSubscriptionQueryResponse()
.getUpdate();
GrpcBackedQueryUpdateMessage<TestQueryUpdate> testSubject =
new GrpcBackedQueryUpdateMessage<>(testQueryUpdate, serializer);

assertEquals(TestQueryUpdate.class, testSubject.getPayloadType());
}

@Test
public void testWithMetaDataCompletelyReplacesTheInitialMetaDataMap() {
MetaData testMetaData = MetaData.with("some-key", "some-value");
SubscriptionQueryUpdateMessage<Object> testSubscriptionQueryUpdateMessage =
GenericSubscriptionQueryUpdateMessage.asUpdateMessage(TEST_QUERY_UPDATE).withMetaData(testMetaData);
QueryUpdate testQueryUpdate =
subscriptionMessageSerializer.serialize(testSubscriptionQueryUpdateMessage, SUBSCRIPTION_ID)
.getSubscriptionQueryResponse()
.getUpdate();
GrpcBackedQueryUpdateMessage<TestQueryUpdate> testSubject =
new GrpcBackedQueryUpdateMessage<>(testQueryUpdate, serializer);

MetaData replacementMetaData = MetaData.with("some-other-key", "some-other-value");

testSubject = testSubject.withMetaData(replacementMetaData);
MetaData resultMetaData = testSubject.getMetaData();
assertFalse(resultMetaData.containsKey(testMetaData.keySet().iterator().next()));
assertEquals(replacementMetaData, resultMetaData);
}

@Test
public void testAndMetaDataAppendsToTheExistingMetaData() {
MetaData testMetaData = MetaData.with("some-key", "some-value");
SubscriptionQueryUpdateMessage<Object> testSubscriptionQueryUpdateMessage =
GenericSubscriptionQueryUpdateMessage.asUpdateMessage(TEST_QUERY_UPDATE).withMetaData(testMetaData);
QueryUpdate testQueryUpdate =
subscriptionMessageSerializer.serialize(testSubscriptionQueryUpdateMessage, SUBSCRIPTION_ID)
.getSubscriptionQueryResponse()
.getUpdate();
GrpcBackedQueryUpdateMessage<TestQueryUpdate> testSubject =
new GrpcBackedQueryUpdateMessage<>(testQueryUpdate, serializer);

MetaData additionalMetaData = MetaData.with("some-other-key", "some-other-value");

testSubject = testSubject.andMetaData(additionalMetaData);
MetaData resultMetaData = testSubject.getMetaData();

assertTrue(resultMetaData.containsKey(testMetaData.keySet().iterator().next()));
assertTrue(resultMetaData.containsKey(additionalMetaData.keySet().iterator().next()));
}

private static class TestQueryUpdate {

private final String queryModelId;
private final int someFilterValue;

private TestQueryUpdate(String queryModelId, int someFilterValue) {
this.queryModelId = queryModelId;
this.someFilterValue = someFilterValue;
}

@SuppressWarnings("unused")
public String getQueryModelId() {
return queryModelId;
}

@SuppressWarnings("unused")
public int getSomeFilterValue() {
return someFilterValue;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestQueryUpdate that = (TestQueryUpdate) o;
return someFilterValue == that.someFilterValue &&
Objects.equals(queryModelId, that.queryModelId);
}

@Override
public int hashCode() {
return Objects.hash(queryModelId, someFilterValue);
}
}
}

0 comments on commit 239424f

Please sign in to comment.