Skip to content

Commit

Permalink
Optimize the memory usage of certain graphs
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow committed Jan 13, 2023
1 parent 59367f8 commit 41e93d2
Show file tree
Hide file tree
Showing 67 changed files with 616 additions and 171 deletions.
15 changes: 15 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@
**/*.egg-info/
**/*.eggs

# artifacts, use for testing
/google-java-format-1.13.0-all-deps.jar


# coordinator
coordinator/proto

# python
python/proto
python/build
python/*.tar.gz
python/*.zip

# Gar file
**/*.gar
Expand Down Expand Up @@ -86,6 +93,14 @@ package.lock
# docker: ignore learning engine's build artifacts
learning_engine/graph-learn/cmake-build
learning_engine/graph-learn/graphlearn/cmake-build
learning_engine/graph-learn/graphlearn/built
learning_engine/graph-learn/graphlearn/build
learning_engine/graph-learn/graphlearn/examples/data
learning_engine/graph-learn/graphlearn/**/graphlearn.*.log*
!learning_engine/graph-learn/graphlearn/examples/data/*.py
learning_engine/graph-learn/graphlearn/pywrap_graphlearn.cpython*
learning_engine/graph-learn/graphlearn/src/contrib/knn
learning_engine/graph-learn/graphlearn/python/lib

# docker: ignore .install_prefix
.install_prefix
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/gae.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
runs-on: ubuntu-20.04
if: ${{ github.repository == 'alibaba/GraphScope' }}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.11.4
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.11.6
options:
--shm-size 4096m
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/networkx-forward-algo-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
run:
shell: bash --noprofile --norc -eo pipefail {0}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.11.4
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.11.6
options:
--shm-size 4096m

Expand Down
2 changes: 1 addition & 1 deletion analytical_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ else()
endif()

# find vineyard after arrow to avoid duplicate target names
find_package(vineyard 0.11.4 REQUIRED)
find_package(vineyard 0.11.6 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
7 changes: 5 additions & 2 deletions analytical_engine/core/context/tensor_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ inline InArchive& operator<<(
size_t size = tensor.size();
if (size > 0) {
auto type = gs::dynamic::GetType(tensor.data()[0]);
CHECK(type == gs::dynamic::Type::kInt64Type ||
CHECK(type == gs::dynamic::Type::kInt32Type ||
type == gs::dynamic::Type::kInt64Type ||
type == gs::dynamic::Type::kDoubleType ||
type == gs::dynamic::Type::kStringType);
for (size_t i = 0; i < tensor.size(); i++) {
Expand Down Expand Up @@ -791,7 +792,9 @@ class TensorContextWrapper<
for (auto dim_size : first_shape) {
*arc << static_cast<int64_t>(dim_size);
}
if (data_type == dynamic::Type::kInt64Type) {
if (data_type == dynamic::Type::kInt32Type) {
*arc << static_cast<int>(vineyard::TypeToInt<int32_t>::value);
} else if (data_type == dynamic::Type::kInt64Type) {
*arc << static_cast<int>(vineyard::TypeToInt<int64_t>::value);
} else if (data_type == dynamic::Type::kDoubleType) {
*arc << static_cast<int>(vineyard::TypeToInt<double>::value);
Expand Down
4 changes: 4 additions & 0 deletions analytical_engine/core/io/property_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ struct Graph {
std::vector<std::shared_ptr<Edge>> edges;
bool directed;
bool generate_eid;
bool retain_oid;

std::string SerializeToString() const {
std::stringstream ss;
ss << "directed: " << directed << "\n";
ss << "generate_eid: " << generate_eid << "\n";
ss << "retain_oid: " << retain_oid << "\n";
for (auto& v : vertices) {
ss << v->SerializeToString();
}
Expand Down Expand Up @@ -279,10 +281,12 @@ inline bl::result<std::shared_ptr<detail::Graph>> ParseCreatePropertyGraph(
const GSParams& params) {
BOOST_LEAF_AUTO(directed, params.Get<bool>(rpc::DIRECTED));
BOOST_LEAF_AUTO(generate_eid, params.Get<bool>(rpc::GENERATE_EID));
BOOST_LEAF_AUTO(retain_oid, params.Get<bool>(rpc::RETAIN_OID));

auto graph = std::make_shared<detail::Graph>();
graph->directed = directed;
graph->generate_eid = generate_eid;
graph->retain_oid = retain_oid;

const auto& large_attr = params.GetLargeAttr();
for (const auto& item : large_attr.chunk_list().items()) {
Expand Down
8 changes: 4 additions & 4 deletions analytical_engine/core/java/graphx_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ class GraphXClient
GraphXClient(int listen_port, int part_cnt, vineyard::Client& client,
const grape::CommSpec& comm_spec,
const GraphXPartitioner<OID_T>& partitioner,
bool directed = true, bool retain_oid = false,
bool generate_eid = false)
bool directed = true, bool generate_eid = false,
bool retain_oid = false)
: vineyard::BasicEVFragmentLoader<OID_T, VID_T, GraphXPartitioner<OID_T>>(
client, comm_spec, partitioner, directed, retain_oid,
generate_eid) {
client, comm_spec, partitioner, directed, generate_eid,
retain_oid) {
listen_port_ = listen_port;
part_cnt_ = part_cnt;
}
Expand Down
8 changes: 4 additions & 4 deletions analytical_engine/core/java/graphx_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ class GraphXLoader
explicit GraphXLoader(vineyard::ObjectID objId, vineyard::Client& client,
const grape::CommSpec& comm_spec,
const GraphXPartitioner<OID_T>& partitioner,
bool directed = true, bool retain_oid = false,
bool generate_eid = false)
bool directed = true, bool generate_eid = false,
bool retain_oid = false)
: vineyard::BasicEVFragmentLoader<OID_T, VID_T, GraphXPartitioner<OID_T>>(
client, comm_spec, partitioner, directed, retain_oid,
generate_eid) {
client, comm_spec, partitioner, directed, generate_eid,
retain_oid) {
this->raw_data =
std::dynamic_pointer_cast<raw_data_t>(client.GetObject(objId));
}
Expand Down
5 changes: 3 additions & 2 deletions analytical_engine/core/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class ArrowFragmentLoader
using Base::directed_;
using Base::efiles_;
using Base::generate_eid_;
using Base::retain_oid_;
using Base::vfiles_;

public:
Expand All @@ -106,7 +107,7 @@ class ArrowFragmentLoader
const std::vector<std::string>& efiles,
const std::vector<std::string>& vfiles,
bool directed = true)
: Base(client, comm_spec, efiles, vfiles, directed, false),
: Base(client, comm_spec, efiles, vfiles, directed, false, false),
graph_info_(nullptr),
giraph_enabled_(false) {}

Expand All @@ -115,7 +116,7 @@ class ArrowFragmentLoader
std::shared_ptr<detail::Graph> graph_info)
: Base(client, comm_spec, std::vector<std::string>{},
std::vector<std::string>{}, graph_info->directed,
graph_info->generate_eid),
graph_info->generate_eid, graph_info->retain_oid),
graph_info_(graph_info) {
#ifdef ENABLE_JAVA_SDK
// check when vformat or eformat start with giraph. if not, we
Expand Down
64 changes: 64 additions & 0 deletions analytical_engine/core/loader/dynamic_to_arrow_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,70 @@ struct EdgeArrayBuilder<arrow::LargeStringBuilder> {
template <typename DST_FRAG_T, typename OID_T>
struct COOBuilder {};

/**
* @brief This is a specialized EdgeArrayBuilder for int32_t
* @tparam DST_FRAG_T
*/
template <typename DST_FRAG_T>
struct COOBuilder<DST_FRAG_T, int32_t> {
using oid_t = int32_t;
using src_fragment_t = DynamicFragment;
using dst_fragment_t = DST_FRAG_T;

auto Build(
const std::shared_ptr<src_fragment_t>& src_frag,
const std::shared_ptr<typename dst_fragment_t::vertex_map_t>& dst_vm)
-> bl::result<std::pair<std::shared_ptr<arrow::Array>,
std::shared_ptr<arrow::Array>>> {
auto fid = src_frag->fid();

auto inner_vertices = src_frag->InnerVertices();
arrow::UInt64Builder src_builder, dst_builder;
std::shared_ptr<arrow::Array> src_array, dst_array;

for (const auto& u : inner_vertices) {
if (!src_frag->IsAliveInnerVertex(u)) {
continue;
}
auto u_oid = src_frag->GetId(u);
vineyard::property_graph_types::VID_TYPE u_gid;

CHECK(dst_vm->GetGid(fid, 0, u_oid.GetInt(), u_gid));

for (auto& e : src_frag->GetOutgoingAdjList(u)) {
auto& v = e.neighbor;
if (!src_frag->directed() && u.GetValue() > v.GetValue()) {
continue;
}
auto v_oid = src_frag->GetId(v);
vineyard::property_graph_types::VID_TYPE v_gid;

CHECK(dst_vm->GetGid(0, v_oid.GetInt(), v_gid));
ARROW_OK_OR_RAISE(src_builder.Append(u_gid));
ARROW_OK_OR_RAISE(dst_builder.Append(v_gid));
}
if (src_frag->directed()) {
for (auto& e : src_frag->GetIncomingAdjList(u)) {
auto& v = e.neighbor;
if (src_frag->IsOuterVertex(v)) {
auto v_oid = src_frag->GetId(v);
vineyard::property_graph_types::VID_TYPE v_gid;

CHECK(dst_vm->GetGid(0, v_oid.GetInt(), v_gid));
ARROW_OK_OR_RAISE(src_builder.Append(v_gid));
ARROW_OK_OR_RAISE(dst_builder.Append(u_gid));
}
}
}
}

ARROW_OK_OR_RAISE(src_builder.Finish(&src_array));
ARROW_OK_OR_RAISE(dst_builder.Finish(&dst_array));

return std::make_pair(src_array, dst_array);
}
};

