Skip to content

Commit

Permalink
Provide a method to print human readable schema for Context. (#734)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Aug 27, 2021
1 parent 434b872 commit 741ccfe
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 6 deletions.
7 changes: 7 additions & 0 deletions analytical_engine/core/context/i_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ class IContextWrapper : public GSObject {

virtual std::string context_type() = 0;

// Return the schema of context, in human readable format.
// This is consistent with the syntax of selector.
// For simplicity, only return those parts that cannot be known
// from client.
// Those context who need it may override this method.
virtual std::string schema() { return ""; }

virtual std::shared_ptr<IFragmentWrapper> fragment_wrapper() = 0;
};

Expand Down
16 changes: 16 additions & 0 deletions analytical_engine/core/context/labeled_vertex_property_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <map>
#include <memory>
#include <sstream>
#include <string>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -180,6 +181,21 @@ class LabeledVertexPropertyContextWrapper
return CONTEXT_TYPE_LABELED_VERTEX_PROPERTY;
}

std::string schema() override {
auto frag = ctx_->fragment();
auto label_num = frag.vertex_label_num();
std::ostringstream os;
for (int i = 0; i < label_num; ++i) {
os << i << ":";
auto property_map = ctx_->properties_map()[i];
for (auto& pair : property_map) {
os << pair.first + ",";
}
os << "\n";
}
return os.str();
}

