Skip to content

Commit

Permalink
fix(interactive): use pk hash to generate eid if has pk (#3621)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Mar 12, 2024
1 parent cdc6190 commit e1d1b33
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ public class FrontendConfig {
"frontend.service.thread.count",
Math.max(Math.min(Runtime.getRuntime().availableProcessors() / 2, 64), 4));

public static final Config<Boolean> ENABLE_HASH_GENERATE_EID =
Config.boolConfig("enable.hash.generate.eid", false);

public static final Config<Integer> WRITE_QUEUE_BUFFER_MAX_COUNT =
Config.intConfig("write.queue.buffer.max.count", 1024000);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public long getNextId() {

@Override
public long getHashId(long srcId, long dstId, int labelId, List<byte[]> pks) {
if (pks != null && pks.size() > 0) {
return PkHashUtils.hash(srcId, dstId, labelId, pks);
if (pks == null || pks.size() == 0) {
throw new RuntimeException("Cannot get hash id when pk is empty");
}
return PkHashUtils.hash(srcId, dstId, labelId, System.nanoTime());
return PkHashUtils.hash(srcId, dstId, labelId, pks);
}

private void allocateNewIds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.alibaba.graphscope.groot.CompletionCallback;
import com.alibaba.graphscope.groot.SnapshotCache;
import com.alibaba.graphscope.groot.common.config.Configs;
import com.alibaba.graphscope.groot.common.config.FrontendConfig;
import com.alibaba.graphscope.groot.common.exception.GrootException;
import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException;
import com.alibaba.graphscope.groot.common.schema.api.GraphElement;
Expand All @@ -14,7 +13,6 @@
import com.alibaba.graphscope.groot.common.schema.wrapper.LabelId;
import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue;
import com.alibaba.graphscope.groot.common.util.*;
import com.alibaba.graphscope.groot.meta.MetaService;
import com.alibaba.graphscope.groot.metrics.MetricsAgent;
import com.alibaba.graphscope.groot.metrics.MetricsCollector;
import com.alibaba.graphscope.groot.operation.EdgeId;
Expand Down Expand Up @@ -49,34 +47,24 @@ public class GraphWriter implements MetricsAgent {
private volatile long ingestorBlockTimeAvgMs;
private volatile long lastUpdateIngestorBlockTimeNano;
private AtomicInteger pendingWriteCount;
/**
* true: enable use hash64(srcId, dstId, edgeLabelId, edgePks) to generate eid;
*/
private boolean enableHashEid;

private SnapshotCache snapshotCache;
private EdgeIdGenerator edgeIdGenerator;
private MetaService metaService;
private AtomicLong lastWrittenSnapshotId = new AtomicLong(0L);
private final SnapshotCache snapshotCache;
private final EdgeIdGenerator edgeIdGenerator;
private final AtomicLong lastWrittenSnapshotId = new AtomicLong(0L);

private final KafkaAppender kafkaAppender;
private ScheduledExecutorService scheduler;

public GraphWriter(
SnapshotCache snapshotCache,
EdgeIdGenerator edgeIdGenerator,
MetaService metaService,
MetricsCollector metricsCollector,
KafkaAppender appender,
Configs configs) {
this.snapshotCache = snapshotCache;
this.edgeIdGenerator = edgeIdGenerator;
this.metaService = metaService;
initMetrics();
metricsCollector.register(this, this::updateMetrics);
// default for increment eid generate
this.enableHashEid = FrontendConfig.ENABLE_HASH_GENERATE_EID.get(configs);

this.kafkaAppender = appender;
}

Expand Down Expand Up @@ -410,8 +398,9 @@ private long getEdgeInnerId(
GraphSchema schema,
DataRecord dataRecord) {
long edgeInnerId;
if (this.enableHashEid) {
GraphElement edgeDef = schema.getElement(edgeRecordKey.getLabel());
GraphElement edgeDef = schema.getElement(edgeRecordKey.getLabel());
List<GraphProperty> pks = edgeDef.getPrimaryKeyList();
if (pks != null && pks.size() > 0) {
Map<Integer, PropertyValue> edgePkVals =
parseRawProperties(edgeDef, dataRecord.getProperties());
List<byte[]> edgePkBytes = getPkBytes(edgePkVals, edgeDef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,7 @@ public Frontend(Configs configs) {
KafkaAppender kafkaAppender = new KafkaAppender(configs, metaService, logService);
this.graphWriter =
new GraphWriter(
snapshotCache,
edgeIdGenerator,
this.metaService,
metricsCollector,
kafkaAppender,
configs);
snapshotCache, edgeIdGenerator, metricsCollector, kafkaAppender, configs);
ClientWriteService clientWriteService = new ClientWriteService(graphWriter);

RoleClients<BackupClient> backupClients =
Expand Down

0 comments on commit e1d1b33

Please sign in to comment.