/**
* @brief This is a specialized EdgeArrayBuilder for int64_t
* @tparam DST_FRAG_T
Expand Down
2 changes: 2 additions & 0 deletions analytical_engine/core/object/dynamic.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ enum Type {
kStringType = 5,
kInt64Type = 6,
kDoubleType = 7,
kInt32Type = 8,
};

// An inherit class of rapidjson::Value to support more features.
Expand Down Expand Up @@ -299,6 +300,7 @@ static inline rpc::graph::DataTypePb DynamicType2RpcType(const Type& t) {
static const std::map<Type, rpc::graph::DataTypePb> type2type = {
{Type::kNullType, rpc::graph::DataTypePb::NULLVALUE},
{Type::kBoolType, rpc::graph::DataTypePb::BOOL},
{Type::kInt32Type, rpc::graph::DataTypePb::INT},
{Type::kInt64Type, rpc::graph::DataTypePb::LONG},
{Type::kDoubleType, rpc::graph::DataTypePb::DOUBLE},
{Type::kStringType, rpc::graph::DataTypePb::STRING},
Expand Down
34 changes: 30 additions & 4 deletions analytical_engine/core/utils/transform_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,9 @@ class TransformUtils<
}
}

if (oid_type == dynamic::Type::kInt64Type) {
if (oid_type == dynamic::Type::kInt32Type) {
return vineyard::TypeToInt<int32_t>::value;
} else if (oid_type == dynamic::Type::kInt64Type) {
return vineyard::TypeToInt<int64_t>::value;
} else if (oid_type == dynamic::Type::kStringType) {
return vineyard::TypeToInt<std::string>::value;
Expand Down Expand Up @@ -1102,7 +1104,16 @@ class TransformUtils<

BOOST_LEAF_AUTO(oid_type, GetOidTypeId());

if (oid_type == vineyard::TypeToInt<int64_t>::value) {
if (oid_type == vineyard::TypeToInt<int32_t>::value) {
typename vineyard::ConvertToArrowType<int32_t>::BuilderType builder;
for (auto& v : inner_vertices) {
ARROW_OK_OR_RAISE(builder.Append(frag_.GetId(v).GetInt32()));
}
std::shared_ptr<typename vineyard::ConvertToArrowType<int32_t>::ArrayType>
ret;
ARROW_OK_OR_RAISE(builder.Finish(&ret));
return std::dynamic_pointer_cast<arrow::Array>(ret);
} else if (oid_type == vineyard::TypeToInt<int64_t>::value) {
typename vineyard::ConvertToArrowType<int64_t>::BuilderType builder;
for (auto& v : inner_vertices) {
ARROW_OK_OR_RAISE(builder.Append(frag_.GetId(v).GetInt64()));
Expand Down Expand Up @@ -1136,7 +1147,15 @@ class TransformUtils<

BOOST_LEAF_AUTO(oid_type, GetOidTypeId());

if (oid_type == vineyard::TypeToInt<int64_t>::value) {
if (oid_type == vineyard::TypeToInt<int32_t>::value) {
auto tensor_builder = std::make_shared<vineyard::TensorBuilder<int32_t>>(
client, shape, part_idx);
for (size_t i = 0; i < vertices.size(); i++) {
tensor_builder->data()[i] = frag_.GetId(vertices[i]).GetInt32();
}
return std::dynamic_pointer_cast<vineyard::ITensorBuilder>(
tensor_builder);
} else if (oid_type == vineyard::TypeToInt<int64_t>::value) {
auto tensor_builder = std::make_shared<vineyard::TensorBuilder<int64_t>>(
client, shape, part_idx);
for (size_t i = 0; i < vertices.size(); i++) {
Expand Down Expand Up @@ -1169,7 +1188,14 @@ class TransformUtils<
client, vertices));
BOOST_LEAF_AUTO(oid_type, GetOidTypeId());

if (oid_type == vineyard::TypeToInt<int64_t>::value) {
if (oid_type == vineyard::TypeToInt<int32_t>::value) {
auto builder =
std::dynamic_pointer_cast<vineyard::TensorBuilder<int32_t>>(
base_builder);
auto tensor = builder->Seal(client);
VY_OK_OR_RAISE(tensor->Persist(client));
return tensor->id();
} else if (oid_type == vineyard::TypeToInt<int64_t>::value) {
auto builder =
std::dynamic_pointer_cast<vineyard::TensorBuilder<int64_t>>(
base_builder);
Expand Down
11 changes: 11 additions & 0 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client,
vy_info.add_fragments(item.second);
}
vy_info.set_generate_eid(graph_info->generate_eid);
vy_info.set_retain_oid(graph_info->retain_oid);
graph_def.mutable_extension()->PackFrom(vy_info);
gs::set_graph_def(frag, graph_def);

Expand Down Expand Up @@ -169,6 +170,15 @@ ToArrowFragment(vineyard::Client& client, const grape::CommSpec& comm_spec,
gs::TransformUtils<gs::DynamicFragment> trans_utils(comm_spec, *dynamic_frag);
BOOST_LEAF_AUTO(oid_type, trans_utils.GetOidTypeId());

if (oid_type == vineyard::TypeToInt<int32_t>::value &&
!std::is_same<oid_t, int32_t>::value &&
!std::is_same<oid_t, int64_t>::value) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"The oid type of DynamicFragment is int32, but the "
"oid type of destination fragment is: " +
std::string(vineyard::type_name<oid_t>()));
}

if (oid_type == vineyard::TypeToInt<int64_t>::value &&
!std::is_same<oid_t, int32_t>::value &&
!std::is_same<oid_t, int64_t>::value) {
Expand Down Expand Up @@ -295,6 +305,7 @@ AddLabelsToGraph(vineyard::ObjectID origin_frag_id,
vy_info.add_fragments(item.second);
}
vy_info.set_generate_eid(graph_info->generate_eid);
vy_info.set_retain_oid(graph_info->retain_oid);
graph_def.mutable_extension()->PackFrom(vy_info);
gs::set_graph_def(frag, graph_def);

Expand Down
1 change: 1 addition & 0 deletions analytical_engine/test/giraph_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ vineyard::ObjectID LoadGiraphFragment(
auto graph = std::make_shared<detail::Graph>();
graph->directed = directed;
graph->generate_eid = false;
graph->retain_oid = false;

auto vertex = std::make_shared<detail::Vertex>();
vertex->label = "label1";
Expand Down
4 changes: 4 additions & 0 deletions analytical_engine/test/run_vy_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,14 @@ int main(int argc, char** argv) {

LOG(INFO) << "[worker-" << comm_spec.worker_id()
<< "] loaded graph to vineyard ... ";
LOG(INFO) << "peek memory: " << vineyard::get_peak_rss_pretty()
<< std::endl;

MPI_Barrier(comm_spec.comm());

Run(client, comm_spec, fragment_id, run_projected, app_name, path_pattern);
LOG(INFO) << "peek memory: " << vineyard::get_peak_rss_pretty()
<< std::endl;

MPI_Barrier(comm_spec.comm());
}
Expand Down
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/cluster_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def vineyard_ipc_socket(self):
return self._sock

def base64_decode(self, string):
return base64.b64decode(string).decode("utf-8")
return base64.b64decode(string).decode("utf-8", errors="ignore")

def get_common_env(self):
def put_if_exists(env: dict, key: str):
Expand Down
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ def start_server(launcher, args):

# register gRPC server
server = grpc.server(
futures.ThreadPoolExecutor(os.cpu_count() or 1),
futures.ThreadPoolExecutor(max(4, os.cpu_count() or 1)),
options=[
("grpc.max_send_message_length", GS_GRPC_MAX_MESSAGE_LENGTH),
("grpc.max_receive_message_length", GS_GRPC_MAX_MESSAGE_LENGTH),
Expand Down
Loading

0 comments on commit 41e93d2

Please sign in to comment.