From ab161db77e5ee6404b28b7a89bfa050055accad9 Mon Sep 17 00:00:00 2001 From: Deepak Jois Date: Wed, 14 Feb 2018 11:18:35 +0530 Subject: [PATCH] Use DgraphClientPool to manage GRPC clients. Instead of passing a list of DGraphBlockingStub clients directly to DgraphClient, we now use a DgraphClientPool to manage these clients. This allows us to better manage the lifecycle of the clients, including the ability to close or shutdown the client cleanly. --- CHANGELOG.md | 1 + README.md | 9 +- src/main/java/io/dgraph/DgraphClient.java | 45 +---- src/main/java/io/dgraph/DgraphClientPool.java | 156 ++++++++++++++++++ src/test/java/io/dgraph/DgraphClientTest.java | 9 +- .../java/io/dgraph/DgraphIntegrationTest.java | 11 +- 6 files changed, 179 insertions(+), 52 deletions(-) create mode 100644 src/main/java/io/dgraph/DgraphClientPool.java diff --git a/CHANGELOG.md b/CHANGELOG.md index a3cd024..e597d51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +* Use a separate `DgraphClientPool` class to manage GRPC clients. ## [1.2.0] - 2018-02-06 * Added possibility to create a DgraphClient with a specified request diff --git a/README.md b/README.md index 4800e86..803e442 100644 --- a/README.md +++ b/README.md @@ -63,15 +63,16 @@ The following code snippet shows just one connection. ```java ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9080).usePlaintext(true).build(); -DgraphBlockingStub blockingStub = DgraphGrpc.newBlockingStub(channel); -DgraphClient dgraphClient = new DgraphClient(Collections.singletonList(blockingStub)); +DgraphClientPool pool = new DgraphClientPool(Collections.singletonList(channel)); +DgraphClient dgraphClient = new DgraphClient(pool); ``` -Alternatively, you can specify a deadline (in seconds) after which the client will time out when making +Alternatively, you can specify a deadline (in seconds) after which the client will time out when making requests to the server. ```java -DgraphClient dgraphClient = new DgraphClient(Collections.singletonList(blockingStub), 60) // 1 min timeout +DgraphClientPool pool = new DgraphClientPool(Collections.singletonList(channel), 60); // 1 min timeout +DgraphClient dgraphClient = new DgraphClient(pool); ``` ### Alter the database diff --git a/src/main/java/io/dgraph/DgraphClient.java b/src/main/java/io/dgraph/DgraphClient.java index 635006e..ab838ac 100644 --- a/src/main/java/io/dgraph/DgraphClient.java +++ b/src/main/java/io/dgraph/DgraphClient.java @@ -20,10 +20,7 @@ import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,9 +37,7 @@ public class DgraphClient { private static final Logger logger = LoggerFactory.getLogger(DgraphClient.class); - private List clients; - - private int deadlineSecs; + private DgraphClientPool clients; private LinRead linRead; @@ -63,25 +58,11 @@ LinRead getLinRead() { * @param clients One or more synchronous grpc clients. Can contain connections to multiple * servers in a cluster. */ - public DgraphClient(List clients) { + public DgraphClient(DgraphClientPool clients) { this.clients = clients; linRead = LinRead.getDefaultInstance(); } - /** - * Creates a new Dgraph for interacting with a Dgraph store, with the the specified deadline. - * - *

