Skip to content

Commit

Permalink
[GAE-Java] Remove String-specialized FFI classes and Simplify Java Me…
Browse files Browse the repository at this point in the history
…ssageManager interface (#2195)

* Simplify Java MessageManager interface for better user experience.
* Remove specialized FFI interface for Java String vd/ed Fragment .
* Some other minor changes.
  • Loading branch information
zhanglei1949 committed Nov 4, 2022
1 parent 5a4312d commit 8de000f
Show file tree
Hide file tree
Showing 116 changed files with 1,970 additions and 2,568 deletions.
5 changes: 5 additions & 0 deletions analytical_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ if (BUILD_TESTS)
target_compile_definitions(graphx_loader_test PUBLIC ENABLE_JAVA_SDK)
target_link_libraries(graphx_loader_test ${CMAKE_DL_LIBS} ${VINEYARD_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})

add_executable(run_java_string_app test/run_java_string_app.cc core/java/javasdk.cc)
target_include_directories(run_java_string_app PRIVATE core utils apps)
target_compile_definitions(run_java_string_app PUBLIC ENABLE_JAVA_SDK)
target_link_libraries(run_java_string_app ${CMAKE_DL_LIBS} ${VINEYARD_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${JNI_LIBRARIES})

if (${LIBUNWIND_FOUND})
target_link_libraries(run_java_app ${LIBUNWIND_LIBRARIES})
target_link_libraries(property_graph_java_app_benchmarks ${LIBUNWIND_LIBRARIES})
Expand Down
21 changes: 16 additions & 5 deletions analytical_engine/core/java/type_alias.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,25 @@ template <typename FRAG_T>
using IntColumn = Column<FRAG_T, uint32_t>;

template <typename T>
using ArrowArrayBuilder = typename vineyard::ConvertToArrowType<T>::BuilderType;

using ArrowStringArrayBuilder = typename gs::ArrowArrayBuilder<std::string>;
struct ConvertToArrowType {
using BuilderType = typename vineyard::ConvertToArrowType<T>::BuilderType;
using ArrayType = typename vineyard::ConvertToArrowType<T>::ArrayType;
};

/** Add one specialization for string_view, we will need this in Java FFI.*/
template <>
struct ConvertToArrowType<arrow::util::string_view> {
using BuilderType =
typename vineyard::ConvertToArrowType<std::string>::BuilderType;
using ArrayType =
typename vineyard::ConvertToArrowType<std::string>::ArrayType;
};

template <typename T>
using ArrowArray = typename vineyard::ConvertToArrowType<T>::ArrayType;
using ArrowArrayBuilder = typename ConvertToArrowType<T>::BuilderType;

using ArrowStringArray = typename gs::ArrowArray<std::string>;
template <typename T>
using ArrowArray = typename ConvertToArrowType<T>::ArrayType;

} // namespace gs

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.alibaba.graphscope.utils;

import com.alibaba.fastffi.FFIByteString;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

Expand Down Expand Up @@ -45,18 +47,20 @@ static Unused getUnused(Class<?> a, Class<?> b) {
}

static int class2Int(Class clz) {
if (clz.equals(Long.class)) {
if (clz.equals(Long.class) || clz.getName().contains("LongMsg")) {
return 0;
} else if (clz.equals(Double.class)) {
} else if (clz.equals(Double.class) || clz.getName().contains("DoubleMsg")) {
return 1;
} else if (clz.equals(Integer.class)) {
} else if (clz.equals(Integer.class) || clz.getName().contains("IntMsg")) {
return 2;
} else if (clz.equals(String.class)) {
} else if (clz.equals(String.class)
|| FFIByteString.class.isAssignableFrom(clz)
|| clz.getName().contains("StringView")) {
return 3;
} else if (clz.getSimpleName().startsWith("EmptyType")) {
return 4;
} else {
throw new IllegalStateException("Not possible");
throw new IllegalStateException("Not possible clz" + clz.getName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.alibaba.graphscope.parallel.ParallelEngine;
import com.alibaba.graphscope.parallel.ParallelMessageManager;
import com.alibaba.graphscope.utils.FFITypeFactoryhelper;
import com.alibaba.graphscope.utils.Unused;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -56,15 +55,14 @@ public void PEval(
if (ctx.partialResults.get(neighbor) == Integer.MAX_VALUE) {
ctx.partialResults.set(neighbor, 1);
if (fragment.isOuterVertex(neighbor)) {
messageManager.syncStateOnOuterVertexNoMsg(
fragment, neighbor, 0, Unused.getUnused(Double.class, Long.class));
messageManager.syncStateOnOuterVertexNoMsg(fragment, neighbor, 0);
} else {
ctx.currentInnerUpdated.set(neighbor);
}
}
}
}
messageManager.ForceContinue();
messageManager.forceContinue();
}

@Override
Expand All @@ -86,12 +84,7 @@ public void IncEval(
};
Supplier<EmptyType> msgSupplier = () -> EmptyType.factory.create();
messageManager.parallelProcess(
fragment,
ctx.threadNum,
ctx.executor,
msgSupplier,
receiveMsg,
Unused.getUnused(Double.class, Long.class, EmptyType.class));
fragment, ctx.threadNum, ctx.executor, msgSupplier, receiveMsg);

BiConsumer<Vertex<Long>, Integer> vertexProcessConsumer =
(cur, finalTid) -> {
Expand All @@ -102,10 +95,7 @@ public void IncEval(
ctx.partialResults.set(vertex, nextDepth);
if (fragment.isOuterVertex(vertex)) {
messageManager.syncStateOnOuterVertexNoMsg(
fragment,
vertex,
finalTid,
Unused.getUnused(Double.class, Long.class));
fragment, vertex, finalTid);
} else {
ctx.nextInnerUpdated.insert(vertex);
}
Expand All @@ -121,7 +111,7 @@ public void IncEval(

ctx.currentDepth = nextDepth;
if (!ctx.nextInnerUpdated.empty()) {
messageManager.ForceContinue();
messageManager.forceContinue();
}
ctx.currentInnerUpdated.assign(ctx.nextInnerUpdated);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void Output(IFragment<Long, Long, Double, Long> frag) {

Vertex<Long> cur = FFITypeFactoryhelper.newVertexLong();
for (long index = 0; index < frag.getInnerVerticesNum(); ++index) {
cur.SetValue(index);
cur.setValue(index);
Long oid = frag.getId(cur);
bufferedWriter.write(oid + "\t" + partialResults.get(index) + "\n");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.alibaba.graphscope.parallel.ParallelMessageManager;
import com.alibaba.graphscope.parallel.message.DoubleMsg;
import com.alibaba.graphscope.utils.FFITypeFactoryhelper;
import com.alibaba.graphscope.utils.Unused;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,11 +63,7 @@ public void PEval(
ctx.pagerank.set(vertex, base / edgeNum);
DoubleMsg msg = FFITypeFactoryhelper.newDoubleMsg(base / edgeNum);
parallelMessageManager.sendMsgThroughOEdges(
fragment,
vertex,
msg,
finalTid,
Unused.getUnused(Long.class, Double.class, Double.class));
fragment, vertex, msg, finalTid);
}
};
forEachVertex(innerVertices, ctx.thread_num, ctx.executor, calc);
Expand All @@ -82,7 +77,7 @@ public void PEval(
DoubleMsg localSumMsg = FFITypeFactoryhelper.newDoubleMsg(base * ctx.danglingVNum);
sum(localSumMsg, msgDanglingSum);
ctx.danglingSum = msgDanglingSum.getData();
parallelMessageManager.ForceContinue();
parallelMessageManager.forceContinue();
}

@Override
Expand Down Expand Up @@ -118,12 +113,7 @@ public void IncEval(
});
Supplier<DoubleMsg> msgSupplier = () -> DoubleMsg.factory.create();
parallelMessageManager.parallelProcess(
fragment,
ctx.thread_num,
ctx.executor,
msgSupplier,
consumer,
Unused.getUnused(Long.class, Double.class, Double.class));
fragment, ctx.thread_num, ctx.executor, msgSupplier, consumer);
} // finish receive data

BiConsumer<Vertex<Long>, Integer> calc =
Expand All @@ -141,11 +131,7 @@ public void IncEval(
DoubleMsg msg =
FFITypeFactoryhelper.newDoubleMsg(ctx.nextResult.get(vertex));
parallelMessageManager.sendMsgThroughOEdges(
fragment,
vertex,
msg,
finalTid,
Unused.getUnused(Long.class, Double.class, Double.class));
fragment, vertex, msg, finalTid);
}
});
forEachVertex(innerVertices, ctx.thread_num, ctx.executor, calc);
Expand All @@ -166,6 +152,6 @@ public void IncEval(
sum(localSumMsg, msgDanglingSum);
ctx.danglingSum = msgDanglingSum.getData();
ctx.sumDoubleTime += System.nanoTime() - time0;
parallelMessageManager.ForceContinue();
parallelMessageManager.forceContinue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ public void Output(IFragment<Long, Long, Long, Double> frag) {

Vertex<Long> cur = FFITypeFactoryhelper.newVertexLong();
for (long index = 0; index < frag.getInnerVerticesNum(); ++index) {
cur.SetValue(index);
cur.setValue(index);
Long oid = frag.getId(cur);
bufferedWriter.write(
cur.GetValue() + "\t" + oid + "\t" + pagerank.get(index) + "\n");
cur.getValue() + "\t" + oid + "\t" + pagerank.get(index) + "\n");
}
bufferedWriter.close();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.alibaba.graphscope.parallel.message.LongMsg;
import com.alibaba.graphscope.utils.AtomicLongArrayWrapper;
import com.alibaba.graphscope.utils.FFITypeFactoryhelper;
import com.alibaba.graphscope.utils.Unused;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,7 +57,7 @@ public void PEval(
+ ", "
+ sourceInThisFrag
+ ", lid: "
+ source.GetValue());
+ source.getValue());

