Skip to content

Commit

Permalink
Add progress bar for loading graphs. (#894)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 authored Oct 20, 2021
1 parent dbe5d0b commit 3b1de8f
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 6 deletions.
45 changes: 43 additions & 2 deletions analytical_engine/core/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class ArrowFragmentLoader {

boost::leaf::result<std::vector<std::shared_ptr<arrow::Table>>>
LoadVertexTables() {
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-READ-VERTEX-0";
std::vector<std::shared_ptr<arrow::Table>> v_tables;
if (!vfiles_.empty()) {
auto load_v_procedure = [&]() {
Expand All @@ -117,11 +119,15 @@ class ArrowFragmentLoader {
vineyard::sync_gs_error(comm_spec_, load_v_procedure));
v_tables = tmp_v;
}
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-READ-VERTEX-100";
return v_tables;
}

boost::leaf::result<std::vector<std::vector<std::shared_ptr<arrow::Table>>>>
LoadEdgeTables() {
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-READ-EDGE-0";
std::vector<std::vector<std::shared_ptr<arrow::Table>>> e_tables;
if (!efiles_.empty()) {
auto load_e_procedure = [&]() {
Expand All @@ -140,6 +146,8 @@ class ArrowFragmentLoader {
vineyard::sync_gs_error(comm_spec_, load_e_procedure));
e_tables = tmp_e;
}
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-READ-EDGE-100";
return e_tables;
}

Expand All @@ -161,6 +169,8 @@ class ArrowFragmentLoader {
BOOST_LEAF_AUTO(partial_v_tables, LoadVertexTables());
BOOST_LEAF_AUTO(partial_e_tables, LoadEdgeTables());

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-0";
auto frag = std::static_pointer_cast<vineyard::ArrowFragment<oid_t, vid_t>>(
client_.GetObject(frag_id));
auto schema = frag->schema();
Expand Down Expand Up @@ -192,6 +202,10 @@ class ArrowFragmentLoader {
BOOST_LEAF_CHECK(
basic_fragment_loader->ConstructVertices(old_vm_ptr->id()));

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-0";
partial_v_tables.clear();
vertex_tables_with_label.clear();

Expand All @@ -213,19 +227,29 @@ class ArrowFragmentLoader {

BOOST_LEAF_CHECK(basic_fragment_loader->ConstructEdges(
schema.all_edge_label_num(), schema.all_vertex_label_num()));
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-SEAL-0";
return basic_fragment_loader->AddVerticesAndEdgesToFragment(frag);
}

boost::leaf::result<vineyard::ObjectID> addVertices(
vineyard::ObjectID frag_id) {
BOOST_LEAF_AUTO(partitioner, initPartitioner());
BOOST_LEAF_AUTO(partial_v_tables, LoadVertexTables());
// For printing the progress report stub
BOOST_LEAF_CHECK(LoadEdgeTables());

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-0";

auto basic_fragment_loader = std::make_shared<
vineyard::BasicEVFragmentLoader<OID_T, VID_T, partitioner_t>>(
client_, comm_spec_, partitioner, directed_, true, generate_eid_);
auto frag = std::static_pointer_cast<vineyard::ArrowFragment<oid_t, vid_t>>(
client_.GetObject(frag_id));

for (auto table : partial_v_tables) {
auto meta = table->schema()->metadata();
if (meta == nullptr) {
Expand All @@ -247,7 +271,14 @@ class ArrowFragmentLoader {

BOOST_LEAF_CHECK(
basic_fragment_loader->ConstructVertices(frag->GetVertexMap()->id()));

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-0";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-SEAL-0";
return basic_fragment_loader->AddVerticesToFragment(frag);
}

Expand All @@ -260,6 +291,9 @@ class ArrowFragmentLoader {
BOOST_LEAF_AUTO(partial_v_tables, LoadVertexTables());
BOOST_LEAF_AUTO(partial_e_tables, LoadEdgeTables());

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-0";

BOOST_LEAF_AUTO(v_e_tables, preprocessInputs(partitioner, partial_v_tables,
partial_e_tables));

Expand All @@ -277,6 +311,10 @@ class ArrowFragmentLoader {
basic_fragment_loader->AddVertexTable(pair.first, pair.second));
}
BOOST_LEAF_CHECK(basic_fragment_loader->ConstructVertices());
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-0";

partial_v_tables.clear();
vertex_tables_with_label.clear();
Expand All @@ -289,7 +327,10 @@ class ArrowFragmentLoader {
edge_tables_with_label.clear();

BOOST_LEAF_CHECK(basic_fragment_loader->ConstructEdges());

LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-100";
LOG_IF(INFO, comm_spec_.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-SEAL-0";
return basic_fragment_loader->ConstructFragment();
}

Expand Down
8 changes: 4 additions & 4 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ void LoadGraph(
BOOST_LEAF_AUTO(frag_group_id, loader.LoadFragmentAsFragmentGroup());
MPI_Barrier(comm_spec.comm());

LOG(INFO) << "[worker-" << comm_spec.worker_id()
<< "] loaded graph to vineyard ...";
LOG_IF(INFO, comm_spec.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-SEAL-100";

MPI_Barrier(comm_spec.comm());
{
Expand Down Expand Up @@ -288,8 +288,8 @@ void AddLabelsToGraph(
loader.AddLabelsToGraphAsFragmentGroup(frag_id));
MPI_Barrier(comm_spec.comm());

LOG(INFO) << "[worker-" << comm_spec.worker_id()
<< "] Add labels to graph and loaded to vineyard ...";
LOG_IF(INFO, comm_spec.worker_id() == 0)
<< "PROGRESS--GRAPH-LOADING-SEAL-100";

auto fg = std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client.GetObject(frag_group_id));
Expand Down
46 changes: 46 additions & 0 deletions coordinator/gscoordinator/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,30 @@
# limitations under the License.
#

import sys
import threading
from queue import Queue

from tqdm import tqdm


class LoadingProgressTracker:
progbar = None
cur_stub = 0

stubs = [
"PROGRESS--GRAPH-LOADING-READ-VERTEX-0",
"PROGRESS--GRAPH-LOADING-READ-VERTEX-100",
"PROGRESS--GRAPH-LOADING-READ-EDGE-0",
"PROGRESS--GRAPH-LOADING-READ-EDGE-100",
"PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-0",
"PROGRESS--GRAPH-LOADING-CONSTRUCT-VERTEX-100",
"PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-0",
"PROGRESS--GRAPH-LOADING-CONSTRUCT-EDGE-100",
"PROGRESS--GRAPH-LOADING-SEAL-0",
"PROGRESS--GRAPH-LOADING-SEAL-100",
]


class StdStreamWrapper(object):
def __init__(self, std_stream, queue=None, drop=True):
Expand All @@ -41,6 +62,9 @@ def drop(self, drop=True):
self._drop = drop

def write(self, line):
line = self._filter_progress(line)
if line is None:
return
line = line.encode("ascii", "ignore").decode("ascii")
self._stream_backup.write(line)
if not self._drop:
Expand All @@ -52,6 +76,28 @@ def flush(self):
def poll(self, block=True, timeout=None):
return self._lines.get(block=block, timeout=timeout)

def _show_progress(self):
total = len(LoadingProgressTracker.stubs)
if LoadingProgressTracker.progbar is None:
LoadingProgressTracker.progbar = tqdm(
desc="Loading Graph", total=total, file=sys.stderr
)
LoadingProgressTracker.progbar.update(1)
LoadingProgressTracker.cur_stub += 1
if LoadingProgressTracker.cur_stub == total:
LoadingProgressTracker.cur_stub = 0
LoadingProgressTracker.progbar.close()
LoadingProgressTracker.progbar = None
sys.stderr.write("\n")
sys.stderr.flush()

def _filter_progress(self, line):
# print('show_progress: ', len(line), ", ", line)
if "PROGRESS--GRAPH" not in line:
return line
self._show_progress(line)
return None


class PipeWatcher(object):
def __init__(self, pipe, sink, queue=None, drop=True):
Expand Down

0 comments on commit 3b1de8f

Please sign in to comment.