Skip to content

Commit

Permalink
WIP: Issue715 subscription (#727)
Browse files Browse the repository at this point in the history
* build(deps): bump liquibase-core from 3.8.7 to 3.8.8 (#716)

Bumps [liquibase-core](https://github.com/liquibase/liquibase) from 3.8.7 to 3.8.8.
- [Release notes](https://github.com/liquibase/liquibase/releases)
- [Changelog](https://github.com/liquibase/liquibase/blob/v3.8.8/changelog.txt)
- [Commits](liquibase/liquibase@v3.8.7...v3.8.8)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>

Co-authored-by: dependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>

* build(deps): bump kotlin-logging from 1.7.8 to 1.7.9 (#717)

Bumps [kotlin-logging](https://github.com/MicroUtils/kotlin-logging) from 1.7.8 to 1.7.9.
- [Release notes](https://github.com/MicroUtils/kotlin-logging/releases)
- [Changelog](https://github.com/MicroUtils/kotlin-logging/blob/master/ChangeLog.md)
- [Commits](oshai/kotlin-logging@1.7.8...1.7.9)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>

Co-authored-by: dependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>

* build(deps): bump spotless-maven-plugin from 1.27.0 to 1.28.0 (#721)

Bumps [spotless-maven-plugin](https://github.com/diffplug/spotless) from 1.27.0 to 1.28.0.
- [Release notes](https://github.com/diffplug/spotless/releases)
- [Changelog](https://github.com/diffplug/spotless/blob/master/CHANGES.md)
- [Commits](diffplug/spotless@lib/1.27.0...lib/1.28.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>

Co-authored-by: dependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>

* build(deps-dev): bump junit-platform-engine from 1.6.0 to 1.6.1 (#723)

Bumps [junit-platform-engine](https://github.com/junit-team/junit5) from 1.6.0 to 1.6.1.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/commits)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>

Co-authored-by: dependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>

* #725: removed id subscriptions

* #715_subscription: WIP

* #715_subscription: WIP

* #725: removed id subscriptions (#726)

* #725: removed id subscriptions

* #725: removed accidental additions

* #715_subscription: WIP

* #715_subscription: WIP

* #715_subscription: added TranformationCache

* #715_subscription: WIP

* #715_subscription: WIP

* #715_subscription: WIP

* #715_subscription: WIP minor cleanup

* #715_subscription: WIP minor cleanup

* #715_subscription: WIP minor coverage improvements

* #715_subscription: working prototype

* #715_subscription: wip

* #715_subscription: wip

* #715_subscription: wip

* #715_subscription: fixed nasty version bug during transformation (new version value was not applied)

* #715_subscription: fixed nasty version bug during transformation (new version value was not applied)

* #715_subscription: added classpath: protocol for schema registry

* #715_subscription: fixed tests, wrong urls

* #715_subscription: cleanup

* #715_subscription: removed lost comments

* Update factcast-store-pgsql/src/main/java/org/factcast/store/pgsql/registry/transformation/FactTransformersImpl.java

Co-Authored-By: Benjamin Otto <benjamin.otto@style-systems.de>

* #715_subscription: removed unused cost

* #715_subscription: review comments

Co-authored-by: dependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>
Co-authored-by: Uwe Schaefer <uwe.schaefer@mercateo.com>
Co-authored-by: Benjamin Otto <benjamin.otto@style-systems.de>
  • Loading branch information
4 people committed Mar 25, 2020
1 parent c563362 commit fc373fc
Show file tree
Hide file tree
Showing 112 changed files with 1,615 additions and 1,157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package org.factcast.client.grpc;

import org.factcast.core.Fact;
import org.factcast.core.IdOnlyFact;
import org.factcast.core.subscription.Subscription;
import org.factcast.core.subscription.SubscriptionImpl;
import org.factcast.grpc.api.conv.ProtoConverter;
Expand Down Expand Up @@ -44,7 +42,7 @@ class ClientStreamObserver implements StreamObserver<FactStoreProto.MSG_Notifica
final ProtoConverter converter = new ProtoConverter();

@NonNull
final SubscriptionImpl<Fact> subscription;
final SubscriptionImpl subscription;

@Override
public void onNext(MSG_Notification f) {
Expand All @@ -61,10 +59,7 @@ public void onNext(MSG_Notification f) {
case Fact:
subscription.notifyElement(converter.fromProto(f.getFact()));
break;
case Id:
// wrap id in a fact
subscription.notifyElement(new IdOnlyFact(converter.fromProto(f.getId())));
break;

default:
subscription.notifyError(new RuntimeException(
"Unrecognized notification type. THIS IS A BUG!"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package org.factcast.client.grpc;

import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.*;

import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -50,7 +50,6 @@
import org.factcast.grpc.api.gen.FactStoreProto.MSG_Fact;
import org.factcast.grpc.api.gen.FactStoreProto.MSG_Facts;
import org.factcast.grpc.api.gen.FactStoreProto.MSG_Notification;
import org.factcast.grpc.api.gen.FactStoreProto.MSG_OptionalFact;
import org.factcast.grpc.api.gen.FactStoreProto.MSG_OptionalSerial;
import org.factcast.grpc.api.gen.FactStoreProto.MSG_StateForRequest;
import org.factcast.grpc.api.gen.FactStoreProto.MSG_String;
Expand Down Expand Up @@ -84,7 +83,7 @@
*
* @author uwe.schaefer@mercateo.com
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")

@Slf4j
public class GrpcFactStore implements FactStore, SmartInitializingSingleton {

Expand All @@ -102,12 +101,12 @@ public class GrpcFactStore implements FactStore, SmartInitializingSingleton {

private final AtomicBoolean initialized = new AtomicBoolean(false);

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@Autowired
@Generated
public GrpcFactStore(FactCastGrpcChannelFactory channelFactory,
@Value("${grpc.client.factstore.credentials:#{null}}") Optional<String> credentials) {
this(channelFactory.createChannel(CHANNEL_NAME), credentials);

}

@Generated
Expand All @@ -134,23 +133,6 @@ private GrpcFactStore(RemoteFactStoreBlockingStub newBlockingStub, RemoteFactSto
}
}

@Override
public Optional<Fact> fetchById(UUID id) {
log.trace("fetching {} from remote store", id);

MSG_OptionalFact fetchById;
try {
fetchById = blockingStub.fetchById(converter.toProto(id));
} catch (StatusRuntimeException e) {
throw wrapRetryable(e);
}
if (!fetchById.getPresent()) {
return Optional.empty();
} else {
return converter.fromProto(fetchById);
}
}

@Override
public void publish(@NonNull List<? extends Fact> factsToPublish) {
log.trace("publishing {} facts to remote store", factsToPublish.size());
Expand All @@ -171,7 +153,7 @@ public void publish(@NonNull List<? extends Fact> factsToPublish) {
@Override
public Subscription subscribe(@NonNull SubscriptionRequestTO req,
@NonNull FactObserver observer) {
SubscriptionImpl<Fact> subscription = SubscriptionImpl.on(observer);
SubscriptionImpl subscription = SubscriptionImpl.on(observer);
StreamObserver<FactStoreProto.MSG_Notification> responseObserver = new ClientStreamObserver(
subscription);
ClientCall<MSG_SubscriptionRequest, MSG_Notification> call = stub.getChannel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import net.devh.boot.grpc.common.codec.CodecType;
import net.devh.boot.grpc.common.codec.GrpcCodec;

@SuppressWarnings("WeakerAccess")
@GrpcCodec(advertised = true, codecType = CodecType.ALL)
public class SnappyGrpcClientCodec implements Codec {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
*/
package org.factcast.client.grpc;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.UUID;

import org.factcast.core.Fact;
import org.factcast.core.IdOnlyFact;
import org.factcast.core.subscription.FactTransformers;
import org.factcast.core.subscription.SubscriptionImpl;
import org.factcast.core.subscription.observer.FactObserver;
import org.factcast.grpc.api.conv.ProtoConverter;
Expand All @@ -48,11 +46,13 @@ class ClientStreamObserverTest {

ProtoConverter converter = new ProtoConverter();

private SubscriptionImpl<Fact> subscription;
private SubscriptionImpl subscription;

@BeforeEach
void setUp() {
subscription = spy(new SubscriptionImpl<>(factObserver));
FactTransformers trans = new NullFactTransformer();
SubscriptionImpl subscriptionImpl = new SubscriptionImpl(factObserver, trans);
subscription = spy(subscriptionImpl);
uut = new ClientStreamObserver(subscription);
}

Expand All @@ -77,13 +77,6 @@ void testOnNextFailsOnUnknownMessage() {
});
}

@Test
void testOnNextId() {
MSG_Notification n = converter.createNotificationFor(UUID.randomUUID());
uut.onNext(n);
verify(factObserver).onNext(any(IdOnlyFact.class));
}

@Test
void testOnCatchup() {
uut.onNext(converter.createCatchupNotification());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,12 @@
*/
package org.factcast.client.grpc;

import static org.assertj.core.api.Assertions.assertThat;
import static org.factcast.core.TestHelper.expectNPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.assertj.core.api.Assertions.*;
import static org.factcast.core.TestHelper.*;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -103,24 +91,6 @@ class GrpcFactStoreTest {
@Mock
public Optional<String> credentials;

@Test
void testFetchByIdNotFound() {
UUID id = UUID.randomUUID();
when(blockingStub.fetchById(eq(conv.toProto(id)))).thenReturn(conv.toProto(Optional
.empty()));
Optional<Fact> fetchById = uut.fetchById(id);
assertFalse(fetchById.isPresent());
}

@Test
void testFetchByIdFound() {
UUID id = UUID.randomUUID();
when(blockingStub.fetchById(eq(conv.toProto(id))))
.thenReturn(conv.toProto(Optional.of(Fact.builder().ns("test").build("{}"))));
Optional<Fact> fetchById = uut.fetchById(id);
assertTrue(fetchById.isPresent());
}

@Test
void testPublish() {
when(blockingStub.publish(factsCap.capture())).thenReturn(MSG_Empty.newBuilder().build());
Expand Down Expand Up @@ -161,13 +131,6 @@ void testPublishPropagatesException() {
.build("{}"))));
}

@Test
void testFetchByIdPropagatesRetryableExceptionOnUnavailableStatus() {
when(blockingStub.fetchById(any())).thenThrow(new StatusRuntimeException(
Status.UNAVAILABLE));
assertThrows(RetryableException.class, () -> uut.fetchById(UUID.randomUUID()));
}

@Test
void testPublishPropagatesRetryableExceptionOnUnavailableStatus() {
when(blockingStub.publish(any())).thenThrow(new StatusRuntimeException(Status.UNAVAILABLE));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright © 2017-2020 factcast.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.factcast.client.grpc;

import org.factcast.core.Fact;
import org.factcast.core.subscription.FactTransformers;
import org.factcast.core.subscription.TransformationException;

import lombok.NonNull;

public class NullFactTransformer implements FactTransformers {

@Override
public @NonNull Fact transformIfNecessary(@NonNull Fact e) throws TransformationException {
return e;
}

}

0 comments on commit fc373fc

Please sign in to comment.