Skip to content

Commit

Permalink
refs #1: introduce a common class which generates client to connect t…
Browse files Browse the repository at this point in the history
…o Microservice
  • Loading branch information
KengoTODA committed Oct 28, 2016
1 parent 05fd04d commit a83daba
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 44 deletions.
24 changes: 24 additions & 0 deletions common/pom.xml
Expand Up @@ -18,6 +18,14 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-unit</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.uuid</groupId>
<artifactId>java-uuid-generator</artifactId>
Expand All @@ -26,10 +34,22 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-service-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand All @@ -38,6 +58,10 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
Expand Down
@@ -0,0 +1,60 @@
package jp.skypencil.brownie;

import java.util.Objects;

import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.inject.Inject;

import io.vertx.core.json.JsonObject;
import io.vertx.rxjava.core.Future;
import io.vertx.rxjava.core.http.HttpClient;
import io.vertx.rxjava.servicediscovery.ServiceDiscovery;
import lombok.RequiredArgsConstructor;
import rx.Observable;
import rx.Single;

/**
* A factory class which generates {@code Single<HttpClient>} to connect to
* other microservices. Caller should hold an instance of this factory as
* instance field, and call {@link #createClient(String, Future)} when it tries
* to call other microservices.
*/
@ParametersAreNonnullByDefault
@RequiredArgsConstructor(
onConstructor = @__(@Inject))
public class MicroserviceClientFactory {
private final ServiceDiscovery discovery;

/**
* @param name
* Name of target microservice.
* @param closed
* A {@link Future} which should be completed when caller finished using returned {@link HttpClient}.
* @return A {@link Single} which emits a {@link HttpClient} to invoke other
* microservice. Caller does not have to close this client, but it
* should complete {@link Future} when it finishes to use returned {@link HttpClient}.
*/
@Nonnull
public Single<HttpClient> createClient(String name, Future<Void> closed) {
Objects.requireNonNull(name, "name");
Objects.requireNonNull(closed, "closed");
if (closed.isComplete()) {
return Single.error(new IllegalArgumentException("Given Future is already closed"));
}

return discovery
.getRecordObservable(new JsonObject().put("name", name))
.map(discovery::getReference)
.flatMap(reference -> {
HttpClient client = new HttpClient(reference.get());
closed.setHandler(ar -> {
// this will invoke HttpEndpointReference#close() which closes HttpClient,
// so we do not have to call client.close() explicitly.
reference.release();
});
return Observable.just(client);
})
.toSingle();
}
}
@@ -0,0 +1,99 @@
package jp.skypencil.brownie;


import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.util.concurrent.atomic.AtomicReference;

import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.rxjava.core.Future;
import io.vertx.rxjava.core.Vertx;
import io.vertx.rxjava.servicediscovery.ServiceDiscovery;
import io.vertx.rxjava.servicediscovery.ServiceReference;
import io.vertx.rxjava.servicediscovery.types.HttpEndpoint;
import io.vertx.servicediscovery.Record;

@RunWith(VertxUnitRunner.class)
public class MicroserviceClientFactoryTest {
@Rule
public RunTestOnContext rule = new RunTestOnContext();

@Test
public void testSucceededFuture(TestContext context) {
Vertx vertx = Vertx.newInstance(rule.vertx());
ServiceDiscovery discovery = ServiceDiscovery.create(vertx);
MicroserviceClientFactory factory = new MicroserviceClientFactory(discovery);

Async async = context.async();
factory.createClient("name", Future.succeededFuture())
.subscribe(client -> {
context.fail();
}, e -> {
assertThat(e).isInstanceOf(IllegalArgumentException.class);
async.complete();
});
}

@Test
public void ensureClientIsCreatedBasedOnServiceDiscovery(TestContext context) {
Vertx vertx = Vertx.newInstance(rule.vertx());
ServiceDiscovery discovery = spy(ServiceDiscovery.create(vertx));
Async async = context.async();

discovery.publish(HttpEndpoint.createRecord("name", "localhost"), ar -> {
assertThat(ar.succeeded()).isTrue();

MicroserviceClientFactory factory = new MicroserviceClientFactory(discovery);
Future<Void> future = spy(Future.future());

factory.createClient("name", future).subscribe(client -> {
verify(discovery).getRecordObservable(eq(new JsonObject().put("name", "name")));
async.complete();
}, context::fail);
});
}

@Test
public void ensureReferenceIsReleased(TestContext context) {
Vertx vertx = Vertx.newInstance(rule.vertx());
ServiceDiscovery discovery = spy(ServiceDiscovery.create(vertx));
Async async = context.async();

discovery.publish(HttpEndpoint.createRecord("name", "localhost"), ar -> {
assertThat(ar.succeeded()).isTrue();

MicroserviceClientFactory factory = new MicroserviceClientFactory(discovery);
Future<Void> future = Future.future();
AtomicReference<ServiceReference> reference = new AtomicReference<>();

doAnswer(invocation -> {
reference.set(spy((ServiceReference) invocation.callRealMethod()));
return reference.get();
}).when(discovery).getReference(any(Record.class));

factory.createClient("name", future).subscribe(client -> {
verify(reference.get(), never()).release();

future.complete();
verify(reference.get()).release();

async.complete();
}, context::fail);
});
}

}
Expand Up @@ -10,10 +10,10 @@
import javax.annotation.Nonnull;
import javax.inject.Inject;

