diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index 04c282c3fe36..ce961ef29c85 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -30,6 +30,7 @@ from graphscope.config import Config from graphscope.proto import coordinator_service_pb2_grpc +from gscoordinator.servicer import init_interactive_service_servicer from gscoordinator.servicer import init_graphscope_one_service_servicer from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH @@ -109,6 +110,7 @@ def get_servicer(config: Config): """Get servicer of specified solution under FLEX architecture""" service_initializers = { "GraphScope One": init_graphscope_one_service_servicer, + "Interactive": init_interactive_service_servicer, } initializer = service_initializers.get(config.solution) diff --git a/coordinator/gscoordinator/scheduler.py b/coordinator/gscoordinator/scheduler.py new file mode 100644 index 000000000000..eb4040f6b79d --- /dev/null +++ b/coordinator/gscoordinator/scheduler.py @@ -0,0 +1,250 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import datetime +import json +import time +from abc import ABCMeta +from abc import abstractmethod + +import schedule +from graphscope.framework.utils.py import random_string +from schedule import CancelJob + +from gscoordinator.stoppable_thread import StoppableThread +from gscoordinator.utils import decode_datetimestr + + +class Schedule(object): + """Schedule class that wrapper dbader schedule + + Repo: https://github.com/dbader/schedule. + """ + + def __init__(self): + self._schedule = schedule.Scheduler() + self._run_pending_thread = StoppableThread(target=self.run_pending, args=()) + self._run_pending_thread.daemon = True + self._run_pending_thread.start() + + @property + def schedule(self): + return self._schedule + + def run_pending(self): + """Run all jobs that are scheduled to run.""" + while True: + self._schedule.run_pending() + time.sleep(1) + + +schedule = Schedule().schedule # noqa: F811 + + +class Scheduler(metaclass=ABCMeta): + """ + Objects instantiated by the :class:`Scheduler ` are + factories to create jobs, keep record of scheduled jobs and + handle their execution in the :method:`run` method. + """ + + def __init__(self, at_time, repeat): + # scheduler id + self._scheduler_id = "Job-{0}".format(random_string(16)).upper() + # periodic job as used + self._job = None + # true will be run immediately + self._run_now = False + # time at which this job to schedule + self._at_time = self._decode_datetimestr(at_time) + # repeat every day or week, or run job once(no repeat) + # optional value "day", "week", "null" + self._repeat = repeat + # job running thread, note that: + # the last job should be end of execution at the beginning of the next job + self._running_thread = None + # tags + self._tags = [] + + # when the job actually scheduled, the following variables will be generated and overridden. + self._jobid = None + self._last_run = None + + def _decode_datetimestr(self, datetime_str): + if datetime_str == "now": + self._run_now = True + return datetime.datetime.now() + return decode_datetimestr(datetime_str) + + def __str__(self): + return "Scheduler(at_time={}, repeat={})".format(self._at_time, self._repeat) + + @property + def monday(self): + return self._at_time.weekday() == 0 + + @property + def tuesday(self): + return self._at_time.weekday() == 1 + + @property + def wednesday(self): + return self._at_time.weekday() == 2 + + @property + def thursday(self): + return self._at_time.weekday() == 3 + + @property + def friday(self): + return self._at_time.weekday() == 4 + + @property + def saturday(self): + return self._at_time.weekday() == 5 + + @property + def sunday(self): + return self._at_time.weekday() == 6 + + @property + def timestr(self): + """return str of the time object. + time([hour[, minute[, second[, microsecond[, tzinfo]]]]]) --> a time object + """ + return str(self._at_time.time()) + + @property + def job(self): + """A periodic job managed by the dbader scheduler. + https://github.com/dbader/schedule. + """ + return self._job + + @property + def jobid(self): + """id for the last scheduled job""" + return self._jobid + + @property + def schedulerid(self): + """id for the scheduler""" + return self._scheduler_id + + @property + def last_run(self): + """datetime of the last run""" + return self._last_run + + @property + def tags(self): + return self._tags + + @property + def running_thread(self): + return self._running_thread + + def run_once(self): + """Run the job immediately.""" + self.do_run() + return CancelJob + + def waiting_until_to_run(self): + """Run the job once at a specific time.""" + if datetime.datetime.now() >= self._at_time: + return self.run_once() + + def do_run(self): + """Start a thread for the job.""" + # overwrite for each scheduled job + self._jobid = "job-{0}".format(random_string(16)).upper() + self._last_run = datetime.datetime.now() + # schedule in a thread + self._running_thread = StoppableThread(target=self.run, args=()) + self._running_thread.daemon = True + self._running_thread.start() + + def submit(self): + if not self._run_now and self._repeat not in ["week", "day", "null", None]: + raise RuntimeError( + "Submit schedule job failed: at_time is '{0}', repeat is '{1}'".format( + self._at_time, self._repeat + ) + ) + + if self._run_now: + self._job = schedule.every().seconds.do(self.run_once) + + if not self._run_now and self._repeat == "week": + if self.monday: + self._job = schedule.every().monday.at(self.timestr).do(self.do_run) + elif self.tuesday: + self._job = schedule.every().tuesday.at(self.timestr).do(self.do_run) + elif self.wednesday: + self._job = schedule.every().wednesday.at(self.timestr).do(self.do_run) + elif self.thursday: + self._job = schedule.every().thursday.at(self.timestr).do(self.do_run) + elif self.friday: + self._job = schedule.every().friday.at(self.timestr).do(self.do_run) + elif self.saturday: + self._job = schedule.every().saturday.at(self.timestr).do(self.do_run) + elif self.sunday: + self._job = schedule.every().sunday.at(self.timestr).do(self.do_run) + + if not self._run_now and self._repeat == "day": + self._job = schedule.every().day.at(self.timestr).do(self.do_run) + + if not self._run_now and self._repeat in ["null", None]: + self._job = ( + schedule.every().day.at(self.timestr).do(self.waiting_until_to_run) + ) + + # tag + self._job.tag(self._scheduler_id, *self._tags) + + def cancel(self): + """ + Set the running job thread stoppable and wait for the + thread to exit properly by using join() method. + """ + if self._running_thread is not None and self._running_thread.is_alive(): + self._running_thread.stop() + self._running_thread.join() + + @abstractmethod + def run(self): + """ + Methods that all subclasses need to implement, note that + subclass needs to handle exception by itself. + """ + raise NotImplementedError + + +def cancel_job(job, delete_scheduler=True): + """ + Cancel the job which going to scheduled or cancel the whole scheduler. + + Args: + job: Periodic job as used by :class:`Scheduler`. + delete_scheduler: True will can the whole scheduler, otherwise, + delay the next-run time by on period. + """ + if delete_scheduler: + schedule.cancel_job(job) + else: + job.next_run += job.period diff --git a/coordinator/gscoordinator/servicer/__init__.py b/coordinator/gscoordinator/servicer/__init__.py index 9cc0c51d02ed..9b169d1e23d0 100644 --- a/coordinator/gscoordinator/servicer/__init__.py +++ b/coordinator/gscoordinator/servicer/__init__.py @@ -16,4 +16,5 @@ # limitations under the License. # +from gscoordinator.servicer.interactive.service import * from gscoordinator.servicer.graphscope_one.service import * diff --git a/coordinator/gscoordinator/servicer/base_service.py b/coordinator/gscoordinator/servicer/base_service.py new file mode 100644 index 000000000000..895ea53fb3dd --- /dev/null +++ b/coordinator/gscoordinator/servicer/base_service.py @@ -0,0 +1,45 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import atexit +import logging + +from graphscope.config import Config +from graphscope.proto import coordinator_service_pb2_grpc +from graphscope.proto import message_pb2 + + +class BaseServiceServicer(coordinator_service_pb2_grpc.CoordinatorServiceServicer): + """Base class of coordinator service""" + + def __init__(self, config: Config): + self._config = config + atexit.register(self.cleanup) + + def __del__(self): + self.cleanup() + + def Connect(self, request, context): + return message_pb2.ConnectResponse(solution=self._config.solution) + + @property + def launcher_type(self): + return self._config.launcher_type + + def cleanup(self): + pass diff --git a/coordinator/gscoordinator/servicer/interactive/__init__.py b/coordinator/gscoordinator/servicer/interactive/__init__.py new file mode 100644 index 000000000000..3a8204719d16 --- /dev/null +++ b/coordinator/gscoordinator/servicer/interactive/__init__.py @@ -0,0 +1,26 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys + +try: + sys.path.insert(0, os.path.dirname(__file__)) + import interactive_client +except ImportError: + raise diff --git a/coordinator/gscoordinator/servicer/interactive/service.py b/coordinator/gscoordinator/servicer/interactive/service.py new file mode 100644 index 000000000000..f65e82064e8f --- /dev/null +++ b/coordinator/gscoordinator/servicer/interactive/service.py @@ -0,0 +1,230 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Interactive Service under FLEX Architecture""" + +import itertools +import logging +import os +import threading + +import interactive_client +from google.protobuf.json_format import MessageToDict +from graphscope.config import Config +from graphscope.gsctl.utils import dict_to_proto_message +from graphscope.proto import error_codes_pb2 +from graphscope.proto import interactive_pb2 + +from gscoordinator.servicer.base_service import BaseServiceServicer +from gscoordinator.utils import WORKSPACE +from gscoordinator.utils import delegate_command_to_pod +from gscoordinator.utils import run_kube_cp_command + +__all__ = ["InteractiveServiceServicer", "init_interactive_service_servicer"] + +logger = logging.getLogger("graphscope") + + +# host +INTERACTIVE_HOST = ( + os.environ["INTERACTIVE_HOST"] + if "INTERACTIVE_HOST" in os.environ + else "http://127.0.0.1:8080" +) + + +# There are two workspaces for FLEX Interactive, one residing on "coordinator" node +# and the other on the "interactive" node. These two workspaces are equivalent when +# running mode is "hosts". Hence, INTERACTIVE_WORKSPACE is only effective in +# Kubernetes (k8s) environment. +INTERACTIVE_WORKSPACE = ( + os.environ["INTERACTIVE_WORKSPACE"] + if "INTERACTIVE_WORKSPACE" in os.environ + else "/tmp" +) + + +INTERACTIVE_CONTAINER_NAME = "interactive" + + +class InteractiveServiceServicer(BaseServiceServicer): + """Interactive service under flex architecture.""" + + def __init__(self, config: Config): + super().__init__(config) + + # lock to protect the service + self._lock = threading.RLock() + + # get interactive service endpoint by instance id + self._interactive_client_configuration = interactive_client.Configuration( + host=INTERACTIVE_HOST + ) + + # interactive pod list + self._interactive_pod_list = [] + + def cleanup(self): + pass + + def write_and_distribute_file(self, graph, location, raw_data): + # format: _ + filename = "{0}_{1}".format(graph, os.path.basename(location)) + + # write temp file + tmp_file = os.path.join(WORKSPACE, filename) + with open(tmp_file, "wb") as f: + f.write(raw_data) + + # distribute + target_file = tmp_file + if self.launcher_type == "k8s": + # update file path in interactive pod + target_file = os.path.join(INTERACTIVE_WORKSPACE, filename) + + for pod in self._interactive_pod_list: + container = INTERACTIVE_CONTAINER_NAME + cmd = f"mkdir -p {os.path.dirname(target_file)}" + logger.debug(delegate_command_to_pod(cmd, pod, container)) + logger.debug( + run_kube_cp_command(tmp_file, target_file, pod, container, True) + ) + + return target_file + + def CreateInteractiveGraph(self, request, context): + with interactive_client.ApiClient( + self._interactive_client_configuration + ) as api_client: + # create an instance of the API class + api_instance = interactive_client.GraphApi(api_client) + + graph_def_dict = MessageToDict( + request.graph_def, preserving_proto_field_name=True + ) + graph = interactive_client.Graph.from_dict(graph_def_dict) + + try: + api_response = api_instance.create_graph(graph) + except Exception as e: + logger.warning("Failed to create interactive graph. %s", str(e)) + return interactive_pb2.ApiResponse( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + else: + return interactive_pb2.ApiResponse( + code=error_codes_pb2.OK, error_msg=api_response.message + ) + + def RemoveInteractiveGraph(self, request, context): + with interactive_client.ApiClient( + self._interactive_client_configuration + ) as api_client: + api_instance = interactive_client.GraphApi(api_client) + + try: + api_response = api_instance.delete_graph(request.graph_name) + except Exception as e: + logger.warning("Failed to remove interactive graph. %s", str(e)) + return interactive_pb2.ApiResponse( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + else: + return interactive_pb2.ApiResponse( + code=error_codes_pb2.OK, error_msg=api_response.message + ) + + def ListInteractiveGraph(self, request, context): + with interactive_client.ApiClient( + self._interactive_client_configuration + ) as api_client: + api_instance = interactive_client.GraphApi(api_client) + + try: + api_response = api_instance.list_graphs() + except Exception as e: + logger.warning("Failed to list interactive graph. %s", str(e)) + return interactive_pb2.ApiResponse( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + else: + return interactive_pb2.ListInteractiveGraphResponse( + code=error_codes_pb2.OK, + graphs=[ + dict_to_proto_message(g.to_dict(), interactive_pb2.GraphProto()) + for g in api_response + ], + ) + + def ImportInteractiveGraph(self, request, context): + # write raw data to file and copy to interactive workspace + try: + schema_mapping_proto = request.schema_mapping + for mapping in itertools.chain( + schema_mapping_proto.vertex_mappings, schema_mapping_proto.edge_mappings + ): + raw_data_index = 0 + for index, location in enumerate(mapping.inputs): + if location.startswith("@"): + # write raw data and distribute file to interactive workspace + new_location = self.write_and_distribute_file( + schema_mapping_proto.graph, + location, + mapping.raw_data[raw_data_index], + ) + raw_data_index += 1 + # update the location + mapping.inputs[index] = new_location + except Exception as e: + logger.warning("Failed to distribute file. %s", str(e)) + return interactive_pb2.ApiResponse( + code=error_codes_pb2.NETWORK_ERROR, error_msg=str(e) + ) + + with interactive_client.ApiClient( + self._interactive_client_configuration + ) as api_client: + # create an instance of the API class + api_instance = interactive_client.DataloadingApi(api_client) + + schema_mapping_dict = MessageToDict( + schema_mapping_proto, preserving_proto_field_name=True + ) + graph_name = schema_mapping_dict["graph"] + print(schema_mapping_dict) + schema_mapping = interactive_client.SchemaMapping.from_dict( + schema_mapping_dict + ) + + try: + api_response = api_instance.create_dataloading_job( + graph_name, schema_mapping + ) + except Exception as e: + logger.warning("Failed to create dataloading job. %s", str(e)) + return interactive_pb2.ApiResponse( + code=error_codes_pb2.API_EXCEPTION_ERROR, error_msg=str(e) + ) + else: + return interactive_pb2.ApiResponse( + code=error_codes_pb2.OK, error_msg=api_response.message + ) + + +def init_interactive_service_servicer(config: Config): + return InteractiveServiceServicer(config) diff --git a/coordinator/gscoordinator/stoppable_thread.py b/coordinator/gscoordinator/stoppable_thread.py new file mode 100644 index 000000000000..96a072e9f5a7 --- /dev/null +++ b/coordinator/gscoordinator/stoppable_thread.py @@ -0,0 +1,39 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import threading + + +class StoppableThread(threading.Thread): + """ + This is one of the simplest mechaisms for a stoppable thread to + hold an 'exit_request' flag that each thread checks on a regular + interval to see if it is time for it to exit. + """ + + def __init__(self, *args, **kwargs): + super(StoppableThread, self).__init__(*args, **kwargs) + self._stop_event = threading.Event() + + def stop(self): + self._stop_event.set() + + def stopped(self): + """The thread itself should check regularly for the stopped() condition. + """ + return self._stop_event.is_set() diff --git a/coordinator/gscoordinator/utils.py b/coordinator/gscoordinator/utils.py index f39d99f2b036..7b7568ab4342 100644 --- a/coordinator/gscoordinator/utils.py +++ b/coordinator/gscoordinator/utils.py @@ -219,6 +219,20 @@ def get_timestamp() -> float: return datetime.datetime.timestamp(datetime.datetime.now()) +def decode_datetimestr(datetime_str): + formats = ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d"] + for f in formats: + try: + return datetime.datetime.strptime(datetime_str, f) + except ValueError: + pass + raise RuntimeError( + "Decode '{0}' failed: format should be one of '{1}'".format( + datetime_str, str(formats) + ) + ) + + def get_lib_path(app_dir: str, app_name: str) -> str: if sys.platform == "linux" or sys.platform == "linux2": return os.path.join(app_dir, "lib%s.so" % app_name) diff --git a/coordinator/requirements.txt b/coordinator/requirements.txt index 72e88e88fbe1..a359239467cf 100644 --- a/coordinator/requirements.txt +++ b/coordinator/requirements.txt @@ -9,3 +9,5 @@ vineyard-io>=0.16.3;sys_platform!="win32" prometheus-client>=0.14.1 packaging tqdm +pydantic >= 1.10.5, < 2 +schedule diff --git a/proto/coordinator_service.proto b/proto/coordinator_service.proto index 1d5ed680f8d5..f00f835dc11e 100644 --- a/proto/coordinator_service.proto +++ b/proto/coordinator_service.proto @@ -17,6 +17,7 @@ syntax = "proto3"; package gs.rpc; import "message.proto"; +import "interactive.proto"; service CoordinatorService { // Connect a session. @@ -51,4 +52,12 @@ service CoordinatorService { // service functions under FLEX architecture rpc Connect(ConnectRequest) returns (ConnectResponse); + + rpc CreateInteractiveGraph(CreateInteractiveGraphRequest) returns (ApiResponse); + + rpc RemoveInteractiveGraph(RemoveInteractiveGraphRequest) returns (ApiResponse); + + rpc ListInteractiveGraph(ListInteractiveGraphRequest) returns (ListInteractiveGraphResponse); + + rpc ImportInteractiveGraph(ImportInteractiveGraphRequest) returns (ApiResponse); } diff --git a/proto/error_codes.proto b/proto/error_codes.proto index 7b9ca43e8838..1107e3bb55e4 100644 --- a/proto/error_codes.proto +++ b/proto/error_codes.proto @@ -84,6 +84,9 @@ enum Code { // Results of workers not consistent WORKER_RESULTS_INCONSISTENT_ERROR = 41; + // Api exception during communication between coordinator and engines + API_EXCEPTION_ERROR = 51; + // Unknown error. UNKNOWN_ERROR = 101; diff --git a/proto/interactive.proto b/proto/interactive.proto new file mode 100644 index 000000000000..584175aa9560 --- /dev/null +++ b/proto/interactive.proto @@ -0,0 +1,173 @@ +// Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +syntax = "proto3"; + +package gs.rpc; + +import "error_codes.proto"; + +// property type +message PropertyTypeProto { + string primitive_type = 1; // [ DT_DOUBLE, DT_STRING, DT_SIGNED_INT32, DT_SIGNED_INT64 ] +} + +// property +message PropertyProto { + int32 property_id = 1; + string property_name = 2; + PropertyTypeProto property_type = 3; +} + +// csr params definition, used for storage optimization +message CSRParamsProto { + optional string incoming_edge_strategy = 1; // one of [Single, Multple] + optional string outgoing_edge_strategy = 2; // one of [Single, Multple] +} + +// vertex pair +message VertexPairProto { + string source_vertex = 1; // label + string destination_vertex = 2; // label + string relation = 3; // one of [ MANY_TO_MANY, ONE_TO_MANY, MANY_TO_ONE, ONE_TO_ONE, ] + CSRParamsProto x_csr_params = 4; +} + +// vertex +message VertexProto { + int32 type_id = 1; // index + string type_name = 2; // label + repeated PropertyProto properties = 3; + repeated string primary_keys = 4; +} + +// edge +message EdgeProto { + int32 type_id = 1; // index + string type_name = 2; // label + repeated VertexPairProto vertex_type_pair_relations = 3; + repeated PropertyProto properties = 4; +} + +// schema +message Schema { + repeated VertexProto vertex_types = 1; + repeated EdgeProto edge_types = 2; +} + +// stored procedure +message StoredProcedureProto { + string directory = 1; // one of [ plugins ] +} + +// graph +message GraphProto { + string name = 1; + string store_type = 2; // one of [ mutable_csr, ] + StoredProcedureProto stored_procedures = 3; + Schema schema = 4; +} + +// column +message Column { + int32 index = 1; + string name = 2; +} + +// column mapping +// column index(name) of datasource -> property +message ColumnMapping { + Column column = 1; + string property = 2; +} + +// vertex mapping +message VertexMapping { + string type_name = 1; // vertex label + repeated string inputs = 2; // protocol:///path/to/data + repeated bytes raw_data = 3; // raw data + repeated ColumnMapping column_mappings = 4; +} + +// type triplet for edge +message TypeTriplet { + string edge = 1; // edge label + string source_vertex = 2; // src vertex label + string destination_vertex = 3; // dst vertex label +} + +// edge mapping +message EdgeMapping { + TypeTriplet type_triplet = 1; + repeated string inputs = 2; + repeated bytes raw_data = 3; + repeated ColumnMapping source_vertex_mappings = 4; + repeated ColumnMapping destination_vertex_mappings = 5; + repeated ColumnMapping column_mappings = 6; +} + +// data source config +message DataSource { + string scheme = 1; // file, oss, s3, hdfs; only file is supported now + string location = 2; // unified prefix +} + +// format +message DataSourceFormat { + string type = 1; // csv + map metadata = 2; +} + +// loading config +message LoadingConfig { + DataSource data_source = 1; + string import_option = 2; // init, append, overwrite, only init is supported now + DataSourceFormat format = 3; +} + +// schema mapping +message SchemaMapping { + string graph = 1; + LoadingConfig loading_config = 2; + repeated VertexMapping vertex_mappings = 3; + repeated EdgeMapping edge_mappings = 4; +} + +// message +message ApiResponse { + Code code = 1; + optional string error_msg = 2; +} + +message CreateInteractiveGraphRequest { + GraphProto graph_def = 1; +} + +message RemoveInteractiveGraphRequest { + string graph_name = 1; +} + +message ListInteractiveGraphRequest {} + +message ListInteractiveGraphResponse { + Code code = 1; + optional string error_msg = 2; + + repeated GraphProto graphs = 3; +} + +message ImportInteractiveGraphRequest { + SchemaMapping schema_mapping = 1; +} diff --git a/python/graphscope/gsctl/rpc.py b/python/graphscope/gsctl/client/rpc.py similarity index 71% rename from python/graphscope/gsctl/rpc.py rename to python/graphscope/gsctl/client/rpc.py index f77d0fe58bae..3f5ef2391196 100644 --- a/python/graphscope/gsctl/rpc.py +++ b/python/graphscope/gsctl/client/rpc.py @@ -21,11 +21,12 @@ import click import grpc - from graphscope.client.utils import GS_GRPC_MAX_MESSAGE_LENGTH from graphscope.client.utils import handle_grpc_error +from graphscope.gsctl.config import GS_CONFIG_DEFAULT_LOCATION from graphscope.gsctl.config import get_current_context from graphscope.proto import coordinator_service_pb2_grpc +from graphscope.proto import interactive_pb2 from graphscope.proto import message_pb2 from graphscope.version import __version__ @@ -73,6 +74,26 @@ def close(self): except: # noqa: E722 pass + @handle_grpc_error + def create_interactive_graph(self, graph_def: interactive_pb2.GraphProto): + request = interactive_pb2.CreateInteractiveGraphRequest(graph_def=graph_def) + return self._stub.CreateInteractiveGraph(request) + + @handle_grpc_error + def remove_interactive_graph(self, graph: str): + request = interactive_pb2.RemoveInteractiveGraphRequest(graph_name=graph) + return self._stub.RemoveInteractiveGraph(request) + + @handle_grpc_error + def list_interactive_graph(self): + request = interactive_pb2.ListInteractiveGraphRequest() + return self._stub.ListInteractiveGraph(request) + + @handle_grpc_error + def import_interactive_graph(self, schema_mapping: interactive_pb2.SchemaMapping): + request = interactive_pb2.ImportInteractiveGraphRequest(schema_mapping=schema_mapping) + return self._stub.ImportInteractiveGraph(request) + def get_grpc_client(coordinator_endpoint=None): if coordinator_endpoint is not None: @@ -82,6 +103,8 @@ def get_grpc_client(coordinator_endpoint=None): current_context = get_current_context() if current_context is None: raise RuntimeError( - "No available context found, please connect to a launched coordinator first." + "No available context found in {0}, please connect to a launched coordinator first.".format( + GS_CONFIG_DEFAULT_LOCATION + ) ) return GRPCClient(current_context.coordinator_endpoint) diff --git a/python/graphscope/gsctl/commands/__init__.py b/python/graphscope/gsctl/commands/__init__.py index 29ab31f2d6ab..8bdcbed90f5f 100644 --- a/python/graphscope/gsctl/commands/__init__.py +++ b/python/graphscope/gsctl/commands/__init__.py @@ -16,4 +16,27 @@ # limitations under the License. # -from graphscope.gsctl.commands.utils import * +import click + +from graphscope.gsctl.commands.common import cli as common_cli +from graphscope.gsctl.commands.dev import cli as dev_cli +from graphscope.gsctl.commands.interactive import cli as interactive_cli +from graphscope.gsctl.config import Context +from graphscope.gsctl.config import FLEX_INTERACTIVE + + +def get_command_collection(context: Context): + if context is None: + # treat gsctl as an utility script, providing hepler functions or utilities. e.g. + # initialize and manage cluster, install the dependencies required to build graphscope locally + commands = click.CommandCollection(sources=[common_cli, dev_cli]) + + elif context.solution == FLEX_INTERACTIVE: + commands = click.CommandCollection(sources=[common_cli, interactive_cli]) + + else: + raise RuntimeError( + f"Failed to get command collection with context {context.name}" + ) + + return commands diff --git a/python/graphscope/gsctl/commands/common_command.py b/python/graphscope/gsctl/commands/common.py similarity index 94% rename from python/graphscope/gsctl/commands/common_command.py rename to python/graphscope/gsctl/commands/common.py index ff3a02a4a3e2..c2028e90239f 100644 --- a/python/graphscope/gsctl/commands/common_command.py +++ b/python/graphscope/gsctl/commands/common.py @@ -22,7 +22,7 @@ from graphscope.gsctl.config import Context from graphscope.gsctl.config import load_gs_config -from graphscope.gsctl.rpc import get_grpc_client +from graphscope.gsctl.client.rpc import get_grpc_client @click.group() @@ -31,7 +31,7 @@ def cli(): pass -@click.command() +@cli.command() @click.option( "--coordinator-endpoint", help="Coordinator endpoint which gsctl connect to, e.g. http://127.0.0.1:9527", @@ -56,7 +56,7 @@ def connect(coordinator_endpoint): click.secho("Coordinator service connected.", fg="green") -@click.command() +@cli.command() def close(): """Close the connection from the coordinator.""" config = load_gs_config() @@ -69,9 +69,5 @@ def close(): click.secho(f"Disconnect from the {current_context.to_dict()}.", fg="green") -cli.add_command(connect) -cli.add_command(close) - - if __name__ == "__main__": cli() diff --git a/python/graphscope/gsctl/commands/dev_command.py b/python/graphscope/gsctl/commands/dev.py similarity index 100% rename from python/graphscope/gsctl/commands/dev_command.py rename to python/graphscope/gsctl/commands/dev.py diff --git a/python/graphscope/gsctl/commands/interactive.py b/python/graphscope/gsctl/commands/interactive.py new file mode 100644 index 000000000000..2a628e03cea8 --- /dev/null +++ b/python/graphscope/gsctl/commands/interactive.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Group of interactive commands under the FLEX architecture""" + +import itertools + +import click +from graphscope.gsctl.client.rpc import get_grpc_client +from graphscope.gsctl.utils import dict_to_proto_message +from graphscope.gsctl.utils import is_valid_file_path +from graphscope.gsctl.utils import read_yaml_file +from graphscope.gsctl.utils import terminal_display +from graphscope.proto import error_codes_pb2 +from graphscope.proto import interactive_pb2 + + +@click.group() +def cli(): + pass + + +@cli.group() +def database(): + """Group of operations on graph, dataloading""" + pass + + +@database.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph.", +) +@click.option( + "-c", + "--config", + required=True, + help="Yaml path or json string of schema for the graph.", +) +def create(graph, config): + """Create a graph in database, with the provided schema file.""" + + graph_def_dict = config + if is_valid_file_path(config): + graph_def_dict = read_yaml_file(config) + + # override graph name + if graph is not None: + graph_def_dict["name"] = graph + + # transform graph dict to proto message + graph_def = interactive_pb2.GraphProto() + dict_to_proto_message(graph_def_dict, graph_def) + + grpc_client = get_grpc_client() + response = grpc_client.create_interactive_graph(graph_def) + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + click.secho( + "Create interactive graph {0} successfully.".format(graph_def_dict["name"]), + fg="green", + ) + + +@database.command() +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph.", +) +def remove(graph): + """Remove a graph, as well as the loaded data in database.""" + + grpc_client = get_grpc_client() + response = grpc_client.remove_interactive_graph(graph) + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + click.secho( + "Remove interactive graph {0} successfully.".format(graph), + fg="green", + ) + + +@database.command() +def list_graph(): + """List all the graphs in database.""" + + def _construct_and_display_data(graphs): + star = "*" + head = ["", "Name", "Store Type", "Vertex Schema", "Edge Schema"] + data = [head] + + for g in graphs: + vertex_schema = [] + for v in g.schema.vertex_types: + vertex_schema.append(v.type_name) + edge_schema = [] + for e in g.schema.edge_types: + edge_schema.append(e.type_name) + data.append( + [ + star, + g.name, + g.store_type, + "({0}) {1}".format(len(vertex_schema), ",".join(vertex_schema)), + "({0}) {1}".format(len(edge_schema), ",".join(edge_schema)), + ] + ) + + terminal_display(data) + + grpc_client = get_grpc_client() + response = grpc_client.list_interactive_graph() + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + _construct_and_display_data(response.graphs) + return response.graphs + + +@database.command(name="import") +@click.option( + "-g", + "--graph", + required=True, + help="The name of the graph.", +) +@click.option( + "-c", + "--config", + required=True, + help="Yaml path or raw data for loading graph.", +) +def data_import(graph, config): + """Load the raw data specified in bulk load file.""" + + def _read_and_fill_raw_data(config): + for mapping in itertools.chain( + config["vertex_mappings"], config["edge_mappings"] + ): + for index, location in enumerate(mapping["inputs"]): + # location is one of: + # 1) protocol:///path/to/the/file + # 2) @/path/to/the/file, which represents the local file + if location.startswith("@"): + if "raw_data" not in mapping: + mapping["raw_data"] = [] + + # read file and set raw data + with open(location[1:], "rb") as f: + content = f.read() + mapping["raw_data"].append(content) + + schema_mapping_dict = config + if is_valid_file_path(config): + schema_mapping_dict = read_yaml_file(config) + + if graph is not None: + schema_mapping_dict["graph"] = graph + + _read_and_fill_raw_data(schema_mapping_dict) + + # transfiorm dict to proto message + schema_mapping = interactive_pb2.SchemaMapping() + dict_to_proto_message(schema_mapping_dict, schema_mapping) + + grpc_client = get_grpc_client() + response = grpc_client.import_interactive_graph(schema_mapping) + + if response.code != error_codes_pb2.OK: + click.secho( + "{0}: {1}".format( + error_codes_pb2.Code.Name(response.code), response.error_msg + ), + fg="red", + ) + else: + click.secho("Create dataloading job successfully.", fg="green") + + +if __name__ == "__main__": + cli() diff --git a/python/graphscope/gsctl/commands/utils.py b/python/graphscope/gsctl/commands/utils.py deleted file mode 100644 index 711a7b32e7f5..000000000000 --- a/python/graphscope/gsctl/commands/utils.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# -# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import click - -from graphscope.gsctl.commands.common_command import cli as common_cli -from graphscope.gsctl.commands.dev_command import cli as dev_cli -from graphscope.gsctl.config import Context - - -def get_command_collection(context: Context): - if context is None: - # treat gsctl as an utility script, providing hepler functions or utilities. e.g. - # initialize and manage cluster, install the dependencies required to build graphscope locally - commands = click.CommandCollection(sources=[common_cli, dev_cli]) - - elif context.solution == "interactive": - commands = click.CommandCollection(sources=[common_cli]) - - else: - raise RuntimeError( - f"Failed to get command collection with context {context.name}" - ) - - return commands diff --git a/python/graphscope/gsctl/config.py b/python/graphscope/gsctl/config.py index e9357c0b4919..2e286f0dcdc0 100644 --- a/python/graphscope/gsctl/config.py +++ b/python/graphscope/gsctl/config.py @@ -22,16 +22,19 @@ import random from string import ascii_letters -import yaml +from graphscope.gsctl.utils import read_yaml_file +from graphscope.gsctl.utils import write_yaml_file GS_CONFIG_DEFAULT_LOCATION = os.environ.get( "GSCONFIG", os.path.expanduser("~/.graphscope/config") ) +FLEX_INTERACTIVE = "Interactive" + class Context(object): def __init__(self, solution, coordinator_endpoint, name=None): - self.supported_solutions = ["interactive"] + self.supported_solutions = [FLEX_INTERACTIVE] if solution not in self.supported_solutions: raise RuntimeError( "The solution {0} in context {1} is not supported yet.".format( @@ -83,10 +86,10 @@ def set_and_write(self, context: Context): # write contexts = [v.to_dict() for _, v in self._contexts.items()] - with open(GS_CONFIG_DEFAULT_LOCATION, "w") as file: - yaml.dump( - {"contexts": contexts, "current-context": self._current_context}, file - ) + write_yaml_file( + {"contexts": contexts, "current-context": self._current_context}, + GS_CONFIG_DEFAULT_LOCATION, + ) def remove_and_write(self, current_context: Context): # remove @@ -95,10 +98,10 @@ def remove_and_write(self, current_context: Context): # write contexts = [v.to_dict() for _, v in self._contexts.items()] - with open(GS_CONFIG_DEFAULT_LOCATION, "w") as file: - yaml.dump( - {"contexts": contexts, "current-context": self._current_context}, file - ) + write_yaml_file( + {"contexts": contexts, "current-context": self._current_context}, + GS_CONFIG_DEFAULT_LOCATION, + ) class GSConfigLoader(object): @@ -132,9 +135,7 @@ def _parse_config(self, config_dict): return contexts, current_context def load_config(self): - config_dict = None - with open(self._config_file, "r") as file: - config_dict = yaml.safe_load(file) + config_dict = read_yaml_file(self._config_file) contexts, current_context = self._parse_config(config_dict) return GSConfig(contexts, current_context) @@ -149,8 +150,7 @@ def load_gs_config(): if not os.path.exists(config_file): workdir = os.path.dirname(config_file) os.makedirs(workdir, exist_ok=True) - with open(config_file, "w") as file: - yaml.safe_dump({}, file) + write_yaml_file({}, config_file) loader = GSConfigLoader(config_file) return loader.load_config() diff --git a/python/graphscope/gsctl/utils.py b/python/graphscope/gsctl/utils.py new file mode 100644 index 000000000000..4715b30ad6a4 --- /dev/null +++ b/python/graphscope/gsctl/utils.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2023 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os + +import yaml + + +def read_yaml_file(path): + """Reads YAML file and returns as a python object.""" + with open(path, "r") as file: + return yaml.safe_load(file) + + +def write_yaml_file(data, path): + """Writes python object to the YAML file.""" + with open(path, "w") as file: + yaml.dump(data, file) + + +def is_valid_file_path(path): + """Check if the path exists and corresponds to a regular file.""" + return os.path.exists(path) and os.path.isfile(path) + + +def dict_to_proto_message(values, message): + """Transform pyhon dict object to protobuf message + + Args: + values (dict): values to be transformed. + message (proto): protobuf message, such as graph_def_pb2.GraphMessage() + """ + + def _parse_list(values, message): + if isinstance(values[0], dict): # value needs to be further parsed + for v in values: + cmd = message.add() + _parse_dict(v, cmd) + else: # value can be set + message.extend(values) + + def _parse_dict(values, message): + for k, v in values.items(): + if isinstance(v, dict): # value needs to be further parsed + _parse_dict(v, getattr(message, k)) + elif isinstance(v, list): + _parse_list(v, getattr(message, k)) + else: # value can be set + try: + setattr(message, k, v) + except AttributeError: + # treat as map + message[k] = v + + _parse_dict(values, message) + return message + + +def terminal_display(data: list): + """Display tablular data in terminal""" + + # Compute the maximum width for each column + column_widths = [max(len(str(item)) for item in column) for column in zip(*data)] + + # Display the data with aligned columns + for row in data: + print( + " ".join( + "{:<{}}".format(item, width) for item, width in zip(row, column_widths) + ) + )