Skip to content

Commit

Permalink
GraphScope for Spark-GraphX (#2004)
Browse files Browse the repository at this point in the history
* Introducing `GraphX on GraphScope`, which 
      1) breaks down the barrier of graph storage between GraphScope and GraphX, enabling transformation between `RDD` and GraphScope `Fragment` 
      2) Support Fragment-backend GraphX RDDs, where all data is stored in GraphScope, but can be accessed in RDD API in Spark.
      3) GraphX Graph can be transformed into fragment, where GraphScope algorithms run.
      3) Support running GraphX Pregel on Fragment. GraphX Pregel algorithms can be seamlessly run on GraphScope, with up to 6x performance boost.
  • Loading branch information
zhanglei1949 committed Sep 21, 2022
1 parent 96d3bb1 commit c35766a
Show file tree
Hide file tree
Showing 169 changed files with 21,728 additions and 81 deletions.
27 changes: 27 additions & 0 deletions analytical_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,19 @@ if (${LIBUNWIND_FOUND})
target_link_libraries(grape_engine PRIVATE ${LIBUNWIND_LIBRARIES})
endif ()

# An executable to work around for graphx pregel.
if (ENABLE_JAVA_SDK)
add_executable(graphx_runner core/java/graphx_runner.cc core/java/javasdk.cc)
target_include_directories(graphx_runner PRIVATE core utils apps)
target_compile_definitions(graphx_runner PUBLIC ENABLE_JAVA_SDK)
target_link_libraries(graphx_runner PRIVATE ${CMAKE_DL_LIBS} gs_proto ${VINEYARD_LIBRARIES} ${Boost_LIBRARIES} ${GFLAGS_LIBRARIES} ${JNI_LIBRARIES})

if (${LIBUNWIND_FOUND})
target_link_libraries(graphx_runner PRIVATE ${LIBUNWIND_LIBRARIES})
endif()
endif()


# Test targets
if (BUILD_TESTS)
add_executable(run_app test/run_app.cc core/object/dynamic.cc)
Expand All @@ -301,11 +314,22 @@ if (BUILD_TESTS)
target_compile_definitions(giraph_runner PUBLIC ENABLE_JAVA_SDK)
target_link_libraries(giraph_runner ${CMAKE_DL_LIBS} gs_proto ${VINEYARD_LIBRARIES} ${Boost_LIBRARIES} ${GFLAGS_LIBRARIES} ${JNI_LIBRARIES})

# graphx related test
add_executable(graphx_test test/graphx_test.cc)
target_include_directories(graphx_test PRIVATE core utils apps)
target_compile_definitions(graphx_test PUBLIC ENABLE_JAVA_SDK)
target_link_libraries(graphx_test ${CMAKE_DL_LIBS} ${VINEYARD_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES})

add_executable(projected_fragment_mapper_test test/projected_fragment_mapper_test.cc)
target_include_directories(projected_fragment_mapper_test PRIVATE core utils apps)
target_link_libraries(projected_fragment_mapper_test ${CMAKE_DL_LIBS} ${VINEYARD_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})

if (${LIBUNWIND_FOUND})
target_link_libraries(run_java_app ${LIBUNWIND_LIBRARIES})
target_link_libraries(property_graph_java_app_benchmarks ${LIBUNWIND_LIBRARIES})
target_link_libraries(giraph_runner ${LIBUNWIND_LIBRARIES})
target_link_libraries(graphx_test ${LIBUNWIND_LIBRARIES})
target_link_libraries(projected_fragment_mapper_test ${LIBUNWIND_LIBRARIES})
endif ()
endif()

