diff --git a/charts/graphscope-store-one-pod/templates/configmap.yaml b/charts/graphscope-store-one-pod/templates/configmap.yaml index e0841a57085b..ea2a9168d074 100644 --- a/charts/graphscope-store-one-pod/templates/configmap.yaml +++ b/charts/graphscope-store-one-pod/templates/configmap.yaml @@ -15,7 +15,6 @@ metadata: data: groot.config: |- ## Common Config - graph.name={{ .Values.graphName }} role.name= node.idx= rpc.port=0 @@ -26,7 +25,6 @@ data: coordinator.node.count=1 ingestor.queue.count={{ .Values.ingestor.replicaCount }} partition.count={{ .Values.store.replicaCount | mul 16 }} - engine.type=gaia discovery.mode=zookeeper ## Frontend Config @@ -34,17 +32,20 @@ data: enable.hash.generate.eid={{ .Values.enableHashGenerateEid }} ## Ingestor Config - ingestor.queue.buffer.size={{ .Values.ingestorQueueBufferSize }} - ingestor.sender.buffer.size={{ .Values.ingestorSenderBufferSize }} + ingestor.queue.buffer.max.count={{ .Values.ingestorQueueBufferMaxCount }} + ingestor.sender.buffer.max.count={{ .Values.ingestorSenderBufferMaxCount }} + ingestor.sender.operation.max.count={{ .Values.ingestorSenderOperationMaxCount }} ## Coordinator Config snapshot.increase.interval.ms={{ .Values.snapshotIncreaseIntervalMs }} offsets.persist.interval.ms={{ .Values.offsetsPersistIntervalMs }} file.meta.store.path={{ .Values.fileMetaStorePath }} + log.recycle.enable={{ .Values.logRecycleEnable }} ## Store Config store.data.path={{ .Values.storeDataPath }} store.write.thread.count={{ .Values.storeWriteThreadCount }} + store.queue.buffer.size={{ .Values.storeQueueBufferSize }} ## Frontend Config gremlin.server.port=12312 @@ -69,11 +70,11 @@ data: pegasus.output.capacity=16 pegasus.hosts=localhost:8080 - kafka.test.cluster.enable={{ not .Values.kafka.enabled }} ## Kafka Config kafka.servers=KAFKA_SERVERS kafka.topic={{ .Values.kafkaTopic }} kafka.producer.custom.configs={{ .Values.kafkaProducerCustomConfigs }} + kafka.test.cluster.enable={{ not .Values.kafka.enabled }} ## Zk Config zk.base.path=/graphscope/groot diff --git a/charts/graphscope-store-one-pod/values.yaml b/charts/graphscope-store-one-pod/values.yaml index 91198a6b3e5e..ddbfa16a9029 100644 --- a/charts/graphscope-store-one-pod/values.yaml +++ b/charts/graphscope-store-one-pod/values.yaml @@ -348,19 +348,18 @@ externalKafka: ## javaOpts: "" -graphName: "graphscope" - rpcMaxBytesMb: 20 ## Ingestor Config -ingestorQueueBufferSize: 1024 - -ingestorSenderBufferSize: 1024 +ingestorQueueBufferMaxCount: 102400 +ingestorSenderBufferMaxCount: 102400 +ingestorSenderOperationMaxCount: 102400 ## Coordinator Config snapshotIncreaseIntervalMs: 1000 offsetsPersistIntervalMs: 3000 fileMetaStorePath: "/var/lib/graphscope-store/meta" +logRecycleEnable: true ## Frontend config enableHashGenerateEid: false @@ -368,12 +367,13 @@ enableHashGenerateEid: false ## Store Config storeDataPath: "/var/lib/graphscope-store" storeWriteThreadCount: 1 +storeQueueBufferSize: 102400 ## Kafka Config kafkaTopic: "graphscope" kafkaProducerCustomConfigs: "" -## Key-value pair seperated by ; +## Key-value pair separated by ; ## For example extraConfig="k1=v1;k2=v2" extraConfig: "" @@ -383,5 +383,5 @@ auth: pegasus: worker: - num: 4 + num: 1 timeout: 240000 diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index 9f1647f6b3cb..291c618863c2 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -15,7 +15,6 @@ metadata: data: groot.config: |- ## Common Config - graph.name={{ .Values.graphName }} role.name=ROLE node.idx=INDEX rpc.port=55555 @@ -26,7 +25,6 @@ data: coordinator.node.count={{ .Values.coordinator.replicaCount }} ingestor.queue.count={{ .Values.ingestor.replicaCount }} partition.count={{ .Values.store.replicaCount | mul 16 }} - engine.type={{ .Values.engineType }} discovery.mode={{ .Values.discoveryMode }} ## Frontend Config @@ -36,33 +34,32 @@ data: enable.hash.generate.eid={{ .Values.enableHashGenerateEid }} ## Ingestor Config - ingestor.queue.buffer.size={{ .Values.ingestorQueueBufferSize }} - ingestor.sender.buffer.size={{ .Values.ingestorSenderBufferSize }} + ingestor.queue.buffer.max.count={{ .Values.ingestorQueueBufferMaxCount }} + ingestor.sender.buffer.max.count={{ .Values.ingestorSenderBufferMaxCount }} + ingestor.sender.operation.max.count={{ .Values.ingestorSenderOperationMaxCount }} ## Coordinator Config snapshot.increase.interval.ms={{ .Values.snapshotIncreaseIntervalMs }} offsets.persist.interval.ms={{ .Values.offsetsPersistIntervalMs }} file.meta.store.path={{ .Values.fileMetaStorePath }} + log.recycle.enable={{ .Values.logRecycleEnable }} ## Store Config store.data.path={{ .Values.storeDataPath }} store.write.thread.count={{ .Values.storeWriteThreadCount }} + store.queue.buffer.size={{ .Values.storeQueueBufferSize }} ## Kafka Config kafka.servers=KAFKA_SERVERS kafka.topic={{ .Values.kafkaTopic }} kafka.producer.custom.configs={{ .Values.kafkaProducerCustomConfigs }} + kafka.test.cluster.enable=false ## Frontend Config gremlin.server.port=12312 ## disable neo4j when launching groot server by default neo4j.bolt.server.disabled=true - executor.worker.per.process={{ .Values.executorWorkerPerProcess }} - executor.query.thread.count={{ .Values.executorQueryThreadCount }} - executor.query.manager.thread.count={{ .Values.executorQueryManagerThreadCount }} - executor.query.store.thread.count={{ .Values.executorQueryStoreThreadCount }} - dns.name.prefix.frontend=FRONTEND dns.name.prefix.ingestor=INGESTOR dns.name.prefix.coordinator=COORDINATOR diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index fbfdf81fa808..4306818e722f 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -495,27 +495,25 @@ externalKafka: ## javaOpts: "" -graphName: "graphscope" - rpcMaxBytesMb: 20 -engineType: "gaia" - discoveryMode: "file" ## Ingestor Config -ingestorQueueBufferSize: 1024 - -ingestorSenderBufferSize: 1024 +ingestorQueueBufferMaxCount: 102400 +ingestorSenderBufferMaxCount: 102400 +ingestorSenderOperationMaxCount: 102400 ## Coordinator Config snapshotIncreaseIntervalMs: 1000 offsetsPersistIntervalMs: 3000 fileMetaStorePath: "/etc/groot/my.meta" +logRecycleEnable: true ## Store Config storeDataPath: "/var/lib/graphscope-store" storeWriteThreadCount: 1 +storeQueueBufferSize: 102400 ## Kafka Config kafkaTopic: "graphscope" @@ -525,12 +523,7 @@ kafkaProducerCustomConfigs: "" enableHashGenerateEid: false # gremlinServerPort: 12312 -executorWorkerPerProcess: 2 -executorQueryThreadCount: 2 -executorQueryManagerThreadCount: 2 -executorQueryStoreThreadCount: 2 - -## Key-value pair seperated by ; +## Key-value pair separated by ; ## For example extraConfig="k1=v1;k2=v2" extraConfig: "" diff --git a/docs/storage_engine/groot.md b/docs/storage_engine/groot.md index 32378e150c7d..70e1a64c15f0 100644 --- a/docs/storage_engine/groot.md +++ b/docs/storage_engine/groot.md @@ -568,6 +568,32 @@ APIs including: Refer to [RealtimeWrite.java](https://github.com/alibaba/GraphScope/blob/main/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java) for examples. + +### Other features + +Groot could enable user to replay realtime write records from a specific offset, or a timestamp, this is useful when you want to restore some records before +a offline load finished, since offload will overwrite all records. + +You can only specify one of `offset` and `timestamp`. The other unused one must be set to -1. If not, `offset` will take precedence. + +Example API: +- Python: + ```python + import time + import graphscope + conn = graphscope.conn() + current_timestamp = int(time.time() * 1000) - 100 * 60 * 1000 + + r = conn.replay_records(-1, current_timestamp) + ``` +- Java + + ```java + GrootClient client = GrootClientBuilder.build(); + long timestamp = System.currentTimeMillis(); + client.replayRecords(-1, timestamp); + ``` + ## Uninstalling and Restarting ### Uninstall Groot diff --git a/interactive_engine/assembly/src/conf/groot/config.template b/interactive_engine/assembly/src/conf/groot/config.template index e4341a20a767..8a3f055be2b4 100644 --- a/interactive_engine/assembly/src/conf/groot/config.template +++ b/interactive_engine/assembly/src/conf/groot/config.template @@ -1,31 +1,21 @@ ## Common Config -graph.name=graphscope role.name= node.idx= rpc.port=0 -rpc.max.bytes.mb=4 store.node.count=1 frontend.node.count=1 ingestor.node.count=2 coordinator.node.count=1 ingestor.queue.count=2 -partition.count=1 -engine.type=gaia +partition.count=4 discovery.mode=zookeeper -## Ingestor Config -ingestor.queue.buffer.size=128 -ingestor.sender.buffer.size=128 - ## Coordinator Config -snapshot.increase.interval.ms=1000 -offsets.persist.interval.ms=3000 log.recycle.enable=true file.meta.store.path=./meta ## Store Config store.data.path=./data -store.write.thread.count=1 ## Zk Config zk.base.path=/graphscope/groot @@ -38,24 +28,11 @@ kafka.topic=groot ## Frontend Config gremlin.server.port=12312 -## Backup Config -backup.enable=false -store.backup.thread.count=1 - log4rs.config=LOG4RS_CONFIG -dns.name.prefix.frontend=localhost -dns.name.prefix.ingestor=localhost -dns.name.prefix.coordinator=localhost -dns.name.prefix.store=localhost - -gaia.enable=true neo4j.bolt.server.disabled=true pegasus.worker.num=2 -pegasus.timeout=240000 -pegasus.batch.size=1024 -pegasus.output.capacity=16 pegasus.hosts=localhost:8080 kafka.test.cluster.enable=true \ No newline at end of file diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java index ba3936d14862..561577b6a182 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java @@ -55,8 +55,6 @@ public class CommonConfig { public static final Config PARTITION_COUNT = Config.intConfig("partition.count", 1); - public static final Config GRAPH_NAME = Config.stringConfig("graph.name", "graphscope"); - public static final Config METRIC_UPDATE_INTERVAL_MS = Config.longConfig("metric.update.interval.ms", 5000L); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/Config.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/Config.java index c292baf3f325..1dd214f0fc27 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/Config.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/Config.java @@ -18,10 +18,10 @@ import java.util.function.Function; public class Config { - private String key; - private String defaultVal; + private final String key; + private final String defaultVal; - private Function parseFunc; + private final Function parseFunc; public Config(String key, String defaultVal, Function parseFunc) { this.key = key; diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/Configs.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/Configs.java index c7c552b59ef0..f238c7345c59 100755 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/Configs.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/Configs.java @@ -25,7 +25,7 @@ public class Configs { - private Properties properties; + private final Properties properties; public Configs() { this.properties = new Properties(); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java index a7d54f2cb97e..3ac622586ef0 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java @@ -26,7 +26,7 @@ public class CoordinatorConfig { Config.boolConfig("log.recycle.enable", true); public static final Config LOG_RECYCLE_INTERVAL_SECOND = - Config.longConfig("log.recycle.interval.second", 60L); + Config.longConfig("log.recycle.interval.second", 600L); public static final Config FILE_META_STORE_PATH = Config.stringConfig("file.meta.store.path", "./meta"); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/GaiaConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/GaiaConfig.java index 6b608d870613..23876992a19c 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/GaiaConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/GaiaConfig.java @@ -1,7 +1,7 @@ package com.alibaba.graphscope.groot.common.config; public class GaiaConfig { - public static final Config GAIA_ENABLE = Config.boolConfig("gaia.enable", false); + public static final Config GAIA_ENABLE = Config.boolConfig("gaia.enable", true); public static final Config GAIA_REPORT = Config.boolConfig("gaia.report", false); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java index 238b5c1f9771..c8e05cfb73bb 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/IngestorConfig.java @@ -15,13 +15,13 @@ public class IngestorConfig { public static final Config INGESTOR_QUEUE_BUFFER_MAX_COUNT = - Config.intConfig("ingestor.queue.buffer.max.count", 1024); + Config.intConfig("ingestor.queue.buffer.max.count", 102400); public static final Config INGESTOR_SENDER_BUFFER_MAX_COUNT = - Config.intConfig("ingestor.sender.buffer.max.count", 1024); + Config.intConfig("ingestor.sender.buffer.max.count", 102400); public static final Config INGESTOR_SENDER_OPERATION_MAX_COUNT = - Config.intConfig("ingestor.sender.operation.max.count", 8192); + Config.intConfig("ingestor.sender.operation.max.count", 102400); public static final Config INGESTOR_CHECK_PROCESSOR_INTERVAL_MS = Config.longConfig("ingestor.check.processor.interval.ms", 3000L); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java index 23f96dbb2e49..fe2d1be9ca33 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java @@ -15,13 +15,13 @@ public class StoreConfig { public static final Config STORE_DATA_PATH = - Config.stringConfig("store.data.path", "/groot_data"); + Config.stringConfig("store.data.path", "./data"); public static final Config STORE_WRITE_THREAD_COUNT = Config.intConfig("store.write.thread.count", 1); public static final Config STORE_QUEUE_BUFFER_SIZE = - Config.intConfig("store.queue.buffer.size", 1024); + Config.intConfig("store.queue.buffer.size", 102400); public static final Config STORE_QUEUE_WAIT_MS = Config.longConfig("store.queue.wait.ms", 3000L); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/RpcUtils.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/RpcUtils.java index ba01608fda6f..a450cc4912ea 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/RpcUtils.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/RpcUtils.java @@ -27,7 +27,7 @@ public class RpcUtils { public static final Logger logger = LoggerFactory.getLogger(RpcUtils.class); public static Executor createGrpcExecutor(int threadCount) { - logger.info("create grpc executor, thread count [" + threadCount + "]"); + logger.debug("create grpc executor, thread count [" + threadCount + "]"); return new ForkJoinPool( threadCount, new ForkJoinPool.ForkJoinWorkerThreadFactory() { diff --git a/interactive_engine/compiler/conf/ir.compiler.properties b/interactive_engine/compiler/conf/ir.compiler.properties index 48c75eb91ae0..d17d8af1ad09 100644 --- a/interactive_engine/compiler/conf/ir.compiler.properties +++ b/interactive_engine/compiler/conf/ir.compiler.properties @@ -37,7 +37,7 @@ graph.planner.rules: FilterIntoJoinRule, FilterMatchRule, NotMatchToAntiJoinRule # gremlin.server.port: 8182 # disable neo4j server -# neo4j.bolt.server.disabled: true +neo4j.bolt.server.disabled: false # set neo4j server port if neo4j server is enabled # neo4j.bolt.server.port: 7687 diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java index 9e9f9037264f..823581b84edf 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java @@ -24,7 +24,7 @@ public class FrontendConfig { Config.intConfig("gremlin.server.port", 8182); public static final Config NEO4J_BOLT_SERVER_DISABLED = - Config.boolConfig("neo4j.bolt.server.disabled", false); + Config.boolConfig("neo4j.bolt.server.disabled", true); public static final Config NEO4J_BOLT_SERVER_PORT = Config.intConfig("neo4j.bolt.server.port", 7687); diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java index 186c9cf65a04..9ae66720a633 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java @@ -75,11 +75,18 @@ private BatchWriteRequest.Builder getNewWriteBuilder() { * Block until this snapshot becomes available. * @param snapshotId the snapshot id to be flushed */ - public void remoteFlush(long snapshotId) { - if (snapshotId != 0) { - this.writeStub.remoteFlush( - RemoteFlushRequest.newBuilder().setSnapshotId(snapshotId).build()); - } + public boolean remoteFlush(long snapshotId) { + RemoteFlushResponse resp = + this.writeStub.remoteFlush( + RemoteFlushRequest.newBuilder().setSnapshotId(snapshotId).build()); + return resp.getSuccess(); + } + + public List replayRecords(long offset, long timestamp) { + ReplayRecordsRequest req = + ReplayRecordsRequest.newBuilder().setOffset(offset).setTimestamp(timestamp).build(); + ReplayRecordsResponse resp = writeStub.replayRecords(req); + return resp.getSnapshotIdList(); } private long modifyVertex(Vertex vertex, WriteTypePb writeType) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/CoordinatorSnapshotClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/CoordinatorSnapshotClient.java index 9a9432fbd2ca..3a29c9459d42 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/CoordinatorSnapshotClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/CoordinatorSnapshotClient.java @@ -11,7 +11,7 @@ // send rpc to CoordinatorSnapshotService in Store, to report minimum snapshot current used public class CoordinatorSnapshotClient extends RpcClient { private static final Logger logger = LoggerFactory.getLogger(CoordinatorSnapshotClient.class); - private CoordinatorSnapshotServiceGrpc.CoordinatorSnapshotServiceBlockingStub stub; + private final CoordinatorSnapshotServiceGrpc.CoordinatorSnapshotServiceBlockingStub stub; public CoordinatorSnapshotClient(ManagedChannel channel) { super(channel); @@ -29,8 +29,7 @@ public void synchronizeSnapshot(long snapshotId) throws RuntimeException { SynchronizeMinQuerySnapshotIdRequest.newBuilder().setSnapshotId(snapshotId).build(); SynchronizeMinQuerySnapshotIdResponse res = stub.synchronizeMinQuerySnapshotId(req); if (!res.getSuccess()) { - throw new RuntimeException( - "Synchronize snapshot to store failed: {} " + res.getErrMsg()); + throw new RuntimeException("Synchronize snapshot to store failed: " + res.getErrMsg()); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/CoordinatorSnapshotService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/CoordinatorSnapshotService.java index 0a99121e2f5f..69e257962c19 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/CoordinatorSnapshotService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/CoordinatorSnapshotService.java @@ -12,7 +12,7 @@ public class CoordinatorSnapshotService extends CoordinatorSnapshotServiceGrpc.CoordinatorSnapshotServiceImplBase { private static final Logger logger = LoggerFactory.getLogger(CoordinatorSnapshotService.class); - private GarbageCollectManager garbageCollectManager; + private final GarbageCollectManager garbageCollectManager; public CoordinatorSnapshotService(GarbageCollectManager garbageCollectManager) { this.garbageCollectManager = garbageCollectManager; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/DdlWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/DdlWriter.java index 21660fd6c0fc..a1cc89bd03e4 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/DdlWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/DdlWriter.java @@ -20,7 +20,7 @@ public class DdlWriter { - private RoleClients ingestorWriteClients; + private final RoleClients ingestorWriteClients; private int queueId = 0; public DdlWriter(RoleClients ingestorWriteClients) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FileMetaStore.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FileMetaStore.java index e14d015abc4c..8dcd796682c3 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FileMetaStore.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FileMetaStore.java @@ -39,12 +39,12 @@ public class FileMetaStore implements MetaStore { private static final Logger logger = LoggerFactory.getLogger(FileMetaStore.class); - private String workingDir; - private Map pathToSuffix; + private final String workingDir; + private final Map pathToSuffix; public FileMetaStore(Configs configs) { this.workingDir = CoordinatorConfig.FILE_META_STORE_PATH.get(configs); - new File(this.workingDir).mkdirs(); + boolean ret = new File(this.workingDir).mkdirs(); this.pathToSuffix = new ConcurrentHashMap<>(); } @@ -80,13 +80,7 @@ public byte[] read(String path) throws IOException { long realCrc = getCRC32Checksum(res); if (realCrc != crc) { logger.error( - "checksum of file [" - + file0.getAbsolutePath() - + "] is [" - + realCrc - + "], expected [" - + crc - + "]"); + "checksum [{}] is [{}] versus [{}]", file0.getAbsolutePath(), realCrc, crc); res = null; } } @@ -94,7 +88,7 @@ public byte[] read(String path) throws IOException { if (res != null) { return res; } else { - throw new IOException("file0 checksum failed, file1 not exists"); + throw new IOException("file0 checksum failed and file1 not exists"); } } else { try (InputStream is = new BufferedInputStream(new FileInputStream(file1))) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FrontendSnapshotClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FrontendSnapshotClient.java index 64c172817739..18413aef423a 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FrontendSnapshotClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/FrontendSnapshotClient.java @@ -50,7 +50,7 @@ public void advanceQuerySnapshot( } stub.advanceQuerySnapshot( builder.build(), - new StreamObserver() { + new StreamObserver<>() { @Override public void onNext(AdvanceQuerySnapshotResponse response) { long previousSnapshotId = response.getPreviousSnapshotId(); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java index e8f9eda06231..813935b71cd9 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java @@ -16,10 +16,10 @@ public class GarbageCollectManager { private static final Logger logger = LoggerFactory.getLogger(GarbageCollectManager.class); - private Configs configs; - private ConcurrentHashMap hashMap; - private RoleClients clients; - private ScheduledExecutorService updateStoreMinSnapshotScheduler; + private final Configs configs; + private final ConcurrentHashMap hashMap; + private final RoleClients clients; + private ScheduledExecutorService updateScheduler; public GarbageCollectManager(Configs configs, RoleClients clients) { this.configs = configs; @@ -32,11 +32,11 @@ public void put(int frontendId, long snapshotId) { } public void start() { - this.updateStoreMinSnapshotScheduler = + this.updateScheduler = Executors.newSingleThreadScheduledExecutor( ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "update-store-min-snapshot-scheduler", logger)); - this.updateStoreMinSnapshotScheduler.scheduleWithFixedDelay( + this.updateScheduler.scheduleWithFixedDelay( () -> { try { if (!hashMap.isEmpty()) { @@ -44,13 +44,15 @@ public void start() { for (int i = 0; i < CommonConfig.STORE_NODE_COUNT.get(configs); i++) { CoordinatorSnapshotClient client = clients.getClient(i); client.synchronizeSnapshot(offlineVersion); - if (i == 0 && offlineVersion % 100 == 0) { + if (i == 0 && offlineVersion % 1000 == 0) { logger.info("Offline version updated to {}", offlineVersion); } } } } catch (Exception e) { - logger.error("error in updateStoreMinSnapshotScheduler, ignore", e); + logger.error( + "error in updateStoreMinSnapshotScheduler {}, ignore", + e.getMessage()); } }, 5000L, @@ -59,11 +61,10 @@ public void start() { } public void stop() { - if (updateStoreMinSnapshotScheduler != null) { - updateStoreMinSnapshotScheduler.shutdownNow(); + if (updateScheduler != null) { + updateScheduler.shutdownNow(); try { - if (!updateStoreMinSnapshotScheduler.awaitTermination( - 1000, TimeUnit.MILLISECONDS)) { + if (!updateScheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) { logger.error("updateStoreMinSnapshotScheduler await timeout before shutdown"); } } catch (InterruptedException e) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphDestroyer.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphDestroyer.java index 4ee6445bb407..843bf6f05fce 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphDestroyer.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphDestroyer.java @@ -25,9 +25,9 @@ public class GraphDestroyer { private static final Logger logger = LoggerFactory.getLogger(GraphDestroyer.class); - private Configs configs; - private CuratorFramework curator; - private LogService logService; + private final Configs configs; + private final CuratorFramework curator; + private final LogService logService; public GraphDestroyer(Configs configs, CuratorFramework curator, LogService logService) { this.configs = configs; @@ -43,10 +43,9 @@ public void destroyAll() throws Exception { } String zkRoot = ZkConfig.ZK_BASE_PATH.get(configs); Stat stat = this.curator.checkExists().forPath(zkRoot); - if (stat == null) { - return; + if (stat != null) { + this.curator.delete().deletingChildrenIfNeeded().forPath(zkRoot); + logger.info("zk destroyed"); } - this.curator.delete().deletingChildrenIfNeeded().forPath(zkRoot); - logger.info("zk destroyed"); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java index c90805b3eebf..eebfb20dca72 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GraphInitializer.java @@ -31,11 +31,11 @@ public class GraphInitializer { - private Configs configs; - private CuratorFramework curator; - private MetaStore metaStore; - private LogService logService; - private ObjectMapper objectMapper; + private final Configs configs; + private final CuratorFramework curator; + private final MetaStore metaStore; + private final LogService logService; + private final ObjectMapper objectMapper; public GraphInitializer( Configs configs, CuratorFramework curator, MetaStore metaStore, LogService logService) { @@ -67,10 +67,9 @@ private void initializeZkIfNeeded() throws Exception { if (CommonConfig.DISCOVERY_MODE.get(this.configs).equalsIgnoreCase("zookeeper")) { String zkRoot = ZkConfig.ZK_BASE_PATH.get(configs); Stat stat = this.curator.checkExists().forPath(zkRoot); - if (stat != null) { - return; + if (stat == null) { + this.curator.create().creatingParentsIfNeeded().forPath(zkRoot); } - this.curator.create().creatingParentsIfNeeded().forPath(zkRoot); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IdAllocateService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IdAllocateService.java index 4effd41216c2..298cc08aee0f 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IdAllocateService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IdAllocateService.java @@ -14,7 +14,7 @@ public class IdAllocateService extends IdAllocateGrpc.IdAllocateImplBase { private static final Logger logger = LoggerFactory.getLogger(IdAllocateService.class); - private IdAllocator idAllocator; + private final IdAllocator idAllocator; public IdAllocateService(IdAllocator idAllocator) { this.idAllocator = idAllocator; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IdAllocator.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IdAllocator.java index 2559bc44f3a0..130583aaa3b5 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IdAllocator.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IdAllocator.java @@ -11,8 +11,8 @@ public class IdAllocator { public static final String ID_ALLOCATE_INFO_PATH = "id_allocate_info"; - private MetaStore metaStore; - private ObjectMapper objectMapper; + private final MetaStore metaStore; + private final ObjectMapper objectMapper; private volatile long tailId; public IdAllocator(MetaStore metaStore) { @@ -37,8 +37,7 @@ private void recover() throws IOException { throw new FileNotFoundException(ID_ALLOCATE_INFO_PATH); } byte[] b = this.metaStore.read(ID_ALLOCATE_INFO_PATH); - long tailId = this.objectMapper.readValue(b, Long.class); - this.tailId = tailId; + this.tailId = this.objectMapper.readValue(b, Long.class); } public synchronized long allocate(int allocateSize) throws IOException { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestProgressService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestProgressService.java index 832a81151cb6..4812acb32481 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestProgressService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestProgressService.java @@ -27,7 +27,7 @@ public class IngestProgressService extends IngestProgressGrpc.IngestProgressImplBase { private static final Logger logger = LoggerFactory.getLogger(IngestProgressService.class); - private SnapshotManager snapshotManager; + private final SnapshotManager snapshotManager; public IngestProgressService(SnapshotManager snapshotManager) { this.snapshotManager = snapshotManager; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorSnapshotClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorSnapshotClient.java index b2bb5398d053..84d6f2919bef 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorSnapshotClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorSnapshotClient.java @@ -23,7 +23,7 @@ import io.grpc.stub.StreamObserver; public class IngestorSnapshotClient extends RpcClient { - private IngestorSnapshotGrpc.IngestorSnapshotStub stub; + private final IngestorSnapshotGrpc.IngestorSnapshotStub stub; public IngestorSnapshotClient(ManagedChannel channel) { super(channel); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorWriteSnapshotIdNotifier.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorWriteSnapshotIdNotifier.java index 576672126a7c..a76c26978ca9 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorWriteSnapshotIdNotifier.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/IngestorWriteSnapshotIdNotifier.java @@ -25,8 +25,8 @@ public class IngestorWriteSnapshotIdNotifier implements WriteSnapshotIdNotifier private static final Logger logger = LoggerFactory.getLogger(IngestorWriteSnapshotIdNotifier.class); - private RoleClients ingestorSnapshotClients; - private int ingestorCount; + private final RoleClients ingestorSnapshotClients; + private final int ingestorCount; public IngestorWriteSnapshotIdNotifier( Configs configs, RoleClients ingestorSnapshotClients) { @@ -48,31 +48,26 @@ public void notifyWriteSnapshotIdChanged(long snapshotId) { public void onCompleted(Long previousSnapshotId) { if (previousSnapshotId > snapshotId) { logger.error( - "unexpected previousSnapshotId [" - + previousSnapshotId - + "], " - + "should <= [" - + snapshotId - + "]. " - + "target realtimeWriter [" - + realtimeWriterId - + "]"); + "unexpected previousSnapshotId [{}], should <=" + + " [{}]. target realtime writer [{}]", + previousSnapshotId, + snapshotId, + realtimeWriterId); } } @Override public void onError(Throwable t) { logger.error( - "error in advanceIngestSnapshotId [" - + snapshotId - + "]. realtimeWriter [" - + realtimeWriterId - + "]", - t); + "error in advanceIngestSnapshotId [{}]. realtime" + + " writer [{}], reason [{}]", + snapshotId, + realtimeWriterId, + t.getMessage()); } }); } catch (Exception e) { - logger.warn("update writeSnapshotId failed. realtimeWriter [" + i + "]", e); + logger.warn("update writeSnapshotId failed. realtimeWriter [{}]", i, e); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LocalSnapshotListener.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LocalSnapshotListener.java index 0333b15215a1..640a07d1ad8c 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LocalSnapshotListener.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LocalSnapshotListener.java @@ -24,9 +24,9 @@ public class LocalSnapshotListener implements QuerySnapshotListener { private static final Logger logger = LoggerFactory.getLogger(LocalSnapshotListener.class); - private SchemaManager schemaManager; - private SnapshotCache snapshotCache; - private AtomicLong lastDdlSnapshotId; + private final SchemaManager schemaManager; + private final SnapshotCache snapshotCache; + private final AtomicLong lastDdlSnapshotId; public LocalSnapshotListener(SchemaManager schemaManager, SnapshotCache snapshotCache) { this.schemaManager = schemaManager; @@ -36,12 +36,7 @@ public LocalSnapshotListener(SchemaManager schemaManager, SnapshotCache snapshot @Override public void snapshotAdvanced(long snapshotId, long ddlSnapshotId) { - logger.debug( - "snapshot advance to [" - + snapshotId - + "]-[" - + ddlSnapshotId - + "], will update local snapshot cache"); + logger.debug("snapshot advance to [{}]-[{}]", snapshotId, ddlSnapshotId); GraphDef graphDef = null; if (ddlSnapshotId > this.lastDdlSnapshotId.get()) { graphDef = this.schemaManager.getGraphDef(); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java index a371fa94c90f..01eccae521a3 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/LogRecycler.java @@ -30,11 +30,11 @@ public class LogRecycler { private static final Logger logger = LoggerFactory.getLogger(LogRecycler.class); - private LogService logService; - private SnapshotManager snapshotManager; + private final LogService logService; + private final SnapshotManager snapshotManager; private ScheduledExecutorService scheduler; - private boolean recycleEnable; - private long recycleIntervalSeconds; + private final boolean recycleEnable; + private final long recycleIntervalSeconds; public LogRecycler(Configs configs, LogService logService, SnapshotManager snapshotManager) { this.logService = logService; @@ -45,7 +45,7 @@ public LogRecycler(Configs configs, LogService logService, SnapshotManager snaps public void start() { if (!this.recycleEnable) { - logger.info("log recycler is disable"); + logger.info("log recycler is disabled"); return; } this.scheduler = @@ -77,19 +77,18 @@ public void stop() { } this.scheduler = null; } - logger.info("LogRecycler stopped"); + logger.debug("LogRecycler stopped"); } private void doRecycle() { List queueOffsets = this.snapshotManager.getQueueOffsets(); for (int i = 0; i < queueOffsets.size(); i++) { + long offset = queueOffsets.get(i); try { - logService.deleteBeforeOffset(i, queueOffsets.get(i)); - logger.info("recycled queue [" + i + "] offset [" + queueOffsets.get(i) + "]"); + logService.deleteBeforeOffset(i, offset); + logger.info("recycled queue [{}] offset [{}]", i, offset); } catch (IOException e) { - logger.error( - "error in delete queue [" + i + "] offset [" + queueOffsets.get(i) + "]", - e); + logger.error("error in delete queue [{}] offset [{}]", i, offset, e); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/NotifyFrontendListener.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/NotifyFrontendListener.java index 993259cd1646..b0ca09c80abc 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/NotifyFrontendListener.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/NotifyFrontendListener.java @@ -24,11 +24,11 @@ public class NotifyFrontendListener implements QuerySnapshotListener { private static final Logger logger = LoggerFactory.getLogger(NotifyFrontendListener.class); - private int frontendId; - private FrontendSnapshotClient frontendSnapshotClient; - private SchemaManager schemaManager; + private final int frontendId; + private final FrontendSnapshotClient frontendSnapshotClient; + private final SchemaManager schemaManager; - private AtomicLong lastDdlSnapshotId; + private final AtomicLong lastDdlSnapshotId; public NotifyFrontendListener( int frontendId, @@ -44,11 +44,7 @@ public NotifyFrontendListener( @Override public void snapshotAdvanced(long snapshotId, long ddlSnapshotId) { logger.debug( - "snapshot advance to [" - + snapshotId - + "]-[" - + ddlSnapshotId - + "], will notify frontend"); + "snapshot advanced to [{}]-[{}], will notify frontend", snapshotId, ddlSnapshotId); GraphDef graphDef = null; if (ddlSnapshotId > this.lastDdlSnapshotId.get()) { graphDef = this.schemaManager.getGraphDef(); @@ -61,29 +57,23 @@ public void snapshotAdvanced(long snapshotId, long ddlSnapshotId) { public void onCompleted(Long res) { if (res >= snapshotId) { logger.warn( - "unexpected previousSnapshotId [" - + res - + "], should <= [" - + snapshotId - + "]. frontend [" - + frontendId - + "]"); + "Unexpected previous snapshot id [{}], should <= [{}], frontend" + + " [{}]", + res, + snapshotId, + frontendId); } else { - lastDdlSnapshotId.getAndUpdate( - x -> x < ddlSnapshotId ? ddlSnapshotId : x); + lastDdlSnapshotId.getAndUpdate(x -> Math.max(x, ddlSnapshotId)); } } @Override public void onError(Throwable t) { logger.error( - "error in advanceQuerySnapshot [" - + snapshotId - + "], ddlSnapshotId [" - + ddlSnapshotId - + "], frontend [" - + frontendId - + "]", + "error in advanceQuerySnapshot [{}]-[{}], frontend [{}]", + snapshotId, + ddlSnapshotId, + frontendId, t); } }); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java index 9632d482092d..e82e5974c7a2 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java @@ -42,13 +42,13 @@ public class SchemaManager { private static final Logger logger = LoggerFactory.getLogger(SchemaManager.class); - private SnapshotManager snapshotManager; - private DdlWriter ddlWriter; - private DdlExecutors ddlExecutors; - private GraphDefFetcher graphDefFetcher; + private final SnapshotManager snapshotManager; + private final DdlWriter ddlWriter; + private final DdlExecutors ddlExecutors; + private final GraphDefFetcher graphDefFetcher; - private AtomicReference graphDefRef; - private int partitionCount; + private final AtomicReference graphDefRef; + private final int partitionCount; private volatile boolean ready = false; private ExecutorService singleThreadExecutor; @@ -126,12 +126,10 @@ public void submitBatchDdl( DdlRequestBatch ddlRequestBatch, CompletionCallback callback) { logger.info( - "submitBatchDdl requestId [" - + requestId - + "], sessionId [" - + sessionId - + "]. Request Body: " - + ddlRequestBatch.toProto().toString()); + "submitBatchDdl requestId [{}], sessionId [{}], request body [{}]", + requestId, + sessionId, + ddlRequestBatch.toProto()); if (!ready) { callback.onError(new IllegalStateException("SchemaManager is recovering")); return; @@ -175,11 +173,9 @@ public void submitBatchDdl( callback.onCompleted(snapshotId); } catch (Exception e) { logger.error( - "Error in Ddl requestId [" - + requestId - + "], sessionId [" - + sessionId - + "]", + "Error in Ddl requestId [{}], sessionId [{}]", + requestId, + sessionId, e); this.ready = false; callback.onError(e); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaService.java index 72bab52669c0..6453b3154d8b 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaService.java @@ -24,7 +24,7 @@ public class SchemaService extends SchemaGrpc.SchemaImplBase { - private SchemaManager schemaManager; + private final SchemaManager schemaManager; public SchemaService(SchemaManager schemaManager) { this.schemaManager = schemaManager; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotCommitService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotCommitService.java index fcf1aaf903eb..0fe4c6e972bb 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotCommitService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotCommitService.java @@ -24,7 +24,7 @@ public class SnapshotCommitService extends SnapshotCommitGrpc.SnapshotCommitImplBase { - private SnapshotManager snapshotManager; + private final SnapshotManager snapshotManager; public SnapshotCommitService(SnapshotManager snapshotManager) { this.snapshotManager = snapshotManager; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotInfo.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotInfo.java index 1558e1ece5a0..67a44ee56b62 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotInfo.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotInfo.java @@ -18,9 +18,9 @@ public class SnapshotInfo implements Comparable { - private long snapshotId; + private final long snapshotId; - private long ddlSnapshotId; + private final long ddlSnapshotId; @JsonCreator public SnapshotInfo( diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotManager.java index aa536e87d979..b7cb786fd42f 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotManager.java @@ -110,32 +110,32 @@ public class SnapshotManager { public static final String QUERY_SNAPSHOT_INFO_PATH = "query_snapshot_info"; public static final String QUEUE_OFFSETS_PATH = "queue_offsets"; - private MetaStore metaStore; - private LogService logService; - private WriteSnapshotIdNotifier writeSnapshotIdNotifier; + private final MetaStore metaStore; + private final LogService logService; + private final WriteSnapshotIdNotifier writeSnapshotIdNotifier; - private int storeCount; - private int queueCount; - private long snapshotIncreaseIntervalMs; - private long offsetsPersistIntervalMs; + private final int storeCount; + private final int queueCount; + private final long snapshotIncreaseIntervalMs; + private final long offsetsPersistIntervalMs; private volatile SnapshotInfo querySnapshotInfo; private volatile long writeSnapshotId; - private Map storeToSnapshotInfo; - private Map> storeToOffsets; + private final Map storeToSnapshotInfo; + private final Map> storeToOffsets; private AtomicReference> queueOffsetsRef; private ScheduledExecutorService increaseWriteSnapshotIdScheduler; private ScheduledExecutorService persistOffsetsScheduler; - private List listeners = new CopyOnWriteArrayList<>(); - private TreeMap> snapshotToListeners = new TreeMap<>(); + private final List listeners = new CopyOnWriteArrayList<>(); + private final TreeMap> snapshotToListeners = new TreeMap<>(); - private Object querySnapshotLock = new Object(); - private Lock writeSnapshotLock = new ReentrantLock(); + private final Object querySnapshotLock = new Object(); + private final Lock writeSnapshotLock = new ReentrantLock(); - private ObjectMapper objectMapper; + private final ObjectMapper objectMapper; public SnapshotManager( Configs configs, @@ -250,7 +250,7 @@ private void recover() throws IOException { byte[] queueOffsetsBytes = this.metaStore.read(QUEUE_OFFSETS_PATH); List recoveredQueueOffsets = this.objectMapper.readValue(queueOffsetsBytes, new TypeReference>() {}); - logger.info("recovered queue offsets [" + recoveredQueueOffsets + "]"); + logger.info("recovered queue offsets " + recoveredQueueOffsets + ""); if (recoveredQueueOffsets.size() != this.queueCount) { throw new IllegalStateException( "recovered queueCount [" @@ -262,9 +262,7 @@ private void recover() throws IOException { for (int i = 0; i < this.queueCount; i++) { long recoveredOffset = recoveredQueueOffsets.get(i); - LogReader reader = null; - try { - reader = logService.createReader(i, recoveredOffset + 1); + try (LogReader reader = logService.createReader(i, recoveredOffset + 1)) { } catch (Exception e) { throw new IOException( "recovered queue [" @@ -273,10 +271,6 @@ private void recover() throws IOException { + recoveredOffset + "] is not available", e); - } finally { - if (reader != null) { - reader.close(); - } } } @@ -354,9 +348,8 @@ public void removeListener(QuerySnapshotListener listener) { private void maybeUpdateQuerySnapshotId() { if (this.storeToSnapshotInfo.size() < this.storeCount) { logger.warn( - "Not all store nodes reported snapshot progress. current storeToSnapshot [" - + this.storeToSnapshotInfo - + "]"); + "Not all store nodes reported snapshot progress. current: [{}]", + storeToSnapshotInfo); return; } SnapshotInfo minSnapshotInfo = Collections.min(this.storeToSnapshotInfo.values()); @@ -378,7 +371,7 @@ private void maybeUpdateQuerySnapshotId() { } persistQuerySnapshotId(minSnapshotInfo); this.querySnapshotInfo = minSnapshotInfo; - logger.debug("querySnapshotInfo updated to [" + minSnapshotInfo + "]"); + logger.debug("querySnapshotInfo updated to [{}]", querySnapshotInfo); } catch (IOException e) { logger.error("update querySnapshotInfo failed", e); return; @@ -395,9 +388,8 @@ private void maybeUpdateQuerySnapshotId() { listener.onSnapshotAvailable(); } catch (Exception e) { logger.warn( - "trigger snapshotListener failed. snapshotId [" - + snapshotId - + "]"); + "trigger snapshotListener failed. snapshotId [{}]", + snapshotId); } } } @@ -453,9 +445,7 @@ private void persistWriteSnapshotId(long snapshotId) throws IOException { private void updateQueueOffsets() throws IOException { if (this.storeToOffsets.size() < this.storeCount) { logger.warn( - "Not all store nodes reported queue offsets. current storeToOffsets [" - + this.storeToOffsets - + "]"); + "Not all store nodes reported queue offsets. current: [{}]", storeToOffsets); return; } List queueOffsets = this.queueOffsetsRef.get(); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotNotifier.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotNotifier.java index 5e8bd4d4f826..29c9cbae16fd 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotNotifier.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SnapshotNotifier.java @@ -23,10 +23,10 @@ public class SnapshotNotifier implements NodeDiscovery.Listener { - private NodeDiscovery nodeDiscovery; - private SnapshotManager snapshotManager; - private SchemaManager schemaManager; - private RoleClients frontendSnapshotClients; + private final NodeDiscovery nodeDiscovery; + private final SnapshotManager snapshotManager; + private final SchemaManager schemaManager; + private final RoleClients frontendSnapshotClients; private Map listeners; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/StoreBackupTaskSender.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/StoreBackupTaskSender.java index 08594719fd69..ae5976c668cd 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/StoreBackupTaskSender.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/StoreBackupTaskSender.java @@ -23,7 +23,7 @@ import java.util.Map; public class StoreBackupTaskSender { - private RoleClients storeBackupClients; + private final RoleClients storeBackupClients; public StoreBackupTaskSender(RoleClients storeBackupClients) { this.storeBackupClients = storeBackupClients; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/ZkMetaStore.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/ZkMetaStore.java index 8e2fad954bc5..f2c6dc417ceb 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/ZkMetaStore.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/ZkMetaStore.java @@ -30,8 +30,8 @@ public class ZkMetaStore implements MetaStore { private static final Logger logger = LoggerFactory.getLogger(ZkMetaStore.class); public static final String ROOT_NODE = "meta"; - private CuratorFramework curator; - private String metaBasePath; + private final CuratorFramework curator; + private final String metaBasePath; public ZkMetaStore(Configs configs, CuratorFramework curator) { this(curator, ZkConfig.ZK_BASE_PATH.get(configs)); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/DiscoveryFactory.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/DiscoveryFactory.java index 3f12ef2a7c0d..5fa2e20584a5 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/DiscoveryFactory.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/DiscoveryFactory.java @@ -8,7 +8,7 @@ public class DiscoveryFactory { - private Configs configs; + private final Configs configs; public DiscoveryFactory(Configs configs) { this.configs = configs; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/FileDiscovery.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/FileDiscovery.java index 2f57ba5dc0a6..5945d953f7be 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/FileDiscovery.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/FileDiscovery.java @@ -23,35 +23,33 @@ import java.util.Map; public class FileDiscovery implements NodeDiscovery { - private Logger logger = LoggerFactory.getLogger(FileDiscovery.class); + private final Logger logger = LoggerFactory.getLogger(FileDiscovery.class); - private Configs configs; - private Map> allNodes = new HashMap<>(); + private final Map> allNodes = new HashMap<>(); private boolean started = false; public FileDiscovery(Configs configs) { - this.configs = configs; // Store related nodes - int storeCount = CommonConfig.STORE_NODE_COUNT.get(this.configs); - String storeNamePrefix = DiscoveryConfig.DNS_NAME_PREFIX_STORE.get(this.configs); - int port = CommonConfig.RPC_PORT.get(this.configs); + int storeCount = CommonConfig.STORE_NODE_COUNT.get(configs); + String storeNamePrefix = DiscoveryConfig.DNS_NAME_PREFIX_STORE.get(configs); + int port = CommonConfig.RPC_PORT.get(configs); Map storeNodes = makeRoleNodes(storeCount, storeNamePrefix, RoleType.STORE.getName(), port); this.allNodes.put(RoleType.STORE, storeNodes); - int graphPort = StoreConfig.EXECUTOR_GRAPH_PORT.get(this.configs); + int graphPort = StoreConfig.EXECUTOR_GRAPH_PORT.get(configs); Map graphNodes = makeRoleNodes( storeCount, storeNamePrefix, RoleType.EXECUTOR_GRAPH.getName(), graphPort); this.allNodes.put(RoleType.EXECUTOR_GRAPH, graphNodes); - int queryPort = StoreConfig.EXECUTOR_QUERY_PORT.get(this.configs); + int queryPort = StoreConfig.EXECUTOR_QUERY_PORT.get(configs); Map queryNodes = makeRoleNodes( storeCount, storeNamePrefix, RoleType.EXECUTOR_QUERY.getName(), queryPort); this.allNodes.put(RoleType.EXECUTOR_QUERY, queryNodes); - int enginePort = StoreConfig.EXECUTOR_ENGINE_PORT.get(this.configs); + int enginePort = StoreConfig.EXECUTOR_ENGINE_PORT.get(configs); Map engineNodes = makeRoleNodes( storeCount, @@ -60,13 +58,13 @@ public FileDiscovery(Configs configs) { enginePort); this.allNodes.put(RoleType.EXECUTOR_ENGINE, engineNodes); - int gaiaRpcPort = GaiaConfig.GAIA_RPC_PORT.get(this.configs); + int gaiaRpcPort = GaiaConfig.GAIA_RPC_PORT.get(configs); Map gaiaRpcNodes = makeRoleNodes( storeCount, storeNamePrefix, RoleType.GAIA_RPC.getName(), gaiaRpcPort); this.allNodes.put(RoleType.GAIA_RPC, gaiaRpcNodes); - int gaiaEnginePort = GaiaConfig.GAIA_ENGINE_PORT.get(this.configs); + int gaiaEnginePort = GaiaConfig.GAIA_ENGINE_PORT.get(configs); Map gaiaEngineNodes = makeRoleNodes( storeCount, @@ -76,23 +74,22 @@ public FileDiscovery(Configs configs) { this.allNodes.put(RoleType.GAIA_ENGINE, gaiaEngineNodes); // Frontend nodes - int frontendCount = CommonConfig.FRONTEND_NODE_COUNT.get(this.configs); - String frontendNamePrefix = DiscoveryConfig.DNS_NAME_PREFIX_FRONTEND.get(this.configs); + int frontendCount = CommonConfig.FRONTEND_NODE_COUNT.get(configs); + String frontendNamePrefix = DiscoveryConfig.DNS_NAME_PREFIX_FRONTEND.get(configs); Map frontendNodes = makeRoleNodes(frontendCount, frontendNamePrefix, RoleType.FRONTEND.getName(), port); this.allNodes.put(RoleType.FRONTEND, frontendNodes); // Ingestor nodes - int ingestorCount = CommonConfig.INGESTOR_NODE_COUNT.get(this.configs); - String ingestorNamePrefix = DiscoveryConfig.DNS_NAME_PREFIX_INGESTOR.get(this.configs); + int ingestorCount = CommonConfig.INGESTOR_NODE_COUNT.get(configs); + String ingestorNamePrefix = DiscoveryConfig.DNS_NAME_PREFIX_INGESTOR.get(configs); Map ingestorNodes = makeRoleNodes(ingestorCount, ingestorNamePrefix, RoleType.INGESTOR.getName(), port); this.allNodes.put(RoleType.INGESTOR, ingestorNodes); // Coordinator nodes - int coordinatorCount = CommonConfig.COORDINATOR_NODE_COUNT.get(this.configs); - String coordinatorNamePrefix = - DiscoveryConfig.DNS_NAME_PREFIX_COORDINATOR.get(this.configs); + int coordinatorCount = CommonConfig.COORDINATOR_NODE_COUNT.get(configs); + String coordinatorNamePrefix = DiscoveryConfig.DNS_NAME_PREFIX_COORDINATOR.get(configs); Map coordinatorNodes = makeRoleNodes( coordinatorCount, diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/GrootNode.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/GrootNode.java index 5db82af87724..69f477b5f265 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/GrootNode.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/GrootNode.java @@ -29,10 +29,10 @@ @JsonRootName("details") public class GrootNode { - private String roleName; - private int idx; - private String host; - private int port; + private final String roleName; + private final int idx; + private final String host; + private final int port; @JsonCreator public GrootNode( diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/LocalNodeProvider.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/LocalNodeProvider.java index 1337121cc285..ddcae785033e 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/LocalNodeProvider.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/LocalNodeProvider.java @@ -23,9 +23,9 @@ public class LocalNodeProvider implements Function { - private Configs configs; - private RoleType roleType; - private AtomicReference localNodeRef = new AtomicReference<>(); + private final Configs configs; + private final RoleType roleType; + private final AtomicReference localNodeRef = new AtomicReference<>(); public LocalNodeProvider(Configs configs) { this(RoleType.fromName(CommonConfig.ROLE_NAME.get(configs)), configs); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java index c5395bbfe562..8fa9d4df235e 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java @@ -45,18 +45,18 @@ public class ZkDiscovery implements NodeDiscovery { private static final Logger logger = LoggerFactory.getLogger(ZkDiscovery.class); public static final String ROOT_NODE = "discovery"; - private List listeners = new ArrayList<>(); + private final List listeners = new ArrayList<>(); private ServiceDiscovery serviceDiscovery; private List> serviceCaches; - private LocalNodeProvider localNodeProvider; - private CuratorFramework curator; - private String discoveryBasePath; + private final LocalNodeProvider localNodeProvider; + private final CuratorFramework curator; + private final String discoveryBasePath; private ExecutorService singleThreadExecutor; private AtomicReference>> currentNodesRef = new AtomicReference<>(new HashMap<>()); - private Object lock = new Object(); + private final Object lock = new Object(); public ZkDiscovery( Configs configs, LocalNodeProvider localNodeProvider, CuratorFramework curator) { @@ -118,10 +118,10 @@ public void start() { listener.cacheChanged(); this.serviceCaches.add(serviceCache); } - logger.info("ZkDiscovery started"); } catch (Exception e) { throw new GrootException(e); } + logger.info("ZkDiscovery started"); } @Override @@ -143,13 +143,13 @@ public void stop() { logger.warn("close serviceDiscovery failed", e); } } - logger.info("ZkDiscovery stopped"); + logger.debug("ZkDiscovery stopped"); } private class NodeChangeListener implements ServiceCacheListener { - private RoleType roleType; - private ServiceCache serviceCache; + private final RoleType roleType; + private final ServiceCache serviceCache; public NodeChangeListener(RoleType roleType, ServiceCache serviceCache) { this.roleType = roleType; @@ -184,7 +184,7 @@ public void cacheChanged() { notifyRemoved(roleType, removed); } - if (newRoleNodes != null && !newRoleNodes.isEmpty()) { + if (!newRoleNodes.isEmpty()) { Map added = new HashMap<>(); newRoleNodes.forEach( (id, newNode) -> { @@ -203,7 +203,7 @@ public void cacheChanged() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { - logger.info("stateChanged to [" + newState + "]"); + logger.info("stateChanged to [{}]", newState); } } @@ -211,7 +211,7 @@ private void notifyRemoved(RoleType role, Map removed) { if (removed.isEmpty()) { return; } - logger.debug("role [" + role.getName() + "] remove nodes [" + removed.values() + "]"); + logger.debug("role [{}] remove nodes [{}]", role.getName(), removed.values()); for (Listener listener : this.listeners) { this.singleThreadExecutor.execute( () -> { @@ -219,12 +219,7 @@ private void notifyRemoved(RoleType role, Map removed) { listener.nodesLeft(role, removed); } catch (Exception e) { logger.error( - "listener [" - + listener - + "] failed on nodesLeft [" - + removed - + "]", - e); + "listener [{}] failed on nodesLeft [{}]", listener, removed, e); } }); } @@ -234,7 +229,7 @@ private void notifyAdded(RoleType role, Map added) { if (added.isEmpty()) { return; } - logger.debug("role [" + role.getName() + "] add nodes [" + added.values() + "]"); + logger.debug("role [{}] add nodes [{}]", role.getName(), added.values()); for (Listener listener : this.listeners) { this.singleThreadExecutor.execute( () -> { @@ -242,12 +237,7 @@ private void notifyAdded(RoleType role, Map added) { listener.nodesJoin(role, added); } catch (Exception e) { logger.error( - "listener [" - + listener - + "] failed on nodesJoin [" - + added - + "]", - e); + "listener [{}] failed on nodesJoin [{}]", listener, added, e); } }); } @@ -268,11 +258,9 @@ public void addListener(Listener listener) { listener.nodesJoin(role, nodes); } catch (Exception ex) { logger.error( - "listener [" - + listener - + "] failed on nodesJoin [" - + nodes - + "]", + "listener [{}] failed on nodesJoin [{}]", + listener, + nodes, ex); } }); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BackupClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BackupClient.java index 82bef762a0e8..c30456c67792 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BackupClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BackupClient.java @@ -23,7 +23,7 @@ import java.util.List; public class BackupClient extends RpcClient { - private BackupGrpc.BackupBlockingStub stub; + private final BackupGrpc.BackupBlockingStub stub; public BackupClient(ManagedChannel channel) { super(channel); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java index 80852805a6d5..312137627a07 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/BatchDdlClient.java @@ -10,9 +10,9 @@ public class BatchDdlClient { - private DdlExecutors ddlExecutors; - private SnapshotCache snapshotCache; - private SchemaWriter schemaWriter; + private final DdlExecutors ddlExecutors; + private final SnapshotCache snapshotCache; + private final SchemaWriter schemaWriter; public BatchDdlClient( DdlExecutors ddlExecutors, SnapshotCache snapshotCache, SchemaWriter schemaWriter) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientBackupService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientBackupService.java index c9d02ed71b46..2def0851af19 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientBackupService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientBackupService.java @@ -29,7 +29,7 @@ public class ClientBackupService extends ClientBackupGrpc.ClientBackupImplBase { private static final Logger logger = LoggerFactory.getLogger(ClientBackupService.class); - private RoleClients backupClients; + private final RoleClients backupClients; public ClientBackupService(RoleClients backupClients) { this.backupClients = backupClients; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java index fc57ee67933f..2dc00fdc4534 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java @@ -44,13 +44,13 @@ public class ClientService extends ClientGrpc.ClientImplBase { private static final Logger logger = LoggerFactory.getLogger(ClientService.class); - private SnapshotCache snapshotCache; - private MetricsAggregator metricsAggregator; - private StoreIngestor storeIngestor; - private MetaService metaService; - private BatchDdlClient batchDdlClient; + private final SnapshotCache snapshotCache; + private final MetricsAggregator metricsAggregator; + private final StoreIngestor storeIngestor; + private final MetaService metaService; + private final BatchDdlClient batchDdlClient; - private StoreStateFetcher storeStateFetcher; + private final StoreStateFetcher storeStateFetcher; public ClientService( SnapshotCache snapshotCache, diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java index fa5c656c602f..eb7a1ff0f6b3 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java @@ -18,8 +18,8 @@ public class ClientWriteService extends ClientWriteGrpc.ClientWriteImplBase { private static final Logger logger = LoggerFactory.getLogger(ClientWriteService.class); - private WriteSessionGenerator writeSessionGenerator; - private GraphWriter graphWriter; + private final WriteSessionGenerator writeSessionGenerator; + private final GraphWriter graphWriter; public ClientWriteService( WriteSessionGenerator writeSessionGenerator, GraphWriter graphWriter) { @@ -40,16 +40,13 @@ public void batchWrite( BatchWriteRequest request, StreamObserver responseObserver) { String requestId = UuidUtils.getBase64UUIDString(); String writeSession = request.getClientId(); - int writeRequestsCount = request.getWriteRequestsCount(); - List writeRequests = new ArrayList<>(writeRequestsCount); + int count = request.getWriteRequestsCount(); + List writeRequests = new ArrayList<>(count); logger.debug( - "received batchWrite request. requestId [" - + requestId - + "] writeSession [" - + writeSession - + "] batchSize [" - + writeRequestsCount - + "]"); + "batchWrite: requestId {} writeSession {} batchSize {}", + requestId, + writeSession, + count); try { for (WriteRequestPb writeRequestPb : request.getWriteRequestsList()) { writeRequests.add(WriteRequest.parseProto(writeRequestPb)); @@ -69,11 +66,9 @@ public void onCompleted(Long res) { @Override public void onError(Throwable t) { logger.error( - "batch write callback error. request [" - + requestId - + "] session [" - + writeSession - + "]", + "batch write error. request {} session {}", + requestId, + writeSession, t); responseObserver.onError( Status.INTERNAL @@ -84,8 +79,7 @@ public void onError(Throwable t) { } catch (Exception e) { logger.error( - "batchWrite failed. request [" + requestId + "] session [" + writeSession + "]", - e); + "batchWrite failed. request [{}] session [{}]", requestId, writeSession, e); responseObserver.onError( Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException()); } @@ -94,28 +88,36 @@ public void onError(Throwable t) { @Override public void remoteFlush( RemoteFlushRequest request, StreamObserver responseObserver) { - long flushSnapshotId = request.getSnapshotId(); - long waitTimeMs = request.getWaitTimeMs(); - logger.info( - "flush snapshot id [" + flushSnapshotId + "] with timeout [" + waitTimeMs + "]ms"); + long snapshotId = request.getSnapshotId(); + long timeout = request.getWaitTimeMs(); + logger.info("flush snapshot id [{}] with timeout [{}]ms", snapshotId, timeout); try { boolean suc; - if (flushSnapshotId == 0L) { - suc = graphWriter.flushLastSnapshot(waitTimeMs); + if (snapshotId == 0L) { + suc = graphWriter.flushLastSnapshot(timeout); } else { - suc = graphWriter.flushSnapshot(flushSnapshotId, waitTimeMs); + suc = graphWriter.flushSnapshot(snapshotId, timeout); } responseObserver.onNext(RemoteFlushResponse.newBuilder().setSuccess(suc).build()); responseObserver.onCompleted(); } catch (InterruptedException e) { logger.error( - "remoteFlush failed. flushSnapshotId [" - + flushSnapshotId - + "] waitTimeMs [" - + waitTimeMs - + "]"); + "remoteFlush failed. flushSnapshotId [{}] waitTimeMs [{}]", + snapshotId, + timeout); responseObserver.onError( Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException()); } } + + @Override + public void replayRecords( + ReplayRecordsRequest request, StreamObserver responseObserver) { + long offset = request.getOffset(); + long timestamp = request.getTimestamp(); + logger.info("replay records from offset {}, timestamp {}", offset, timestamp); + List ids = graphWriter.replayWALFrom(offset, timestamp); + responseObserver.onNext(ReplayRecordsResponse.newBuilder().addAllSnapshotId(ids).build()); + responseObserver.onCompleted(); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendSnapshotService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendSnapshotService.java index ef0d69ecf70c..ee08d48e06cf 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendSnapshotService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendSnapshotService.java @@ -28,7 +28,7 @@ public class FrontendSnapshotService extends FrontendSnapshotGrpc.FrontendSnapsh private static final Logger logger = LoggerFactory.getLogger(FrontendSnapshotService.class); - private SnapshotCache snapshotCache; + private final SnapshotCache snapshotCache; public FrontendSnapshotService(SnapshotCache snapshotCache) { this.snapshotCache = snapshotCache; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java index b5fb6c2491e3..f4f34719c409 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java @@ -44,8 +44,8 @@ public class GrootDdlService extends GrootDdlServiceGrpc.GrootDdlServiceImplBase public static final int FORMAT_VERSION = 1; - private SnapshotCache snapshotCache; - private BatchDdlClient batchDdlClient; + private final SnapshotCache snapshotCache; + private final BatchDdlClient batchDdlClient; public GrootDdlService(SnapshotCache snapshotCache, BatchDdlClient batchDdlClient) { this.snapshotCache = snapshotCache; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/IngestorWriteClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/IngestorWriteClient.java index e27f8f16111a..2c18e1998fa6 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/IngestorWriteClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/IngestorWriteClient.java @@ -17,16 +17,16 @@ import com.alibaba.graphscope.groot.operation.BatchId; import com.alibaba.graphscope.groot.operation.OperationBatch; import com.alibaba.graphscope.groot.rpc.RpcClient; -import com.alibaba.graphscope.proto.groot.IngestorWriteGrpc; -import com.alibaba.graphscope.proto.groot.WriteIngestorRequest; -import com.alibaba.graphscope.proto.groot.WriteIngestorResponse; +import com.alibaba.graphscope.proto.groot.*; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; +import java.util.List; + public class IngestorWriteClient extends RpcClient { - private IngestorWriteGrpc.IngestorWriteBlockingStub stub; + private final IngestorWriteGrpc.IngestorWriteBlockingStub stub; private IngestorWriteGrpc.IngestorWriteStub asyncStub; public IngestorWriteClient(ManagedChannel channel) { @@ -51,6 +51,13 @@ public BatchId writeIngestor(String requestId, int queueId, OperationBatch opera return new BatchId(response.getSnapshotId()); } + public List replayWALFrom(long offset, long timestamp) { + ReplayWALRequest request = + ReplayWALRequest.newBuilder().setOffset(offset).setTimestamp(timestamp).build(); + ReplayWALResponse response = stub.replayWAL(request); + return response.getSnapshotIdList(); + } + public void writeIngestorAsync( String requestId, int queueId, diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaClient.java index b653400a4dd6..91d0afe98496 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaClient.java @@ -24,7 +24,7 @@ public class SchemaClient extends RpcClient { - private SchemaGrpc.SchemaBlockingStub stub; + private final SchemaGrpc.SchemaBlockingStub stub; public SchemaClient(ManagedChannel channel) { super(channel); @@ -46,8 +46,7 @@ public long submitBatchDdl( .build(); SubmitBatchDdlResponse submitBatchDdlResponse = stub.submitBatchDdl(request); if (submitBatchDdlResponse.getSuccess()) { - long ddlSnapshotId = submitBatchDdlResponse.getDdlSnapshotId(); - return ddlSnapshotId; + return submitBatchDdlResponse.getDdlSnapshotId(); } else { throw new DdlException(submitBatchDdlResponse.getMsg()); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaWriter.java index ca678a9ec6a4..650204074132 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SchemaWriter.java @@ -19,7 +19,7 @@ public class SchemaWriter { - private RoleClients schemaClients; + private final RoleClients schemaClients; public SchemaWriter(RoleClients schemaClients) { this.schemaClients = schemaClients; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SnapshotUpdateClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SnapshotUpdateClient.java index 6f1f626aacc5..920afcf9c18b 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SnapshotUpdateClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/SnapshotUpdateClient.java @@ -30,7 +30,7 @@ public class SnapshotUpdateClient extends RpcClient { private static final Logger logger = LoggerFactory.getLogger(SnapshotUpdateClient.class); - private CoordinatorSnapshotServiceGrpc.CoordinatorSnapshotServiceBlockingStub stub; + private final CoordinatorSnapshotServiceGrpc.CoordinatorSnapshotServiceBlockingStub stub; public SnapshotUpdateClient(ManagedChannel channel) { super(channel); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java index 9a8f3e1b70bb..ebaddb795f9d 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java @@ -28,7 +28,7 @@ public class StoreIngestClient extends RpcClient { - private StoreIngestGrpc.StoreIngestStub stub; + private final StoreIngestGrpc.StoreIngestStub stub; public StoreIngestClient(ManagedChannel channel) { super(channel); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreStateClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreStateClient.java index d6b7bc05897b..e037967124e0 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreStateClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreStateClient.java @@ -20,7 +20,7 @@ public class StoreStateClient extends RpcClient { - private StateServiceGrpc.StateServiceBlockingStub stub; + private final StateServiceGrpc.StateServiceBlockingStub stub; public StoreStateClient(ManagedChannel channel) { super(channel); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WriteSessionGenerator.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WriteSessionGenerator.java index 2e7e9dc433e0..be0a4e67d4ab 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WriteSessionGenerator.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/WriteSessionGenerator.java @@ -9,8 +9,8 @@ public class WriteSessionGenerator { - private AtomicLong nextIdx; - private int frontendNodeId; + private final AtomicLong nextIdx; + private final int frontendNodeId; public WriteSessionGenerator(Configs configs) { int frontendCount = CommonConfig.FRONTEND_NODE_COUNT.get(configs); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java index 1ed8210a48cc..3bb8abacfdd5 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java @@ -80,8 +80,8 @@ public GraphWriter( this.metaService = metaService; this.ingestWriteClients = ingestWriteClients; initMetrics(); - metricsCollector.register(this, () -> updateMetrics()); - // default for incr eid generate + metricsCollector.register(this, this::updateMetrics); + // default for increment eid generate this.enableHashEid = FrontendConfig.ENABLE_HASH_GENERATE_EID.get(configs); } @@ -166,7 +166,7 @@ public void writeBatch( public void onCompleted(Long res) { long writeSnapshotId = res; lastWrittenSnapshotId.updateAndGet( - x -> x < writeSnapshotId ? writeSnapshotId : x); + x -> Math.max(x, writeSnapshotId)); writeRequestsTotal.addAndGet(writeRequests.size()); finish(); callback.onCompleted(res); @@ -187,6 +187,16 @@ void finish() { }); } + public List replayWALFrom(long offset, long timestamp) { + List allIds = new ArrayList<>(); + for (int queue = 0; queue < metaService.getQueueCount(); ++queue) { + int id = metaService.getIngestorIdForQueue(queue); + List ids = ingestWriteClients.getClient(id).replayWALFrom(offset, timestamp); + allIds.addAll(ids); + } + return allIds; + } + public boolean flushSnapshot(long snapshotId, long waitTimeMs) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); this.snapshotCache.addListener(snapshotId, () -> latch.countDown()); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java index 12d95b1b0f3a..ea495777e505 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/BatchSender.java @@ -54,7 +54,6 @@ public class BatchSender implements MetricsAgent { private volatile long lastUpdateTime; private AvgMetric sendBytesMetric; private AvgMetric sendRecordsMetric; - private List bufferBatchCountMetrics; private List callbackLatencyMetrics; private final int receiverQueueSize; @@ -150,12 +149,12 @@ public void asyncSendWithRetry( try { BlockingQueue curBuffer = storeSendBuffer.get(storeId); if (curBuffer.remainingCapacity() == 0) { - logger.warn("Buffer of store [" + storeId + "] is full"); + logger.warn("Buffer of store [{}] is full", storeId); } curBuffer.put(batchBuilder.build()); break; } catch (InterruptedException e) { - logger.warn("send buffer interrupted", e); + logger.warn("send buffer interrupted"); } } }); @@ -176,7 +175,7 @@ private void sendBatch() { try { sendTask = this.sendTasks.poll(1000L, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - logger.warn("polling send task interrupted", e); + logger.warn("polling send task interrupted"); return; } if (sendTask == null) { @@ -197,7 +196,7 @@ private void sendBatch() { dataBatch = buffer.poll(100L, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { dataBatch = null; - logger.warn("polling send buffer interrupted", e); + logger.warn("polling send buffer interrupted, {}", e.getMessage()); } if (dataBatch == null) { break; @@ -262,10 +261,8 @@ public void initMetrics() { this.lastUpdateTime = System.nanoTime(); this.sendBytesMetric = new AvgMetric(); this.sendRecordsMetric = new AvgMetric(); - this.bufferBatchCountMetrics = new ArrayList<>(this.storeCount); this.callbackLatencyMetrics = new ArrayList<>(this.storeCount); for (int i = 0; i < this.storeCount; i++) { - this.bufferBatchCountMetrics.add(new AvgMetric()); this.callbackLatencyMetrics.add(new AvgMetric()); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProcessor.java index ff946bd520df..e32684d7eb90 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProcessor.java @@ -19,6 +19,8 @@ import com.alibaba.graphscope.groot.metrics.MetricsAgent; import com.alibaba.graphscope.groot.metrics.MetricsCollector; import com.alibaba.graphscope.groot.operation.OperationBatch; +import com.alibaba.graphscope.groot.operation.OperationBlob; +import com.alibaba.graphscope.groot.operation.OperationType; import com.alibaba.graphscope.groot.wal.LogEntry; import com.alibaba.graphscope.groot.wal.LogReader; import com.alibaba.graphscope.groot.wal.LogService; @@ -29,7 +31,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -50,14 +54,14 @@ public class IngestProcessor implements MetricsAgent { private volatile boolean shouldStop = true; private volatile long tailOffset; - private int queueId; - private int bufferSize; + private final int queueId; + private final int bufferSize; private BlockingQueue ingestBuffer; private Thread ingestThread; - private AtomicLong ingestSnapshotId; + private final AtomicLong ingestSnapshotId; - private LogService logService; - private BatchSender batchSender; + private final LogService logService; + private final BatchSender batchSender; private volatile boolean started; // For metrics @@ -87,7 +91,7 @@ public IngestProcessor( this.bufferSize = IngestorConfig.INGESTOR_QUEUE_BUFFER_MAX_COUNT.get(configs); initMetrics(); - metricsCollector.register(this, () -> updateMetrics()); + metricsCollector.register(this, this::updateMetrics); } public void start() { @@ -99,17 +103,12 @@ public void start() { this.ingestThread = new Thread( () -> { - LogWriter logWriter = null; while (!shouldStop) { try { replayWAL(this.tailOffset); - logWriter = this.logService.createWriter(this.queueId); break; } catch (Exception e) { - logger.error( - "error occurred before ingest process, will retry after" - + " 1s", - e); + logger.error("error occurred before ingest, retrying", e); try { Thread.sleep(1000L); } catch (InterruptedException ie) { @@ -117,6 +116,7 @@ public void start() { } } } + LogWriter logWriter = this.logService.createWriter(this.queueId); while (!shouldStop) { try { process(logWriter); @@ -155,15 +155,14 @@ public void stop() { private void checkStarted() { if (!started) { - throw new IllegalStateException( - "IngestProcessor of queue #[" + this.queueId + "] not started yet"); + throw new IllegalStateException("IngestProcessor queue#[" + queueId + "] not started"); } } public void ingestBatch( String requestId, OperationBatch operationBatch, IngestCallback callback) { checkStarted(); - logger.debug("ingestBatch requestId [" + requestId + "], queueId [" + queueId + "]"); + logger.debug("ingestBatch requestId [{}], queueId [{}]", requestId, queueId); if (this.ingestSnapshotId.get() == -1L) { throw new IllegalStateException("ingestor has no valid ingestSnapshotId"); } @@ -181,7 +180,7 @@ private void process(LogWriter logWriter) { try { task = this.ingestBuffer.poll(1000L, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - logger.warn("polling ingestBuffer interrupted", e); + logger.warn("polling ingestBuffer interrupted"); return; } if (task == null) { @@ -202,11 +201,9 @@ private long processTask(LogWriter logWriter, IngestTask task) throws IOExceptio throw new IllegalStateException("invalid ingestSnapshotId [" + batchSnapshotId + "]"); } logger.debug( - "append batch to WAL. requestId [" - + task.requestId - + "], snapshotId [" - + batchSnapshotId - + "]"); + "append batch to WAL. requestId [{}], snapshotId [{}]", + task.requestId, + batchSnapshotId); long latestSnapshotId = task.operationBatch.getLatestSnapshotId(); if (latestSnapshotId > 0 && latestSnapshotId < batchSnapshotId) { throw new IllegalStateException( @@ -218,10 +215,9 @@ private long processTask(LogWriter logWriter, IngestTask task) throws IOExceptio } long startTimeNano = System.nanoTime(); long walOffset = -1L; - while (!shouldStop) { + if (!shouldStop) { try { walOffset = logWriter.append(new LogEntry(batchSnapshotId, task.operationBatch)); - break; } catch (Exception e) { // write failed, just throw out to fail this task logger.error("write WAL failed. requestId [" + task.requestId + "]", e); @@ -243,6 +239,109 @@ private long processTask(LogWriter logWriter, IngestTask task) throws IOExceptio return batchSnapshotId; } + class IngestTask { + String requestId; + OperationBatch operationBatch; + IngestCallback callback; + + public IngestTask( + String requestId, OperationBatch operationBatch, IngestCallback callback) { + this.requestId = requestId; + this.operationBatch = operationBatch; + this.callback = callback; + } + } + + public void setTailOffset(long offset) { + logger.info("IngestProcessor of queue #[{}] set tail offset to [{}]", queueId, offset); + this.tailOffset = offset; + } + + public void replayWAL(long tailOffset) throws IOException { + long replayFrom = tailOffset + 1; + logger.info("replay WAL of queue#[{}] from offset [{}]", queueId, replayFrom); + int replayCount = 0; + try (LogReader logReader = this.logService.createReader(queueId, replayFrom)) { + ReadLogEntry readLogEntry; + while (!shouldStop && (readLogEntry = logReader.readNext()) != null) { + long offset = readLogEntry.getOffset(); + LogEntry logEntry = readLogEntry.getLogEntry(); + long snapshotId = logEntry.getSnapshotId(); + OperationBatch batch = logEntry.getOperationBatch(); + this.batchSender.asyncSendWithRetry("", queueId, snapshotId, offset, batch); + if (!batch.equals(IngestService.MARKER_BATCH)) { + replayCount++; + } + } + } + logger.info("replayWAL finished. total replayed [{}] records", replayCount); + } + + public long replayDMLRecordsFrom(long offset, long timestamp) throws IOException { + List types = new ArrayList<>(); + types.add(OperationType.OVERWRITE_VERTEX); + types.add(OperationType.UPDATE_VERTEX); + types.add(OperationType.DELETE_VERTEX); + types.add(OperationType.OVERWRITE_EDGE); + types.add(OperationType.UPDATE_EDGE); + types.add(OperationType.DELETE_EDGE); + types.add(OperationType.CLEAR_VERTEX_PROPERTIES); + types.add(OperationType.CLEAR_EDGE_PROPERTIES); + + long batchSnapshotId = this.ingestSnapshotId.get(); + logger.info( + "replay DML records of queue#[{}] from offset [{}], ts [{}]", + queueId, + offset, + timestamp); + int replayCount = 0; + try (LogReader logReader = this.logService.createReader(queueId, offset, timestamp)) { + ReadLogEntry readLogEntry; + while (!shouldStop && (readLogEntry = logReader.readNext()) != null) { + long entryOffset = readLogEntry.getOffset(); + LogEntry logEntry = readLogEntry.getLogEntry(); + OperationBatch batch = extractOperations(logEntry.getOperationBatch(), types); + if (batch.getOperationCount() == 0) { + continue; + } + this.batchSender.asyncSendWithRetry( + "", queueId, batchSnapshotId, entryOffset, batch); + replayCount++; + } + } + logger.info("replay DML records finished. total replayed [{}] records", replayCount); + return batchSnapshotId; + } + + private OperationBatch extractOperations(OperationBatch input, List types) { + boolean hasOtherType = false; + for (int i = 0; i < input.getOperationCount(); ++i) { + OperationBlob blob = input.getOperationBlob(i); + OperationType opType = blob.getOperationType(); + if (!types.contains(opType)) { + hasOtherType = true; + break; + } + } + if (!hasOtherType) { + return input; + } + OperationBatch.Builder batchBuilder = OperationBatch.newBuilder(); + batchBuilder.setLatestSnapshotId(input.getLatestSnapshotId()); + for (int i = 0; i < input.getOperationCount(); ++i) { + OperationBlob blob = input.getOperationBlob(i); + OperationType opType = blob.getOperationType(); + if (types.contains(opType)) { + batchBuilder.addOperationBlob(blob); + } + } + return batchBuilder.build(); + } + + public int getQueueId() { + return queueId; + } + @Override public void initMetrics() { this.totalProcessed = 0L; @@ -277,7 +376,7 @@ private void updateMetrics() { @Override public Map getMetrics() { - return new HashMap() { + return new HashMap<>() { { put(WRITE_RECORDS_PER_SECOND, String.valueOf(writeRecordsPerSecond)); put(WRITE_RECORDS_TOTAL, String.valueOf(totalProcessed)); @@ -300,54 +399,4 @@ public String[] getMetricKeys() { INGEST_BUFFER_TASKS_COUNT }; } - - class IngestTask { - String requestId; - OperationBatch operationBatch; - IngestCallback callback; - - public IngestTask( - String requestId, OperationBatch operationBatch, IngestCallback callback) { - this.requestId = requestId; - this.operationBatch = operationBatch; - this.callback = callback; - } - } - - public void setTailOffset(long offset) { - logger.info( - "IngestProcessor of queue #[" - + this.queueId - + "] set tail offset to [" - + offset - + "]"); - this.tailOffset = offset; - } - - private void replayWAL(long tailOffset) throws IOException { - long replayFrom = tailOffset + 1; - logger.info("replay WAL of queue#[" + this.queueId + "] from offset [" + replayFrom + "]"); - LogReader logReader = this.logService.createReader(this.queueId, replayFrom); - ReadLogEntry readLogEntry; - int replayCount = 0; - while (!shouldStop && (readLogEntry = logReader.readNext()) != null) { - long offset = readLogEntry.getOffset(); - LogEntry logEntry = readLogEntry.getLogEntry(); - long snapshotId = logEntry.getSnapshotId(); - OperationBatch operationBatch = logEntry.getOperationBatch(); - this.batchSender.asyncSendWithRetry( - "", this.queueId, snapshotId, offset, operationBatch); - replayCount++; - } - try { - logReader.close(); - } catch (IOException e) { - logger.warn("close logReader failed", e); - } - logger.info("replayWAL finished. total replayed [" + replayCount + "] records"); - } - - public int getQueueId() { - return queueId; - } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProgressClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProgressClient.java index dd72a992920b..a4c5a32f0abd 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProgressClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestProgressClient.java @@ -25,7 +25,7 @@ /** ingestor -> coordinator */ public class IngestProgressClient extends RpcClient { - private IngestProgressGrpc.IngestProgressBlockingStub stub; + private final IngestProgressGrpc.IngestProgressBlockingStub stub; public IngestProgressClient(ManagedChannel channel) { super(channel); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestService.java index 59cda9caf216..e2950d00d9c0 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestService.java @@ -30,11 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.io.IOException; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -52,16 +49,15 @@ public class IngestService implements NodeDiscovery.Listener { OperationBatch.newBuilder() .addOperationBlob(OperationBlob.MARKER_OPERATION_BLOB) .build(); + private final Configs configs; + private final NodeDiscovery discovery; + private final MetaService metaService; + private final LogService logService; + private final IngestProgressFetcher ingestProgressFetcher; + private final StoreWriter storeWriter; + private final MetricsCollector metricsCollector; - private Configs configs; - private NodeDiscovery discovery; - private MetaService metaService; - private LogService logService; - private IngestProgressFetcher ingestProgressFetcher; - private StoreWriter storeWriter; - private MetricsCollector metricsCollector; - - private int ingestorId; + private final int ingestorId; private List queueIds; private Map queueToProcessor; private AtomicLong ingestSnapshotId; @@ -71,10 +67,9 @@ public class IngestService implements NodeDiscovery.Listener { private ScheduledExecutorService scheduler; private ExecutorService singleThreadExecutor; private volatile boolean started = false; - private int storeNodeCount; - private long checkProcessorIntervalMs; + private final int storeNodeCount; - private Set availableNodes; + private final Set availableNodes; public IngestService( Configs configs, @@ -95,8 +90,6 @@ public IngestService( this.ingestorId = CommonConfig.NODE_IDX.get(configs); this.storeNodeCount = CommonConfig.STORE_NODE_COUNT.get(configs); this.availableNodes = new HashSet<>(); - this.checkProcessorIntervalMs = - IngestorConfig.INGESTOR_CHECK_PROCESSOR_INTERVAL_MS.get(configs); } public void start() { @@ -136,11 +129,10 @@ public void start() { Executors.newSingleThreadScheduledExecutor( ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "ingest-try-start", logger)); + + long delay = IngestorConfig.INGESTOR_CHECK_PROCESSOR_INTERVAL_MS.get(configs); this.scheduler.scheduleWithFixedDelay( - () -> tryStartProcessors(), - this.checkProcessorIntervalMs, - this.checkProcessorIntervalMs, - TimeUnit.MILLISECONDS); + this::tryStartProcessors, 0, delay, TimeUnit.MILLISECONDS); this.started = true; logger.info("IngestService started"); } @@ -178,7 +170,7 @@ public void stop() { } this.singleThreadExecutor = null; } - logger.info("IngestService stopped"); + logger.debug("IngestService stopped"); } private void checkStarted() { @@ -193,6 +185,15 @@ public void ingestBatch( this.queueToProcessor.get(queueId).ingestBatch(requestId, operationBatch, callback); } + public List replayDMLRecordsFrom(long offset, long timestamp) throws IOException { + List ids = new ArrayList<>(); + for (IngestProcessor processor : queueToProcessor.values()) { + long snapshotId = processor.replayDMLRecordsFrom(offset, timestamp); + ids.add(snapshotId); + } + return ids; + } + /** * This method will update writeSnapshotId and returns the previous value. * @@ -205,8 +206,7 @@ public void ingestBatch( public synchronized void advanceIngestSnapshotId( long snapshotId, CompletionCallback callback) { checkStarted(); - long previousSnapshotId = - this.ingestSnapshotId.getAndUpdate(x -> x < snapshotId ? snapshotId : x); + long previousSnapshotId = this.ingestSnapshotId.getAndUpdate(x -> Math.max(x, snapshotId)); if (previousSnapshotId >= snapshotId) { throw new IllegalStateException( "current ingestSnapshotId [" @@ -218,6 +218,7 @@ public synchronized void advanceIngestSnapshotId( AtomicInteger counter = new AtomicInteger(this.queueToProcessor.size()); AtomicBoolean finished = new AtomicBoolean(false); for (IngestProcessor processor : this.queueToProcessor.values()) { + int queue = processor.getQueueId(); try { processor.ingestBatch( "marker", @@ -239,11 +240,9 @@ public void onFailure(Exception e) { return; } logger.warn( - "ingest marker failed. queue#[" - + processor.getQueueId() - + "], snapshotId [" - + snapshotId - + "]", + "ingest marker failed. queue#{}, snapshotId {}", + queue, + snapshotId, e); callback.onError(e); } @@ -252,13 +251,7 @@ public void onFailure(Exception e) { if (finished.getAndSet(true)) { return; } - logger.warn( - "error in ingest marker. queue#[" - + processor.getQueueId() - + "], snapshotId [" - + snapshotId - + "]", - e); + logger.warn("ingest marker failed. queue#{}, snapshotId {}", queue, snapshotId, e); callback.onError(e); } } @@ -295,14 +288,13 @@ private void stopProcessors() { } this.singleThreadExecutor.execute( () -> { - if (!processorStarted) { - return; - } - for (IngestProcessor processor : this.queueToProcessor.values()) { - processor.stop(); + if (processorStarted) { + for (IngestProcessor processor : this.queueToProcessor.values()) { + processor.stop(); + } + processorStarted = false; + logger.info("processors stopped"); } - this.processorStarted = false; - logger.info("processors stopped"); }); } @@ -329,21 +321,19 @@ private void tryStartProcessors() { @Override public void nodesJoin(RoleType role, Map nodes) { - if (role != RoleType.STORE) { - return; - } - this.availableNodes.addAll(nodes.keySet()); - if (this.availableNodes.size() == storeNodeCount) { - graphNodesReady(); + if (role == RoleType.STORE) { + this.availableNodes.addAll(nodes.keySet()); + if (this.availableNodes.size() == storeNodeCount) { + graphNodesReady(); + } } } @Override public void nodesLeft(RoleType role, Map nodes) { if (role != RoleType.STORE) { - return; + this.availableNodes.removeAll(nodes.keySet()); + graphNodesLost(); } - this.availableNodes.removeAll(nodes.keySet()); - graphNodesLost(); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestorSnapshotService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestorSnapshotService.java index 19ae2a6c2905..1e7b00270135 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestorSnapshotService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestorSnapshotService.java @@ -22,7 +22,7 @@ public class IngestorSnapshotService extends IngestorSnapshotGrpc.IngestorSnapshotImplBase { - private IngestService ingestService; + private final IngestService ingestService; public IngestorSnapshotService(IngestService ingestService) { this.ingestService = ingestService; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestorWriteService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestorWriteService.java index 02cbe4e0aebd..4b0aa67d1f96 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestorWriteService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/IngestorWriteService.java @@ -14,15 +14,16 @@ package com.alibaba.graphscope.groot.ingestor; import com.alibaba.graphscope.groot.operation.OperationBatch; -import com.alibaba.graphscope.proto.groot.IngestorWriteGrpc; -import com.alibaba.graphscope.proto.groot.WriteIngestorRequest; -import com.alibaba.graphscope.proto.groot.WriteIngestorResponse; +import com.alibaba.graphscope.proto.groot.*; +import io.grpc.Status; import io.grpc.stub.StreamObserver; +import java.util.List; + public class IngestorWriteService extends IngestorWriteGrpc.IngestorWriteImplBase { - private IngestService ingestService; + private final IngestService ingestService; public IngestorWriteService(IngestService ingestService) { this.ingestService = ingestService; @@ -59,4 +60,18 @@ public void onFailure(Exception e) { responseObserver.onError(e); } } + + @Override + public void replayWAL( + ReplayWALRequest request, StreamObserver responseObserver) { + try { + List ids = + ingestService.replayDMLRecordsFrom(request.getOffset(), request.getTimestamp()); + responseObserver.onNext(ReplayWALResponse.newBuilder().addAllSnapshotId(ids).build()); + } catch (Exception e) { + responseObserver.onError( + Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException()); + } + responseObserver.onCompleted(); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/StoreWriteClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/StoreWriteClient.java index 430cc3ecdb61..6a86449a8866 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/StoreWriteClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/StoreWriteClient.java @@ -33,7 +33,7 @@ public class StoreWriteClient extends RpcClient { private static final Logger logger = LoggerFactory.getLogger(StoreWriteClient.class); - private StoreWriteGrpc.StoreWriteStub stub; + private final StoreWriteGrpc.StoreWriteStub stub; public StoreWriteClient(ManagedChannel channel) { super(channel); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/StoreWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/StoreWriter.java index d6bf8c14cd83..0e946f02e977 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/StoreWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/ingestor/StoreWriter.java @@ -19,6 +19,5 @@ import java.util.List; public interface StoreWriter { - void write( - int storeId, List storeDataBatch, CompletionCallback callback); + void write(int storeId, List batch, CompletionCallback callback); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/meta/DefaultMetaService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/meta/DefaultMetaService.java index 56284575a50a..34db12022a7b 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/meta/DefaultMetaService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/meta/DefaultMetaService.java @@ -18,24 +18,21 @@ import com.alibaba.graphscope.groot.common.config.KafkaConfig; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; public class DefaultMetaService implements MetaService { - private Configs configs; - private int partitionCount; - private int queueCount; + private final int partitionCount; + private final int queueCount; private Map> storeToPartitionIds; private Map partitionToStore; - private int storeCount; - private String kafkaServers; - private String kafkaTopicName; + private final int storeCount; + private final String kafkaServers; + private final String kafkaTopicName; public DefaultMetaService(Configs configs) { - this.configs = configs; this.partitionCount = CommonConfig.PARTITION_COUNT.get(configs); this.queueCount = CommonConfig.INGESTOR_QUEUE_COUNT.get(configs); this.storeCount = CommonConfig.STORE_NODE_COUNT.get(configs); @@ -58,15 +55,14 @@ private void loadPartitions() { this.partitionToStore = new HashMap<>(); int avg = this.partitionCount / this.storeCount; int remainder = this.partitionCount % storeCount; + for (int i = 0; i < storeCount; i++) { int startPartitionId = getStartPartition(avg, i, remainder); int nextStartPartitionId = getStartPartition(avg, i + 1, remainder); List partitionIds = new ArrayList<>(); - for (int partitionId = startPartitionId; - partitionId < nextStartPartitionId; - partitionId++) { - partitionIds.add(partitionId); - this.partitionToStore.put(partitionId, i); + for (int pid = startPartitionId; pid < nextStartPartitionId; pid++) { + partitionIds.add(pid); + this.partitionToStore.put(pid, i); } this.storeToPartitionIds.put(i, partitionIds); } @@ -101,7 +97,7 @@ public int getQueueCount() { @Override public List getQueueIdsForIngestor(int ingestorId) { - return Arrays.asList(ingestorId); + return List.of(ingestorId); } @Override diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/AvgMetric.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/AvgMetric.java index 82d9858f649e..02c8c8606db9 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/AvgMetric.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/AvgMetric.java @@ -3,7 +3,7 @@ import java.util.concurrent.atomic.AtomicLong; public class AvgMetric { - private AtomicLong totalVal; + private final AtomicLong totalVal; private volatile long lastUpdateVal; private volatile double metricVal; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsAggregator.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsAggregator.java index 89ddd7dabaa3..dfb87c2626bc 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsAggregator.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsAggregator.java @@ -29,10 +29,10 @@ public class MetricsAggregator { private Map> roleToClients = new HashMap<>(); - private ObjectMapper objectMapper; - private int frontendCount; - private int ingestorCount; - private int storeCount; + private final ObjectMapper objectMapper; + private final int frontendCount; + private final int ingestorCount; + private final int storeCount; public MetricsAggregator( Configs configs, diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollectClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollectClient.java index 17ee797a8922..6214f69730cf 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollectClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollectClient.java @@ -26,7 +26,7 @@ public class MetricsCollectClient extends RpcClient { - private MetricsCollectGrpc.MetricsCollectStub stub; + private final MetricsCollectGrpc.MetricsCollectStub stub; public MetricsCollectClient(ManagedChannel channel) { super(channel); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollectService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollectService.java index bb57e6a36b45..66772cc3195a 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollectService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollectService.java @@ -25,7 +25,7 @@ public class MetricsCollectService extends MetricsCollectGrpc.MetricsCollectImplBase { - private MetricsCollector metricsCollector; + private final MetricsCollector metricsCollector; public MetricsCollectService(MetricsCollector metricsCollector) { this.metricsCollector = metricsCollector; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollector.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollector.java index 5530a3c93d4d..9135ae58bdfc 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollector.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/metrics/MetricsCollector.java @@ -26,11 +26,11 @@ public class MetricsCollector { - private Set registeredMetricKeys = new HashSet<>(); - private List metricsAgents = new CopyOnWriteArrayList<>(); - private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final Set registeredMetricKeys = new HashSet<>(); + private final List metricsAgents = new CopyOnWriteArrayList<>(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - private long updateIntervalMs; + private final long updateIntervalMs; public MetricsCollector(Configs configs) { this.updateIntervalMs = CommonConfig.METRIC_UPDATE_INTERVAL_MS.get(configs); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/OperationBatch.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/OperationBatch.java index f39a41ed035b..3a3be247c52b 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/OperationBatch.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/OperationBatch.java @@ -19,11 +19,12 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Objects; public class OperationBatch implements Iterable { - private long latestSnapshotId; - private List operationBlobs; + private final long latestSnapshotId; + private final List operationBlobs; private OperationBatch(long latestSnapshotId, List operationBlobs) { this.latestSnapshotId = latestSnapshotId; @@ -81,9 +82,7 @@ public boolean equals(Object o) { OperationBatch that = (OperationBatch) o; - return operationBlobs != null - ? operationBlobs.equals(that.operationBlobs) - : that.operationBlobs == null; + return Objects.equals(operationBlobs, that.operationBlobs); } public static class Builder { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/ddl/AddEdgeKindOperation.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/ddl/AddEdgeKindOperation.java index 27d7b7408b72..39da967a4973 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/ddl/AddEdgeKindOperation.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/ddl/AddEdgeKindOperation.java @@ -22,10 +22,10 @@ public class AddEdgeKindOperation extends Operation { - private int partitionId; - private long schemaVersion; - private EdgeKind edgeKind; - private long tableIdx; + private final int partitionId; + private final long schemaVersion; + private final EdgeKind edgeKind; + private final long tableIdx; public AddEdgeKindOperation( int partitionId, long schemaVersion, EdgeKind edgeKind, long tableIdx) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/ChannelManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/ChannelManager.java index 610e0156eef5..e1663ac0c654 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/ChannelManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/ChannelManager.java @@ -105,7 +105,7 @@ public void start() { for (int i = 0; i < count; i++) { String host = hostTemplate.replace("{}", String.valueOf(i)); logger.info( - "Creating channel to role {} #{}, host {}, port {}", + "Create channel to role {} #{}, host {}, port {}", role.getName(), i, host, @@ -147,7 +147,7 @@ public void stop() { } this.roleToChannels = null; } - logger.info("ChannelManager stopped"); + logger.debug("ChannelManager stopped"); } public void registerRole(RoleType role) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/RpcServer.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/RpcServer.java index ff9ec3bd48e1..0615752efb78 100755 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/RpcServer.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/rpc/RpcServer.java @@ -86,7 +86,7 @@ public void stop() { // Do nothing } } - logger.info("RpcServer stopped"); + logger.debug("RpcServer stopped"); } public int getPort() { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java index 0e07e5768044..408316a39784 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java @@ -88,25 +88,22 @@ public StoreDataBatch poll() throws InterruptedException { } this.queueHeads.set(i, entry); } - long entrySnapshotId = entry.getSnapshotId(); - if (entrySnapshotId < minSnapshotId) { - minSnapshotId = entrySnapshotId; - } + minSnapshotId = Math.min(minSnapshotId, entry.getSnapshotId()); } this.currentPollSnapshotId = minSnapshotId; - logger.info("currentPollSnapshotId initialize to [" + this.currentPollSnapshotId + "]"); + logger.info("currentPollSnapshotId initialize to [{}]", currentPollSnapshotId); } while (true) { StoreDataBatch entry = this.queueHeads.get(this.currentPollQueueIdx); this.queueHeads.set(this.currentPollQueueIdx, null); if (entry == null) { entry = - this.innerQueues - .get(this.currentPollQueueIdx) - .poll(this.queueWaitMs, TimeUnit.MILLISECONDS); - if (entry == null) { - return null; - } + innerQueues + .get(currentPollQueueIdx) + .poll(queueWaitMs, TimeUnit.MILLISECONDS); + } + if (entry == null) { + return null; } long snapshotId = entry.getSnapshotId(); @@ -123,13 +120,11 @@ public StoreDataBatch poll() throws InterruptedException { } } else { logger.warn( - "Illegal entry polled from queue [" - + this.currentPollQueueIdx - + "]. entrySnapshotId [" - + snapshotId - + "] < currentSnapshotId [" - + this.currentPollSnapshotId - + "]. Ignored entry."); + "Illegal entry polled from queue [{}]. entrySnapshotId [{}] <" + + " currentSnapshotId [{}]. Ignored entry", + currentPollQueueIdx, + snapshotId, + currentPollSnapshotId); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 61322f27be88..8263f198b921 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -205,8 +205,7 @@ public long recover() throws InterruptedException, IOException { () -> { try { long partitionSnapshotId = partition.recover(); - snapshotId.updateAndGet( - x -> x < partitionSnapshotId ? x : partitionSnapshotId); + snapshotId.updateAndGet(x -> Math.min(x, partitionSnapshotId)); } catch (Exception e) { logger.error("partition #[] recover failed"); snapshotId.set(-1L); @@ -406,6 +405,9 @@ public void garbageCollect(long snapshotId, CompletionCallback callback) { } private void garbageCollectInternal(long snapshotId) throws IOException { + if (snapshotId % 3600 != 0) { // schedule every 1 hour + return; + } for (Map.Entry entry : this.idToPartition.entrySet()) { GraphPartition partition = entry.getValue(); partition.garbageCollect(snapshotId); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSnapshotService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSnapshotService.java index 1eeac1391c9f..ebbb9a7a0d96 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSnapshotService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSnapshotService.java @@ -14,7 +14,7 @@ public class StoreSnapshotService private static final Logger logger = LoggerFactory.getLogger(com.alibaba.graphscope.groot.store.StoreSnapshotService.class); - private StoreService storeService; + private final StoreService storeService; public StoreSnapshotService(StoreService storeService) { this.storeService = storeService; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index 00e479a4916c..e47079a3265f 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -152,7 +152,7 @@ public void stop() { } this.commitExecutor = null; } - logger.info("WriterAgent stopped"); + logger.debug("WriterAgent stopped"); } /** @@ -226,6 +226,8 @@ private void processBatches() { int queueId = storeDataBatch.getQueueId(); long offset = storeDataBatch.getOffset(); this.consumedQueueOffsets.set(queueId, offset); + } catch (InterruptedException e) { + logger.error("processBatches interrupted"); } catch (Exception e) { logger.error("error in processBatches, ignore", e); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java index ffe819e6383b..a34bfdff5891 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java @@ -16,9 +16,11 @@ import com.alibaba.graphscope.groot.operation.OperationBatch; import com.alibaba.graphscope.proto.groot.LogEntryPb; +import java.util.Objects; + public class LogEntry { - private long snapshotId; - private OperationBatch operationBatch; + private final long snapshotId; + private final OperationBatch operationBatch; public LogEntry(long snapshotId, OperationBatch operationBatch) { this.snapshotId = snapshotId; @@ -54,8 +56,6 @@ public boolean equals(Object o) { LogEntry logEntry = (LogEntry) o; if (snapshotId != logEntry.snapshotId) return false; - return operationBatch != null - ? operationBatch.equals(logEntry.operationBatch) - : logEntry.operationBatch == null; + return Objects.equals(operationBatch, logEntry.operationBatch); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogService.java index 6b5ff34b57c7..647ca60d7e57 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogService.java @@ -36,13 +36,23 @@ public interface LogService { LogWriter createWriter(int queueId); /** - * Create a reader can read data of specific a queue from certain offset. + * Create a reader can read data of specific queue from certain offset. * @param queueId * @param offset * @return */ LogReader createReader(int queueId, long offset) throws IOException; + /** + * Create a reader that can read data of a specific queue from certain offset or timestamp + * @param queueId + * @param offset -1 if timestamp is specified. + * @param timestamp -1 if offset is specified. + * @return + * @throws IOException + */ + LogReader createReader(int queueId, long offset, long timestamp) throws IOException; + /** * Delete all data before certain offset in the queue. * diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogReader.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogReader.java index ed78ee115439..d4eefed4fa65 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogReader.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogReader.java @@ -21,21 +21,14 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.OffsetSpec; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; +import java.time.*; +import java.util.*; import java.util.concurrent.ExecutionException; public class KafkaLogReader implements LogReader { @@ -43,82 +36,80 @@ public class KafkaLogReader implements LogReader { private static final Logger logger = LoggerFactory.getLogger(KafkaLogReader.class); private static final LogEntryDeserializer deSer = new LogEntryDeserializer(); - private Consumer consumer; + private final Consumer consumer; private Iterator> iterator; - private long latestOffset; + private final long latest; private long nextReadOffset; public KafkaLogReader( - String servers, AdminClient adminClient, String topicName, int partitionId, long offset) + String servers, + AdminClient client, + String topicName, + int partitionId, + long offset, + long timestamp) throws IOException { Map kafkaConfigs = new HashMap<>(); kafkaConfigs.put("bootstrap.servers", servers); TopicPartition partition = new TopicPartition(topicName, partitionId); - long earliestOffset; - try { - earliestOffset = - adminClient - .listOffsets(Collections.singletonMap(partition, OffsetSpec.earliest())) - .partitionResult(partition) - .get() - .offset(); - this.latestOffset = - adminClient - .listOffsets(Collections.singletonMap(partition, OffsetSpec.latest())) - .partitionResult(partition) - .get() - .offset(); - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); + + long earliest = getOffset(client, partition, OffsetSpec.earliest()); + latest = getOffset(client, partition, OffsetSpec.latest()); + + // Get offset from timestamp + if (offset == -1) { + offset = getOffset(client, partition, OffsetSpec.forTimestamp(timestamp)); } - if (earliestOffset > offset || offset > this.latestOffset) { + if (earliest > offset || offset > latest) { throw new IllegalArgumentException( - "cannot read from [" - + offset - + "], earliest offset is [" - + earliestOffset - + "], latest offset is [" - + this.latestOffset - + "]"); + "invalid offset " + offset + ", hint: [" + earliest + ", " + latest + ")"); } - this.consumer = new KafkaConsumer<>(kafkaConfigs, deSer, deSer); - this.consumer.assign(Arrays.asList(partition)); - this.consumer.seek(partition, offset); - this.nextReadOffset = offset; + consumer = new KafkaConsumer<>(kafkaConfigs, deSer, deSer); + consumer.assign(List.of(partition)); + consumer.seek(partition, offset); + nextReadOffset = offset; logger.info( - "reader created. kafka offset range is [" - + earliestOffset - + "] ~ [" - + this.latestOffset - + "]"); + "reader created with offset [{}], offset range is [{}] ~ [{}]", + offset, + earliest, + latest); + } + + private long getOffset(AdminClient client, TopicPartition partition, OffsetSpec spec) + throws IOException { + try { + return client.listOffsets(Collections.singletonMap(partition, spec)) + .partitionResult(partition) + .get() + .offset(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } } @Override public ReadLogEntry readNext() { - if (this.nextReadOffset == this.latestOffset) { + if (nextReadOffset == latest) { return null; } - while (this.iterator == null || !this.iterator.hasNext()) { + while (iterator == null || !iterator.hasNext()) { ConsumerRecords consumerRecords = - this.consumer.poll(Duration.ofMillis(100L)); + consumer.poll(Duration.ofMillis(100L)); if (consumerRecords == null || consumerRecords.isEmpty()) { - logger.info( - "polled nothing from Kafka. nextReadOffset is [" - + this.nextReadOffset - + "]"); + logger.info("polled nothing from Kafka. nextReadOffset is [{}]", nextReadOffset); continue; } - this.iterator = consumerRecords.iterator(); + iterator = consumerRecords.iterator(); } ConsumerRecord record = iterator.next(); - this.nextReadOffset = record.offset() + 1; + nextReadOffset = record.offset() + 1; LogEntry v = record.value(); return new ReadLogEntry(record.offset(), v); } @Override public void close() throws IOException { - this.consumer.close(); + consumer.close(); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java index 1f28a43c5f26..b4622c3740fe 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java @@ -21,10 +21,7 @@ import com.alibaba.graphscope.groot.wal.LogService; import com.alibaba.graphscope.groot.wal.LogWriter; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.DeleteRecordsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,12 +36,12 @@ public class KafkaLogService implements LogService { private static final Logger logger = LoggerFactory.getLogger(KafkaLogService.class); - private Configs configs; - private String servers; - private String topic; - private int queueCount; - private short replicationFactor; - private int maxMessageMb; + private final Configs configs; + private final String servers; + private final String topic; + private final int queueCount; + private final short replicationFactor; + private final int maxMessageMb; private volatile AdminClient adminClient; @@ -97,7 +94,7 @@ public boolean initialized() { public LogWriter createWriter(int queueId) { String customConfigsStr = KafkaConfig.KAFKA_PRODUCER_CUSTOM_CONFIGS.get(configs); Map customConfigs = new HashMap<>(); - if (!"".equals(customConfigsStr)) { + if (!customConfigsStr.isEmpty()) { for (String item : customConfigsStr.split("\\|")) { String[] kv = item.split(":"); if (kv.length != 2) { @@ -107,12 +104,17 @@ public LogWriter createWriter(int queueId) { customConfigs.put(kv[0], kv[1]); } } + logger.info("Kafka writer configs {}", customConfigs); return new KafkaLogWriter(servers, topic, queueId, customConfigs); } @Override public LogReader createReader(int queueId, long offset) throws IOException { - return new KafkaLogReader(servers, getAdmin(), topic, queueId, offset); + return createReader(queueId, offset, -1); + } + + public LogReader createReader(int queueId, long offset, long timestamp) throws IOException { + return new KafkaLogReader(servers, getAdmin(), topic, queueId, offset, timestamp); } @Override diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogWriter.java index 6619deec2520..2f51fea09e89 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogWriter.java @@ -36,9 +36,9 @@ public class KafkaLogWriter implements LogWriter { private static final Logger logger = LoggerFactory.getLogger(KafkaLogWriter.class); private static final LogEntrySerializer ser = new LogEntrySerializer(); - private Producer producer; - private String topicName; - private int partitionId; + private final Producer producer; + private final String topicName; + private final int partitionId; public KafkaLogWriter( String servers, String topicName, int partitionId, Map customConfigs) { diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeBase.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeBase.java index 27b3353e107f..5aa2dd3783d2 100755 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeBase.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/NodeBase.java @@ -34,9 +34,11 @@ public NodeBase() { } public NodeBase(Configs configs) { - logger.info("Configs {}", configs.toString()); this.roleType = RoleType.fromName(CommonConfig.ROLE_NAME.get(configs)); this.idx = CommonConfig.NODE_IDX.get(configs); + if (idx == 0) { + logger.info("Configs {}", configs); + } } public NodeBase(Configs configs, RoleType roleType) { diff --git a/interactive_engine/groot-server/src/test/resources/gremlin-test.config b/interactive_engine/groot-server/src/test/resources/gremlin-test.config index 8c01c3b0c6f8..8a3f055be2b4 100644 --- a/interactive_engine/groot-server/src/test/resources/gremlin-test.config +++ b/interactive_engine/groot-server/src/test/resources/gremlin-test.config @@ -1,56 +1,38 @@ ## Common Config -graph.name=graphscope role.name= node.idx= rpc.port=0 -rpc.max.bytes.mb=4 store.node.count=1 frontend.node.count=1 ingestor.node.count=2 coordinator.node.count=1 ingestor.queue.count=2 -partition.count=8 -engine.type=gaia +partition.count=4 discovery.mode=zookeeper -## Ingestor Config -ingestor.queue.buffer.size=128 -ingestor.sender.buffer.size=128 - ## Coordinator Config -snapshot.increase.interval.ms=1000 -offsets.persist.interval.ms=3000 +log.recycle.enable=true +file.meta.store.path=./meta ## Store Config store.data.path=./data -store.write.thread.count=1 ## Zk Config zk.base.path=/graphscope/groot -zk.connect.string=graph_env:2181 +zk.connect.string=localhost:2181 ## Kafka Config -kafka.servers=graph_env:9092 +kafka.servers=localhost:9092 kafka.topic=groot ## Frontend Config gremlin.server.port=12312 -executor.worker.per.process=2 -executor.query.thread.count=2 -executor.query.manager.thread.count=2 -executor.query.store.thread.count=2 -log4rs.config=./conf/log4rs.yml +log4rs.config=LOG4RS_CONFIG -dns.name.prefix.frontend=localhost -dns.name.prefix.ingestor=localhost -dns.name.prefix.coordinator=localhost -dns.name.prefix.store=localhost +neo4j.bolt.server.disabled=true -#rpc.port=55555 -#executor.graph.port=55556 -#executor.query.port=55557 -#executor.engine.port=55558 -pegasus.worker.num=1 +pegasus.worker.num=2 +pegasus.hosts=localhost:8080 -neo4j.bolt.server.disabled=true +kafka.test.cluster.enable=true \ No newline at end of file diff --git a/proto/groot/ingestor_snapshot_service.proto b/proto/groot/ingestor_snapshot_service.proto index 9e03303948a9..cb5cf4f39d7b 100644 --- a/proto/groot/ingestor_snapshot_service.proto +++ b/proto/groot/ingestor_snapshot_service.proto @@ -48,8 +48,19 @@ message GetTailOffsetsResponse { service IngestorWrite { rpc writeIngestor(WriteIngestorRequest) returns (WriteIngestorResponse); + rpc replayWAL(ReplayWALRequest) returns (ReplayWALResponse); } +message ReplayWALRequest { + int64 offset = 1; + int64 timestamp = 2; +} + +message ReplayWALResponse { + repeated int64 snapshotId = 1; +} + + message WriteIngestorRequest { string requestId = 1; int32 queueId = 2; @@ -58,4 +69,4 @@ message WriteIngestorRequest { message WriteIngestorResponse { int64 snapshotId = 1; -} \ No newline at end of file +} diff --git a/proto/write_service.proto b/proto/write_service.proto index dc8dd3c286fe..c1ad2013d3d4 100644 --- a/proto/write_service.proto +++ b/proto/write_service.proto @@ -23,6 +23,16 @@ service ClientWrite { rpc getClientId(GetClientIdRequest) returns(GetClientIdResponse); rpc batchWrite(BatchWriteRequest) returns(BatchWriteResponse); rpc remoteFlush(RemoteFlushRequest) returns(RemoteFlushResponse); + rpc replayRecords(ReplayRecordsRequest) returns(ReplayRecordsResponse); +} + +message ReplayRecordsRequest { + int64 offset = 1; + int64 timestamp = 2; +} + +message ReplayRecordsResponse { + repeated int64 snapshot_id = 1; } message GetClientIdRequest { diff --git a/python/graphscope/client/connection.py b/python/graphscope/client/connection.py index 0dadef0a4a77..ed62d17a80c6 100644 --- a/python/graphscope/client/connection.py +++ b/python/graphscope/client/connection.py @@ -177,6 +177,15 @@ def remote_flush(self, snapshot_id, timeout_ms=3000): ) return response.success + def replay_records(self, offset: int, timestamp: int): + request = write_service_pb2.ReplayRecordsRequest() + request.offset = offset + request.timestamp = timestamp + response = self._write_service_stub.replayRecords( + request, metadata=self._metadata + ) + return response.snapshot_id + def get_store_state(self): request = model_pb2.GetStoreStateRequest() response = self._client_service_stub.getStoreState(