Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add progress bar for loading graphs. #894

Merged
merged 10 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions analytical_engine/core/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class ArrowFragmentLoader {

boost::leaf::result<std::vector<std::shared_ptr<arrow::Table>>>
LoadVertexTables() {
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-READ-VERTEX-0";
std::vector<std::shared_ptr<arrow::Table>> v_tables;
if (!vfiles_.empty()) {
auto load_v_procedure = [&]() {
Expand All @@ -117,11 +119,15 @@ class ArrowFragmentLoader {
vineyard::sync_gs_error(comm_spec_, load_v_procedure));
v_tables = tmp_v;
}
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-READ-VERTEX-100";
return v_tables;
}

boost::leaf::result<std::vector<std::vector<std::shared_ptr<arrow::Table>>>>
LoadEdgeTables() {
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-READ-EDGE-0";
std::vector<std::vector<std::shared_ptr<arrow::Table>>> e_tables;
if (!efiles_.empty()) {
auto load_e_procedure = [&]() {
Expand All @@ -140,6 +146,8 @@ class ArrowFragmentLoader {
vineyard::sync_gs_error(comm_spec_, load_e_procedure));
e_tables = tmp_e;
}
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-READ-EDGE-100";
return e_tables;
}

Expand All @@ -161,6 +169,8 @@ class ArrowFragmentLoader {
BOOST_LEAF_AUTO(partial_v_tables, LoadVertexTables());
BOOST_LEAF_AUTO(partial_e_tables, LoadEdgeTables());

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-0";
auto frag = std::static_pointer_cast<vineyard::ArrowFragment<oid_t, vid_t>>(
client_.GetObject(frag_id));
auto schema = frag->schema();
Expand Down Expand Up @@ -192,6 +202,10 @@ class ArrowFragmentLoader {
BOOST_LEAF_CHECK(
basic_fragment_loader->ConstructVertices(old_vm_ptr->id()));

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-0";
partial_v_tables.clear();
vertex_tables_with_label.clear();

Expand All @@ -213,19 +227,29 @@ class ArrowFragmentLoader {

BOOST_LEAF_CHECK(basic_fragment_loader->ConstructEdges(
schema.all_edge_label_num(), schema.all_vertex_label_num()));
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-SEAL-0";
return basic_fragment_loader->AddVerticesAndEdgesToFragment(frag);
}

boost::leaf::result<vineyard::ObjectID> addVertices(
vineyard::ObjectID frag_id) {
BOOST_LEAF_AUTO(partitioner, initPartitioner());
BOOST_LEAF_AUTO(partial_v_tables, LoadVertexTables());
// For printing the progress report stub
BOOST_LEAF_CHECK(LoadEdgeTables());

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-0";

auto basic_fragment_loader = std::make_shared<
vineyard::BasicEVFragmentLoader<OID_T, VID_T, partitioner_t>>(
client_, comm_spec_, partitioner, directed_, true, generate_eid_);
auto frag = std::static_pointer_cast<vineyard::ArrowFragment<oid_t, vid_t>>(
client_.GetObject(frag_id));

for (auto table : partial_v_tables) {
auto meta = table->schema()->metadata();
if (meta == nullptr) {
Expand All @@ -247,7 +271,14 @@ class ArrowFragmentLoader {

BOOST_LEAF_CHECK(
basic_fragment_loader->ConstructVertices(frag->GetVertexMap()->id()));

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-0";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-SEAL-0";
return basic_fragment_loader->AddVerticesToFragment(frag);
}

Expand All @@ -260,6 +291,9 @@ class ArrowFragmentLoader {
BOOST_LEAF_AUTO(partial_v_tables, LoadVertexTables());
BOOST_LEAF_AUTO(partial_e_tables, LoadEdgeTables());

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-0";

BOOST_LEAF_AUTO(v_e_tables, preprocessInputs(partitioner, partial_v_tables,
partial_e_tables));

Expand All @@ -277,6 +311,10 @@ class ArrowFragmentLoader {
basic_fragment_loader->AddVertexTable(pair.first, pair.second));
}
BOOST_LEAF_CHECK(basic_fragment_loader->ConstructVertices());
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-0";

partial_v_tables.clear();
vertex_tables_with_label.clear();
Expand All @@ -289,7 +327,10 @@ class ArrowFragmentLoader {
edge_tables_with_label.clear();

BOOST_LEAF_CHECK(basic_fragment_loader->ConstructEdges());

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-SEAL-0";
return basic_fragment_loader->ConstructFragment();
}

Expand Down
8 changes: 4 additions & 4 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ void LoadGraph(
BOOST_LEAF_AUTO(frag_group_id, loader.LoadFragmentAsFragmentGroup());
MPI_Barrier(comm_spec.comm());

LOG(INFO) << "[worker-" << comm_spec.worker_id()
<< "] loaded graph to vineyard ...";
LOG_IF(INFO, comm_spec.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-SEAL-100";

MPI_Barrier(comm_spec.comm());
{
Expand Down Expand Up @@ -288,8 +288,8 @@ void AddLabelsToGraph(
loader.AddLabelsToGraphAsFragmentGroup(frag_id));
MPI_Barrier(comm_spec.comm());

LOG(INFO) << "[worker-" << comm_spec.worker_id()
<< "] Add labels to graph and loaded to vineyard ...";
LOG_IF(INFO, comm_spec.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-SEAL-100";

auto fg = std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client.GetObject(frag_group_id));
Expand Down
46 changes: 46 additions & 0 deletions coordinator/gscoordinator/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,30 @@
# limitations under the License.
#

import sys
import threading
from queue import Queue

from tqdm import tqdm


class LoadingProgressTracker:
progbar = None
cur_stub = 0

stubs = [
"PROGRESS--GRAPH-LOADING-READ-VERTEX-0",
"PROGRESS--GRAPH-LOADING-READ-VERTEX-100",
"PROGRESS--GRAPH-LOADING-READ-EDGE-0",
"PROGRESS--GRAPH-LOADING-READ-EDGE-100",
"PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-0",
"PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-100",
"PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-0",
"PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-100",
"PROGRESS--GRAPH-LOADING-SEAL-0",
"PROGRESS--GRAPH-LOADING-SEAL-100",
]


class StdStreamWrapper(object):
def __init__(self, std_stream, queue=None, drop=True):
Expand All @@ -41,6 +62,9 @@ def drop(self, drop=True):
self._drop = drop

def write(self, line):
line = self._filter_progress(line)
if line is None:
return
line = line.encode("ascii", "ignore").decode("ascii")
self._stream_backup.write(line)
if not self._drop:
Expand All @@ -52,6 +76,28 @@ def flush(self):
def poll(self, block=True, timeout=None):
return self._lines.get(block=block, timeout=timeout)

def _show_progress(self):
total = len(LoadingProgressTracker.stubs)
if LoadingProgressTracker.progbar is None:
LoadingProgressTracker.progbar = tqdm(
desc="Loading Graph", total=total, file=sys.stderr
)
LoadingProgressTracker.progbar.update(1)
LoadingProgressTracker.cur_stub += 1
if LoadingProgressTracker.cur_stub == total:
LoadingProgressTracker.cur_stub = 0
LoadingProgressTracker.progbar.close()
LoadingProgressTracker.progbar = None
sys.stderr.write("\n")
sys.stderr.flush()

def _filter_progress(self, line):
# print('show_progress: ', len(line), ", ", line)
if "PROGRESS--GRAPH" not in line:
return line
self._show_progress(line)
return None


class PipeWatcher(object):
def __init__(self, pipe, sink, queue=None, drop=True):
Expand Down