AtomicLongArrayWrapper partialResults = context.partialResults;
VertexSet curModified = context.curModified;
Expand All @@ -72,18 +71,13 @@ public void PEval(
partialResults.set(vertex, Math.min(nbr.data(), partialResults.get(vertex)));
if (fragment.isOuterVertex(vertex)) {
msg.setData(partialResults.get(vertex));
mm.syncStateOnOuterVertex(
fragment,
vertex,
msg,
0,
Unused.getUnused(Long.class, Long.class, Long.class));
mm.syncStateOnOuterVertex(fragment, vertex, msg, 0);
} else {
nextModified.set(vertex);
}
}
}
mm.ForceContinue();
mm.forceContinue();
curModified.assign(nextModified);
}

Expand All @@ -110,7 +104,7 @@ public void IncEval(
context.sendMessageTime += System.nanoTime();

if (!context.nextModified.partialEmpty(0, (int) fragment.getInnerVerticesNum())) {
messageManager.ForceContinue();
messageManager.forceContinue();
}
context.curModified.assign(context.nextModified);
}
Expand All @@ -130,12 +124,7 @@ private void receiveMessage(
}
};
messageManager.parallelProcess(
frag,
context.threadNum,
context.executor,
msgSupplier,
messageConsumer,
Unused.getUnused(Long.class, Long.class, Long.class));
frag, context.threadNum, context.executor, msgSupplier, messageConsumer);
}