std::shared_ptr<IFragmentWrapper> fragment_wrapper() override {
return frag_wrapper_;
}
Expand Down
9 changes: 9 additions & 0 deletions analytical_engine/core/context/vertex_property_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@ class VertexPropertyContextWrapper : public IVertexPropertyContextWrapper {

std::string context_type() override { return CONTEXT_TYPE_VERTEX_PROPERTY; }

std::string schema() override {
std::ostringstream os;
auto property_map = ctx_->properties_map();
for (auto& pair : property_map) {
os << pair.first + ",";
}
return os.str();
}

std::shared_ptr<IFragmentWrapper> fragment_wrapper() override {
return frag_wrapper_;
}
Expand Down
7 changes: 5 additions & 2 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,15 @@ bl::result<std::string> GrapeInstance::query(const rpc::GSParams& params,
BOOST_LEAF_AUTO(ctx_wrapper,
app->Query(worker.get(), query_args, context_key, wrapper));
std::string context_type;
std::string context_schema;
if (ctx_wrapper != nullptr) {
context_type = ctx_wrapper->context_type();
context_schema = ctx_wrapper->schema();
BOOST_LEAF_CHECK(object_manager_.PutObject(ctx_wrapper));
}

return toJson({{"context_type", context_type}, {"context_key", context_key}});
return toJson({{"context_type", context_type},
{"context_key", context_key},
{"context_schema", context_schema}});
}

bl::result<std::string> GrapeInstance::reportGraph(
Expand Down
4 changes: 2 additions & 2 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,15 @@ def run_on_analytical_engine( # noqa: C901
message = "Analytical engine exited with %s" % self._launcher.poll()
else:
message = str(e)
op_results.extend(response.results)
# op_results.extend(response.results)
return self._make_response(
message_pb2.RunStepResponse,
error_codes_pb2.FATAL_ERROR,
message,
results=op_results,
)
except Exception as e:
op_results.extend(response.results)
# op_results.extend(response.results)
return self._make_response(
message_pb2.RunStepResponse,
error_codes_pb2.UNKNOWN,
Expand Down
2 changes: 1 addition & 1 deletion python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _rebuild_context(self, seq, op: Operation, op_result: op_def_pb2.OpResult):
# for nx
return DynamicVertexDataContext(context_dag_node, ret["context_key"])
else:
return Context(context_dag_node, ret["context_key"])
return Context(context_dag_node, ret["context_key"], ret["context_schema"])

def _rebuild_gremlin_results(
self, seq, op: Operation, op_result: op_def_pb2.OpResult
Expand Down
89 changes: 88 additions & 1 deletion python/graphscope/framework/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ def _check_selector(self, selector):
def context_type(self):
raise NotImplementedError()

def _build_schema(self, result_properties):
raise NotImplementedError()

def to_numpy(self, selector, vertex_range=None, axis=0):
"""Get the context data as a numpy array.
Expand Down Expand Up @@ -248,6 +251,9 @@ class TensorContextDAGNode(BaseContextDAGNode):
def context_type(self):
return "tensor"

def _build_schema(self, result_properties):
return "axis"

def _check_selector(self, selector):
return True

Expand Down Expand Up @@ -275,6 +281,10 @@ class VertexDataContextDAGNode(BaseContextDAGNode):
def context_type(self):
return "vertex_data"

def _build_schema(self, result_properties):
ret = {"v": ["id", "data"], "e": ["src", "dst", "data"], "r": []}
return json.dumps(ret, indent=4)

def _check_selector(self, selector):
"""
Raises:
Expand Down Expand Up @@ -332,6 +342,15 @@ class LabeledVertexDataContextDAGNode(BaseContextDAGNode):
def context_type(self):
return "labeled_vertex_data"

def _build_schema(self, result_properties):
schema = self._graph.schema
ret = {
"v": _get_property_v_context_schema_str(schema),
"e": _get_property_e_context_schema_str(schema),
"r": schema.vertex_labels,
}
return json.dumps(ret, indent=4)

def _check_selector(self, selector):
"""
Raises:
Expand Down Expand Up @@ -383,6 +402,20 @@ class VertexPropertyContextDAGNode(BaseContextDAGNode):
def context_type(self):
return "vertex_property"

def _build_schema(self, result_properties):
"""Build context schema.
Args:
result_properties (str): Returned by c++,
example_format(str): "id,name,age,"
Returns:
str: return schema as human readable string
"""
result_properties = [i for i in result_properties.split(",") if i]
ret = {"v": ["id", "data"], "e": ["src", "dst", "data"], "r": result_properties}
return json.dumps(ret, indent=4)

def _check_selector(self, selector):
"""
Raises:
Expand Down Expand Up @@ -442,6 +475,35 @@ class LabeledVertexPropertyContextDAGNode(BaseContextDAGNode):
def context_type(self):
return "labeled_vertex_property"

def _build_schema(self, result_properties):
"""Build context schema.
Args:
result_properties (str): Returned by c++,
example_format:
0:a,b,c,
1:e,f,g,
Returns:
str: return schema as human readable string
"""
schema = self._graph.schema
ret = {
"v": _get_property_v_context_schema_str(schema),
"e": _get_property_e_context_schema_str(schema),
"r": {},
}
result_properties = [i for i in result_properties.split("\n") if i]
label_property_dict = {}
for r_props in result_properties:
label_id, props = r_props.split(":")
label_property_dict[label_id] = [i for i in props.split(",") if i]
for label in schema.vertex_labels:
label_id = schema.get_vertex_label_id(label)
props = label_property_dict.get(label_id, [])
ret["r"][label] = props
return json.dumps(ret, indent=4)

def _check_selector(self, selector):
if selector is None:
raise InvalidArgumentError(
Expand All @@ -467,11 +529,12 @@ class Context(object):
and can be referenced through a handle.
"""

def __init__(self, context_node, key):
def __init__(self, context_node, key, result_schema):
self._context_node = context_node
self._session = context_node.session
self._graph = self._context_node._graph
self._key = key
self._result_schema = result_schema
# copy and set op evaluated
self._context_node.op = deepcopy(self._context_node.op)
self._context_node.evaluated = True
Expand All @@ -490,6 +553,10 @@ def key(self):
def context_type(self):
return self._context_node.context_type

@property
def schema(self):
return self._context_node._build_schema(self._result_schema)

@property
def signature(self):
"""Compute digest by key and graph signatures.
Expand Down Expand Up @@ -617,3 +684,23 @@ def create_context_node(context_type, bound_app, graph, *args, **kwargs):
else:
# dynamic_vertex_data for networkx
return BaseContextDAGNode(bound_app, graph, *args, **kwargs)


def _get_property_v_context_schema_str(schema):
ret = {}
for label in schema.vertex_labels:
ret[label] = ["id"]
for prop in schema.get_vertex_properties(label):
if prop.name != "id": # avoid property name duplicate
ret[label].append(prop.name)
return ret


def _get_property_e_context_schema_str(schema):
ret = {}
for label in schema.edge_labels:
ret[label] = ["src", "dst"]
for prop in schema.get_edge_properties(label):
if prop.name not in ("src", "dst"):
ret[label].append(prop.name)
return ret

0 comments on commit 741ccfe

Please sign in to comment.