Skip to content

Commit

Permalink
Use DgraphClientPool to manage GRPC clients.
Browse files Browse the repository at this point in the history
Instead of passing a list of DGraphBlockingStub clients directly to
DgraphClient, we now use a DgraphClientPool to manage these clients.
This allows us to better manage the lifecycle of the clients, including
the ability to close or shutdown the client cleanly.
  • Loading branch information
deepakjois committed Feb 14, 2018
1 parent 8755cc4 commit ab161db
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.1.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
* Use a separate `DgraphClientPool` class to manage GRPC clients.

## [1.2.0] - 2018-02-06
* Added possibility to create a DgraphClient with a specified request
Expand Down
9 changes: 5 additions & 4 deletions README.md
Expand Up @@ -63,15 +63,16 @@ The following code snippet shows just one connection.

```java
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9080).usePlaintext(true).build();
DgraphBlockingStub blockingStub = DgraphGrpc.newBlockingStub(channel);
DgraphClient dgraphClient = new DgraphClient(Collections.singletonList(blockingStub));
DgraphClientPool pool = new DgraphClientPool(Collections.singletonList(channel));
DgraphClient dgraphClient = new DgraphClient(pool);
```

Alternatively, you can specify a deadline (in seconds) after which the client will time out when making
Alternatively, you can specify a deadline (in seconds) after which the client will time out when making
requests to the server.

```java
DgraphClient dgraphClient = new DgraphClient(Collections.singletonList(blockingStub), 60) // 1 min timeout
DgraphClientPool pool = new DgraphClientPool(Collections.singletonList(channel), 60); // 1 min timeout
DgraphClient dgraphClient = new DgraphClient(pool);
```