private void execute(SSSPContext context, IFragment<Long, Long, Long, Long> frag) {
Expand All @@ -146,7 +135,7 @@ private void execute(SSSPContext context, IFragment<Long, Long, Long, Long> frag
long curDist = context.partialResults.get(vertex);
AdjList<Long, Long> nbrs = frag.getOutgoingAdjList(vertex);
for (Nbr<Long, Long> nbr : nbrs.iterable()) {
long curLid = nbr.neighbor().GetValue();
long curLid = nbr.neighbor().getValue();
long nextDist = curDist + nbr.data();
if (nextDist < context.partialResults.get(curLid)) {
context.partialResults.compareAndSetMin(curLid, nextDist);
Expand All @@ -171,12 +160,7 @@ private void sendMessage(
(vertex, finalTid) -> {
LongMsg msg =
FFITypeFactoryhelper.newLongMsg(context.partialResults.get(vertex));
messageManager.syncStateOnOuterVertex(
frag,
vertex,
msg,
finalTid,
Unused.getUnused(Long.class, Long.class, Long.class));
messageManager.syncStateOnOuterVertex(frag, vertex, msg, finalTid);
};
forEachVertex(
frag.outerVertices(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ public void Output(IFragment<Long, Long, Long, Long> frag) {

Vertex<Long> cur = FFITypeFactoryhelper.newVertexLong();
for (long index = 0; index < frag.getInnerVerticesNum(); ++index) {
cur.SetValue(index);
cur.setValue(index);
Long oid = frag.getId(cur);
bufferedWriter.write(
cur.GetValue() + "\t" + oid + "\t" + partialResults.get(index) + "\n");
cur.getValue() + "\t" + oid + "\t" + partialResults.get(index) + "\n");
}
bufferedWriter.close();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.alibaba.graphscope.example.stringApp;

import com.alibaba.graphscope.app.ParallelAppBase;
import com.alibaba.graphscope.context.ParallelContextBase;
import com.alibaba.graphscope.ds.StringView;
import com.alibaba.graphscope.ds.Vertex;
import com.alibaba.graphscope.ds.adaptor.Nbr;
import com.alibaba.graphscope.fragment.IFragment;
import com.alibaba.graphscope.parallel.ParallelEngine;
import com.alibaba.graphscope.parallel.ParallelMessageManager;
import com.alibaba.graphscope.utils.FFITypeFactoryhelper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StringApp
implements ParallelAppBase<Long, Long, StringView, StringView, StringAppContext>,
ParallelEngine {

private static Logger logger = LoggerFactory.getLogger(ParallelAppBase.class);

/**
* Partial Evaluation to implement.
*
* @param graph fragment. The graph fragment providing accesses to graph data.
* @param context context. User defined context which manages data during the whole
* computations.
* @param messageManager The message manger which manages messages between fragments.
* @see IFragment
* @see ParallelContextBase
* @see ParallelMessageManager
*/
@Override
public void PEval(
IFragment<Long, Long, StringView, StringView> graph,
ParallelContextBase<Long, Long, StringView, StringView> context,
ParallelMessageManager messageManager) {

StringAppContext ctx = (StringAppContext) context;

Vertex<Long> vertex = FFITypeFactoryhelper.newVertexLong();
long batch = graph.getInnerVerticesNum() / 10;
for (long i = 0; i < graph.getInnerVerticesNum(); i += batch) {
vertex.setValue(i);
for (Nbr<Long, StringView> nbr : graph.getOutgoingAdjList(vertex).iterable()) {
logger.info(
"Edge {}({})->{}({}), ed {}",
graph.getId(vertex),
graph.getData(vertex).toJavaString(),
graph.getId(nbr.neighbor()),
graph.getData(nbr.neighbor()),
nbr.data());
}
}

messageManager.forceContinue();
}

/**
* Incremental Evaluation to implement.
*
* @param graph fragment. The graph fragment providing accesses to graph data.
* @param context context. User defined context which manages data during the whole
* computations.
* @param messageManager The message manger which manages messages between fragments.
* @see IFragment
* @see ParallelContextBase
* @see ParallelMessageManager
*/
@Override
public void IncEval(
IFragment<Long, Long, StringView, StringView> graph,
ParallelContextBase<Long, Long, StringView, StringView> context,
ParallelMessageManager messageManager) {}
}
Loading

0 comments on commit 8de000f

Please sign in to comment.