Skip to content
Permalink
Browse files
add PointerCombiner (#57)
  • Loading branch information
houzhizhen committed Jun 3, 2021
1 parent df5c2bb commit 8c2dbec8ca3915fb4d1c3db354ab56ced02d2fb3
Show file tree
Hide file tree
Showing 11 changed files with 409 additions and 13 deletions.
@@ -22,8 +22,10 @@
import java.util.Map;

import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.io.Readable;
import com.baidu.hugegraph.computer.core.io.Writable;

public interface Properties {
public interface Properties extends Readable, Writable {

Map<String, Value<?>> get();

@@ -0,0 +1,61 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.combiner;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.io.OptimizedUnsafeBytesOutput;
import com.baidu.hugegraph.computer.core.io.Readable;
import com.baidu.hugegraph.computer.core.io.Writable;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.InlinePointer;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.Pointer;

public class PointerCombiner<V extends Readable & Writable>
implements Combiner<Pointer> {

private final V v1;
private final V v2;
private final Combiner<V> combiner;
private final OptimizedUnsafeBytesOutput bytesOutput;

public PointerCombiner(V v1, V v2, Combiner<V> combiner) {
this.v1 = v1;
this.v2 = v2;
this.combiner = combiner;
this.bytesOutput = new OptimizedUnsafeBytesOutput();
}

@Override
public Pointer combine(Pointer v1, Pointer v2) {
try {
this.v1.read(v1.input());
this.v2.read(v2.input());
V combinedValue = this.combiner.combine(this.v1, this.v2);
this.bytesOutput.seek(0L);
combinedValue.write(this.bytesOutput);
return new InlinePointer(this.bytesOutput.buffer(),
this.bytesOutput.position());
} catch (Exception e) {
throw new ComputerException(
"Failed to combine pointer1(offset=%s, length=%s) and" +
" pointer2(offset=%s, length=%s)'",
e, v1.offset(), v1.length(), v2.offset(), v2.length());
}
}
}
@@ -27,6 +27,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.baidu.hugegraph.computer.core.combiner.OverwriteCombiner;
import com.baidu.hugegraph.computer.core.graph.partition.HashPartitioner;
import com.baidu.hugegraph.computer.core.master.DefaultMasterComputation;
import com.baidu.hugegraph.computer.core.network.TransportConf;
@@ -284,6 +285,26 @@ public static synchronized ComputerOptions instance() {
Null.class
);

public static final ConfigOption<Class<?>>
WORKER_VERTEX_PROPERTIES_COMBINER_CLASS =
new ConfigOption<>(
"worker.vertex_properties_combiner_class",
"The combiner can combine several properties of the same " +
"vertex into one properties at inputstep.",
disallowEmpty(),
OverwriteCombiner.class
);

public static final ConfigOption<Class<?>>
WORKER_EDGE_PROPERTIES_COMBINER_CLASS =
new ConfigOption<>(
"worker.edge_properties_combiner_class",
"The combiner can combine several properties of the same " +
"edge into one properties at inputstep.",
disallowEmpty(),
OverwriteCombiner.class
);

public static final ConfigOption<Long> WORKER_RECEIVED_BUFFERS_BYTES_LIMIT =
new ConfigOption<>(
"worker.received_buffers_bytes_limit",
@@ -38,15 +38,18 @@
import com.baidu.hugegraph.computer.core.graph.properties.DefaultProperties;
import com.baidu.hugegraph.computer.core.graph.properties.Properties;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.graph.value.ValueFactory;
import com.baidu.hugegraph.computer.core.graph.vertex.DefaultVertex;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;

public final class BuiltinGraphFactory implements GraphFactory {

private final Config config;
private ValueFactory valueFactory;

public BuiltinGraphFactory(Config config) {
public BuiltinGraphFactory(Config config, ValueFactory valueFactory) {
this.config = config;
this.valueFactory = valueFactory;
}

@Override
@@ -113,6 +116,6 @@ public <K, V> Map<K, V> createMap() {

@Override
public Properties createProperties() {
return new DefaultProperties(this);
return new DefaultProperties(this, this.valueFactory);
}
}
@@ -19,22 +19,32 @@

package com.baidu.hugegraph.computer.core.graph.properties;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import com.baidu.hugegraph.computer.core.common.SerialEnum;
import com.baidu.hugegraph.computer.core.graph.GraphFactory;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.graph.value.ValueFactory;
import com.baidu.hugegraph.computer.core.graph.value.ValueType;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.io.RandomAccessOutput;

public class DefaultProperties implements Properties {

private final Map<String, Value<?>> keyValues;
private final ValueFactory valueFactory;

public DefaultProperties(GraphFactory graphFactory) {
this(graphFactory.createMap());
public DefaultProperties(GraphFactory graphFactory,
ValueFactory valueFactory) {
this(graphFactory.createMap(), valueFactory);
}

public DefaultProperties(Map<String, Value<?>> keyValues) {
public DefaultProperties(Map<String, Value<?>> keyValues,
ValueFactory valueFactory) {
this.keyValues = keyValues;
this.valueFactory = valueFactory;
}

@Override
@@ -47,6 +57,31 @@ public void put(String key, Value<?> value) {
this.keyValues.put(key, value);
}

@Override
public void read(RandomAccessInput in) throws IOException {
this.keyValues.clear();
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = in.readUTF();
ValueType valueType = SerialEnum.fromCode(ValueType.class,
in.readByte());
Value<?> value = this.valueFactory.createValue(valueType);
value.read(in);
this.keyValues.put(key, value);
}
}

@Override
public void write(RandomAccessOutput out) throws IOException {
out.writeInt(this.keyValues.size());
for (Map.Entry<String, Value<?>> entry : this.keyValues.entrySet()) {
out.writeUTF(entry.getKey());
Value<?> value = entry.getValue();
out.writeByte(value.type().code());
value.write(out);
}
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
@@ -22,9 +22,9 @@
import java.io.IOException;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.io.OptimizedUnsafeBytesInput;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.io.RandomAccessOutput;
import com.baidu.hugegraph.computer.core.io.UnsafeBytesInput;
import com.baidu.hugegraph.computer.core.util.BytesUtil;

public class InlinePointer implements Pointer {
@@ -37,20 +37,25 @@ public InlinePointer(byte[] bytes) {
this.bytes = bytes;
}

public InlinePointer(byte[] bytes, long length) {
this.length = length;
this.bytes = bytes;
}

@Override
public RandomAccessInput input() {
return new UnsafeBytesInput(this.bytes);
return new OptimizedUnsafeBytesInput(this.bytes);
}

@Override
public byte[] bytes() throws IOException {
public byte[] bytes() {
return this.bytes;
}

@Override
public void write(RandomAccessOutput output) throws IOException {
output.writeInt((int) this.length);
output.write(this.bytes());
output.write(this.bytes(), 0, (int) this.length);
}

@Override
@@ -35,8 +35,9 @@ public class ComputerContextUtil {

public static void initContext(Map<String, String> params) {
Config config = new DefaultConfig(params);
GraphFactory graphFactory = new BuiltinGraphFactory(config);
ValueFactory valueFactory = new BuiltinValueFactory(config);
GraphFactory graphFactory = new BuiltinGraphFactory(config,
valueFactory);
Allocator allocator = new DefaultAllocator(config, graphFactory);
ComputerContext.initContext(config, graphFactory,
valueFactory, allocator);
@@ -32,7 +32,8 @@
FloatValueSumCombinerTest.class,
DoubleValueSumCombinerTest.class,
ValueMinCombinerTest.class,
ValueMaxCombinerTest.class
ValueMaxCombinerTest.class,
PointerCombinerTest.class
})
public class CombinerTestSuite {
}

0 comments on commit 8c2dbec

Please sign in to comment.