### Alter the database
Expand Down
45 changes: 9 additions & 36 deletions src/main/java/io/dgraph/DgraphClient.java
Expand Up @@ -20,10 +20,7 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,9 +37,7 @@ public class DgraphClient {

private static final Logger logger = LoggerFactory.getLogger(DgraphClient.class);

private List<DgraphGrpc.DgraphBlockingStub> clients;

private int deadlineSecs;
private DgraphClientPool clients;

private LinRead linRead;

Expand All @@ -63,25 +58,11 @@ LinRead getLinRead() {
* @param clients One or more synchronous grpc clients. Can contain connections to multiple
* servers in a cluster.
*/
public DgraphClient(List<DgraphGrpc.DgraphBlockingStub> clients) {
public DgraphClient(DgraphClientPool clients) {
this.clients = clients;
linRead = LinRead.getDefaultInstance();
}

/**
* Creates a new Dgraph for interacting with a Dgraph store, with the the specified deadline.
*
* <p>A single client is thread safe.
*
* @param clients One or more synchronous grpc clients. Can contain connections to multiple
* servers in a cluster.
* @param deadlineSecs Deadline specified in secs, after which the client will timeout.
*/
public DgraphClient(List<DgraphGrpc.DgraphBlockingStub> clients, int deadlineSecs) {
this(clients);
this.deadlineSecs = deadlineSecs;
}

/**
* Creates a new Transaction object.
*
Expand Down Expand Up @@ -115,7 +96,7 @@ public Transaction newTransaction() {
* @param op a protocol buffer Operation object representing the operation being performed.
*/
public void alter(Operation op) {
final DgraphGrpc.DgraphBlockingStub client = anyClient();
final DgraphGrpc.DgraphBlockingStub client = clients.anyClient();
client.alter(op);
}

Expand All @@ -142,16 +123,8 @@ public static Mutation deleteEdges(Mutation mu, String uid, String... predicates
return b.build();
}

private DgraphGrpc.DgraphBlockingStub anyClient() {
Random rand = new Random();

DgraphGrpc.DgraphBlockingStub client = clients.get(rand.nextInt(clients.size()));

if (deadlineSecs > 0) {
return client.withDeadlineAfter(deadlineSecs, TimeUnit.SECONDS);
}

return client;
public void close() throws InterruptedException {
clients.close();
}

static LinRead mergeLinReads(LinRead dst, LinRead src) {
Expand Down Expand Up @@ -194,7 +167,7 @@ public Response queryWithVars(final String query, final Map<String, String> vars
.setStartTs(context.getStartTs())
.setLinRead(context.getLinRead())
.build();
final DgraphGrpc.DgraphBlockingStub client = anyClient();
final DgraphGrpc.DgraphBlockingStub client = clients.anyClient();
logger.debug("Sending request to Dgraph...");
final Response response = client.query(request);
logger.debug("Received response from Dgraph!");
Expand Down Expand Up @@ -229,7 +202,7 @@ public Assigned mutate(Mutation mutation) {

Mutation request = Mutation.newBuilder(mutation).setStartTs(context.getStartTs()).build();

final DgraphGrpc.DgraphBlockingStub client = anyClient();
final DgraphGrpc.DgraphBlockingStub client = clients.anyClient();
Assigned ag;
try {
ag = client.mutate(request);
Expand Down Expand Up @@ -273,7 +246,7 @@ public void commit() {
return;
}

final DgraphGrpc.DgraphBlockingStub client = anyClient();
final DgraphGrpc.DgraphBlockingStub client = clients.anyClient();
try {
client.commitOrAbort(context);
} catch (RuntimeException ex) {
Expand Down Expand Up @@ -301,7 +274,7 @@ public void discard() {

context = TxnContext.newBuilder(context).setAborted(true).build();

final DgraphGrpc.DgraphBlockingStub client = anyClient();
final DgraphGrpc.DgraphBlockingStub client = clients.anyClient();
client.commitOrAbort(context);
}

Expand Down
156 changes: 156 additions & 0 deletions src/main/java/io/dgraph/DgraphClientPool.java
@@ -0,0 +1,156 @@
/*
* Copyright 2016-18 DGraph Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.dgraph;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javafx.util.Pair;

/**
* Manage a pool of Dgraph clients.
*
* <p>Clients must belong to the same Dgraph cluster
*
* @author Deepak Jois
*/
public class DgraphClientPool {
List<Pair<ManagedChannel, DgraphGrpc.DgraphBlockingStub>> clients;

private int deadlineSecs;

private DgraphClientPool() {
this.clients = new ArrayList<>();
}

/**
* Create a pool of clients from a list of {@link ManagedChannel} objects.
*
* @param channels List of {@link ManagedChannel} objects to use when creating GRPC clients
*/
public DgraphClientPool(List<ManagedChannel> channels) {
this();
add(channels);
}

/**
* Create a pool of clients from a list of {@link ManagedChannel} objects, and specify a deadline
* (in seconds) for the requests to execute.
*
* @param channels List of {@link ManagedChannel} objects to use when creating GRPC clients
* @param deadlineSecs deadline in seconds for requests made to the clients.
*/
public DgraphClientPool(List<ManagedChannel> channels, int deadlineSecs) {
this(channels);
this.deadlineSecs = deadlineSecs;
}

/**
* Add a client that connects to a Dgraph server located at specified host and port.
*
* @param host
* @param port
*/
public void add(String host, int port) {
ManagedChannel channel =
ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
DgraphGrpc.DgraphBlockingStub stub = DgraphGrpc.newBlockingStub(channel);
clients.add(new Pair<>(channel, stub));
}

/**
* Add a client that connects to a Dgraph server, using the specified {@link ManagedChannel}.
*
* @param channel
*/
public void add(ManagedChannel channel) {
DgraphGrpc.DgraphBlockingStub stub = DgraphGrpc.newBlockingStub(channel);
clients.add(new Pair<>(channel, stub));
}

/**
* Add clients that connect to different nodes in a Dgraph cluster, using the specified {@link
* ManagedChannel} objects.
*
* @param channels
*/
public void add(List<ManagedChannel> channels) {
for (ManagedChannel channel : channels) {
DgraphGrpc.DgraphBlockingStub stub = DgraphGrpc.newBlockingStub(channel);
clients.add(new Pair<>(channel, stub));
}
}

/**
* Returns a randomly chosen client from the pool.
*
* @return A {@link io.dgraph.DgraphGrpc.DgraphBlockingStub} object.
*/
public DgraphGrpc.DgraphBlockingStub anyClient() {
Random rand = new Random();

DgraphGrpc.DgraphBlockingStub client = clients.get(rand.nextInt(clients.size())).getValue();

if (deadlineSecs > 0) {
return client.withDeadlineAfter(deadlineSecs, TimeUnit.SECONDS);
}

return client;
}

/**
* Return the deadline(in secs) for the clients in the pool.
*
* @return
*/
public int getDeadline() {
return deadlineSecs;
}

/**
* Set the deadline for the clients in the pool in secs.
*
* @param deadlineSecs
*/
public void setDeadline(int deadlineSecs) {
this.deadlineSecs = deadlineSecs;
}

/**
* Closes all the clients in the pool with the specified timeout.
*
* @param timeoutSecs timeout value in seconds
* @throws InterruptedException
*/
public void close(long timeoutSecs) throws InterruptedException {
for (Pair<ManagedChannel, DgraphGrpc.DgraphBlockingStub> client : clients) {
client.getKey().awaitTermination(timeoutSecs, TimeUnit.SECONDS);
}
}

/**
* Closes all the clients in the pool with a default timeout of 5 seconds.
*
* @throws InterruptedException
*/
public void close() throws InterruptedException {
close(5);
}
}
9 changes: 4 additions & 5 deletions src/test/java/io/dgraph/DgraphClientTest.java
Expand Up @@ -170,20 +170,19 @@ public void testDiscardAbort() {
public void testClientWithDeadline() throws Exception {
ManagedChannel channel =
ManagedChannelBuilder.forAddress(TEST_HOSTNAME, TEST_PORT).usePlaintext(true).build();
DgraphGrpc.DgraphBlockingStub blockingStub = DgraphGrpc.newBlockingStub(channel);
dgraphClient = new DgraphClient(Collections.singletonList(blockingStub), 1);
DgraphClientPool pool = new DgraphClientPool(Collections.singletonList(channel), 1);
dgraphClient = new DgraphClient(pool);

Operation op = Operation.newBuilder().setSchema("name: string @index(exact) .").build();

// Alters schema without exceeding the given deadline.
dgraphClient.alter(op);

// Creates a blocking stub directly, in order to force a deadline to be exceeded.
Method method = DgraphClient.class.getDeclaredMethod("anyClient");
Method method = DgraphClientPool.class.getDeclaredMethod("anyClient");
method.setAccessible(true);

DgraphGrpc.DgraphBlockingStub client =
(DgraphGrpc.DgraphBlockingStub) method.invoke(dgraphClient);
DgraphGrpc.DgraphBlockingStub client = (DgraphGrpc.DgraphBlockingStub) method.invoke(pool);

Thread.sleep(1001);

Expand Down
11 changes: 4 additions & 7 deletions src/test/java/io/dgraph/DgraphIntegrationTest.java
@@ -1,19 +1,16 @@
package io.dgraph;

import io.dgraph.DgraphGrpc.DgraphBlockingStub;
import io.dgraph.DgraphProto.Operation;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DgraphIntegrationTest {
protected static final Logger logger = LoggerFactory.getLogger(DgraphIntegrationTest.class);
private static ManagedChannel channel;
protected static DgraphClient dgraphClient;

protected static final String TEST_HOSTNAME = "localhost";
Expand All @@ -22,15 +19,15 @@ public abstract class DgraphIntegrationTest {
@BeforeClass
public static void beforeClass() {

channel = ManagedChannelBuilder.forAddress(TEST_HOSTNAME, TEST_PORT).usePlaintext(true).build();
DgraphBlockingStub blockingStub = DgraphGrpc.newBlockingStub(channel);
dgraphClient = new DgraphClient(Collections.singletonList(blockingStub));
ManagedChannel channel =
ManagedChannelBuilder.forAddress(TEST_HOSTNAME, TEST_PORT).usePlaintext(true).build();
dgraphClient = new DgraphClient(new DgraphClientPool(Collections.singletonList(channel)));

dgraphClient.alter(Operation.newBuilder().setDropAll(true).build());
}

@AfterClass
public static void afterClass() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
dgraphClient.close();
}
}

0 comments on commit ab161db

Please sign in to comment.