Skip to content
Permalink
Browse files
Implement StreamGraphInput and EntryInput (#60)
  • Loading branch information
Linary committed Jun 18, 2021
1 parent 2522a35 commit 435d31af301e0613b54a36a2d1a198adc90325dc
Show file tree
Hide file tree
Showing 13 changed files with 581 additions and 59 deletions.
@@ -21,12 +21,17 @@

import java.io.IOException;

import org.apache.commons.lang3.tuple.Pair;

import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;

public interface GraphComputeInput extends GraphInput {

Id readId() throws IOException;

Vertex readVertex() throws IOException;

Vertex readEdges() throws IOException;

Pair<Id, Value<?>> readMessage() throws IOException;
}
@@ -124,10 +124,10 @@ public boolean equals(Object obj) {
}
DefaultVertex other = (DefaultVertex) obj;
return this.active == other.active &&
this.id.equals(other.id) &&
this.value.equals(other.value) &&
this.edges.equals(other.edges) &&
this.properties.equals(other.properties);
Objects.equals(this.id, other.id) &&
Objects.equals(this.value, other.value) &&
Objects.equals(this.edges, other.edges) &&
Objects.equals(this.properties, other.properties);
}

@Override
@@ -21,41 +21,134 @@

import java.io.IOException;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;

import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.EdgeFrequency;
import com.baidu.hugegraph.computer.core.graph.GraphFactory;
import com.baidu.hugegraph.computer.core.graph.edge.Edge;
import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.graph.id.IdFactory;
import com.baidu.hugegraph.computer.core.graph.properties.Properties;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.graph.value.ValueFactory;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryInput;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntryReader;


