Skip to content
Permalink
Browse files
Implement vertex/edge fetcher to load data from hugegraph server (#18)
* Add install-hugegraph-from-source.sh

* Add HugeConverter
  • Loading branch information
Linary committed Mar 15, 2021
1 parent 128ce85 commit 2b76996b0593c7a5e53f9e91816f196fa5e4c5f7
Showing 30 changed files with 1,073 additions and 23 deletions.
@@ -23,6 +23,7 @@ install: mvn compile -Dmaven.javadoc.skip=true | grep -v "Downloading\|Downloade

before_script:
- $TRAVIS_DIR/install-env.sh
- $TRAVIS_DIR/install-hugegraph-from-source.sh $TRAVIS_BRANCH | grep -v "Downloading\|Downloaded"

script:
- mvn test -P unit-test
@@ -16,6 +16,10 @@
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
</dependency>
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-client</artifactId>
</dependency>
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
@@ -79,6 +79,30 @@ public static synchronized ComputerOptions instance() {
"value"
);

public static final ConfigOption<Long> INPUT_SPLITS_SIZE =
new ConfigOption<>(
"input.split_size",
"The input split size in bytes",
positiveInt(),
1024 * 1024L
);

public static final ConfigOption<Integer> INPUT_MAX_SPLITS =
new ConfigOption<>(
"input.split_max_splits",
"The maximum number of input splits",
positiveInt(),
10_000_000
);

public static final ConfigOption<Integer> INPUT_SPLIT_PAGE_SIZE =
new ConfigOption<>(
"input.split_page_size",
"The page size for streamed load input split data",
positiveInt(),
500
);

public static final ConfigOption<Boolean> OUTPUT_WITH_ADJACENT_EDGES =
new ConfigOption<>(
"output.with_adjacent_edges",
@@ -25,12 +25,12 @@
public final class IdFactory {

// Maybe can reuse Id
public static Id createID(byte code) {
public static Id createId(byte code) {
IdType type = SerialEnum.fromCode(IdType.class, code);
return createID(type);
return createId(type);
}

public static Id createID(IdType type) {
public static Id createId(IdType type) {
switch (type) {
case LONG:
return new LongId();
@@ -0,0 +1,30 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.input;

import com.baidu.hugegraph.structure.graph.Edge;

/**
* Streamed read the data of each input split, and return one HugeEdge object
* at each iteration
*/
public interface EdgeFetcher extends ElementFetcher<Edge> {

}
@@ -0,0 +1,31 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.input;

import java.util.Iterator;

public interface ElementFetcher<T> extends Iterator<T> {

/**
* Set the current input split meta information to be loaded
* @param split current input split meta
*/
void prepareLoadInputSplit(InputSplit split);
}
@@ -0,0 +1,96 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.input;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.graph.id.LongId;
import com.baidu.hugegraph.computer.core.graph.id.Utf8Id;
import com.baidu.hugegraph.computer.core.graph.id.UuidId;
import com.baidu.hugegraph.computer.core.graph.properties.DefaultProperties;
import com.baidu.hugegraph.computer.core.graph.properties.Properties;
import com.baidu.hugegraph.computer.core.graph.value.BooleanValue;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.graph.value.FloatValue;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.graph.value.ListValue;
import com.baidu.hugegraph.computer.core.graph.value.LongValue;
import com.baidu.hugegraph.computer.core.graph.value.NullValue;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.util.E;

public final class HugeConverter {

public static Id convertId(Object rawId) {
E.checkArgumentNotNull(rawId, "The rawId can't be null");
if (rawId instanceof Long) {
return new LongId((long) rawId);
} else if (rawId instanceof String) {
return new Utf8Id((String) rawId);
} else if (rawId instanceof UUID) {
return new UuidId((UUID) rawId);
} else {
throw new ComputerException("Can't convert to Id from '%s'(%s)",
rawId, rawId.getClass());
}
}

public static Value convertValue(Object rawValue) {
if (rawValue == null) {
return NullValue.get();
} else if (rawValue instanceof Boolean) {
return new BooleanValue((boolean) rawValue);
} else if (rawValue instanceof Integer) {
return new IntValue((int) rawValue);
} else if (rawValue instanceof Long) {
return new LongValue((long) rawValue);
} else if (rawValue instanceof Float) {
return new FloatValue((float) rawValue);
} else if (rawValue instanceof Double) {
return new DoubleValue((double) rawValue);
} else if (rawValue instanceof Collection) {
@SuppressWarnings("unchecked")
Collection<Object> collection = (Collection<Object>) rawValue;
ListValue listValue = new ListValue<>();
for (Object nestedRawValue : collection) {
listValue.add(convertValue(nestedRawValue));
}
return listValue;
} else {
throw new ComputerException("Can't convert to Value from '%s'(%s)",
rawValue, rawValue.getClass());
}
}

public static Properties convertProperties(
Map<String, Object> rawProperties) {
Properties properties = new DefaultProperties();
for (Map.Entry<String, Object> entry : rawProperties.entrySet()) {
String key = entry.getKey();
Value value = convertValue(entry.getValue());
properties.put(key, value);
}
return properties;
}
}
@@ -0,0 +1,43 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.input;

public class InputSplit {

public static final InputSplit END_SPLIT = new InputSplit(null, null);

// inclusive
private final String start;
// exclusive
private final String end;

public InputSplit(String start, String end) {
this.start = start;
this.end = end;
}

public String start() {
return this.start;
}

public String end() {
return this.end;
}
}
@@ -0,0 +1,37 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.input;

import java.util.List;

public interface InputSplitFetcher {

/**
* Fetch all vertex input splits just once
* @return all vertex input splits
*/
List<InputSplit> fetchVertexInputSplits();

/**
* Fetch all edge input splits just once
* @return all edge input splits
*/
List<InputSplit> fetchEdgeInputSplits();
}
@@ -0,0 +1,63 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.input;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

public class MasterInputHandler {

private final InputSplitFetcher fetcher;
private final Queue<InputSplit> vertexSplits;
private final Queue<InputSplit> edgeSplits;

public MasterInputHandler(InputSplitFetcher fetcher) {
this.fetcher = fetcher;
this.vertexSplits = new LinkedBlockingQueue<>();
this.edgeSplits = new LinkedBlockingQueue<>();
}

public int createVertexInputSplits() {
List<InputSplit> splits = this.fetcher.fetchVertexInputSplits();
for (InputSplit split : splits) {
this.vertexSplits.offer(split);
}
return this.vertexSplits.size();
}

public int createEdgeInputSplits() {
List<InputSplit> splits = this.fetcher.fetchEdgeInputSplits();
for (InputSplit split : splits) {
this.edgeSplits.offer(split);
}
return this.edgeSplits.size();
}

public InputSplit pollVertexInputSplit() {
InputSplit split = this.vertexSplits.poll();
return split != null ? split : InputSplit.END_SPLIT;
}

public InputSplit pollEdgeInputSplit() {
InputSplit split = this.edgeSplits.poll();
return split != null ? split : InputSplit.END_SPLIT;
}
}

0 comments on commit 2b76996

Please sign in to comment.