diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/maxgraph/servers/Store.java b/interactive_engine/groot-server/src/main/java/com/alibaba/maxgraph/servers/Store.java index 9c31093182cc..3645d34c7ceb 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/maxgraph/servers/Store.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/maxgraph/servers/Store.java @@ -51,9 +51,9 @@ public Store(Configs configs) { NameResolver.Factory nameResolverFactory = new MaxGraphNameResolverFactory(this.discovery); this.channelManager = new ChannelManager(configs, nameResolverFactory); this.metaService = new DefaultMetaService(configs); - this.storeService = new StoreService(configs, this.metaService); - SnapshotCommitter snapshotCommitter = new DefaultSnapshotCommitter(this.channelManager); MetricsCollector metricsCollector = new MetricsCollector(configs); + this.storeService = new StoreService(configs, this.metaService, metricsCollector); + SnapshotCommitter snapshotCommitter = new DefaultSnapshotCommitter(this.channelManager); MetricsCollectService metricsCollectService = new MetricsCollectService(metricsCollector); this.writerAgent = new WriterAgent( diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/maxgraph/tests/store/StoreServiceTest.java b/interactive_engine/groot-server/src/test/java/com/alibaba/maxgraph/tests/store/StoreServiceTest.java index cb72e3b0c9eb..9752cecf45ce 100644 --- a/interactive_engine/groot-server/src/test/java/com/alibaba/maxgraph/tests/store/StoreServiceTest.java +++ b/interactive_engine/groot-server/src/test/java/com/alibaba/maxgraph/tests/store/StoreServiceTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.*; import com.alibaba.graphscope.groot.meta.MetaService; +import com.alibaba.graphscope.groot.metrics.MetricsCollector; import com.alibaba.graphscope.groot.operation.OperationBatch; import com.alibaba.graphscope.groot.operation.OperationBlob; import com.alibaba.graphscope.groot.operation.StoreDataBatch; @@ -40,7 +41,8 @@ void testStoreService() throws IOException, InterruptedException, ExecutionExcep MetaService mockMetaService = mock(MetaService.class); when(mockMetaService.getPartitionsByStoreId(0)).thenReturn(Arrays.asList(0)); - StoreService spyStoreService = spy(new StoreService(configs, mockMetaService)); + StoreService spyStoreService = + spy(new StoreService(configs, mockMetaService, new MetricsCollector(configs))); GraphPartition mockGraphPartition = mock(GraphPartition.class); when(mockGraphPartition.recover()).thenReturn(10L); diff --git a/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java b/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java index aedc4e6f87f6..9443fc1f44ba 100644 --- a/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java +++ b/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java @@ -1,5 +1,6 @@ package com.alibaba.graphscope.groot.frontend; +import com.alibaba.graphscope.groot.CompletionCallback; import com.alibaba.graphscope.groot.frontend.write.GraphWriter; import com.alibaba.graphscope.groot.frontend.write.WriteRequest; import com.alibaba.graphscope.proto.write.*; @@ -53,10 +54,34 @@ public void batchWrite( for (WriteRequestPb writeRequestPb : request.getWriteRequestsList()) { writeRequests.add(WriteRequest.parseProto(writeRequestPb)); } - long snapshotId = graphWriter.writeBatch(requestId, writeSession, writeRequests); - responseObserver.onNext( - BatchWriteResponse.newBuilder().setSnapshotId(snapshotId).build()); - responseObserver.onCompleted(); + graphWriter.writeBatch( + requestId, + writeSession, + writeRequests, + new CompletionCallback() { + @Override + public void onCompleted(Long res) { + responseObserver.onNext( + BatchWriteResponse.newBuilder().setSnapshotId(res).build()); + responseObserver.onCompleted(); + } + + @Override + public void onError(Throwable t) { + logger.error( + "batch write callback error. request [" + + requestId + + "] session [" + + writeSession + + "]", + t); + responseObserver.onError( + Status.INTERNAL + .withDescription(t.getMessage()) + .asRuntimeException()); + } + }); + } catch (Exception e) { logger.error( "batchWrite failed. request [" + requestId + "] session [" + writeSession + "]", diff --git a/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/IngestorWriteClient.java b/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/IngestorWriteClient.java index 0e4d28d83f84..80a8fa3ce89c 100644 --- a/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/IngestorWriteClient.java +++ b/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/IngestorWriteClient.java @@ -13,6 +13,7 @@ */ package com.alibaba.graphscope.groot.frontend; +import com.alibaba.graphscope.groot.CompletionCallback; import com.alibaba.graphscope.groot.operation.BatchId; import com.alibaba.graphscope.groot.operation.OperationBatch; import com.alibaba.graphscope.groot.rpc.RpcClient; @@ -21,14 +22,17 @@ import com.alibaba.maxgraph.proto.groot.WriteIngestorResponse; import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; public class IngestorWriteClient extends RpcClient { private IngestorWriteGrpc.IngestorWriteBlockingStub stub; + private IngestorWriteGrpc.IngestorWriteStub asyncStub; public IngestorWriteClient(ManagedChannel channel) { super(channel); this.stub = IngestorWriteGrpc.newBlockingStub(channel); + this.asyncStub = IngestorWriteGrpc.newStub(channel); } public IngestorWriteClient(IngestorWriteGrpc.IngestorWriteBlockingStub stub) { @@ -46,4 +50,33 @@ public BatchId writeIngestor(String requestId, int queueId, OperationBatch opera WriteIngestorResponse response = this.stub.writeIngestor(request); return new BatchId(response.getSnapshotId()); } + + public void writeIngestorAsync( + String requestId, + int queueId, + OperationBatch operationBatch, + CompletionCallback callback) { + WriteIngestorRequest request = + WriteIngestorRequest.newBuilder() + .setRequestId(requestId) + .setQueueId(queueId) + .setOperationBatch(operationBatch.toProto()) + .build(); + this.asyncStub.writeIngestor( + request, + new StreamObserver() { + @Override + public void onNext(WriteIngestorResponse response) { + callback.onCompleted(response.getSnapshotId()); + } + + @Override + public void onError(Throwable t) { + callback.onError(t); + } + + @Override + public void onCompleted() {} + }); + } } diff --git a/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java b/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java index 47efe0dac1f6..5d2f7265daa7 100644 --- a/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java +++ b/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java @@ -1,11 +1,11 @@ package com.alibaba.graphscope.groot.frontend.write; +import com.alibaba.graphscope.groot.CompletionCallback; import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.frontend.IngestorWriteClient; import com.alibaba.graphscope.groot.meta.MetaService; import com.alibaba.graphscope.groot.metrics.MetricsAgent; import com.alibaba.graphscope.groot.metrics.MetricsCollector; -import com.alibaba.graphscope.groot.operation.BatchId; import com.alibaba.graphscope.groot.operation.EdgeId; import com.alibaba.graphscope.groot.operation.LabelId; import com.alibaba.graphscope.groot.operation.OperationBatch; @@ -17,6 +17,7 @@ import com.alibaba.graphscope.groot.schema.PropertyValue; import com.alibaba.maxgraph.common.util.PkHashUtils; import com.alibaba.maxgraph.common.util.WriteSessionUtil; +import com.alibaba.maxgraph.compiler.api.exception.MaxGraphException; import com.alibaba.maxgraph.compiler.api.exception.PropertyDefNotFoundException; import com.alibaba.maxgraph.compiler.api.schema.DataType; import com.alibaba.maxgraph.compiler.api.schema.GraphElement; @@ -27,8 +28,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class GraphWriter implements MetricsAgent { @@ -37,6 +41,7 @@ public class GraphWriter implements MetricsAgent { public static final String WRITE_REQUESTS_PER_SECOND = "write.requests.per.second"; public static final String INGESTOR_BLOCK_TIME_MS = "ingestor.block.time.ms"; public static final String INGESTOR_BLOCK_TIME_AVG_MS = "ingestor.block.time.avg.ms"; + public static final String PENDING_WRITE_COUNT = "pending.write.count"; private AtomicLong writeRequestsTotal; private volatile long lastUpdateWriteRequestsTotal; @@ -45,6 +50,7 @@ public class GraphWriter implements MetricsAgent { private AtomicLong ingestorBlockTimeNano; private volatile long ingestorBlockTimeAvgMs; private volatile long lastUpdateIngestorBlockTimeNano; + private AtomicInteger pendingWriteCount; private SnapshotCache snapshotCache; private EdgeIdGenerator edgeIdGenerator; @@ -68,6 +74,35 @@ public GraphWriter( public long writeBatch( String requestId, String writeSession, List writeRequests) { + CompletableFuture future = new CompletableFuture<>(); + writeBatch( + requestId, + writeSession, + writeRequests, + new CompletionCallback() { + @Override + public void onCompleted(Long res) { + future.complete(res); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + }); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new MaxGraphException(e); + } + } + + public void writeBatch( + String requestId, + String writeSession, + List writeRequests, + CompletionCallback callback) { + this.pendingWriteCount.incrementAndGet(); GraphSchema schema = snapshotCache.getSnapshotWithSchema().getGraphDef(); OperationBatch.Builder batchBuilder = OperationBatch.newBuilder(); for (WriteRequest writeRequest : writeRequests) { @@ -101,16 +136,36 @@ public long writeBatch( int writeQueueId = getWriteQueueId(writeSession); int ingestorId = this.metaService.getIngestorIdForQueue(writeQueueId); long startTimeNano = System.nanoTime(); - BatchId batchId = - this.ingestWriteClients - .getClient(ingestorId) - .writeIngestor(requestId, writeQueueId, operationBatch); - long ingestorCompleteTimeNano = System.nanoTime(); - long writeSnapshotId = batchId.getSnapshotId(); - this.lastWrittenSnapshotId.updateAndGet(x -> x < writeSnapshotId ? writeSnapshotId : x); - this.writeRequestsTotal.addAndGet(writeRequests.size()); - this.ingestorBlockTimeNano.addAndGet(ingestorCompleteTimeNano - startTimeNano); - return writeSnapshotId; + this.ingestWriteClients + .getClient(ingestorId) + .writeIngestorAsync( + requestId, + writeQueueId, + operationBatch, + new CompletionCallback() { + @Override + public void onCompleted(Long res) { + long writeSnapshotId = res; + lastWrittenSnapshotId.updateAndGet( + x -> x < writeSnapshotId ? writeSnapshotId : x); + writeRequestsTotal.addAndGet(writeRequests.size()); + finish(); + callback.onCompleted(res); + } + + @Override + public void onError(Throwable t) { + finish(); + callback.onError(t); + } + + void finish() { + long ingestorCompleteTimeNano = System.nanoTime(); + ingestorBlockTimeNano.addAndGet( + ingestorCompleteTimeNano - startTimeNano); + pendingWriteCount.decrementAndGet(); + } + }); } public boolean flushSnapshot(long snapshotId, long waitTimeMs) throws InterruptedException { @@ -359,6 +414,7 @@ public void initMetrics() { this.writeRequestsPerSecond = 0L; this.ingestorBlockTimeNano = new AtomicLong(0L); this.lastUpdateIngestorBlockTimeNano = 0L; + this.pendingWriteCount = new AtomicInteger(0); } @Override @@ -369,6 +425,7 @@ public Map getMetrics() { put(WRITE_REQUESTS_PER_SECOND, String.valueOf(writeRequestsPerSecond)); put(INGESTOR_BLOCK_TIME_MS, String.valueOf(ingestorBlockTimeNano.get() / 1000000)); put(INGESTOR_BLOCK_TIME_AVG_MS, String.valueOf(ingestorBlockTimeAvgMs)); + put(PENDING_WRITE_COUNT, String.valueOf(pendingWriteCount.get())); } }; } @@ -379,7 +436,8 @@ public String[] getMetricKeys() { WRITE_REQUESTS_TOTAL, WRITE_REQUESTS_PER_SECOND, INGESTOR_BLOCK_TIME_MS, - INGESTOR_BLOCK_TIME_AVG_MS + INGESTOR_BLOCK_TIME_AVG_MS, + PENDING_WRITE_COUNT, }; } diff --git a/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/schema/GraphVertexImpl.java b/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/schema/GraphVertexImpl.java index 48765a0fd3b5..4a3aae854447 100644 --- a/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/schema/GraphVertexImpl.java +++ b/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/schema/GraphVertexImpl.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; public class GraphVertexImpl implements GraphVertex { @@ -52,7 +53,10 @@ public List getPkPropertyIndices() { @Override public List getPrimaryKeyList() { - return null; + List propertyList = getPropertyList(); + return getPkPropertyIndices().stream() + .map(i -> propertyList.get(i)) + .collect(Collectors.toList()); } @Override diff --git a/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 21bb7dbdfea9..7a8c38595718 100644 --- a/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -15,6 +15,9 @@ import com.alibaba.graphscope.groot.CompletionCallback; import com.alibaba.graphscope.groot.meta.MetaService; +import com.alibaba.graphscope.groot.metrics.AvgMetric; +import com.alibaba.graphscope.groot.metrics.MetricsAgent; +import com.alibaba.graphscope.groot.metrics.MetricsCollector; import com.alibaba.graphscope.groot.operation.OperationBatch; import com.alibaba.graphscope.groot.operation.StoreDataBatch; import com.alibaba.graphscope.groot.store.jna.JnaGraphStore; @@ -22,6 +25,7 @@ import com.alibaba.maxgraph.common.config.Configs; import com.alibaba.maxgraph.common.config.StoreConfig; import com.alibaba.maxgraph.common.util.ThreadFactoryUtils; +import com.alibaba.maxgraph.compiler.api.exception.MaxGraphException; import com.alibaba.maxgraph.proto.groot.GraphDefPb; import org.apache.hadoop.conf.Configuration; @@ -45,10 +49,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; -public class StoreService { +public class StoreService implements MetricsAgent { private static final Logger logger = LoggerFactory.getLogger(StoreService.class); + private static final String PARTITION_WRITE_PER_SECOND_MS = "partition.write.per.second.ms"; + private Configs configs; private int storeId; private int writeThreadCount; @@ -57,15 +64,31 @@ public class StoreService { private ExecutorService writeExecutor, ingestExecutor; private volatile boolean shouldStop = true; - public StoreService(Configs configs, MetaService metaService) { + private volatile long lastUpdateTime; + private Map partitionToMetric; + + public StoreService( + Configs configs, MetaService metaService, MetricsCollector metricsCollector) { this.configs = configs; this.storeId = CommonConfig.NODE_IDX.get(configs); this.writeThreadCount = StoreConfig.STORE_WRITE_THREAD_COUNT.get(configs); this.metaService = metaService; + metricsCollector.register(this, () -> updateMetrics()); } public void start() throws IOException { logger.info("starting StoreService..."); + List partitionIds = this.metaService.getPartitionsByStoreId(this.storeId); + this.idToPartition = new HashMap<>(partitionIds.size()); + for (int partitionId : partitionIds) { + try { + GraphPartition partition = makeGraphPartition(this.configs, partitionId); + this.idToPartition.put(partitionId, partition); + } catch (IOException e) { + throw new MaxGraphException(e); + } + } + initMetrics(); this.shouldStop = false; this.writeExecutor = new ThreadPoolExecutor( @@ -85,12 +108,6 @@ public void start() throws IOException { new LinkedBlockingQueue<>(1), ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "store-ingest", logger)); - List partitionIds = this.metaService.getPartitionsByStoreId(this.storeId); - this.idToPartition = new HashMap<>(partitionIds.size()); - for (int partitionId : partitionIds) { - GraphPartition partition = makeGraphPartition(this.configs, partitionId); - this.idToPartition.put(partitionId, partition); - } logger.info("StoreService started. storeId [" + this.storeId + "]"); } @@ -202,6 +219,7 @@ private Map writeStore( if (partitionId != -1) { // Ignore Marker // Only support partition operation for now + long beforeWriteTime = System.nanoTime(); GraphPartition partition = this.idToPartition.get(partitionId); if (partition == null) { throw new IllegalStateException( @@ -212,6 +230,10 @@ private Map writeStore( if (partition.writeBatch(snapshotId, batch)) { hasDdl.set(true); } + long afterWriteTime = System.nanoTime(); + this.partitionToMetric + .get(partitionId) + .add(afterWriteTime - beforeWriteTime); } } catch (Exception ex) { logger.error( @@ -274,4 +296,45 @@ private void ingestDataInternal(String path) throws IOException { partition.ingestHdfsFile(fs, realPath); } } + + private void updateMetrics() { + long currentTime = System.nanoTime(); + long interval = currentTime - this.lastUpdateTime; + if (this.partitionToMetric != null) { + this.partitionToMetric.values().forEach(m -> m.update(interval)); + } + this.lastUpdateTime = currentTime; + } + + @Override + public void initMetrics() { + this.lastUpdateTime = System.nanoTime(); + this.partitionToMetric = new HashMap<>(); + for (Integer id : this.idToPartition.keySet()) { + this.partitionToMetric.put(id, new AvgMetric()); + } + } + + @Override + public Map getMetrics() { + List partitionWritePerSecondMs = + partitionToMetric.entrySet().stream() + .map( + entry -> + String.format( + "%s:%s", + entry.getKey(), + (int) (1000 * entry.getValue().getAvg()))) + .collect(Collectors.toList()); + return new HashMap() { + { + put(PARTITION_WRITE_PER_SECOND_MS, String.valueOf(partitionWritePerSecondMs)); + } + }; + } + + @Override + public String[] getMetricKeys() { + return new String[] {PARTITION_WRITE_PER_SECOND_MS}; + } } diff --git a/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/Client.java b/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/Client.java index 02cbcb85b530..9d4e1de56dc6 100644 --- a/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/Client.java +++ b/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/Client.java @@ -188,6 +188,10 @@ public void ingestData(String path) { public String loadJsonSchema(Path jsonFile) throws IOException { String json = new String(Files.readAllBytes(jsonFile), StandardCharsets.UTF_8); + return loadJsonSchema(json); + } + + public String loadJsonSchema(String json) { LoadJsonSchemaResponse response = this.stub.loadJsonSchema( LoadJsonSchemaRequest.newBuilder().setSchemaJson(json).build());