public class StreamGraphInput implements GraphComputeInput {

private final RandomAccessInput in;
private final Config config;
private final GraphFactory graphFactory;
private final ValueFactory valueFactory;
private final EdgeFrequency frequency;
private final EntryInput in;

public StreamGraphInput(ComputerContext context, RandomAccessInput in) {
this.config = context.config();
public StreamGraphInput(ComputerContext context, EntryInput in) {
this.graphFactory = context.graphFactory();
this.valueFactory = context.valueFactory();
this.frequency = context.config().get(ComputerOptions.INPUT_EDGE_FREQ);
this.in = in;
}

@Override
public Vertex readVertex() throws IOException {
// When data receiver merged, implement it
throw new NotImplementedException("StreamGraphInput.readVertex()");
Vertex vertex = this.graphFactory.createVertex();
this.in.readEntry(in -> {
vertex.id(this.readId(in));
}, in -> {
vertex.properties(this.readProperties(in));
});
return vertex;
}

@Override
public Id readId() throws IOException {
byte code = this.in.readByte();
public Vertex readEdges() throws IOException {
Vertex vertex = this.graphFactory.createVertex();
KvEntryReader reader = this.in.readEntry(in -> {
// Read id
vertex.id(this.readId(in));
});
if (this.frequency == EdgeFrequency.SINGLE) {
while (reader.hasRemaining()) {
Edge edge = this.graphFactory.createEdge();
// Only use targetId as subKey, use properties as subValue
reader.readSubKv(in -> {
edge.targetId(this.readId(in));
}, in -> {
edge.properties(this.readProperties(in));
});
vertex.addEdge(edge);
}
} else if (frequency == EdgeFrequency.SINGLE_PER_LABEL) {
while (reader.hasRemaining()) {
Edge edge = this.graphFactory.createEdge();
// Use label + targetId as subKey, use properties as subValue
reader.readSubKv(in -> {
edge.label(in.readUTF());
edge.targetId(this.readId(in));
}, in -> {
edge.properties(this.readProperties(in));
});
vertex.addEdge(edge);
}
} else {
assert frequency == EdgeFrequency.MULTIPLE;
while (reader.hasRemaining()) {
Edge edge = this.graphFactory.createEdge();
/*
* Use label + sortValues + targetId as subKey,
* use properties as subValue
*/
reader.readSubKv(in -> {
edge.label(in.readUTF());
edge.name(in.readUTF());
edge.targetId(this.readId(in));
}, in -> {
edge.properties(this.readProperties(in));
});
vertex.addEdge(edge);
}
}
return vertex;
}

@Override
public Pair<Id, Value<?>> readMessage() throws IOException {
MutablePair<Id, Value<?>> pair = MutablePair.of(null, null);
this.in.readEntry(in -> {
// Read id
pair.setLeft(this.readId(in));
}, in -> {
pair.setRight(this.readValue(in));
});
return pair;
}

private Id readId(RandomAccessInput in) throws IOException {
byte code = in.readByte();
Id id = IdFactory.createId(code);
id.read(this.in);
id.read(in);
return id;
}

private Value<?> readValue(RandomAccessInput in) throws IOException {
byte code = in.readByte();
Value<?> value = this.valueFactory.createValue(code);
value.read(in);
return value;
}

private Properties readProperties(RandomAccessInput in) throws IOException {
Properties properties = this.graphFactory.createProperties();
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = in.readUTF();
Value<?> value = this.readValue(in);
properties.put(key, value);
}
return properties;
}
}
@@ -46,7 +46,8 @@ public StreamGraphOutput(ComputerContext context, EntryOutput out) {
@Override
public void writeVertex(Vertex vertex) throws IOException {
this.out.writeEntry(out -> {
writeId(out, vertex.id());
// Write id
this.writeId(out, vertex.id());
}, out -> {
// Write properties
this.writeProperties(out, vertex.properties());
@@ -56,13 +57,14 @@ public void writeVertex(Vertex vertex) throws IOException {
@Override
public void writeEdges(Vertex vertex) throws IOException {
KvEntryWriter writer = this.out.writeEntry(out -> {
writeId(out, vertex.id());
// Write id
this.writeId(out, vertex.id());
});
if (this.frequency == EdgeFrequency.SINGLE) {
for (Edge edge : vertex.edges()) {
// Only use targetId as subKey, use properties as subValue
writer.writeSubKv(out -> {
writeId(out, edge.targetId());
this.writeId(out, edge.targetId());
}, out -> {
this.writeProperties(out, edge.properties());
});
@@ -72,7 +74,7 @@ public void writeEdges(Vertex vertex) throws IOException {
// Use label + targetId as subKey, use properties as subValue
writer.writeSubKv(out -> {
out.writeUTF(edge.label());
writeId(out, edge.targetId());
this.writeId(out, edge.targetId());
}, out -> {
this.writeProperties(out, edge.properties());
});
@@ -87,7 +89,7 @@ public void writeEdges(Vertex vertex) throws IOException {
writer.writeSubKv(out -> {
out.writeUTF(edge.label());
out.writeUTF(edge.name());
writeId(out, edge.targetId());
this.writeId(out, edge.targetId());
}, out -> {
this.writeProperties(out, edge.properties());
});
@@ -99,27 +101,31 @@ public void writeEdges(Vertex vertex) throws IOException {
@Override
public void writeMessage(Id id, Value<?> value) throws IOException {
this.out.writeEntry(out -> {
writeId(out, id);
// Write id
this.writeId(out, id);
}, out -> {
value.write(out);
this.writeValue(out, value);
});
}

private void writeId(RandomAccessOutput out, Id id) throws IOException {
out.writeByte(id.type().code());
id.write(out);
}

private void writeValue(RandomAccessOutput out, Value<?> value)
throws IOException {
out.writeByte(value.type().code());
value.write(out);
}

private void writeProperties(RandomAccessOutput out, Properties properties)
throws IOException {
Map<String, Value<?>> keyValues = properties.get();
out.writeInt(keyValues.size());
for (Map.Entry<String, Value<?>> entry : keyValues.entrySet()) {
out.writeUTF(entry.getKey());
Value<?> value = entry.getValue();
out.writeByte(value.type().code());
value.write(out);
this.writeValue(out, entry.getValue());
}
}

public static void writeId(RandomAccessOutput out, Id id)
throws IOException {
out.writeByte(id.type().code());
id.write(out);
}
}
@@ -0,0 +1,46 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.store.hgkvfile.entry;

import java.io.IOException;

import com.baidu.hugegraph.computer.core.io.Readable;

public interface EntryInput {

/**
* Read entry with multiple sub-key and sub-value.
* Used when read vertex with edges, each sub-key is target id of an edge,
* each sub-value is the properties of an edge.
* The output format:
* | key length | key | total sub-entry length | sub-entry count |
* | sub-key1 length | sub-key1 | sub-value1 length | sub-value1 |
* | sub-key2 length | sub-key2 | sub-value2 length | sub-value2 |
*/
KvEntryReader readEntry(Readable key) throws IOException;

/**
* Read entry with single value.
* Used when read vertex without edges and read message.
* The output format:
* | key length | key | value length | value |
*/
void readEntry(Readable key, Readable value) throws IOException;
}
@@ -0,0 +1,56 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.store.hgkvfile.entry;

import java.io.IOException;

import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.io.Readable;

public class EntryInputImpl implements EntryInput {

private final RandomAccessInput input;

public EntryInputImpl(RandomAccessInput input) {
this.input = input;
}

@Override
public KvEntryReader readEntry(Readable key) throws IOException {
// Read key
this.readData(key);
return new KvEntryReaderImpl(this.input);
}

@Override
public void readEntry(Readable key, Readable value) throws IOException {
// Read key
this.readData(key);
// Read data
this.readData(value);
}

private void readData(Readable data) throws IOException {
// Read data length
this.input.readFixedInt();
// Read data
data.read(this.input);
}
}

0 comments on commit 435d31a

Please sign in to comment.