Skip to content

Commit

Permalink
[GAE-Java] GraphX-on-GraphScope : remove GraphXFragment and related…
Browse files Browse the repository at this point in the history
… components (#2123)

* Remove `GraphXFragment` impl and related loading and shuffling code.
* Use `GraphXLoader` to load graphx rdd  to ArrowFragment, project to ArrowProjectedFragment.
* Some other minor changes.
  • Loading branch information
zhanglei1949 committed Oct 27, 2022
1 parent 49ae814 commit 74af74f
Show file tree
Hide file tree
Showing 192 changed files with 11,919 additions and 11,974 deletions.
16 changes: 8 additions & 8 deletions analytical_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -318,20 +318,19 @@ if (BUILD_TESTS)
target_link_libraries(giraph_runner ${CMAKE_DL_LIBS} gs_proto ${VINEYARD_LIBRARIES} ${Boost_LIBRARIES} ${GFLAGS_LIBRARIES} ${JNI_LIBRARIES})

# graphx related test
add_executable(graphx_test test/graphx_test.cc)
target_include_directories(graphx_test PRIVATE core utils apps)
target_compile_definitions(graphx_test PUBLIC ENABLE_JAVA_SDK)
target_link_libraries(graphx_test ${CMAKE_DL_LIBS} ${VINEYARD_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES})

add_executable(projected_fragment_mapper_test test/projected_fragment_mapper_test.cc)
target_include_directories(projected_fragment_mapper_test PRIVATE core utils apps)
target_link_libraries(projected_fragment_mapper_test ${CMAKE_DL_LIBS} ${VINEYARD_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})

add_executable(graphx_loader_test test/graphx_loader_test.cc)
target_include_directories(graphx_loader_test PRIVATE core utils apps)
target_compile_definitions(graphx_loader_test PUBLIC ENABLE_JAVA_SDK)
target_link_libraries(graphx_loader_test ${CMAKE_DL_LIBS} ${VINEYARD_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})

if (${LIBUNWIND_FOUND})
target_link_libraries(run_java_app ${LIBUNWIND_LIBRARIES})
target_link_libraries(property_graph_java_app_benchmarks ${LIBUNWIND_LIBRARIES})
target_link_libraries(giraph_runner ${LIBUNWIND_LIBRARIES})
target_link_libraries(graphx_test ${LIBUNWIND_LIBRARIES})
target_link_libraries(projected_fragment_mapper_test ${LIBUNWIND_LIBRARIES})
endif ()
endif()
Expand Down Expand Up @@ -410,7 +409,7 @@ add_custom_target(gsa_clformat
COMMENT "Running clang-format, using clang-format-8 from https://github.com/muttleyxd/clang-tools-static-binaries/releases"
VERBATIM)

# Install GAE Java SDK
#Install GAE Java SDK
if(ENABLE_JAVA_SDK)
set(GAE_JAVA_DIR "${CMAKE_CURRENT_SOURCE_DIR}/java/")
set(GAE_JAVA_RUNTIME_DIR "${GAE_JAVA_DIR}/grape-runtime/")
Expand All @@ -437,7 +436,8 @@ if(ENABLE_JAVA_SDK)
install(FILES "${GAE_JAVA_RUNTIME_JAR}" DESTINATION lib)
install(FILES "${GAE_JAVA_GRAPHX_JAR}" DESTINATION lib)
install(FILES "${GAE_JAVA_DIR}/grape_jvm_opts" DESTINATION conf)
endif()
install(FILES "${GAE_JAVA_DIR}/run_graphx.sh" DESTINATION bin)
endif()

# Install binaries
macro(install_gsa_binary target)
Expand Down
3 changes: 1 addition & 2 deletions analytical_engine/core/context/java_context_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,7 @@ class JavaContextBase : public grape::ContextBase {

jobject wrapFragObj(JNIEnv* env, jobject& fragObject) {
if (graph_type_str_.find("Immutable") != std::string::npos ||
graph_type_str_.find("ArrowProjected") != std::string::npos ||
graph_type_str_.find("GraphXFragment") != std::string::npos) {
graph_type_str_.find("ArrowProjected") != std::string::npos) {
VLOG(10) << "Creating IFragment";
// jobject fragment_object_impl_ = env->NewGlobalRef(fragObject);
// For immutableFragment and ArrowProjectedFragment, we use a wrapper
Expand Down
37 changes: 31 additions & 6 deletions analytical_engine/core/fragment/arrow_projected_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,6 @@ class AdjList<VID_T, EID_T, grape::EmptyType> {

} // namespace arrow_projected_fragment_impl

template <typename OID_T, typename VID_T, typename OLD_VDATA_T,
typename NEW_VDATA_T, typename OLD_EDATA_T, typename NEW_EDATA_T>
class ArrowProjectedFragmentMapper;

/**
* @brief This class represents the fragment projected from ArrowFragment which
* contains only one vertex label and edge label. The fragment has no label and
Expand Down Expand Up @@ -711,6 +707,14 @@ class ArrowProjectedFragment

inline fid_t fnum() const { return fnum_; }

inline label_id_t vertex_label() const { return vertex_label_; }

inline label_id_t edge_label() const { return edge_label_; }

inline prop_id_t vertex_prop_id() const { return vertex_prop_; }

inline prop_id_t edge_prop_id() const { return edge_prop_; }

inline vertex_range_t Vertices() const { return vertices_; }

inline vertex_range_t InnerVertices() const { return inner_vertices_; }
Expand Down Expand Up @@ -770,6 +774,18 @@ class ArrowProjectedFragment

inline size_t GetOutEdgeNum() const { return oenum_; }

/* Get outging edges num from this frag*/
inline size_t GetOutgoingEdgeNum() const {
return static_cast<size_t>(oe_offsets_end_->Value(ivnum_ - 1) -
oe_offsets_begin_->Value(0));
}

/* Get incoming edges num to this frag*/
inline size_t GetIncomingEdgeNum() const {
return static_cast<size_t>(ie_offsets_end_->Value(ivnum_ - 1) -
ie_offsets_begin_->Value(0));
}

inline size_t GetTotalVerticesNum() const {
return vm_ptr_->GetTotalVerticesNum();
}
Expand Down Expand Up @@ -984,6 +1000,15 @@ class ArrowProjectedFragment
return edge_data_array_accessor_;
}

inline arrow_projected_fragment_impl::TypedArray<VDATA_T>&
get_vdata_array_accessor() {
return vertex_data_array_accessor_;
}

std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>> get_arrow_fragment() {
return fragment_;
}

private:
inline static std::pair<int64_t, int64_t> getRangeOfLabel(
std::shared_ptr<property_graph_t> fragment, label_id_t v_label,
Expand Down Expand Up @@ -1230,8 +1255,8 @@ class ArrowProjectedFragment
std::vector<vid_t> outer_vertex_offsets_;
std::vector<std::vector<vertex_t>> mirrors_of_frag_;

template <typename _OID_T, typename _VID_T, typename _OLD_VDATA_T,
typename _NEW_VDATA_T, typename _OLD_EDATA_T, typename _NEW_EDATA_T>
template <typename _OID_T, typename _VID_T, typename _NEW_VDATA_T,
typename _NEW_EDATA_T>
friend class ArrowProjectedFragmentMapper;
};

Expand Down
196 changes: 119 additions & 77 deletions analytical_engine/core/fragment/arrow_projected_fragment_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,15 @@ namespace gs {
* @tparam OID_T OID type
* @tparam VID_T VID type
*/
template <typename OID_T, typename VID_T, typename OLD_VDATA_T,
typename NEW_VDATA_T, typename OLD_EDATA_T, typename NEW_EDATA_T>
template <typename OID_T, typename VID_T, typename NEW_VDATA_T,
typename NEW_EDATA_T>
class ArrowProjectedFragmentMapper {
public:
using label_id_t = vineyard::property_graph_types::LABEL_ID_TYPE;
using prop_id_t = vineyard::property_graph_types::PROP_ID_TYPE;
using vid_t = VID_T;
using oid_t = OID_T;
using old_vdata_t = OLD_VDATA_T;
using new_vdata_t = NEW_VDATA_T;
using old_edata_t = OLD_EDATA_T;
using new_edata_t = NEW_EDATA_T;

using edata_array_builder_t =
Expand All @@ -72,95 +70,56 @@ class ArrowProjectedFragmentMapper {
using vineyard_vdata_array_builder_t =
typename vineyard::InternalType<new_vdata_t>::vineyard_builder_type;

using old_frag_t =
ArrowProjectedFragment<oid_t, vid_t, old_vdata_t, old_edata_t>;
using old_frag_t = std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>>;
using new_frag_t =
ArrowProjectedFragment<oid_t, vid_t, new_vdata_t, new_edata_t>;

ArrowProjectedFragmentMapper() {}
~ArrowProjectedFragmentMapper() {}

std::shared_ptr<new_frag_t> Map(old_frag_t& old_fragment,
std::shared_ptr<new_frag_t> Map(old_frag_t old_arrow_fragment,
label_id_t v_label, label_id_t e_label,
vdata_array_builder_t& vdata_array_builder,
edata_array_builder_t& edata_array_builder,
vineyard::Client& client) {
const vineyard::ObjectMeta& old_meta = old_fragment.meta();
std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>> old_arrow_fragment =
old_fragment.fragment_;
auto v_label = old_meta.GetKeyValue<int>("projected_v_label");
auto e_label = old_meta.GetKeyValue<int>("projected_e_label");
auto old_vprop_num = old_arrow_fragment->vertex_property_num(v_label);
auto old_eprop_num = old_arrow_fragment->edge_property_num(e_label);
LOG(INFO) << "Old arrow fragment has " << old_vprop_num
<< " vertex properties and " << old_eprop_num
<< " edge properties";
auto new_vprop_name = "VPROP_" + std::to_string(old_vprop_num);
auto new_eprop_name = "EPROP_" + std::to_string(old_eprop_num);
std::string new_vprop_name, new_eprop_name;
vineyard::ObjectID new_frag_id;

std::shared_ptr<edata_array_t> arrow_edata_array;
edata_array_builder.Finish(&arrow_edata_array);
std::shared_ptr<vdata_array_t> arrow_vdata_array;
vdata_array_builder.Finish(&arrow_vdata_array);
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>
vertex_columns_map;
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>
edge_columns_map;
vertex_columns_map.push_back(
std::make_pair(new_vprop_name, arrow_vdata_array));
edge_columns_map.push_back(
std::make_pair(new_eprop_name, arrow_edata_array));
auto after_vertex = addVertexColumn(client, old_arrow_fragment, v_label,
new_vprop_name, vdata_array_builder);
auto after_edge = addEdgeColumn(client, after_vertex, e_label,
new_eprop_name, edata_array_builder);
// Add new data to original ArrowFragment's table.
{
auto schema = after_edge->schema();
auto v_prop_id = schema.GetVertexPropertyId(v_label, new_vprop_name);
auto e_prop_id = schema.GetEdgePropertyId(e_label, new_eprop_name);
std::shared_ptr<new_frag_t> res =
new_frag_t::Project(after_edge, 0, v_prop_id, 0, e_prop_id);
new_frag_id = res->id();
}

LOG(INFO) << "Got projected fragment: " << new_frag_id;
auto new_frag =
std::dynamic_pointer_cast<new_frag_t>(client.GetObject(new_frag_id));
return new_frag;
}

std::shared_ptr<new_frag_t> Map(old_frag_t old_arrow_fragment,
label_id_t v_label, prop_id_t old_e_prop_id,
vdata_array_builder_t& vdata_array_builder,
vineyard::Client& client) {
std::string new_vprop_name;
vineyard::ObjectID new_frag_id;

auto after_vertex = addVertexColumn(client, old_arrow_fragment, v_label,
new_vprop_name, vdata_array_builder);
// Add new data to original ArrowFragment's table.
{
std::map<
label_id_t,
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
v_map;
v_map[v_label] = vertex_columns_map;
vineyard::ObjectID new_arrow_fragment_id = bl::try_handle_all(
[&]() { return old_arrow_fragment->AddVertexColumns(client, v_map); },
[](const vineyard::GSError& e) {
LOG(ERROR) << e.error_msg;
return 0;
},
[](const bl::error_info& unmatched) {
LOG(ERROR) << "Unmatched error " << unmatched;
return 0;
});
LOG(INFO) << "Build First fragment: " << new_arrow_fragment_id;
std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>>
new_arrow_fragment =
std::dynamic_pointer_cast<vineyard::ArrowFragment<oid_t, vid_t>>(
client.GetObject(new_arrow_fragment_id));

std::map<
label_id_t,
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
e_map;
e_map[e_label] = edge_columns_map;
vineyard::ObjectID new_new_arrow_fragment_id = bl::try_handle_all(
[&]() { return new_arrow_fragment->AddEdgeColumns(client, e_map); },
[](const vineyard::GSError& e) {
LOG(ERROR) << e.error_msg;
return 0;
},
[](const bl::error_info& unmatched) {
LOG(ERROR) << "Unmatched error " << unmatched;
return 0;
});

LOG(INFO) << "Build Second fragment: " << new_new_arrow_fragment_id;
std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>>
new_new_arrow_fragment =
std::dynamic_pointer_cast<vineyard::ArrowFragment<oid_t, vid_t>>(
client.GetObject(new_new_arrow_fragment_id));
auto schema = new_new_arrow_fragment->schema();
auto schema = after_vertex->schema();
auto v_prop_id = schema.GetVertexPropertyId(v_label, new_vprop_name);
auto e_prop_id = schema.GetEdgePropertyId(e_label, new_eprop_name);
std::shared_ptr<new_frag_t> res = new_frag_t::Project(
new_new_arrow_fragment, 0, v_prop_id, 0, e_prop_id);
std::shared_ptr<new_frag_t> res =
new_frag_t::Project(after_vertex, 0, v_prop_id, 0, old_e_prop_id);
new_frag_id = res->id();
}

Expand All @@ -169,6 +128,89 @@ class ArrowProjectedFragmentMapper {
std::dynamic_pointer_cast<new_frag_t>(client.GetObject(new_frag_id));
return new_frag;
}

private:
std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>> addVertexColumn(
vineyard::Client& client,
std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>>&
old_arrow_fragment,
int v_label_id, std::string& new_vprop_name,
vdata_array_builder_t& vdata_array_builder) {
auto old_vprop_num = old_arrow_fragment->vertex_property_num(v_label_id);
LOG(INFO) << "Old arrow fragment has " << old_vprop_num
<< " vertex properties";
new_vprop_name = "VPROP_" + std::to_string(old_vprop_num);
std::shared_ptr<vdata_array_t> arrow_vdata_array;
vdata_array_builder.Finish(&arrow_vdata_array);

std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>
vertex_columns_map;
vertex_columns_map.push_back(
std::make_pair(new_vprop_name, arrow_vdata_array));

std::map<label_id_t,
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
v_map;
v_map[v_label_id] = vertex_columns_map;
vineyard::ObjectID new_arrow_fragment_id = bl::try_handle_all(
[&]() { return old_arrow_fragment->AddVertexColumns(client, v_map); },
[](const vineyard::GSError& e) {
LOG(ERROR) << e.error_msg;
return 0;
},
[](const bl::error_info& unmatched) {
LOG(ERROR) << "Unmatched error " << unmatched;
return 0;
});
LOG(INFO) << "Added vertex column,frag : " << new_arrow_fragment_id;
std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>> new_arrow_fragment =
std::dynamic_pointer_cast<vineyard::ArrowFragment<oid_t, vid_t>>(
client.GetObject(new_arrow_fragment_id));
return new_arrow_fragment;
}

std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>> addEdgeColumn(
vineyard::Client& client,
std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>>&
old_arrow_fragment,
int e_label_id, std::string& new_eprop_name,
edata_array_builder_t& edata_array_builder) {
auto old_eprop_num = old_arrow_fragment->edge_property_num(e_label_id);
LOG(INFO) << "Old arrow fragment has " << old_eprop_num
<< " edge properties";

new_eprop_name = "EPROP_" + std::to_string(old_eprop_num);

std::shared_ptr<edata_array_t> arrow_edata_array;
edata_array_builder.Finish(&arrow_edata_array);

std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>
edge_columns_map;

edge_columns_map.push_back(
std::make_pair(new_eprop_name, arrow_edata_array));

std::map<label_id_t,
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
e_map;
e_map[e_label_id] = edge_columns_map;
vineyard::ObjectID new_arrow_fragment_id = bl::try_handle_all(
[&]() { return old_arrow_fragment->AddEdgeColumns(client, e_map); },
[](const vineyard::GSError& e) {
LOG(ERROR) << e.error_msg;
return 0;
},
[](const bl::error_info& unmatched) {
LOG(ERROR) << "Unmatched error " << unmatched;
return 0;
});

LOG(INFO) << "Add edge Columns, fragment: " << new_arrow_fragment_id;
std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>> new_arrow_fragment =
std::dynamic_pointer_cast<vineyard::ArrowFragment<oid_t, vid_t>>(
client.GetObject(new_arrow_fragment_id));
return new_arrow_fragment;
}
};
} // namespace gs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* limitations under the License.
*/

#ifndef ANALYTICAL_ENGINE_CORE_JAVA_GRAPHX_FRAGMENT_GETTER_H_
#define ANALYTICAL_ENGINE_CORE_JAVA_GRAPHX_FRAGMENT_GETTER_H_
#ifndef ANALYTICAL_ENGINE_CORE_JAVA_FRAGMENT_GETTER_H_
#define ANALYTICAL_ENGINE_CORE_JAVA_FRAGMENT_GETTER_H_

#define WITH_PROFILING

Expand Down Expand Up @@ -80,4 +80,4 @@ class ArrowFragmentGroupGetter {

} // namespace gs

#endif // ANALYTICAL_ENGINE_CORE_JAVA_GRAPHX_FRAGMENT_GETTER_H_
#endif // ANALYTICAL_ENGINE_CORE_JAVA_FRAGMENT_GETTER_H_
Loading

0 comments on commit 74af74f

Please sign in to comment.