Expand Down Expand Up @@ -393,6 +417,7 @@ if(ENABLE_JAVA_SDK)
"${GRAPHSCOPE_ANALYTICAL_VERSION}"
)
set(GAE_JAVA_RUNTIME_JAR "${GAE_JAVA_RUNTIME_DIR}/target/grape-runtime-${GRAPHSCOPE_ANALYTICAL_JAR_VERSION}-shaded.jar")
set(GAE_JAVA_GRAPHX_JAR "${GAE_JAVA_DIR}/graphx-on-graphscope/target/graphx-on-graphscope-${GRAPHSCOPE_ANALYTICAL_JAR_VERSION}-shaded.jar")
add_custom_command(
OUTPUT "${GAE_JAVA_RUNTIME_DIR}/target/native/libgrape-jni.so"
COMMAND mvn clean install -DskipTests --quiet
Expand All @@ -407,6 +432,7 @@ if(ENABLE_JAVA_SDK)
install(FILES DESTINATION lib)
install(FILES "${GAE_JAVA_RUNTIME_DIR}/target/native/libgrape-jni.so" DESTINATION lib)
install(FILES "${GAE_JAVA_RUNTIME_JAR}" DESTINATION lib)
install(FILES "${GAE_JAVA_GRAPHX_JAR}" DESTINATION lib)
install(FILES "${GAE_JAVA_DIR}/grape_jvm_opts" DESTINATION conf)
endif()

Expand Down Expand Up @@ -457,6 +483,7 @@ endmacro()
install_gsa_binary(grape_engine)
install_gsa_binary(gs_proto)
install_gsa_binary(gs_util)
install_gsa_binary(graphx_runner)

install_gsa_headers("${PROJECT_SOURCE_DIR}/apps")
install_gsa_headers("${PROJECT_SOURCE_DIR}/benchmarks")
Expand Down
54 changes: 40 additions & 14 deletions analytical_engine/core/context/java_context_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,11 @@ class JavaContextBase : public grape::ContextBase {
LOG(ERROR) << "no args received";
return;
}
std::string user_library_name, user_class_path;
std::string user_library_name, user_class_path, graphx_context_name,
serial_path; // the later two should only used by graphx
std::string args_str = parseParamsAndSetupJVMEnv(
params, lib_path, user_library_name, user_class_path, local_num);
params, lib_path, user_library_name, user_class_path,
graphx_context_name, serial_path, local_num);

