Skip to content
Permalink
Browse files
sort different type of message use different combiner (#85)
* fix bug: sort use Pointer but combiner used to combine Value or Property
* fix bug: merge to one file until Sorter#iterator is perfected
  • Loading branch information
corgiboygsj committed Aug 16, 2021
1 parent 2ef5b22 commit 16fe708ed1508743048d3e39202fb62516fa8fbd
Showing 11 changed files with 116 additions and 36 deletions.
@@ -165,17 +165,15 @@ private void swapReceiveAndSortBuffers() {
* Merge outputFiles if needed, like merge 10000 files into 100 files.
*/
private void mergeOutputFilesIfNeeded() {
int actualSize = this.outputFiles.size();
if (actualSize <= this.mergeFileNum) {
if (this.outputFiles.size() <= 1) {
return;
}
int targetSize = this.mergeFileNum;
// If mergeFileNum = 200 and actual = 400, target = 20.
if (actualSize < this.mergeFileNum * this.mergeFileNum) {
targetSize = (int) Math.sqrt(actualSize);
}

List<String> newOutputs = this.genOutputFileNames(targetSize);
/*
* TODO Restore genOutputFileNames(sqrt(outputFiles.size()))
* after add Sorter#iterator() of subkv
*/
List<String> newOutputs = this.genOutputFileNames(1);
this.sortManager.mergeInputs(this.outputFiles, newOutputs,
this.withSubKv, this.outerSortFlusher());
this.outputFiles = newOutputs;
@@ -30,11 +30,15 @@

import com.baidu.hugegraph.computer.core.combiner.Combiner;
import com.baidu.hugegraph.computer.core.combiner.OverwriteCombiner;
import com.baidu.hugegraph.computer.core.combiner.PointerCombiner;
import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.GraphFactory;
import com.baidu.hugegraph.computer.core.graph.properties.Properties;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.io.BytesOutput;
import com.baidu.hugegraph.computer.core.io.IOFactory;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
@@ -47,6 +51,7 @@
import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineSubKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.InnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.KvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.OuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry;
@@ -61,19 +66,18 @@ public class SortManager implements Manager {
private static final String NAME = "sort";
private static final String PREFIX = "sort-executor-%s";

private final ComputerContext context;
private final ExecutorService sortExecutor;
private final Sorter sorter;
private final Combiner<Pointer> combiner;
private final int capacity;
private final int flushThreshold;

public SortManager(ComputerContext context) {
this.context = context;
Config config = context.config();
int threadNum = config.get(ComputerOptions.SORT_THREAD_NUMS);
this.sortExecutor = ExecutorUtil.newFixedThreadPool(threadNum, PREFIX);
this.sorter = new SorterImpl(config);
// TODO: sort different type of message use different combiner
this.combiner = new OverwriteCombiner<>();
this.capacity = config.get(
ComputerOptions.WORKER_WRITE_BUFFER_INIT_CAPACITY);
this.flushThreshold = config.get(
@@ -167,12 +171,90 @@ public PeekableIterator<KvEntry> iterator(List<String> outputs,
private InnerSortFlusher createSortFlusher(MessageType type,
RandomAccessOutput output,
int flushThreshold) {
if (type == MessageType.VERTEX || type == MessageType.MSG) {
return new CombineKvInnerSortFlusher(output, this.combiner);
Combiner<Pointer> combiner;
boolean needSortSubKv;

switch (type) {
case VERTEX:
combiner = this.createVertexCombiner();
needSortSubKv = false;
break;
case EDGE:
combiner = this.createEdgeCombiner();
needSortSubKv = true;
break;
case MSG:
combiner = this.createMessageCombiner();
needSortSubKv = false;
break;
default:
throw new ComputerException("Unsupported combine message " +
"type for %s", type);
}

InnerSortFlusher flusher;
if (combiner == null) {
flusher = new KvInnerSortFlusher(output);
} else {
assert type == MessageType.EDGE;
return new CombineSubKvInnerSortFlusher(output, this.combiner,
flushThreshold);
if (needSortSubKv) {
flusher = new CombineSubKvInnerSortFlusher(output, combiner,
flushThreshold);
} else {
flusher = new CombineKvInnerSortFlusher(output, combiner);
}
}

return flusher;
}

private Combiner<Pointer> createVertexCombiner() {
Config config = this.context.config();
Combiner<Properties> propCombiner = config.createObject(
ComputerOptions.WORKER_VERTEX_PROPERTIES_COMBINER_CLASS);

return this.createPropertiesCombiner(propCombiner);
}

private Combiner<Pointer> createEdgeCombiner() {
Config config = this.context.config();
Combiner<Properties> propCombiner = config.createObject(
ComputerOptions.WORKER_EDGE_PROPERTIES_COMBINER_CLASS);

return this.createPropertiesCombiner(propCombiner);
}

private Combiner<Pointer> createMessageCombiner() {
Config config = this.context.config();
Combiner<?> valueCombiner = config.createObject(
ComputerOptions.WORKER_COMBINER_CLASS, false);

if (valueCombiner == null) {
return null;
}

Value<?> v1 = config.createObject(
ComputerOptions.ALGORITHM_MESSAGE_CLASS);
Value<?> v2 = v1.copy();
return new PointerCombiner(v1, v2, valueCombiner);
}

private Combiner<Pointer> createPropertiesCombiner(
Combiner<Properties> propCombiner) {
/*
* If propertiesCombiner is OverwriteCombiner, just remain the
* second, no need to deserialize the properties and then serialize
* the second properties.
*/
Combiner<Pointer> combiner;
if (propCombiner instanceof OverwriteCombiner) {
combiner = new OverwriteCombiner<>();
} else {
GraphFactory graphFactory = this.context.graphFactory();
Properties v1 = graphFactory.createProperties();
Properties v2 = graphFactory.createProperties();

combiner = new PointerCombiner<>(v1, v2, propCombiner);
}
return combiner;
}
}
@@ -165,7 +165,7 @@ private static void add200VertexBuffer(Consumer<ManagedBuffer> consumer)
Vertex vertex = graphFactory().createVertex();
vertex.id(BytesId.of(i));
vertex.properties(graphFactory().createProperties());
ReceiverUtil.comsumeBuffer(writeVertex(vertex), consumer);
ReceiverUtil.consumeBuffer(writeVertex(vertex), consumer);
}
}

@@ -203,7 +203,7 @@ private static void addSingleFreqEdgeBuffer(
edges.add(edge);
}
vertex.edges(edges);
ReceiverUtil.comsumeBuffer(writeEdges(vertex), consumer);
ReceiverUtil.consumeBuffer(writeEdges(vertex), consumer);
}
}

@@ -236,7 +236,7 @@ private static void addMessages(Consumer<ManagedBuffer> consumer)
Id id = BytesId.of(i);
IdList message = new IdList();
message.add(id);
ReceiverUtil.comsumeBuffer(ReceiverUtil.writeMessage(id,
ReceiverUtil.consumeBuffer(ReceiverUtil.writeMessage(id,
message),
consumer);
}
@@ -169,7 +169,7 @@ private static void add200VertexBuffer(Consumer<ManagedBuffer> consumer)
Vertex vertex = graphFactory().createVertex();
vertex.id(BytesId.of(i));
vertex.properties(graphFactory().createProperties());
ReceiverUtil.comsumeBuffer(writeVertex(vertex), consumer);
ReceiverUtil.consumeBuffer(writeVertex(vertex), consumer);
}
}

@@ -224,7 +224,7 @@ private static void addEdgeBuffer(Consumer<ManagedBuffer> consumer,
edges.add(edge);
}
vertex.edges(edges);
ReceiverUtil.comsumeBuffer(writeEdges(vertex, freq), consumer);
ReceiverUtil.consumeBuffer(writeEdges(vertex, freq), consumer);
}
}

@@ -149,7 +149,7 @@ private static void addMessages(Consumer<ManagedBuffer> consumer)
Id id = BytesId.of(random.nextInt(200));
IdList message = new IdList();
message.add(id);
ReceiverUtil.comsumeBuffer(ReceiverUtil.writeMessage(id,
ReceiverUtil.consumeBuffer(ReceiverUtil.writeMessage(id,
message),
consumer);
}
@@ -151,7 +151,7 @@ public void testSortInterrupt() throws InterruptedException {

public static void addMockBufferToBuffers(MessageRecvBuffers buffers,
int mockBufferLength) {
ReceiverUtil.comsumeBuffer(new byte[mockBufferLength],
ReceiverUtil.consumeBuffer(new byte[mockBufferLength],
(ManagedBuffer buffer) -> {
buffers.addBuffer(buffer);
});
@@ -139,7 +139,7 @@ public void testComputeMessage() throws IOException {
@Test
public void testOtherMessageType() {
Assert.assertThrows(ComputerException.class, () -> {
ReceiverUtil.comsumeBuffer(new byte[100],
ReceiverUtil.consumeBuffer(new byte[100],
(ManagedBuffer buffer) -> {
this.receiveManager.handle(MessageType.ACK, 0, buffer);
});
@@ -41,7 +41,7 @@

public class ReceiverUtil {

public static void comsumeBuffer(byte[] bytes,
public static void consumeBuffer(byte[] bytes,
Consumer<ManagedBuffer> consumer) {
ByteBuf buf = Unpooled.directBuffer(bytes.length);
try {
@@ -72,8 +72,8 @@ public void setup() {
ComputerOptions.JOB_WORKERS_COUNT, "1",
ComputerOptions.JOB_PARTITIONS_COUNT, "1",
ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]",
ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "10000",
ComputerOptions.HGKV_MERGE_FILES_NUM, "5"
ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "100",
ComputerOptions.HGKV_MERGE_FILES_NUM, "2"
);
FileUtils.deleteQuietly(new File("data_dir1"));
FileUtils.deleteQuietly(new File("data_dir2"));
@@ -162,7 +162,7 @@ public static void addTenEdgeBuffer(Consumer<ManagedBuffer> consumer)
edges.add(edge);
}
vertex.edges(edges);
ReceiverUtil.comsumeBuffer(writeEdges(vertex), consumer);
ReceiverUtil.consumeBuffer(writeEdges(vertex), consumer);
}
}

@@ -182,7 +182,7 @@ private static void addTenDuplicateEdgeBuffer(
edges.add(edge);
}
vertex.edges(edges);
ReceiverUtil.comsumeBuffer(writeEdges(vertex), consumer);
ReceiverUtil.consumeBuffer(writeEdges(vertex), consumer);
}

for (long i = 0L; i < 10L; i++) {
@@ -198,7 +198,7 @@ private static void addTenDuplicateEdgeBuffer(
edges.add(edge);
}
vertex.edges(edges);
ReceiverUtil.comsumeBuffer(writeEdges(vertex), consumer);
ReceiverUtil.consumeBuffer(writeEdges(vertex), consumer);
}
}

@@ -122,7 +122,7 @@ public static void addTwentyCombineMessageBuffer(
for (int j = 0; j < 2; j++) {
Id id = BytesId.of(i);
DoubleValue message = new DoubleValue(i);
ReceiverUtil.comsumeBuffer(ReceiverUtil.writeMessage(id,
ReceiverUtil.consumeBuffer(ReceiverUtil.writeMessage(id,
message),
consumer);
}
@@ -160,7 +160,7 @@ public static void checkTenCombineMessages(PeekableIterator<KvEntry> it)
Id id = BytesId.of(i);
IdList message = new IdList();
message.add(id);
ReceiverUtil.comsumeBuffer(ReceiverUtil.writeMessage(id,
ReceiverUtil.consumeBuffer(ReceiverUtil.writeMessage(id,
message),
consumer);
}
@@ -183,7 +183,7 @@ public static void addTenVertexBuffer(Consumer<ManagedBuffer> consumer)
Vertex vertex = graphFactory().createVertex();
vertex.id(BytesId.of(i));
vertex.properties(graphFactory().createProperties());
ReceiverUtil.comsumeBuffer(writeVertex(vertex), consumer);
ReceiverUtil.consumeBuffer(writeVertex(vertex), consumer);
}
}

@@ -197,7 +197,7 @@ private static void addTwentyDuplicateVertexBuffer(
properties.put("p1", new LongValue(i));
vertex.properties(properties);

ReceiverUtil.comsumeBuffer(writeVertex(vertex), consumer);
ReceiverUtil.consumeBuffer(writeVertex(vertex), consumer);
}

for (long i = 0L; i < 10L; i++) {
@@ -207,13 +207,13 @@ private static void addTwentyDuplicateVertexBuffer(
properties.put("p2", new LongValue(2L * i));
vertex.properties(properties);

ReceiverUtil.comsumeBuffer(writeVertex(vertex), consumer);
ReceiverUtil.consumeBuffer(writeVertex(vertex), consumer);
}
}

private static void addTwoEmptyBuffer(Consumer<ManagedBuffer> consumer) {
for (int i = 0; i < 2; i++) {
ReceiverUtil.comsumeBuffer(new byte[2], consumer);
ReceiverUtil.consumeBuffer(new byte[2], consumer);
}
}

0 comments on commit 16fe708

Please sign in to comment.