Skip to content

Commit

Permalink
Frontend async rpc (#1357)
Browse files Browse the repository at this point in the history
  • Loading branch information
tianliplus committed Mar 10, 2022
1 parent b61457c commit 9d95f32
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public Store(Configs configs) {
NameResolver.Factory nameResolverFactory = new MaxGraphNameResolverFactory(this.discovery);
this.channelManager = new ChannelManager(configs, nameResolverFactory);
this.metaService = new DefaultMetaService(configs);
this.storeService = new StoreService(configs, this.metaService);
SnapshotCommitter snapshotCommitter = new DefaultSnapshotCommitter(this.channelManager);
MetricsCollector metricsCollector = new MetricsCollector(configs);
this.storeService = new StoreService(configs, this.metaService, metricsCollector);
SnapshotCommitter snapshotCommitter = new DefaultSnapshotCommitter(this.channelManager);
MetricsCollectService metricsCollectService = new MetricsCollectService(metricsCollector);
this.writerAgent =
new WriterAgent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.mockito.Mockito.*;

import com.alibaba.graphscope.groot.meta.MetaService;
import com.alibaba.graphscope.groot.metrics.MetricsCollector;
import com.alibaba.graphscope.groot.operation.OperationBatch;
import com.alibaba.graphscope.groot.operation.OperationBlob;
import com.alibaba.graphscope.groot.operation.StoreDataBatch;
Expand All @@ -40,7 +41,8 @@ void testStoreService() throws IOException, InterruptedException, ExecutionExcep
MetaService mockMetaService = mock(MetaService.class);
when(mockMetaService.getPartitionsByStoreId(0)).thenReturn(Arrays.asList(0));

StoreService spyStoreService = spy(new StoreService(configs, mockMetaService));
StoreService spyStoreService =
spy(new StoreService(configs, mockMetaService, new MetricsCollector(configs)));

GraphPartition mockGraphPartition = mock(GraphPartition.class);
when(mockGraphPartition.recover()).thenReturn(10L);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.alibaba.graphscope.groot.frontend;

import com.alibaba.graphscope.groot.CompletionCallback;
import com.alibaba.graphscope.groot.frontend.write.GraphWriter;
import com.alibaba.graphscope.groot.frontend.write.WriteRequest;
import com.alibaba.graphscope.proto.write.*;
Expand Down Expand Up @@ -53,10 +54,34 @@ public void batchWrite(
for (WriteRequestPb writeRequestPb : request.getWriteRequestsList()) {
writeRequests.add(WriteRequest.parseProto(writeRequestPb));
}
long snapshotId = graphWriter.writeBatch(requestId, writeSession, writeRequests);
responseObserver.onNext(
BatchWriteResponse.newBuilder().setSnapshotId(snapshotId).build());
responseObserver.onCompleted();
graphWriter.writeBatch(
requestId,
writeSession,
writeRequests,
new CompletionCallback<Long>() {
@Override
public void onCompleted(Long res) {
responseObserver.onNext(
BatchWriteResponse.newBuilder().setSnapshotId(res).build());
responseObserver.onCompleted();
}

@Override
public void onError(Throwable t) {
logger.error(
"batch write callback error. request ["
+ requestId
+ "] session ["
+ writeSession
+ "]",
t);
responseObserver.onError(
Status.INTERNAL
.withDescription(t.getMessage())
.asRuntimeException());
}
});

} catch (Exception e) {
logger.error(
"batchWrite failed. request [" + requestId + "] session [" + writeSession + "]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.alibaba.graphscope.groot.frontend;

import com.alibaba.graphscope.groot.CompletionCallback;
import com.alibaba.graphscope.groot.operation.BatchId;
import com.alibaba.graphscope.groot.operation.OperationBatch;
import com.alibaba.graphscope.groot.rpc.RpcClient;
Expand All @@ -21,14 +22,17 @@
import com.alibaba.maxgraph.proto.groot.WriteIngestorResponse;

import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;

public class IngestorWriteClient extends RpcClient {

private IngestorWriteGrpc.IngestorWriteBlockingStub stub;
private IngestorWriteGrpc.IngestorWriteStub asyncStub;

public IngestorWriteClient(ManagedChannel channel) {
super(channel);
this.stub = IngestorWriteGrpc.newBlockingStub(channel);
this.asyncStub = IngestorWriteGrpc.newStub(channel);
}

public IngestorWriteClient(IngestorWriteGrpc.IngestorWriteBlockingStub stub) {
Expand All @@ -46,4 +50,33 @@ public BatchId writeIngestor(String requestId, int queueId, OperationBatch opera
WriteIngestorResponse response = this.stub.writeIngestor(request);
return new BatchId(response.getSnapshotId());
}

public void writeIngestorAsync(
String requestId,
int queueId,
OperationBatch operationBatch,
CompletionCallback<Long> callback) {
WriteIngestorRequest request =
WriteIngestorRequest.newBuilder()
.setRequestId(requestId)
.setQueueId(queueId)
.setOperationBatch(operationBatch.toProto())
.build();
this.asyncStub.writeIngestor(
request,
new StreamObserver<WriteIngestorResponse>() {
@Override
public void onNext(WriteIngestorResponse response) {
callback.onCompleted(response.getSnapshotId());
}

@Override
public void onError(Throwable t) {
callback.onError(t);
}

@Override
public void onCompleted() {}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.alibaba.graphscope.groot.frontend.write;

import com.alibaba.graphscope.groot.CompletionCallback;
import com.alibaba.graphscope.groot.SnapshotCache;
import com.alibaba.graphscope.groot.frontend.IngestorWriteClient;
import com.alibaba.graphscope.groot.meta.MetaService;
import com.alibaba.graphscope.groot.metrics.MetricsAgent;
import com.alibaba.graphscope.groot.metrics.MetricsCollector;
import com.alibaba.graphscope.groot.operation.BatchId;
import com.alibaba.graphscope.groot.operation.EdgeId;
import com.alibaba.graphscope.groot.operation.LabelId;
import com.alibaba.graphscope.groot.operation.OperationBatch;
Expand All @@ -17,6 +17,7 @@
import com.alibaba.graphscope.groot.schema.PropertyValue;
import com.alibaba.maxgraph.common.util.PkHashUtils;
import com.alibaba.maxgraph.common.util.WriteSessionUtil;
import com.alibaba.maxgraph.compiler.api.exception.MaxGraphException;
import com.alibaba.maxgraph.compiler.api.exception.PropertyDefNotFoundException;
import com.alibaba.maxgraph.compiler.api.schema.DataType;
import com.alibaba.maxgraph.compiler.api.schema.GraphElement;
Expand All @@ -27,8 +28,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class GraphWriter implements MetricsAgent {
Expand All @@ -37,6 +41,7 @@ public class GraphWriter implements MetricsAgent {
public static final String WRITE_REQUESTS_PER_SECOND = "write.requests.per.second";
public static final String INGESTOR_BLOCK_TIME_MS = "ingestor.block.time.ms";
public static final String INGESTOR_BLOCK_TIME_AVG_MS = "ingestor.block.time.avg.ms";
public static final String PENDING_WRITE_COUNT = "pending.write.count";

private AtomicLong writeRequestsTotal;
private volatile long lastUpdateWriteRequestsTotal;
Expand All @@ -45,6 +50,7 @@ public class GraphWriter implements MetricsAgent {
private AtomicLong ingestorBlockTimeNano;
private volatile long ingestorBlockTimeAvgMs;
private volatile long lastUpdateIngestorBlockTimeNano;
private AtomicInteger pendingWriteCount;

private SnapshotCache snapshotCache;
private EdgeIdGenerator edgeIdGenerator;
Expand All @@ -68,6 +74,35 @@ public GraphWriter(

public long writeBatch(
String requestId, String writeSession, List<WriteRequest> writeRequests) {
CompletableFuture<Long> future = new CompletableFuture<>();
writeBatch(
requestId,
writeSession,
writeRequests,
new CompletionCallback<Long>() {
@Override
public void onCompleted(Long res) {
future.complete(res);
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}
});
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new MaxGraphException(e);
}
}

public void writeBatch(
String requestId,
String writeSession,
List<WriteRequest> writeRequests,
CompletionCallback<Long> callback) {
this.pendingWriteCount.incrementAndGet();
GraphSchema schema = snapshotCache.getSnapshotWithSchema().getGraphDef();
OperationBatch.Builder batchBuilder = OperationBatch.newBuilder();
for (WriteRequest writeRequest : writeRequests) {
Expand Down Expand Up @@ -101,16 +136,36 @@ public long writeBatch(
int writeQueueId = getWriteQueueId(writeSession);
int ingestorId = this.metaService.getIngestorIdForQueue(writeQueueId);
long startTimeNano = System.nanoTime();
BatchId batchId =
this.ingestWriteClients
.getClient(ingestorId)
.writeIngestor(requestId, writeQueueId, operationBatch);
long ingestorCompleteTimeNano = System.nanoTime();
long writeSnapshotId = batchId.getSnapshotId();
this.lastWrittenSnapshotId.updateAndGet(x -> x < writeSnapshotId ? writeSnapshotId : x);
this.writeRequestsTotal.addAndGet(writeRequests.size());
this.ingestorBlockTimeNano.addAndGet(ingestorCompleteTimeNano - startTimeNano);
return writeSnapshotId;
this.ingestWriteClients
.getClient(ingestorId)
.writeIngestorAsync(
requestId,
writeQueueId,
operationBatch,
new CompletionCallback<Long>() {
@Override
public void onCompleted(Long res) {
long writeSnapshotId = res;
lastWrittenSnapshotId.updateAndGet(
x -> x < writeSnapshotId ? writeSnapshotId : x);
writeRequestsTotal.addAndGet(writeRequests.size());
finish();
callback.onCompleted(res);
}

@Override
public void onError(Throwable t) {
finish();
callback.onError(t);
}

void finish() {
long ingestorCompleteTimeNano = System.nanoTime();
ingestorBlockTimeNano.addAndGet(
ingestorCompleteTimeNano - startTimeNano);
pendingWriteCount.decrementAndGet();
}
});
}

public boolean flushSnapshot(long snapshotId, long waitTimeMs) throws InterruptedException {
Expand Down Expand Up @@ -359,6 +414,7 @@ public void initMetrics() {
this.writeRequestsPerSecond = 0L;
this.ingestorBlockTimeNano = new AtomicLong(0L);
this.lastUpdateIngestorBlockTimeNano = 0L;
this.pendingWriteCount = new AtomicInteger(0);
}

@Override
Expand All @@ -369,6 +425,7 @@ public Map<String, String> getMetrics() {
put(WRITE_REQUESTS_PER_SECOND, String.valueOf(writeRequestsPerSecond));
put(INGESTOR_BLOCK_TIME_MS, String.valueOf(ingestorBlockTimeNano.get() / 1000000));
put(INGESTOR_BLOCK_TIME_AVG_MS, String.valueOf(ingestorBlockTimeAvgMs));
put(PENDING_WRITE_COUNT, String.valueOf(pendingWriteCount.get()));
}
};
}
Expand All @@ -379,7 +436,8 @@ public String[] getMetricKeys() {
WRITE_REQUESTS_TOTAL,
WRITE_REQUESTS_PER_SECOND,
INGESTOR_BLOCK_TIME_MS,
INGESTOR_BLOCK_TIME_AVG_MS
INGESTOR_BLOCK_TIME_AVG_MS,
PENDING_WRITE_COUNT,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class GraphVertexImpl implements GraphVertex {

Expand Down Expand Up @@ -52,7 +53,10 @@ public List<Integer> getPkPropertyIndices() {

@Override
public List<GraphProperty> getPrimaryKeyList() {
return null;
List<GraphProperty> propertyList = getPropertyList();
return getPkPropertyIndices().stream()
.map(i -> propertyList.get(i))
.collect(Collectors.toList());
}

@Override
Expand Down
Loading

0 comments on commit 9d95f32

Please sign in to comment.