Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Enable replay WAL records from offset of timestamp in groot #3385

Merged
merged 11 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 6 additions & 5 deletions charts/graphscope-store-one-pod/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ metadata:
data:
groot.config: |-
## Common Config
graph.name={{ .Values.graphName }}
role.name=
node.idx=
rpc.port=0
Expand All @@ -26,25 +25,27 @@ data:
coordinator.node.count=1
ingestor.queue.count={{ .Values.ingestor.replicaCount }}
partition.count={{ .Values.store.replicaCount | mul 16 }}
engine.type=gaia
discovery.mode=zookeeper

## Frontend Config
frontend.server.num=1
enable.hash.generate.eid={{ .Values.enableHashGenerateEid }}

## Ingestor Config
ingestor.queue.buffer.size={{ .Values.ingestorQueueBufferSize }}
ingestor.sender.buffer.size={{ .Values.ingestorSenderBufferSize }}
ingestor.queue.buffer.max.count={{ .Values.ingestorQueueBufferMaxCount }}
ingestor.sender.buffer.max.count={{ .Values.ingestorSenderBufferMaxCount }}
ingestor.sender.operation.max.count={{ .Values.ingestorSenderOperationMaxCount }}

## Coordinator Config
snapshot.increase.interval.ms={{ .Values.snapshotIncreaseIntervalMs }}
offsets.persist.interval.ms={{ .Values.offsetsPersistIntervalMs }}
file.meta.store.path={{ .Values.fileMetaStorePath }}
log.recycle.enable={{ .Values.logRecycleEnable }}

## Store Config
store.data.path={{ .Values.storeDataPath }}
store.write.thread.count={{ .Values.storeWriteThreadCount }}
store.queue.buffer.size={{ .Values.storeQueueBufferSize }}

## Frontend Config
gremlin.server.port=12312
Expand All @@ -69,11 +70,11 @@ data:
pegasus.output.capacity=16
pegasus.hosts=localhost:8080

kafka.test.cluster.enable={{ not .Values.kafka.enabled }}
## Kafka Config
kafka.servers=KAFKA_SERVERS
kafka.topic={{ .Values.kafkaTopic }}
kafka.producer.custom.configs={{ .Values.kafkaProducerCustomConfigs }}
kafka.test.cluster.enable={{ not .Values.kafka.enabled }}

## Zk Config
zk.base.path=/graphscope/groot
Expand Down
14 changes: 7 additions & 7 deletions charts/graphscope-store-one-pod/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -348,32 +348,32 @@ externalKafka:
##
javaOpts: ""

graphName: "graphscope"

rpcMaxBytesMb: 20

## Ingestor Config
ingestorQueueBufferSize: 1024

ingestorSenderBufferSize: 1024
ingestorQueueBufferMaxCount: 102400
ingestorSenderBufferMaxCount: 102400
ingestorSenderOperationMaxCount: 102400

## Coordinator Config
snapshotIncreaseIntervalMs: 1000
offsetsPersistIntervalMs: 3000
fileMetaStorePath: "/var/lib/graphscope-store/meta"
logRecycleEnable: true

## Frontend config
enableHashGenerateEid: false

## Store Config
storeDataPath: "/var/lib/graphscope-store"
storeWriteThreadCount: 1
storeQueueBufferSize: 102400

## Kafka Config
kafkaTopic: "graphscope"
kafkaProducerCustomConfigs: ""

## Key-value pair seperated by ;
## Key-value pair separated by ;
## For example extraConfig="k1=v1;k2=v2"
extraConfig: ""

Expand All @@ -383,5 +383,5 @@ auth:

pegasus:
worker:
num: 4
num: 1
timeout: 240000
15 changes: 6 additions & 9 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ metadata:
data:
groot.config: |-
## Common Config
graph.name={{ .Values.graphName }}
role.name=ROLE
node.idx=INDEX
rpc.port=55555
Expand All @@ -26,7 +25,6 @@ data:
coordinator.node.count={{ .Values.coordinator.replicaCount }}
ingestor.queue.count={{ .Values.ingestor.replicaCount }}
partition.count={{ .Values.store.replicaCount | mul 16 }}
engine.type={{ .Values.engineType }}
discovery.mode={{ .Values.discoveryMode }}

## Frontend Config
Expand All @@ -36,33 +34,32 @@ data:
enable.hash.generate.eid={{ .Values.enableHashGenerateEid }}

## Ingestor Config
ingestor.queue.buffer.size={{ .Values.ingestorQueueBufferSize }}
ingestor.sender.buffer.size={{ .Values.ingestorSenderBufferSize }}
ingestor.queue.buffer.max.count={{ .Values.ingestorQueueBufferMaxCount }}
ingestor.sender.buffer.max.count={{ .Values.ingestorSenderBufferMaxCount }}
ingestor.sender.operation.max.count={{ .Values.ingestorSenderOperationMaxCount }}