A single client is thread safe. - * - * @param clients One or more synchronous grpc clients. Can contain connections to multiple - * servers in a cluster. - * @param deadlineSecs Deadline specified in secs, after which the client will timeout. - */ - public DgraphClient(List clients, int deadlineSecs) { - this(clients); - this.deadlineSecs = deadlineSecs; - } - /** * Creates a new Transaction object. * @@ -115,7 +96,7 @@ public Transaction newTransaction() { * @param op a protocol buffer Operation object representing the operation being performed. */ public void alter(Operation op) { - final DgraphGrpc.DgraphBlockingStub client = anyClient(); + final DgraphGrpc.DgraphBlockingStub client = clients.anyClient(); client.alter(op); } @@ -142,16 +123,8 @@ public static Mutation deleteEdges(Mutation mu, String uid, String... predicates return b.build(); } - private DgraphGrpc.DgraphBlockingStub anyClient() { - Random rand = new Random(); - - DgraphGrpc.DgraphBlockingStub client = clients.get(rand.nextInt(clients.size())); - - if (deadlineSecs > 0) { - return client.withDeadlineAfter(deadlineSecs, TimeUnit.SECONDS); - } - - return client; + public void close() throws InterruptedException { + clients.close(); } static LinRead mergeLinReads(LinRead dst, LinRead src) { @@ -194,7 +167,7 @@ public Response queryWithVars(final String query, final Map vars .setStartTs(context.getStartTs()) .setLinRead(context.getLinRead()) .build(); - final DgraphGrpc.DgraphBlockingStub client = anyClient(); + final DgraphGrpc.DgraphBlockingStub client = clients.anyClient(); logger.debug("Sending request to Dgraph..."); final Response response = client.query(request); logger.debug("Received response from Dgraph!"); @@ -229,7 +202,7 @@ public Assigned mutate(Mutation mutation) { Mutation request = Mutation.newBuilder(mutation).setStartTs(context.getStartTs()).build(); - final DgraphGrpc.DgraphBlockingStub client = anyClient(); + final DgraphGrpc.DgraphBlockingStub client = clients.anyClient(); Assigned ag; try { ag = client.mutate(request); @@ -273,7 +246,7 @@ public void commit() { return; } - final DgraphGrpc.DgraphBlockingStub client = anyClient(); + final DgraphGrpc.DgraphBlockingStub client = clients.anyClient(); try { client.commitOrAbort(context); } catch (RuntimeException ex) { @@ -301,7 +274,7 @@ public void discard() { context = TxnContext.newBuilder(context).setAborted(true).build(); - final DgraphGrpc.DgraphBlockingStub client = anyClient(); + final DgraphGrpc.DgraphBlockingStub client = clients.anyClient(); client.commitOrAbort(context); } diff --git a/src/main/java/io/dgraph/DgraphClientPool.java b/src/main/java/io/dgraph/DgraphClientPool.java new file mode 100644 index 0000000..96bdedd --- /dev/null +++ b/src/main/java/io/dgraph/DgraphClientPool.java @@ -0,0 +1,156 @@ +/* + * Copyright 2016-18 DGraph Labs, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dgraph; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import javafx.util.Pair; + +/** + * Manage a pool of Dgraph clients. + * + *

Clients must belong to the same Dgraph cluster + * + * @author Deepak Jois + */ +public class DgraphClientPool { + List> clients; + + private int deadlineSecs; + + private DgraphClientPool() { + this.clients = new ArrayList<>(); + } + + /** + * Create a pool of clients from a list of {@link ManagedChannel} objects. + * + * @param channels List of {@link ManagedChannel} objects to use when creating GRPC clients + */ + public DgraphClientPool(List channels) { + this(); + add(channels); + } + + /** + * Create a pool of clients from a list of {@link ManagedChannel} objects, and specify a deadline + * (in seconds) for the requests to execute. + * + * @param channels List of {@link ManagedChannel} objects to use when creating GRPC clients + * @param deadlineSecs deadline in seconds for requests made to the clients. + */ + public DgraphClientPool(List channels, int deadlineSecs) { + this(channels); + this.deadlineSecs = deadlineSecs; + } + + /** + * Add a client that connects to a Dgraph server located at specified host and port. + * + * @param host + * @param port + */ + public void add(String host, int port) { + ManagedChannel channel = + ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build(); + DgraphGrpc.DgraphBlockingStub stub = DgraphGrpc.newBlockingStub(channel); + clients.add(new Pair<>(channel, stub)); + } + + /** + * Add a client that connects to a Dgraph server, using the specified {@link ManagedChannel}. + * + * @param channel + */ + public void add(ManagedChannel channel) { + DgraphGrpc.DgraphBlockingStub stub = DgraphGrpc.newBlockingStub(channel); + clients.add(new Pair<>(channel, stub)); + } + + /** + * Add clients that connect to different nodes in a Dgraph cluster, using the specified {@link + * ManagedChannel} objects. + * + * @param channels + */ + public void add(List channels) { + for (ManagedChannel channel : channels) { + DgraphGrpc.DgraphBlockingStub stub = DgraphGrpc.newBlockingStub(channel); + clients.add(new Pair<>(channel, stub)); + } + } + + /** + * Returns a randomly chosen client from the pool. + * + * @return A {@link io.dgraph.DgraphGrpc.DgraphBlockingStub} object. + */ + public DgraphGrpc.DgraphBlockingStub anyClient() { + Random rand = new Random(); + + DgraphGrpc.DgraphBlockingStub client = clients.get(rand.nextInt(clients.size())).getValue(); + + if (deadlineSecs > 0) { + return client.withDeadlineAfter(deadlineSecs, TimeUnit.SECONDS); + } + + return client; + } + + /** + * Return the deadline(in secs) for the clients in the pool. + * + * @return + */ + public int getDeadline() { + return deadlineSecs; + } + + /** + * Set the deadline for the clients in the pool in secs. + * + * @param deadlineSecs + */ + public void setDeadline(int deadlineSecs) { + this.deadlineSecs = deadlineSecs; + } + + /** + * Closes all the clients in the pool with the specified timeout. + * + * @param timeoutSecs timeout value in seconds + * @throws InterruptedException + */ + public void close(long timeoutSecs) throws InterruptedException { + for (Pair client : clients) { + client.getKey().awaitTermination(timeoutSecs, TimeUnit.SECONDS); + } + } + + /** + * Closes all the clients in the pool with a default timeout of 5 seconds. + * + * @throws InterruptedException + */ + public void close() throws InterruptedException { + close(5); + } +} diff --git a/src/test/java/io/dgraph/DgraphClientTest.java b/src/test/java/io/dgraph/DgraphClientTest.java index dd085a2..dbecca2 100644 --- a/src/test/java/io/dgraph/DgraphClientTest.java +++ b/src/test/java/io/dgraph/DgraphClientTest.java @@ -170,8 +170,8 @@ public void testDiscardAbort() { public void testClientWithDeadline() throws Exception { ManagedChannel channel = ManagedChannelBuilder.forAddress(TEST_HOSTNAME, TEST_PORT).usePlaintext(true).build(); - DgraphGrpc.DgraphBlockingStub blockingStub = DgraphGrpc.newBlockingStub(channel); - dgraphClient = new DgraphClient(Collections.singletonList(blockingStub), 1); + DgraphClientPool pool = new DgraphClientPool(Collections.singletonList(channel), 1); + dgraphClient = new DgraphClient(pool); Operation op = Operation.newBuilder().setSchema("name: string @index(exact) .").build(); @@ -179,11 +179,10 @@ public void testClientWithDeadline() throws Exception { dgraphClient.alter(op); // Creates a blocking stub directly, in order to force a deadline to be exceeded. - Method method = DgraphClient.class.getDeclaredMethod("anyClient"); + Method method = DgraphClientPool.class.getDeclaredMethod("anyClient"); method.setAccessible(true); - DgraphGrpc.DgraphBlockingStub client = - (DgraphGrpc.DgraphBlockingStub) method.invoke(dgraphClient); + DgraphGrpc.DgraphBlockingStub client = (DgraphGrpc.DgraphBlockingStub) method.invoke(pool); Thread.sleep(1001); diff --git a/src/test/java/io/dgraph/DgraphIntegrationTest.java b/src/test/java/io/dgraph/DgraphIntegrationTest.java index 33c9339..a06803f 100644 --- a/src/test/java/io/dgraph/DgraphIntegrationTest.java +++ b/src/test/java/io/dgraph/DgraphIntegrationTest.java @@ -1,11 +1,9 @@ package io.dgraph; -import io.dgraph.DgraphGrpc.DgraphBlockingStub; import io.dgraph.DgraphProto.Operation; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import java.util.Collections; -import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.slf4j.Logger; @@ -13,7 +11,6 @@ public abstract class DgraphIntegrationTest { protected static final Logger logger = LoggerFactory.getLogger(DgraphIntegrationTest.class); - private static ManagedChannel channel; protected static DgraphClient dgraphClient; protected static final String TEST_HOSTNAME = "localhost"; @@ -22,15 +19,15 @@ public abstract class DgraphIntegrationTest { @BeforeClass public static void beforeClass() { - channel = ManagedChannelBuilder.forAddress(TEST_HOSTNAME, TEST_PORT).usePlaintext(true).build(); - DgraphBlockingStub blockingStub = DgraphGrpc.newBlockingStub(channel); - dgraphClient = new DgraphClient(Collections.singletonList(blockingStub)); + ManagedChannel channel = + ManagedChannelBuilder.forAddress(TEST_HOSTNAME, TEST_PORT).usePlaintext(true).build(); + dgraphClient = new DgraphClient(new DgraphClientPool(Collections.singletonList(channel))); dgraphClient.alter(Operation.newBuilder().setDropAll(true).build()); } @AfterClass public static void afterClass() throws InterruptedException { - channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + dgraphClient.close(); } }