Skip to content

Commit

Permalink
feat(analytical): Support using customized writable as vdata,edata fo…
Browse files Browse the repository at this point in the history
…r Giraph apps (#3873)

For vdata, edata with customized writable, we will `StringView` to map
them into java memroy.
To output data to `VertexDataContext`, we will `StdString` to allocate
memory in c++, and map back to java.
  • Loading branch information
zhanglei1949 committed Jun 5, 2024
1 parent 86443da commit fb308b7
Show file tree
Hide file tree
Showing 17 changed files with 534 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.example.giraph;

import com.alibaba.graphscope.example.giraph.writable.MultipleLongWritable;

import org.apache.giraph.conf.LongConfOption;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.LongWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Objects;

/**
* Only send msg.
*/
public class MessageAppWithUserWritable
extends BasicComputation<
LongWritable, MultipleLongWritable, MultipleLongWritable, MultipleLongWritable> {

public static LongConfOption MAX_SUPER_STEP;
private static Logger logger = LoggerFactory.getLogger(MessageAppWithUserWritable.class);

static {
String maxSuperStep = System.getenv("MAX_SUPER_STEP");
if (Objects.isNull(maxSuperStep) || maxSuperStep.isEmpty()) {
MAX_SUPER_STEP = new LongConfOption("maxSuperStep", 3, "max super step");
} else {
MAX_SUPER_STEP =
new LongConfOption(
"maxSuperStep", Long.valueOf(maxSuperStep), "max super step");
}
}

/**
* Must be defined by user to do computation on a single Vertex.
*
* @param vertex Vertex
* @param messages Messages that were sent to this vertex in the previous superstep. Each
* message is only guaranteed to have
*/
@Override
public void compute(
Vertex<LongWritable, MultipleLongWritable, MultipleLongWritable> vertex,
Iterable<MultipleLongWritable> messages)
throws IOException {
if (getSuperstep() == 0) {
// logger.info("There should be no messages in step0, " + vertex.getId());
boolean flag = false;
for (MultipleLongWritable message : messages) {
flag = true;
}
if (flag) {
throw new IllegalStateException(
"Expect no msg received in step 1, but actually received");
}
MultipleLongWritable msg = new MultipleLongWritable(vertex.getId().get());
sendMessageToAllEdges(vertex, msg);
} else if (getSuperstep() < MAX_SUPER_STEP.get(getConf())) {
if (vertex.getId().get() < 20) {
logger.info("step [{}] Checking received msg", getSuperstep());
}
int msgCnt = 0;
for (MultipleLongWritable message : messages) {
msgCnt += 1;
}
vertex.setValue(new MultipleLongWritable(msgCnt));
} else if (getSuperstep() == MAX_SUPER_STEP.get(getConf())) {
vertex.voteToHalt();
} else {
logger.info("Impossible: " + getSuperstep());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.example.giraph.format;

import com.alibaba.graphscope.example.giraph.writable.MultipleLongWritable;

import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.formats.TextEdgeInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class P2PEdgeMultipleLongInputFormat
extends TextEdgeInputFormat<LongWritable, MultipleLongWritable> {

/**
* Create an edge reader for a given split. The framework will call {@link
* EdgeReader#initialize(InputSplit, TaskAttemptContext)} before the split is used.
*
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
* @throws IOException
*/
@Override
public EdgeReader<LongWritable, MultipleLongWritable> createEdgeReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new P2PEdgeReader();
}

public class P2PEdgeReader extends TextEdgeReaderFromEachLineProcessed<String[]> {

String SEPARATOR = " ";
/**
* Cached vertex id for the current line
*/
private LongWritable srcId;

private LongWritable dstId;
private MultipleLongWritable edgeValue;

/**
* Preprocess the line so other methods can easily read necessary information for creating
* edge
*
* @param line the current line to be read
* @return the preprocessed object
* @throws IOException exception that can be thrown while reading
*/
@Override
protected String[] preprocessLine(Text line) throws IOException {
// logger.debug("line: " + line.toString());
String[] tokens = line.toString().split(SEPARATOR);
if (tokens.length != 3) {
throw new IllegalStateException("expect 3 ele in edge line");
}
// logger.debug(String.join(",", tokens));
srcId = new LongWritable(Long.parseLong(tokens[0]));
dstId = new LongWritable(Long.parseLong(tokens[1]));
edgeValue = new MultipleLongWritable(Long.parseLong(tokens[2]));
return tokens;
}

/**
* Reads target vertex id from the preprocessed line.
*
* @param line the object obtained by preprocessing the line
* @return the target vertex id
* @throws IOException exception that can be thrown while reading
*/
@Override
protected LongWritable getTargetVertexId(String[] line) throws IOException {
return dstId;
}

/**
* Reads source vertex id from the preprocessed line.
*
* @param line the object obtained by preprocessing the line
* @return the source vertex id
* @throws IOException exception that can be thrown while reading
*/
@Override
protected LongWritable getSourceVertexId(String[] line) throws IOException {
return srcId;
}

/**
* Reads edge value from the preprocessed line.
*
* @param line the object obtained by preprocessing the line
* @return the edge value
* @throws IOException exception that can be thrown while reading
*/
@Override
protected MultipleLongWritable getValue(String[] line) throws IOException {
return edgeValue;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.example.giraph.format;

import com.alibaba.graphscope.example.giraph.writable.MultipleLongWritable;
import com.google.common.collect.Lists;

import org.apache.giraph.edge.Edge;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.util.List;

public class P2PVertexMultipleLongInputFormat
extends TextVertexInputFormat<LongWritable, MultipleLongWritable, MultipleLongWritable> {

/**
* The factory method which produces the {@link TextVertexReader} used by this input format.
*
* @param split the split to be read
* @param context the information about the task
* @return the text vertex reader to be used
*/
@Override
public TextVertexInputFormat<LongWritable, MultipleLongWritable, MultipleLongWritable>
.TextVertexReader
createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
return new P2PVertexReader();
}

public class P2PVertexReader extends TextVertexReaderFromEachLineProcessed<String[]> {

String SEPARATOR = " ";

/**
* Cached vertex id for the current line
*/
private LongWritable id;

private MultipleLongWritable value;

@Override
protected String[] preprocessLine(Text line) throws IOException {
// logger.debug("line: " + line.toString());
String[] tokens = line.toString().split(SEPARATOR);
// logger.debug(String.join(",", tokens));
id = new LongWritable(Long.parseLong(tokens[0]));
value = new MultipleLongWritable(Long.parseLong(tokens[1]));
return tokens;
}

@Override
protected LongWritable getId(String[] tokens) throws IOException {
return id;
}

@Override
protected MultipleLongWritable getValue(String[] tokens) throws IOException {
return value;
}

@Override
protected Iterable<Edge<LongWritable, MultipleLongWritable>> getEdges(String[] tokens)
throws IOException {
List<Edge<LongWritable, MultipleLongWritable>> edges =
Lists.newArrayListWithCapacity(0);
return edges;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.graphscope.communication.Communicator;
import com.alibaba.graphscope.ds.GSVertexArray;
import com.alibaba.graphscope.ds.StringView;
import com.alibaba.graphscope.factory.GiraphComputationFactory;
import com.alibaba.graphscope.fragment.IFragment;
import com.alibaba.graphscope.graph.AggregatorManager;
Expand All @@ -39,6 +40,7 @@
import com.alibaba.graphscope.serialization.FFIByteVectorInputStream;
import com.alibaba.graphscope.serialization.FFIByteVectorOutputStream;
import com.alibaba.graphscope.stdcxx.FFIByteVector;
import com.alibaba.graphscope.stdcxx.StdString;
import com.alibaba.graphscope.utils.ConfigurationUtils;
import com.alibaba.graphscope.utils.FFITypeFactoryhelper;

Expand Down Expand Up @@ -248,6 +250,22 @@ public void writeBackVertexData() {
// This string is not readable.
vertexArray.setValue(grapeVertex, new String(bytes));
}
} else if (conf.getGrapeVdataClass().equals(StringView.class)) {
byte[] bytes = new byte[(int) maxOffset];
for (long lid = 0; lid < innerVerticesNum; ++lid) {
grapeVertex.setValue((VID_T) (Long) lid);
if (inputStream.longAvailable() <= 0) {
throw new IllegalStateException(
"Input stream too short for " + innerVerticesNum + " vertices");
}
if (inputStream.read(bytes, 0, (int) offsets[(int) lid]) == -1) {
throw new IllegalStateException("read input stream failed");
}
// This string is not readable.
StdString value = (StdString) vertexArray.get(grapeVertex);
// TODO: can be optimized without creating a java string
value.fromJavaString(new String(bytes));
}
} else {
throw new IllegalStateException(
"Unrecognized vdata class:" + conf.getGrapeVdataClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private FFIByteVectorInputStream generateVertexIdStream() {
}
outputStream.finishSetting();
logger.info(
"Vertex data stream size: "
"Vertex id stream size: "
+ outputStream.bytesWriten()
+ ", vertices: "
+ vertexNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.alibaba.graphscope.graph.impl;

import com.alibaba.graphscope.ds.StringView;
import com.alibaba.graphscope.ds.Vertex;
import com.alibaba.graphscope.fragment.IFragment;
import com.alibaba.graphscope.graph.VertexDataManager;
Expand Down Expand Up @@ -143,8 +144,13 @@ private void readVertexDataFromIFragment(FFIByteVectorOutputStream outputStream)
String value = (String) fragment.getData(vertex);
outputStream.writeBytes(value);
}
} else if (conf.getGrapeVdataClass().equals(StringView.class)) {
for (Vertex<GRAPE_VID_T> vertex : iterable) {
StringView value = (StringView) fragment.getData(vertex);
outputStream.writeBytes(value);
}
} else {
logger.error("Unsupported oid class: " + conf.getGrapeOidClass().getName());
logger.error("Unsupported vdata class: " + conf.getGrapeVdataClass().getName());
}
// else if (conf.getGrapeVdataClass().equals the userDefined class...
outputStream.finishSetting();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.graphscope.ds.StringView;
import com.alibaba.graphscope.fragment.IFragment;

import org.apache.giraph.combiner.MessageCombiner;
Expand Down Expand Up @@ -295,6 +296,9 @@ public static boolean checkTypeConsistency(
if (grapeTypeClass.equals(Float.class)) {
return giraphTypeClass.equals(FloatWritable.class);
}
if (grapeTypeClass.equals(String.class) || grapeTypeClass.equals(StringView.class)) {
return true;
}
logger.error(
"Unsupported grape type and giraph type: "
+ grapeTypeClass.getName()
Expand Down
Loading

0 comments on commit fb308b7

Please sign in to comment.