## Coordinator Config
snapshot.increase.interval.ms={{ .Values.snapshotIncreaseIntervalMs }}
offsets.persist.interval.ms={{ .Values.offsetsPersistIntervalMs }}
file.meta.store.path={{ .Values.fileMetaStorePath }}
log.recycle.enable={{ .Values.logRecycleEnable }}

## Store Config
store.data.path={{ .Values.storeDataPath }}
store.write.thread.count={{ .Values.storeWriteThreadCount }}
store.queue.buffer.size={{ .Values.storeQueueBufferSize }}

## Kafka Config
kafka.servers=KAFKA_SERVERS
kafka.topic={{ .Values.kafkaTopic }}
kafka.producer.custom.configs={{ .Values.kafkaProducerCustomConfigs }}
kafka.test.cluster.enable=false

## Frontend Config
gremlin.server.port=12312
## disable neo4j when launching groot server by default
neo4j.bolt.server.disabled=true

executor.worker.per.process={{ .Values.executorWorkerPerProcess }}
executor.query.thread.count={{ .Values.executorQueryThreadCount }}
executor.query.manager.thread.count={{ .Values.executorQueryManagerThreadCount }}
executor.query.store.thread.count={{ .Values.executorQueryStoreThreadCount }}

dns.name.prefix.frontend=FRONTEND
dns.name.prefix.ingestor=INGESTOR
dns.name.prefix.coordinator=COORDINATOR
Expand Down
19 changes: 6 additions & 13 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -495,27 +495,25 @@ externalKafka:
##
javaOpts: ""

graphName: "graphscope"

rpcMaxBytesMb: 20

engineType: "gaia"

discoveryMode: "file"

## Ingestor Config
ingestorQueueBufferSize: 1024

ingestorSenderBufferSize: 1024
ingestorQueueBufferMaxCount: 102400
ingestorSenderBufferMaxCount: 102400
ingestorSenderOperationMaxCount: 102400

## Coordinator Config
snapshotIncreaseIntervalMs: 1000
offsetsPersistIntervalMs: 3000
fileMetaStorePath: "/etc/groot/my.meta"
logRecycleEnable: true

## Store Config
storeDataPath: "/var/lib/graphscope-store"
storeWriteThreadCount: 1
storeQueueBufferSize: 102400

## Kafka Config
kafkaTopic: "graphscope"
Expand All @@ -525,12 +523,7 @@ kafkaProducerCustomConfigs: ""
enableHashGenerateEid: false
# gremlinServerPort: 12312

executorWorkerPerProcess: 2
executorQueryThreadCount: 2
executorQueryManagerThreadCount: 2
executorQueryStoreThreadCount: 2

## Key-value pair seperated by ;
## Key-value pair separated by ;
## For example extraConfig="k1=v1;k2=v2"
extraConfig: ""

Expand Down
26 changes: 26 additions & 0 deletions docs/storage_engine/groot.md
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,32 @@ APIs including:

Refer to [RealtimeWrite.java](https://github.com/alibaba/GraphScope/blob/main/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java) for examples.


### Other features

Groot could enable user to replay realtime write records from a specific offset, or a timestamp, this is useful when you want to restore some records before
a offline load finished, since offload will overwrite all records.

You can only specify one of `offset` and `timestamp`. The other unused one must be set to -1. If not, `offset` will take precedence.

Example API:
- Python:
```python
import time
import graphscope
conn = graphscope.conn()
current_timestamp = int(time.time() * 1000) - 100 * 60 * 1000

r = conn.replay_records(-1, current_timestamp)
```
- Java

```java
GrootClient client = GrootClientBuilder.build();
long timestamp = System.currentTimeMillis();
client.replayRecords(-1, timestamp);
```

## Uninstalling and Restarting

### Uninstall Groot
Expand Down
25 changes: 1 addition & 24 deletions interactive_engine/assembly/src/conf/groot/config.template
Original file line number Diff line number Diff line change
@@ -1,31 +1,21 @@
## Common Config
graph.name=graphscope
role.name=
node.idx=
rpc.port=0
rpc.max.bytes.mb=4
store.node.count=1
frontend.node.count=1
ingestor.node.count=2
coordinator.node.count=1
ingestor.queue.count=2
partition.count=1
engine.type=gaia
partition.count=4
discovery.mode=zookeeper

## Ingestor Config
ingestor.queue.buffer.size=128
ingestor.sender.buffer.size=128

## Coordinator Config
snapshot.increase.interval.ms=1000
offsets.persist.interval.ms=3000
log.recycle.enable=true
file.meta.store.path=./meta

## Store Config
store.data.path=./data
store.write.thread.count=1

## Zk Config
zk.base.path=/graphscope/groot
Expand All @@ -38,24 +28,11 @@ kafka.topic=groot
## Frontend Config
gremlin.server.port=12312

## Backup Config
backup.enable=false
store.backup.thread.count=1

log4rs.config=LOG4RS_CONFIG

dns.name.prefix.frontend=localhost
dns.name.prefix.ingestor=localhost
dns.name.prefix.coordinator=localhost
dns.name.prefix.store=localhost

gaia.enable=true
neo4j.bolt.server.disabled=true

pegasus.worker.num=2
pegasus.timeout=240000
pegasus.batch.size=1024
pegasus.output.capacity=16
pegasus.hosts=localhost:8080

kafka.test.cluster.enable=true
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public class CommonConfig {

public static final Config<Integer> PARTITION_COUNT = Config.intConfig("partition.count", 1);

public static final Config<String> GRAPH_NAME = Config.stringConfig("graph.name", "graphscope");

public static final Config<Long> METRIC_UPDATE_INTERVAL_MS =
Config.longConfig("metric.update.interval.ms", 5000L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import java.util.function.Function;

public class Config<T> {
private String key;
private String defaultVal;
private final String key;
private final String defaultVal;

private Function<String, T> parseFunc;
private final Function<String, T> parseFunc;

public Config(String key, String defaultVal, Function<String, T> parseFunc) {
this.key = key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public class Configs {

private Properties properties;
private final Properties properties;

public Configs() {
this.properties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class CoordinatorConfig {
Config.boolConfig("log.recycle.enable", true);

public static final Config<Long> LOG_RECYCLE_INTERVAL_SECOND =
Config.longConfig("log.recycle.interval.second", 60L);
Config.longConfig("log.recycle.interval.second", 600L);

public static final Config<String> FILE_META_STORE_PATH =
Config.stringConfig("file.meta.store.path", "./meta");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.alibaba.graphscope.groot.common.config;

public class GaiaConfig {
public static final Config<Boolean> GAIA_ENABLE = Config.boolConfig("gaia.enable", false);
public static final Config<Boolean> GAIA_ENABLE = Config.boolConfig("gaia.enable", true);

public static final Config<Boolean> GAIA_REPORT = Config.boolConfig("gaia.report", false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@

public class IngestorConfig {
public static final Config<Integer> INGESTOR_QUEUE_BUFFER_MAX_COUNT =
Config.intConfig("ingestor.queue.buffer.max.count", 1024);
Config.intConfig("ingestor.queue.buffer.max.count", 102400);

public static final Config<Integer> INGESTOR_SENDER_BUFFER_MAX_COUNT =
Config.intConfig("ingestor.sender.buffer.max.count", 1024);
Config.intConfig("ingestor.sender.buffer.max.count", 102400);

public static final Config<Integer> INGESTOR_SENDER_OPERATION_MAX_COUNT =
Config.intConfig("ingestor.sender.operation.max.count", 8192);
Config.intConfig("ingestor.sender.operation.max.count", 102400);

public static final Config<Long> INGESTOR_CHECK_PROCESSOR_INTERVAL_MS =
Config.longConfig("ingestor.check.processor.interval.ms", 3000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@

public class StoreConfig {
public static final Config<String> STORE_DATA_PATH =
Config.stringConfig("store.data.path", "/groot_data");
Config.stringConfig("store.data.path", "./data");

public static final Config<Integer> STORE_WRITE_THREAD_COUNT =
Config.intConfig("store.write.thread.count", 1);

public static final Config<Integer> STORE_QUEUE_BUFFER_SIZE =
Config.intConfig("store.queue.buffer.size", 1024);
Config.intConfig("store.queue.buffer.size", 102400);

public static final Config<Long> STORE_QUEUE_WAIT_MS =
Config.longConfig("store.queue.wait.ms", 3000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class RpcUtils {
public static final Logger logger = LoggerFactory.getLogger(RpcUtils.class);

public static Executor createGrpcExecutor(int threadCount) {
logger.info("create grpc executor, thread count [" + threadCount + "]");
logger.debug("create grpc executor, thread count [" + threadCount + "]");
return new ForkJoinPool(
threadCount,
new ForkJoinPool.ForkJoinWorkerThreadFactory() {
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/compiler/conf/ir.compiler.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ graph.planner.rules: FilterIntoJoinRule, FilterMatchRule, NotMatchToAntiJoinRule
# gremlin.server.port: 8182

# disable neo4j server
# neo4j.bolt.server.disabled: true
neo4j.bolt.server.disabled: false
# set neo4j server port if neo4j server is enabled
# neo4j.bolt.server.port: 7687

Expand Down