import io.vertx.core.Future;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpHeaders;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.Future;
import io.vertx.rxjava.core.RxHelper;
import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.eventbus.Message;
Expand All @@ -24,7 +24,7 @@
import io.vertx.rxjava.core.http.HttpClientResponse;
import io.vertx.rxjava.core.streams.Pump;
import io.vertx.rxjava.core.streams.ReadStream;
import io.vertx.rxjava.servicediscovery.ServiceDiscovery;
import jp.skypencil.brownie.MicroserviceClientFactory;
import jp.skypencil.brownie.IdGenerator;
import jp.skypencil.brownie.event.VideoUploadedEvent;
import lombok.RequiredArgsConstructor;
Expand All @@ -40,7 +40,7 @@ public class EncodeServer extends AbstractVerticle {

private final IdGenerator idGenerator;

private final ServiceDiscovery discovery;
private final MicroserviceClientFactory clientFactory;

private final String directory = createDirectory();

Expand Down Expand Up @@ -190,15 +190,6 @@ private Single<Void> writeTo(HttpClientRequest req, File source) {
}

private Single<HttpClient> createHttpClientForFileStorage(Future<Void> closed) {
return discovery.getRecordObservable(r -> r.getName().equals("file-storage"))
.map(discovery::getReference)
.flatMap(reference -> {
HttpClient client = new HttpClient(reference.get());
closed.setHandler(ar -> {
reference.release();
});
return Observable.just(client);
})
.toSingle();
return clientFactory.createClient("file-storage", closed);
}
}
17 changes: 2 additions & 15 deletions legacy/src/main/java/jp/skypencil/brownie/FrontendServer.java
Expand Up @@ -28,7 +28,6 @@
import io.vertx.rxjava.ext.web.RoutingContext;
import io.vertx.rxjava.ext.web.handler.BodyHandler;
import io.vertx.rxjava.ext.web.handler.StaticHandler;
import io.vertx.rxjava.servicediscovery.ServiceDiscovery;
import jp.skypencil.brownie.event.VideoUploadedEvent;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -58,7 +57,7 @@ public class FrontendServer extends AbstractVerticle {
*/
private final String directory = createDirectory();

private final ServiceDiscovery discovery;
private final MicroserviceClientFactory clientFactory;

@Nonnull
private String createDirectory() {
Expand Down Expand Up @@ -178,19 +177,7 @@ private void downloadThumbnail(RoutingContext ctx) {
*/
@Nonnull
private Single<HttpClient> createHttpClientForFileStorage(Future<Void> closed) {
Single<HttpClient> clientSingle = discovery.getRecordObservable(r -> r.getName().equals("file-storage"))
.map(discovery::getReference)
.flatMap(reference -> {
HttpClient client = new HttpClient(reference.get());
closed.setHandler(ar -> {
// this will invoke HttpEndpointReference#close() which closes HttpClient,
// so we do not have to call client.close() explicitly.
reference.release();
});
return Observable.just(client);
})
.toSingle();
return clientSingle;
return clientFactory.createClient("file-storage", closed);
}

private void deleteFile(RoutingContext ctx, String fileId) {
Expand Down
Expand Up @@ -45,7 +45,7 @@ public void cleanUp(TestContext context) {

@Test
public void testHandleFormWithNoUploadedFile(TestContext context) {
FrontendServer server = new FrontendServer(ServiceDiscovery.create(vertx));
FrontendServer server = new FrontendServer(new MicroserviceClientFactory(ServiceDiscovery.create(vertx)));

HttpServerResponse response = mock(HttpServerResponse.class);
RoutingContext ctx = mock(RoutingContext.class);
Expand Down
Expand Up @@ -10,11 +10,11 @@
import javax.annotation.Nonnull;
import javax.inject.Inject;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpHeaders;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.Future;
import io.vertx.rxjava.core.RxHelper;
import io.vertx.rxjava.core.eventbus.Message;
import io.vertx.rxjava.core.eventbus.MessageConsumer;
Expand All @@ -30,11 +30,11 @@
import io.vertx.rxjava.servicediscovery.ServiceDiscovery;
import io.vertx.rxjava.servicediscovery.types.HttpEndpoint;
import io.vertx.servicediscovery.Record;
import jp.skypencil.brownie.MicroserviceClientFactory;
import jp.skypencil.brownie.IdGenerator;
import jp.skypencil.brownie.MimeType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Single;
import scala.Tuple2;

Expand All @@ -47,6 +47,8 @@ public class ThumbnailServer extends AbstractVerticle {

private final ServiceDiscovery discovery;

private final MicroserviceClientFactory clientFactory;

private final String directory = createDirectory();

private final IdGenerator idGenerator;
Expand All @@ -56,7 +58,7 @@ public class ThumbnailServer extends AbstractVerticle {
private String registration;

@Override
public void start(Future<Void> startFuture) throws Exception {
public void start(io.vertx.core.Future<Void> startFuture) throws Exception {
registerEventListeners();
Router router = createRouter();

Expand All @@ -79,7 +81,7 @@ public void start(Future<Void> startFuture) throws Exception {
}

@Override
public void stop(Future<Void> stopFuture) throws Exception {
public void stop(io.vertx.core.Future<Void> stopFuture) throws Exception {
if (server == null) {
stopFuture.fail(new IllegalStateException("This vertical has not been started yet"));
} else {
Expand Down Expand Up @@ -242,16 +244,6 @@ Single<ThumbnailMetadata> upload(Tuple2<File, ThumbnailMetadata> tuple) {
}

private Single<HttpClient> createHttpClientForFileStorage(Future<Void> closed) {
Single<HttpClient> clientSingle = discovery.getRecordObservable(r -> r.getName().equals("file-storage"))
.map(discovery::getReference)
.flatMap(reference -> {
HttpClient client = new HttpClient(reference.get());
closed.setHandler(v -> {
reference.release();
});
return Observable.just(client);
})
.toSingle();
return clientSingle;
return clientFactory.createClient("file-storage", closed);
}
}

0 comments on commit a83daba

Please sign in to comment.