JavaVM* jvm = GetJavaVM();
(void) jvm;
Expand Down Expand Up @@ -178,13 +180,13 @@ class JavaContextBase : public grape::ContextBase {
loadJNILibrary(env, user_library_name);

VLOG(1) << "Creating app object: " << app_class_name_;
app_object_ =
LoadAndCreate(env, url_class_loader_object_, app_class_name_);
app_object_ = LoadAndCreate(env, url_class_loader_object_,
app_class_name_, serial_path.c_str());
VLOG(1) << "Successfully created app object with class loader:"
<< &url_class_loader_object_
<< ", of type: " << std::string(app_class_name_);

createContextObj(env);
createContextObj(env, graphx_context_name, serial_path);
jclass context_class = env->GetObjectClass(context_object_);
CHECK_NOTNULL(context_class);

Expand Down Expand Up @@ -288,10 +290,14 @@ class JavaContextBase : public grape::ContextBase {
return std::string(user_class_path);
}
// user library name should be absolute
// serial path is used in graphx, to specify the path to serializaed class
// objects of vd,ed.etc.
std::string parseParamsAndSetupJVMEnv(const std::string& params,
const std::string lib_path,
std::string& user_library_name,
std::string& user_class_path,
std::string& graphx_context_class_name,
std::string& serial_path,
int local_num) {
boost::property_tree::ptree pt;
std::stringstream ss;
Expand Down Expand Up @@ -323,6 +329,13 @@ class JavaContextBase : public grape::ContextBase {
app_class_name_[strlen(ch)] = '\0';
pt.erase("app_class");

auto iter = pt.find("graphx_context_class");
if (iter != pt.not_found()) {
graphx_context_class_name = pt.get<std::string>("graphx_context_class");
}

serial_path = pt.get<std::string>("serial_path", "");

boost::filesystem::path lib_path_fs, lib_dir;
if (!lib_path.empty()) {
lib_path_fs = lib_path;
Expand Down Expand Up @@ -436,19 +449,32 @@ class JavaContextBase : public grape::ContextBase {
}
}

void createContextObj(JNIEnv* env) {
std::string _context_class_name_str = getCtxClassNameFromAppObject(env);
VLOG(1) << "Context class name: " << _context_class_name_str;
context_object_ = LoadAndCreate(env, url_class_loader_object_,
_context_class_name_str.c_str());
VLOG(1) << "Successfully created ctx object with class loader:"
<< &url_class_loader_object_
<< ", of type: " << _context_class_name_str;
void createContextObj(JNIEnv* env, const std::string& graphx_context_name,
const std::string& serial_path) {
if (graphx_context_name.size() != 0 &&
(graphx_context_name.find("com.alibaba.graphscope.context."
"GraphXParallelAdaptorContext") !=
std::string::npos)) {
context_object_ =
LoadAndCreate(env, url_class_loader_object_,
graphx_context_name.c_str(), serial_path.c_str());
VLOG(1) << "Succcessfully loaded graphx context: " << context_object_;
} else {
std::string _context_class_name_str = getCtxClassNameFromAppObject(env);
VLOG(1) << "Context class name: " << _context_class_name_str;
context_object_ =
LoadAndCreate(env, url_class_loader_object_,
_context_class_name_str.c_str(), serial_path.c_str());
VLOG(1) << "Successfully created ctx object with class loader:"
<< &url_class_loader_object_
<< ", of type: " << _context_class_name_str;
}
}

jobject wrapFragObj(JNIEnv* env, jobject& fragObject) {
if (graph_type_str_.find("Immutable") != std::string::npos ||
graph_type_str_.find("ArrowProjected") != std::string::npos) {
graph_type_str_.find("ArrowProjected") != std::string::npos ||
graph_type_str_.find("GraphXFragment") != std::string::npos) {
VLOG(10) << "Creating IFragment";
// jobject fragment_object_impl_ = env->NewGlobalRef(fragObject);
// For immutableFragment and ArrowProjectedFragment, we use a wrapper
Expand Down
8 changes: 8 additions & 0 deletions analytical_engine/core/fragment/arrow_projected_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ class AdjList<VID_T, EID_T, grape::EmptyType> {

} // namespace arrow_projected_fragment_impl

template <typename OID_T, typename VID_T, typename OLD_VDATA_T,
typename NEW_VDATA_T, typename OLD_EDATA_T, typename NEW_EDATA_T>
class ArrowProjectedFragmentMapper;

/**
* @brief This class represents the fragment projected from ArrowFragment which
* contains only one vertex label and edge label. The fragment has no label and
Expand Down Expand Up @@ -1225,6 +1229,10 @@ class ArrowProjectedFragment

std::vector<vid_t> outer_vertex_offsets_;
std::vector<std::vector<vertex_t>> mirrors_of_frag_;

template <typename _OID_T, typename _VID_T, typename _OLD_VDATA_T,
typename _NEW_VDATA_T, typename _OLD_EDATA_T, typename _NEW_EDATA_T>
friend class ArrowProjectedFragmentMapper;
};

} // namespace gs
Expand Down
174 changes: 174 additions & 0 deletions analytical_engine/core/fragment/arrow_projected_fragment_mapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ANALYTICAL_ENGINE_CORE_FRAGMENT_ARROW_PROJECTED_FRAGMENT_MAPPER_H_
#define ANALYTICAL_ENGINE_CORE_FRAGMENT_ARROW_PROJECTED_FRAGMENT_MAPPER_H_

#include <limits>
#include <memory>
#include <set>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>

#include <boost/asio.hpp>
#include "arrow/array.h"
#include "boost/lexical_cast.hpp"

#include "grape/fragment/fragment_base.h"
#include "vineyard/basic/ds/arrow_utils.h"
#include "vineyard/common/util/version.h"
#include "vineyard/graph/fragment/property_graph_types.h"

#include "core/fragment/arrow_projected_fragment.h"

namespace gs {

/**
* @brief Create a new arrowProjectedFragment with new vdata and new edata.
*
* @tparam OID_T OID type
* @tparam VID_T VID type
*/
template <typename OID_T, typename VID_T, typename OLD_VDATA_T,
typename NEW_VDATA_T, typename OLD_EDATA_T, typename NEW_EDATA_T>
class ArrowProjectedFragmentMapper {
public:
using label_id_t = vineyard::property_graph_types::LABEL_ID_TYPE;
using prop_id_t = vineyard::property_graph_types::PROP_ID_TYPE;
using vid_t = VID_T;
using oid_t = OID_T;
using old_vdata_t = OLD_VDATA_T;
using new_vdata_t = NEW_VDATA_T;
using old_edata_t = OLD_EDATA_T;
using new_edata_t = NEW_EDATA_T;

using edata_array_builder_t =
typename vineyard::ConvertToArrowType<new_edata_t>::BuilderType;
using edata_array_t =
typename vineyard::ConvertToArrowType<new_edata_t>::ArrayType;
using vineyard_edata_array_builder_t =
typename vineyard::InternalType<new_edata_t>::vineyard_builder_type;

using vdata_array_builder_t =
typename vineyard::ConvertToArrowType<new_vdata_t>::BuilderType;
using vdata_array_t =
typename vineyard::ConvertToArrowType<new_vdata_t>::ArrayType;
using vineyard_vdata_array_builder_t =
typename vineyard::InternalType<new_vdata_t>::vineyard_builder_type;

using old_frag_t =
ArrowProjectedFragment<oid_t, vid_t, old_vdata_t, old_edata_t>;
using new_frag_t =
ArrowProjectedFragment<oid_t, vid_t, new_vdata_t, new_edata_t>;

ArrowProjectedFragmentMapper() {}
~ArrowProjectedFragmentMapper() {}

std::shared_ptr<new_frag_t> Map(old_frag_t& old_fragment,
vdata_array_builder_t& vdata_array_builder,
edata_array_builder_t& edata_array_builder,
vineyard::Client& client) {
const vineyard::ObjectMeta& old_meta = old_fragment.meta();
std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>> old_arrow_fragment =
old_fragment.fragment_;
auto v_label = old_meta.GetKeyValue<int>("projected_v_label");
auto e_label = old_meta.GetKeyValue<int>("projected_e_label");
auto old_vprop_num = old_arrow_fragment->vertex_property_num(v_label);
auto old_eprop_num = old_arrow_fragment->edge_property_num(e_label);
LOG(INFO) << "Old arrow fragment has " << old_vprop_num
<< " vertex properties and " << old_eprop_num
<< " edge properties";
auto new_vprop_name = "VPROP_" + std::to_string(old_vprop_num);
auto new_eprop_name = "EPROP_" + std::to_string(old_eprop_num);

std::shared_ptr<edata_array_t> arrow_edata_array;
edata_array_builder.Finish(&arrow_edata_array);
std::shared_ptr<vdata_array_t> arrow_vdata_array;
vdata_array_builder.Finish(&arrow_vdata_array);
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>
vertex_columns_map;
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>
edge_columns_map;
vertex_columns_map.push_back(
std::make_pair(new_vprop_name, arrow_vdata_array));
edge_columns_map.push_back(
std::make_pair(new_eprop_name, arrow_edata_array));

vineyard::ObjectID new_frag_id;

// Add new data to original ArrowFragment's table.
{
std::map<
label_id_t,
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
v_map;
v_map[v_label] = vertex_columns_map;
vineyard::ObjectID new_arrow_fragment_id = bl::try_handle_all(
[&]() { return old_arrow_fragment->AddVertexColumns(client, v_map); },
[](const vineyard::GSError& e) {
LOG(ERROR) << e.error_msg;
return 0;
},
[](const bl::error_info& unmatched) {
LOG(ERROR) << "Unmatched error " << unmatched;
return 0;
});
LOG(INFO) << "Build First fragment: " << new_arrow_fragment_id;
std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>>
new_arrow_fragment =
std::dynamic_pointer_cast<vineyard::ArrowFragment<oid_t, vid_t>>(
client.GetObject(new_arrow_fragment_id));

std::map<
label_id_t,
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
e_map;
e_map[e_label] = edge_columns_map;
vineyard::ObjectID new_new_arrow_fragment_id = bl::try_handle_all(
[&]() { return new_arrow_fragment->AddEdgeColumns(client, e_map); },
[](const vineyard::GSError& e) {
LOG(ERROR) << e.error_msg;
return 0;
},
[](const bl::error_info& unmatched) {
LOG(ERROR) << "Unmatched error " << unmatched;
return 0;
});

LOG(INFO) << "Build Second fragment: " << new_new_arrow_fragment_id;
std::shared_ptr<vineyard::ArrowFragment<oid_t, vid_t>>
new_new_arrow_fragment =
std::dynamic_pointer_cast<vineyard::ArrowFragment<oid_t, vid_t>>(
client.GetObject(new_new_arrow_fragment_id));
auto schema = new_new_arrow_fragment->schema();
auto v_prop_id = schema.GetVertexPropertyId(v_label, new_vprop_name);
auto e_prop_id = schema.GetEdgePropertyId(e_label, new_eprop_name);
std::shared_ptr<new_frag_t> res = new_frag_t::Project(
new_new_arrow_fragment, 0, v_prop_id, 0, e_prop_id);
new_frag_id = res->id();
}

LOG(INFO) << "Got projected fragment: " << new_frag_id;
auto new_frag =
std::dynamic_pointer_cast<new_frag_t>(client.GetObject(new_frag_id));
return new_frag;
}
};
} // namespace gs

#endif // ANALYTICAL_ENGINE_CORE_FRAGMENT_ARROW_PROJECTED_FRAGMENT_MAPPER_H_
Loading

0 comments on commit c35766a

Please sign in to comment.