From ad04bd3a9f44184c5fca9d502969f1fc60974033 Mon Sep 17 00:00:00 2001 From: Dongze Li Date: Thu, 16 May 2024 16:48:02 +0800 Subject: [PATCH] Code format --- .../gs_flex_coordinator/core/__init__.py | 4 +- .../core/alert/alert_manager.py | 21 +- .../core/alert/alert_rule.py | 6 +- .../core/alert/builtin_rules.py | 6 +- .../core/alert/message_collector.py | 10 +- .../core/client_wrapper.py | 259 +++++-------- .../gs_flex_coordinator/core/config.py | 6 +- .../gs_flex_coordinator/core/datasource.py | 8 +- .../gs_flex_coordinator/core/insight/graph.py | 35 +- .../gs_flex_coordinator/core/insight/groot.py | 362 ------------------ .../gs_flex_coordinator/core/insight/job.py | 23 +- .../core/interactive/hqps.py | 123 ++++-- .../gs_flex_coordinator/core/scheduler.py | 31 +- .../core/stoppable_thread.py | 2 +- flex/coordinator/pyproject.toml | 2 + flex/coordinator/requirements.txt | 2 +- flex/coordinator/setup.cfg | 8 + flex/coordinator/setup.py | 42 ++ python/graphscope/gsctl/commands/__init__.py | 1 + .../gsctl/commands/interactive/glob.py | 12 +- .../gsctl/commands/interactive/graph.py | 12 +- python/graphscope/gsctl/impl/__init__.py | 38 +- python/graphscope/gsctl/impl/datasource.py | 3 +- python/graphscope/gsctl/impl/utils.py | 3 +- python/graphscope/gsctl/utils.py | 8 +- .../graphscope/tests/flex/test_interactive.py | 35 +- 26 files changed, 370 insertions(+), 692 deletions(-) create mode 100644 flex/coordinator/pyproject.toml create mode 100644 flex/coordinator/setup.cfg diff --git a/flex/coordinator/gs_flex_coordinator/core/__init__.py b/flex/coordinator/gs_flex_coordinator/core/__init__.py index 53b18cdb9c65..b065ec847e9d 100644 --- a/flex/coordinator/gs_flex_coordinator/core/__init__.py +++ b/flex/coordinator/gs_flex_coordinator/core/__init__.py @@ -27,5 +27,5 @@ level=logging.INFO, ) -from gs_flex_coordinator.core.client_wrapper import client_wrapper -from gs_flex_coordinator.core.utils import handle_api_exception +from gs_flex_coordinator.core.client_wrapper import client_wrapper # noqa: F401, E402 +from gs_flex_coordinator.core.utils import handle_api_exception # noqa: F401, E402 diff --git a/flex/coordinator/gs_flex_coordinator/core/alert/alert_manager.py b/flex/coordinator/gs_flex_coordinator/core/alert/alert_manager.py index 510f8a899f3d..188813cfcd59 100644 --- a/flex/coordinator/gs_flex_coordinator/core/alert/alert_manager.py +++ b/flex/coordinator/gs_flex_coordinator/core/alert/alert_manager.py @@ -20,17 +20,19 @@ import logging import os import pickle -from typing import List, Union +from typing import List +from typing import Union from gs_flex_coordinator.core.alert.alert_receiver import DingTalkReceiver -from gs_flex_coordinator.core.alert.builtin_rules import \ - init_builtin_alert_rules -from gs_flex_coordinator.core.alert.message_collector import \ - AlertMessageCollector +from gs_flex_coordinator.core.alert.builtin_rules import init_builtin_alert_rules +from gs_flex_coordinator.core.alert.message_collector import AlertMessageCollector from gs_flex_coordinator.core.config import ALERT_WORKSPACE from gs_flex_coordinator.core.scheduler import schedule -from gs_flex_coordinator.core.utils import decode_datetimestr, encode_datetime -from gs_flex_coordinator.models import AlertMessage, AlertReceiver, AlertRule +from gs_flex_coordinator.core.utils import decode_datetimestr +from gs_flex_coordinator.core.utils import encode_datetime +from gs_flex_coordinator.models import AlertMessage +from gs_flex_coordinator.models import AlertReceiver +from gs_flex_coordinator.models import AlertRule class AlertManager(object): @@ -146,7 +148,10 @@ def list_alert_messages( return rlt def update_alert_messages( - self, messages: List[AlertMessage], batch_status: str, batch_delete: bool + self, + messages: List[AlertMessage], + batch_status: str, + batch_delete: bool, ): for message in messages: date = decode_datetimestr(message.trigger_time) diff --git a/flex/coordinator/gs_flex_coordinator/core/alert/alert_rule.py b/flex/coordinator/gs_flex_coordinator/core/alert/alert_rule.py index b71fc94a87b7..6c1597e9c3fe 100644 --- a/flex/coordinator/gs_flex_coordinator/core/alert/alert_rule.py +++ b/flex/coordinator/gs_flex_coordinator/core/alert/alert_rule.py @@ -18,10 +18,12 @@ import datetime import logging -from abc import ABCMeta, abstractmethod +from abc import ABCMeta +from abc import abstractmethod from gs_flex_coordinator.core.alert.alert_message import AlertMessage -from gs_flex_coordinator.core.scheduler import cancel_job, schedule +from gs_flex_coordinator.core.scheduler import cancel_job +from gs_flex_coordinator.core.scheduler import schedule class AlertRule(metaclass=ABCMeta): diff --git a/flex/coordinator/gs_flex_coordinator/core/alert/builtin_rules.py b/flex/coordinator/gs_flex_coordinator/core/alert/builtin_rules.py index 6a981aeaf666..eb3516e3fbdc 100644 --- a/flex/coordinator/gs_flex_coordinator/core/alert/builtin_rules.py +++ b/flex/coordinator/gs_flex_coordinator/core/alert/builtin_rules.py @@ -25,9 +25,9 @@ from gs_flex_coordinator.core import client_wrapper from gs_flex_coordinator.core.alert.alert_rule import AlertRule -from gs_flex_coordinator.core.alert.message_collector import \ - AlertMessageCollector -from gs_flex_coordinator.core.config import CLUSTER_TYPE, SOLUTION +from gs_flex_coordinator.core.alert.message_collector import AlertMessageCollector +from gs_flex_coordinator.core.config import CLUSTER_TYPE +from gs_flex_coordinator.core.config import SOLUTION class HighDiskUtilizationAlert(AlertRule): diff --git a/flex/coordinator/gs_flex_coordinator/core/alert/message_collector.py b/flex/coordinator/gs_flex_coordinator/core/alert/message_collector.py index 14547bd65631..c8246adf1c21 100644 --- a/flex/coordinator/gs_flex_coordinator/core/alert/message_collector.py +++ b/flex/coordinator/gs_flex_coordinator/core/alert/message_collector.py @@ -23,7 +23,8 @@ from gs_flex_coordinator.core.alert.alert_message import AlertMessage from gs_flex_coordinator.core.config import ALERT_WORKSPACE -from gs_flex_coordinator.core.scheduler import cancel_job, schedule +from gs_flex_coordinator.core.scheduler import cancel_job +from gs_flex_coordinator.core.scheduler import schedule from gs_flex_coordinator.core.utils import decode_datetimestr @@ -59,7 +60,9 @@ def _pickle_messages_impl(self): self.dump_to_disk() except Exception as e: logging.warn( - "Failed to dump alert message on date %s: %s", str(self._date), str(e) + "Failed to dump alert message on date %s: %s", + str(self._date), + str(e), ) def _try_to_recover_from_disk(self): @@ -109,7 +112,8 @@ def clean(self): cancel_job(self._pickle_messages_job, delete_scheduler=True) self._pickle_messages_job = None logging.info( - "%s: current alert message collector cleaned", str(self._date) + "%s: current alert message collector cleaned", + str(self._date), ) except: # noqa: E722, B110 pass diff --git a/flex/coordinator/gs_flex_coordinator/core/client_wrapper.py b/flex/coordinator/gs_flex_coordinator/core/client_wrapper.py index 9828f917de57..a756d6b8adc6 100644 --- a/flex/coordinator/gs_flex_coordinator/core/client_wrapper.py +++ b/flex/coordinator/gs_flex_coordinator/core/client_wrapper.py @@ -16,65 +16,38 @@ # limitations under the License. # -# import datetime import itertools -import logging import os -# import pickle -# import socket import threading -from typing import List, Union - -# import psutil - -from gs_flex_coordinator.core.config import ( - CLUSTER_TYPE, - CREATION_TIME, - DATASET_WORKSPACE, - INSTANCE_NAME, - SOLUTION, - WORKSPACE, -) +from typing import List + +from gs_flex_coordinator.core.config import CLUSTER_TYPE +from gs_flex_coordinator.core.config import CREATION_TIME +from gs_flex_coordinator.core.config import DATASET_WORKSPACE +from gs_flex_coordinator.core.config import INSTANCE_NAME +from gs_flex_coordinator.core.config import SOLUTION +from gs_flex_coordinator.core.datasource import DataSourceManager from gs_flex_coordinator.core.insight import init_groot_client from gs_flex_coordinator.core.interactive import init_hqps_client -from gs_flex_coordinator.core.datasource import DataSourceManager -# from gs_flex_coordinator.core.scheduler import schedule -from gs_flex_coordinator.core.utils import ( - # GraphInfo, - # decode_datetimestr, - encode_datetime, - # get_current_time, -) -from gs_flex_coordinator.models import ( - RunningDeploymentInfo, - CreateGraphRequest, - CreateGraphResponse, - GetGraphSchemaResponse, - CreateProcedureRequest, - UpdateProcedureRequest, - CreateProcedureResponse, - GetProcedureResponse, - UploadFileResponse, - DataloadingJobConfig, - CreateDataloadingJobResponse, - CreateEdgeType, - CreateVertexType, - # EdgeDataSource, - # EdgeType, - GetGraphResponse, - # GrootDataloadingJobConfig, - # GrootGraph, - # GrootSchema, - JobStatus, - # ModelSchema, - # NodeStatus, - # Procedure, - SchemaMapping, - ServiceStatus, - StartServiceRequest, - # VertexDataSource, - # VertexType, -) +from gs_flex_coordinator.core.utils import encode_datetime +from gs_flex_coordinator.models import CreateDataloadingJobResponse +from gs_flex_coordinator.models import CreateEdgeType +from gs_flex_coordinator.models import CreateGraphRequest +from gs_flex_coordinator.models import CreateGraphResponse +from gs_flex_coordinator.models import CreateProcedureRequest +from gs_flex_coordinator.models import CreateProcedureResponse +from gs_flex_coordinator.models import CreateVertexType +from gs_flex_coordinator.models import DataloadingJobConfig +from gs_flex_coordinator.models import GetGraphResponse +from gs_flex_coordinator.models import GetGraphSchemaResponse +from gs_flex_coordinator.models import GetProcedureResponse +from gs_flex_coordinator.models import JobStatus +from gs_flex_coordinator.models import RunningDeploymentInfo +from gs_flex_coordinator.models import SchemaMapping +from gs_flex_coordinator.models import ServiceStatus +from gs_flex_coordinator.models import StartServiceRequest +from gs_flex_coordinator.models import UpdateProcedureRequest +from gs_flex_coordinator.models import UploadFileResponse from gs_flex_coordinator.version import __version__ @@ -88,49 +61,6 @@ def __init__(self): self._client = self._initialize_client() # data source management self._datasource_manager = DataSourceManager() - # graphs info - # self._graphs_info = {} - # pickle path - # self._pickle_path = os.path.join(WORKSPACE, "graphs_info.pickle") - # # recover - # self._try_to_recover_from_disk() - # # sync graphs info every 60s - # self._sync_graphs_info_job = ( - # schedule.every(60) - # .seconds.do(self._sync_graphs_info_impl) - # .tag("sync", "graphs info") - # ) - - # def _try_to_recover_from_disk(self): - # try: - # if os.path.exists(self._pickle_path): - # logging.info("Recover graphs info from file %s", self._pickle_path) - # with open(self._pickle_path, "rb") as f: - # self._graphs_info = pickle.load(f) - # except Exception as e: - # logging.warn("Failed to recover graphs info: %s", str(e)) - # # set default graph info - # self._sync_graphs_info_impl() - - # def _pickle_graphs_info_impl(self): - # try: - # with open(self._pickle_path, "wb") as f: - # pickle.dump(self._graphs_info, f) - # except Exception as e: - # logging.warn("Failed to dump graphs info: %s", str(e)) - - # def _sync_graphs_info_impl(self): - # if SOLUTION == "INTERACTIVE": - # graphs = self.list_graphs() - # elif SOLUTION == "GRAPHSCOPE_INSIGHT": - # graphs = self.list_groot_graph() - # rlts = {} - # for g in graphs: - # if g.name in self._graphs_info: - # rlts[g.name] = self._graphs_info[g.name] - # else: - # rlts[g.name] = GraphInfo(name=g.name, creation_time=CREATION_TIME) - # self._graphs_info = rlts def _initialize_client(self): service_initializer = { @@ -146,9 +76,14 @@ def list_graphs(self) -> List[GetGraphResponse]: graphs = self._client.list_graphs() # fix ValueError: Invalid value for `long_text`, must not be `None` for g in graphs: - for item in itertools.chain(g["schema"]["vertex_types"], g["schema"]["edge_types"]): + for item in itertools.chain( + g["schema"]["vertex_types"], g["schema"]["edge_types"] + ): for p in item["properties"]: - if "string" in p["property_type"] and "long_text" in p["property_type"]["string"]: + if ( + "string" in p["property_type"] + and "long_text" in p["property_type"]["string"] + ): p["property_type"]["string"]["long_text"] = "" # transfer rlts = [GetGraphResponse.from_dict(g) for g in graphs] @@ -159,21 +94,13 @@ def get_schema_by_id(self, graph_id: str) -> GetGraphSchemaResponse: # fix ValueError: Invalid value for `long_text`, must not be `None` for item in itertools.chain(schema["vertex_types"], schema["edge_types"]): for p in item["properties"]: - if "string" in p["property_type"] and "long_text" in p["property_type"]["string"]: + if ( + "string" in p["property_type"] + and "long_text" in p["property_type"]["string"] + ): p["property_type"]["string"]["long_text"] = "" return GetGraphSchemaResponse.from_dict(schema) - # def get_groot_schema(self, graph_name: str) -> GrootSchema: - # return GrootSchema.from_dict(self._client.get_groot_schema(graph_name)) - - # def import_groot_schema(self, graph_name: str, schema: GrootSchema) -> str: - # rlt = self._client.import_groot_schema(graph_name, schema.to_dict()) - # self._graphs_info[INSTANCE_NAME].update_time = get_current_time() - # return rlt - - # def get_current_graph(self) -> GrootGraph: - # return self._client.get_current_graph() - def create_graph(self, graph: CreateGraphRequest) -> CreateGraphResponse: # there are some tricks here, since schema is a keyword of openapi # specification, so it will be converted into the _schema field. @@ -187,7 +114,7 @@ def create_vertex_type(self, graph_id: str, vtype: CreateVertexType) -> str: self._client.create_vertex_type(graph_id, vtype.to_dict()) return "Create vertex type successfully" # if SOLUTION == "GRAPHSCOPE_INSIGHT": - # graph_name = INSTANCE_NAME + # graph_name = INSTANCE_NAME # vtype_dict = vtype.to_dict() # rlt = self._client.create_vertex_type(graph_name, vtype_dict) # # self._graphs_info[graph_name].update_time = get_current_time() @@ -196,23 +123,12 @@ def create_vertex_type(self, graph_id: str, vtype: CreateVertexType) -> str: def create_edge_type(self, graph_id: str, etype: CreateEdgeType) -> str: self._client.create_edge_type(graph_id, etype.to_dict()) return "Create edge type successfully" - # if SOLUTION == "GRAPHSCOPE_INSIGHT": - # graph_name = INSTANCE_NAME - # etype_dict = etype.to_dict() - # rlt = self._client.create_edge_type(graph_name, etype_dict) - # self._graphs_info[graph_name].update_time = get_current_time() - # return rlt def delete_vertex_type_by_name(self, graph_id: str, type_name: str) -> str: self._client.delete_vertex_type_by_name(graph_id, type_name) # remove data source mapping self.unbind_vertex_datasource(graph_id, type_name) return f"Delete vertex type {type_name} successfully" - # if SOLUTION == "GRAPHSCOPE_INSIGHT": - # graph_name = INSTANCE_NAME - # rlt = self._client.delete_vertex_type(graph_name, vertex_type) - # self._graphs_info[graph_name].update_time = get_current_time() - # return rlt def delete_edge_type_by_name( self, @@ -225,15 +141,11 @@ def delete_edge_type_by_name( graph_id, type_name, source_vertex_type, destination_vertex_type ) # remove data source mapping - self.unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_vertex_type) - return f"Delete edge type ({source_vertex_type})->[{type_name}]->({destination_vertex_type}) successfully" - # if SOLUTION == "GRAPHSCOPE_INSIGHT": - # graph_name = INSTANCE_NAME - # rlt = self._client.delete_edge_type( - # graph_name, edge_type, source_vertex_type, destination_vertex_type - # ) - # self._graphs_info[graph_name].update_time = get_current_time() - # return rlt + self.unbind_edge_datasource( + graph_id, type_name, source_vertex_type, destination_vertex_type + ) + elabel = f"({source_vertex_type})->[{type_name}]->({destination_vertex_type})" + return f"Delete edge type {elabel} successfully" def delete_graph_by_id(self, graph_id: str) -> str: rlt = self._client.delete_graph_by_id(graph_id) @@ -243,13 +155,20 @@ def delete_graph_by_id(self, graph_id: str) -> str: def get_graph_by_id(self, graph_id: str) -> GetGraphResponse: g = self._client.get_graph_by_id(graph_id) - for item in itertools.chain(g["schema"]["vertex_types"], g["schema"]["edge_types"]): + for item in itertools.chain( + g["schema"]["vertex_types"], g["schema"]["edge_types"] + ): for p in item["properties"]: - if "string" in p["property_type"] and "long_text" in p["property_type"]["string"]: + if ( + "string" in p["property_type"] + and "long_text" in p["property_type"]["string"] + ): p["property_type"]["string"]["long_text"] = "" return GetGraphResponse.from_dict(g) - def create_procedure(self, graph_id: str, procedure: CreateProcedureRequest) -> CreateProcedureResponse: + def create_procedure( + self, graph_id: str, procedure: CreateProcedureRequest + ) -> CreateProcedureResponse: procedure_dict = procedure.to_dict() response = self._client.create_procedure(graph_id, procedure_dict) return CreateProcedureResponse.from_dict(response) @@ -261,32 +180,26 @@ def list_procedures(self, graph_id: str) -> List[GetProcedureResponse]: return rlt def update_procedure_by_id( - self, graph_id: str, procedure_id: str, procedure: UpdateProcedureRequest + self, + graph_id: str, + procedure_id: str, + procedure: UpdateProcedureRequest, ) -> str: procedure_dict = procedure.to_dict() - return self._client.update_procedure_by_id(graph_id, procedure_id, procedure_dict) + return self._client.update_procedure_by_id( + graph_id, procedure_id, procedure_dict + ) def delete_procedure_by_id(self, graph_id: str, procedure_id: str) -> str: return self._client.delete_procedure_by_id(graph_id, procedure_id) - def get_procedure_by_id(self, graph_id: str, procedure_id: str) -> GetProcedureResponse: + def get_procedure_by_id( + self, graph_id: str, procedure_id: str + ) -> GetProcedureResponse: return GetProcedureResponse.from_dict( self._client.get_procedure_by_id(graph_id, procedure_id) ) - # def get_node_status(self) -> List[NodeStatus]: - # rlt = [] - # if CLUSTER_TYPE == "HOSTS": - # disk_info = psutil.disk_usage("/") - # status = { - # "node": socket.gethostname(), - # "cpu_usage": psutil.cpu_percent(), - # "memory_usage": psutil.virtual_memory().percent, - # "disk_usage": float(f"{disk_info.used / disk_info.total * 100:.2f}"), - # } - # rlt.append(NodeStatus.from_dict(status)) - # return rlt - def get_deployment_info(self) -> RunningDeploymentInfo: info = { "instance_name": INSTANCE_NAME, @@ -304,7 +217,10 @@ def get_service_status(self) -> ServiceStatus: schema = status["graph"]["schema"] for item in itertools.chain(schema["vertex_types"], schema["edge_types"]): for p in item["properties"]: - if "string" in p["property_type"] and "long_text" in p["property_type"]["string"]: + if ( + "string" in p["property_type"] + and "long_text" in p["property_type"]["string"] + ): p["property_type"]["string"]["long_text"] = "" return ServiceStatus.from_dict(status) @@ -330,11 +246,15 @@ def submit_dataloading_job( self, graph_id: str, dataloading_job_config: DataloadingJobConfig ) -> str: config = dataloading_job_config.to_dict() - job_id = self._client.submit_dataloading_job(graph_id, config, self._datasource_manager) + job_id = self._client.submit_dataloading_job( + graph_id, config, self._datasource_manager + ) return CreateDataloadingJobResponse.from_dict({"job_id": job_id}) def get_dataloading_job_config(self, graph_id: str) -> DataloadingJobConfig: - return DataloadingJobConfig.from_dict(self._client.get_dataloading_job_config(graph_id)) + return DataloadingJobConfig.from_dict( + self._client.get_dataloading_job_config(graph_id) + ) def upload_file(self, filestorage) -> str: if CLUSTER_TYPE == "HOSTS": @@ -342,32 +262,27 @@ def upload_file(self, filestorage) -> str: filestorage.save(filepath) return UploadFileResponse.from_dict({"file_path": filepath}) - # def create_groot_dataloading_job( - # self, graph_name: str, job_config: GrootDataloadingJobConfig - # ) -> str: - # job_id = self._client.create_groot_dataloading_job( - # graph_name, job_config.to_dict() - # ) - # return job_id - - # def list_groot_graph(self) -> List[GrootGraph]: - # graphs = self._client.list_groot_graph() - # # transfer - # rlts = [GrootGraph.from_dict(g) for g in graphs] - # return rlts - - def bind_datasource_in_batch(self, graph_id: str, schema_mapping: SchemaMapping) -> str: + def bind_datasource_in_batch( + self, graph_id: str, schema_mapping: SchemaMapping + ) -> str: # there are some tricks here, since property is a keyword of openapi # specification, so it will be converted into the _property field. schema_mapping_dict = schema_mapping.to_dict() for mapping in itertools.chain( - schema_mapping_dict["vertex_mappings"], schema_mapping_dict["edge_mappings"] + schema_mapping_dict["vertex_mappings"], + schema_mapping_dict["edge_mappings"], ): for column_mapping in mapping["column_mappings"]: if "_property" in column_mapping: column_mapping["property"] = column_mapping.pop("_property") - if "source_vertex_mappings" in mapping and "destination_vertex_mappings" in mapping: - for column_mapping in itertools.chain(mapping["source_vertex_mappings"], mapping["destination_vertex_mappings"]): + if ( + "source_vertex_mappings" in mapping + and "destination_vertex_mappings" in mapping + ): + for column_mapping in itertools.chain( + mapping["source_vertex_mappings"], + mapping["destination_vertex_mappings"], + ): if "_property" in column_mapping: column_mapping["property"] = column_mapping.pop("_property") self._datasource_manager.bind_datasource_in_batch(graph_id, schema_mapping_dict) diff --git a/flex/coordinator/gs_flex_coordinator/core/config.py b/flex/coordinator/gs_flex_coordinator/core/config.py index 454d68598a22..8f4bd52cabcf 100644 --- a/flex/coordinator/gs_flex_coordinator/core/config.py +++ b/flex/coordinator/gs_flex_coordinator/core/config.py @@ -17,7 +17,6 @@ # import datetime -import logging import os import tempfile @@ -51,8 +50,9 @@ def str_to_bool(s): os.makedirs(DATASET_WORKSPACE, exist_ok=True) -# we use the solution encompasses the various applications and use cases of the -# product across different industries and business scenarios, e.g. "INTERACTIVE", +# we use the solution encompasses the various applications +# and use cases of the product across different industries +# and business scenarios, e.g. "INTERACTIVE", # "GRAPHSCOPE INSIGHT". SOLUTION = os.environ["SOLUTION"] diff --git a/flex/coordinator/gs_flex_coordinator/core/datasource.py b/flex/coordinator/gs_flex_coordinator/core/datasource.py index 2869b22199bd..2da5c7e01256 100644 --- a/flex/coordinator/gs_flex_coordinator/core/datasource.py +++ b/flex/coordinator/gs_flex_coordinator/core/datasource.py @@ -37,7 +37,8 @@ def try_to_recover_from_disk(self): try: if os.path.exists(self._pickle_path): logging.info( - "Recover data source mapping from file %s", self._pickle_path + "Recover data source mapping from file %s", + self._pickle_path, ) with open(self._pickle_path, "rb") as f: self._datasource_mapping = pickle.load(f) @@ -52,7 +53,10 @@ def dump_to_disk(self): logging.warn("Failed to dump data source mapping: %s", str(e)) def get_edge_full_label( - self, type_name: str, source_vertex_type: str, destination_vertex_type: str + self, + type_name: str, + source_vertex_type: str, + destination_vertex_type: str, ) -> str: return f"{source_vertex_type}_{type_name}_{destination_vertex_type}" diff --git a/flex/coordinator/gs_flex_coordinator/core/insight/graph.py b/flex/coordinator/gs_flex_coordinator/core/insight/graph.py index 54f2b087afa7..882e96899026 100644 --- a/flex/coordinator/gs_flex_coordinator/core/insight/graph.py +++ b/flex/coordinator/gs_flex_coordinator/core/insight/graph.py @@ -20,24 +20,31 @@ import logging import os import time -from abc import ABCMeta, abstractmethod +from abc import ABCMeta +from abc import abstractmethod import graphscope from dateutil import tz -from graphscope.deploy.kubernetes.utils import (get_service_endpoints, - resolve_api_client) +from graphscope.deploy.kubernetes.utils import get_service_endpoints +from graphscope.deploy.kubernetes.utils import resolve_api_client from gremlin_python.driver.client import Client from kubernetes import client as kube_client from kubernetes import config as kube_config -from gs_flex_coordinator.core.config import (CLUSTER_TYPE, CREATION_TIME, - ENABLE_DNS, GROOT_GREMLIN_PORT, - GROOT_GRPC_PORT, GROOT_PASSWORD, - GROOT_USERNAME, INSTANCE_NAME, - NAMESPACE, WORKSPACE) +from gs_flex_coordinator.core.config import CLUSTER_TYPE +from gs_flex_coordinator.core.config import CREATION_TIME +from gs_flex_coordinator.core.config import ENABLE_DNS +from gs_flex_coordinator.core.config import GROOT_GREMLIN_PORT +from gs_flex_coordinator.core.config import GROOT_GRPC_PORT +from gs_flex_coordinator.core.config import GROOT_PASSWORD +from gs_flex_coordinator.core.config import GROOT_USERNAME +from gs_flex_coordinator.core.config import INSTANCE_NAME +from gs_flex_coordinator.core.config import NAMESPACE +from gs_flex_coordinator.core.config import WORKSPACE from gs_flex_coordinator.core.scheduler import schedule -from gs_flex_coordinator.core.utils import (data_type_to_groot, - encode_datetime, get_internal_ip) +from gs_flex_coordinator.core.utils import data_type_to_groot +from gs_flex_coordinator.core.utils import encode_datetime +from gs_flex_coordinator.core.utils import get_internal_ip from gs_flex_coordinator.version import __version__ @@ -260,7 +267,7 @@ def delete_vertex_type(self, graph_name: str, vertex_type: str): or vertex_type == relation["dst_label"] ): raise RuntimeError( - "Can not delete '{0}' type, cause exists in edge type '{1}'".format( + "Can not delete '{0}' type, cause exists in edge '{1}'".format( vertex_type, edge_schema["label"] ), ) @@ -295,7 +302,7 @@ def get_groot_graph_from_local(): client.submit( "g.with('evaluationTimeout', 5000).V().limit(1)" ).all().result() - except Exception as e: # noqa: B110 + except Exception: # noqa: B110 pass else: break @@ -316,8 +323,8 @@ def get_groot_graph_from_k8s(): app_api = kube_client.AppsV1Api(api_client) # frontend statefulset and service name name = "{0}-graphscope-store-frontend".format(INSTANCE_NAME) - respoonse = app_api.read_namespaced_stateful_set(name, NAMESPACE) - labels = response.metadata.labels + response = app_api.read_namespaced_stateful_set(name, NAMESPACE) + app_api.read_namespaced_stateful_set(name, NAMESPACE) # creation time creation_time = response.metadata.creation_timestamp.astimezone( tz.tzlocal() diff --git a/flex/coordinator/gs_flex_coordinator/core/insight/groot.py b/flex/coordinator/gs_flex_coordinator/core/insight/groot.py index 4e63ccf5a058..07e36be36c75 100644 --- a/flex/coordinator/gs_flex_coordinator/core/insight/groot.py +++ b/flex/coordinator/gs_flex_coordinator/core/insight/groot.py @@ -16,374 +16,12 @@ # limitations under the License. # -# import itertools -# import logging -# import os -# import pickle -# from typing import List - -# from gs_flex_coordinator.core.config import (CLUSTER_TYPE, INSTANCE_NAME, - # WORKSPACE) -# from gs_flex_coordinator.core.insight.graph import get_groot_graph -# from gs_flex_coordinator.core.insight.job import DataloadingJobScheduler -# from gs_flex_coordinator.core.scheduler import schedule -# from gs_flex_coordinator.models import JobStatus - class GrootClient(object): """Class used to interact with Groot""" def __init__(self): pass - # self._graph = get_groot_graph() - # # workspace - # self._workspace = os.path.join(WORKSPACE, "groot") - # os.makedirs(self._workspace, exist_ok=True) - # # data source - # self._data_source = {"vertices_datasource": {}, "edges_datasource": {}} - # # pickle path - # self._datasource_pickle_path = os.path.join( - # self._workspace, "datasource.pickle" - # ) - # # job - # self._job_scheduler = {} - # # job status - # self._job_status = {} - # # pickle path - # self._job_status_pickle_path = os.path.join( - # self._workspace, "job_status.pickle" - # ) - # # recover - # self._try_to_recover_from_disk() - # # dump job status to disk every 10s - # self._pickle_job_status_job = ( - # schedule.every(10) - # .seconds.do(self._pickle_job_status_impl) - # .tag("pickle", "job status") - # ) - - # def _try_to_recover_from_disk(self): - # try: - # if os.path.exists(self._datasource_pickle_path): - # logging.info( - # "Recover data source from file %s", self._datasource_pickle_path - # ) - # with open(self._datasource_pickle_path, "rb") as f: - # self._data_source = pickle.load(f) - # except Exception as e: - # logging.warn("Failed to recover data source: %s", str(e)) - - # try: - # if os.path.exists(self._job_status_pickle_path): - # logging.info( - # "Recover job status from file %s", self._job_status_pickle_path - # ) - # with open(self._job_status_pickle_path, "rb") as f: - # data = pickle.load(f) - # for jobid, status in data.items(): - # self._job_status[jobid] = JobStatus.from_dict(status) - # except Exception as e: - # logging.warn("Failed to recover job status: %s", str(e)) - - # def _pickle_datasource_impl(self): - # try: - # with open(self._datasource_pickle_path, "wb") as f: - # pickle.dump(self._data_source, f) - # except Exception as e: - # logging.warn("Failed to dump data source: %s", str(e)) - - # def _pickle_job_status_impl(self): - # try: - # rlt = {} - # for jobid, status in self._job_status.items(): - # rlt[jobid] = status.to_dict() - # with open(self._job_status_pickle_path, "wb") as f: - # pickle.dump(rlt, f) - # except Exception as e: - # logging.warn("Failed to dump job status: %s", str(e)) - - # def get_edge_full_label( - # self, type_name: str, source_vertex_type: str, destination_vertex_type: str - # ) -> str: - # return f"{source_vertex_type}_{type_name}_{destination_vertex_type}" - - # def get_current_graph(self): - # return self._graph - - # def list_groot_graph(self) -> list: - # rlts = [self._graph.to_dict()] - # return rlts - - # def create_vertex_type(self, graph_name: str, vtype_dict: dict) -> str: - # return self._graph.create_vertex_type(vtype_dict) - - # def create_edge_type(self, graph_name: str, etype_dict: dict) -> str: - # return self._graph.create_edge_type(etype_dict) - - # def delete_vertex_type(self, graph_name: str, vertex_type: str) -> str: - # rlt = self._graph.delete_vertex_type(graph_name, vertex_type) - # # unbind data source - # if vertex_type in self._data_source["vertices_datasource"]: - # del self._data_source["vertices_datasource"][vertex_type] - # self._pickle_datasource_impl() - # return rlt - - # def delete_edge_type( - # self, - # graph_name: str, - # edge_type: str, - # source_vertex_type: str, - # destination_vertex_type: str, - # ) -> str: - # rlt = self._graph.delete_edge_type( - # graph_name, edge_type, source_vertex_type, destination_vertex_type - # ) - # # unbind data source - # edge_label = self.get_edge_full_label( - # edge_type, source_vertex_type, destination_vertex_type - # ) - # if edge_label in self._data_source["edges_datasource"]: - # del self._data_source["edges_datasource"][edge_label] - # self._pickle_datasource_impl() - # return rlt - - # def get_groot_schema(self, graph_name: str) -> dict: - # return self._graph.schema - - # def import_groot_schema(self, graph_name: str, schema: dict) -> str: - # def _data_type_to_groot(dt): - # if dt == "DT_DOUBLE": - # return "DOUBLE" - # elif dt == "DT_SIGNED_INT64": - # return "LONG" - # elif dt == "DT_STRING": - # return "STRING" - # else: - # return dt - - # # transfer to groot data type - # for item in itertools.chain(schema["vertices"], schema["edges"]): - # for p in item["properties"]: - # p["type"] = _data_type_to_groot(p["type"]) - # return self._graph.import_schema(schema) - - # def list_jobs(self) -> List[dict]: - # rlt = [] - # for jobid, status in self._job_status.items(): - # rlt.append(status.to_dict()) - # return rlt - - # def import_datasource(self, graph_name: str, data_source: dict) -> str: - # for vertex_data_source in data_source["vertices_datasource"]: - # self._data_source["vertices_datasource"][ - # vertex_data_source["type_name"] - # ] = vertex_data_source - # for edge_data_source in data_source["edges_datasource"]: - # edge_label = self.get_edge_full_label( - # edge_data_source["type_name"], - # edge_data_source["source_vertex"], - # edge_data_source["destination_vertex"], - # ) - # self._data_source["edges_datasource"][edge_label] = edge_data_source - # self._pickle_datasource_impl() - - # def get_service_status(self) -> dict: - # return { - # "status": "running", - # "graph_name": self._graph.name, - # "sdk_endpoints": { - # "gremlin": self._graph.gremlin_interface["gremlin_endpoint"], - # "grpc": self._graph.gremlin_interface["grpc_endpoint"], - # }, - # } - - # def get_datasource(self, graph_name: str) -> dict: - # rlts = {"vertices_datasource": [], "edges_datasource": []} - # for _, v in self._data_source["vertices_datasource"].items(): - # rlts["vertices_datasource"].append(v) - # for _, e in self._data_source["edges_datasource"].items(): - # rlts["edges_datasource"].append(e) - # return rlts - - # def bind_vertex_datasource(self, graph_name: str, vertex_data_source: dict) -> str: - # self._data_source["vertices_datasource"][ - # vertex_data_source["type_name"] - # ] = vertex_data_source - # self._pickle_datasource_impl() - # return "Bind vertex data source successfully" - - # def bind_edge_datasource(self, graph_name: str, edge_data_source: dict) -> str: - # edge_label = self.get_edge_full_label( - # edge_data_source["type_name"], - # edge_data_source["source_vertex"], - # edge_data_source["destination_vertex"], - # ) - # self._data_source["edges_datasource"][edge_label] = edge_data_source - # self._pickle_datasource_impl() - # return "Bind edge data source successfully" - - # def get_vertex_datasource(self, graph_name: str, vertex_type: str) -> dict: - # if vertex_type not in self._data_source["vertices_datasource"]: - # raise RuntimeError( - # f"Vertex type {vertex_type} does not bind any data source" - # ) - # return self._data_source["vertices_datasource"][vertex_type] - - # def get_edge_datasource( - # self, - # graph_name: str, - # edge_type: str, - # source_vertex_type: str, - # destination_vertex_type: str, - # ) -> dict: - # edge_label = self.get_edge_full_label( - # edge_type, source_vertex_type, destination_vertex_type - # ) - # if edge_label not in self._data_source["edges_datasource"]: - # raise RuntimeError(f"Edge type {edge_label} does not bind any data source") - # return self._data_source["edges_datasource"][edge_label] - - # def unbind_vertex_datasource(self, graph_name: str, vertex_type: str) -> str: - # # check - # vertex_type_exists = False - # schema = self._graph.schema - # for v in schema["vertices"]: - # if vertex_type == v["label"]: - # vertex_type_exists = True - # break - # if not vertex_type_exists: - # raise RuntimeError(f"Vertex type {vertex_type} not exists") - # if vertex_type in self._data_source["vertices_datasource"]: - # del self._data_source["vertices_datasource"][vertex_type] - # self._pickle_datasource_impl() - # return "unbind data source successfully" - - # def unbind_edge_datasource( - # self, - # graph_name: str, - # edge_type: str, - # source_vertex_type: str, - # destination_vertex_type: str, - # ) -> str: - # # check - # edge_type_exists = False - # schema = self._graph.schema - # for e in schema["edges"]: - # for relation in e["relations"]: - # if ( - # edge_type == e["label"] - # and source_vertex_type == relation["src_label"] - # and destination_vertex_type == relation["dst_label"] - # ): - # edge_type_exists = True - # break - # if not edge_type_exists: - # raise RuntimeError( - # f"Edge type ({source_vertex_type})-[{edge_type}]->({destination_vertex_type}) not exists" - # ) - # edge_label = self.get_edge_full_label( - # edge_type, source_vertex_type, destination_vertex_type - # ) - # if edge_label in self._data_source["edges_datasource"]: - # del self._data_source["edges_datasource"][edge_label] - # self._pickle_datasource_impl() - # return "unbind data source successfully" - - # def create_groot_dataloading_job(self, graph_name: str, job_config: dict) -> str: - # dataloading_job_scheduler = DataloadingJobScheduler( - # job_config=job_config, - # data_source=self._data_source, - # job_scheduler=self._job_scheduler, - # job_status=self._job_status, - # graph=self._graph, - # ) - # return dataloading_job_scheduler.schedulerid - - # def get_job_by_id(self, job_id: str) -> dict: - # if job_id not in self._job_status: - # raise RuntimeError(f"Job {job_id} not found") - # return self._job_status[job_id].to_dict() - - # def delete_job_by_id(self, job_id: str) -> str: - # if job_id not in self._job_status: - # raise RuntimeError(f"Job {job_id} not found") - # if job_id in self._job_scheduler: - # # we don't have some processes in case of restart the coordinator - # # some processes will not exist if the coordinator is restart - # self._job_scheduler[job_id].cancel() - # return f"Submit cancellation job successfully" - - # def get_dataloading_config(self, graph_name: str) -> dict: - # config = { - # "graph": INSTANCE_NAME, - # "loading_config": {}, - # "vertex_mappings": [], - # "edge_mappings": [], - # } - # # transfer - # for vtype, ds in self._data_source["vertices_datasource"].items(): - # column_mappings = [] - # if ds["property_mapping"] is not None: - # for index, property_name in ds["property_mapping"].items(): - # column_mappings.append( - # { - # "column": { - # "index": int(index), - # }, - # "property": property_name, - # } - # ) - # config["vertex_mappings"].append( - # { - # "type_name": vtype, - # "inputs": [ds["location"]], - # "column_mappings": column_mappings, - # } - # ) - # for etype, ds in self._data_source["edges_datasource"].items(): - # source_vertex_mappings = [] - # for index, _ in ds["source_pk_column_map"].items(): - # source_vertex_mappings.append( - # { - # "column": { - # "index": int(index), - # } - # } - # ) - # destination_vertex_mappings = [] - # for index, _ in ds["destination_pk_column_map"].items(): - # destination_vertex_mappings.append( - # { - # "column": { - # "index": int(index), - # } - # } - # ) - # column_mappings = [] - # if ds["property_mapping"] is not None: - # for index, property_name in ds["property_mapping"].items(): - # column_mappings.append( - # { - # "column": { - # "index": int(index), - # }, - # "property": property_name, - # } - # ) - # config["edge_mappings"].append( - # { - # "type_triplet": { - # "edge": ds["type_name"], - # "source_vertex": ds["source_vertex"], - # "destination_vertex": ds["destination_vertex"], - # }, - # "inputs": [ds["location"]], - # "source_vertex_mappings": source_vertex_mappings, - # "destination_vertex_mappings": destination_vertex_mappings, - # "column_mappings": column_mappings, - # } - # ) - # return config def init_groot_client(): diff --git a/flex/coordinator/gs_flex_coordinator/core/insight/job.py b/flex/coordinator/gs_flex_coordinator/core/insight/job.py index de4ced454095..a69555fc09fb 100644 --- a/flex/coordinator/gs_flex_coordinator/core/insight/job.py +++ b/flex/coordinator/gs_flex_coordinator/core/insight/job.py @@ -17,10 +17,12 @@ # import pandas as pd -from graphscope.framework.record import EdgeRecordKey, VertexRecordKey +from graphscope.framework.record import EdgeRecordKey +from graphscope.framework.record import VertexRecordKey from gs_flex_coordinator.core.scheduler import Scheduler -from gs_flex_coordinator.core.utils import encode_datetime, get_current_time +from gs_flex_coordinator.core.utils import encode_datetime +from gs_flex_coordinator.core.utils import get_current_time from gs_flex_coordinator.models import JobStatus @@ -56,7 +58,10 @@ def _construct_detailed_info(self): return detail def get_edge_full_label( - self, type_name: str, source_vertex_type: str, destination_vertex_type: str + self, + type_name: str, + source_vertex_type: str, + destination_vertex_type: str, ) -> str: return f"{source_vertex_type}_{type_name}_{destination_vertex_type}" @@ -66,7 +71,9 @@ def _import_data_from_local_file(self): for vlabel in self._job_config["vertices"]: primary_key = self._graph.get_vertex_primary_key(vlabel) datasource = self._data_source["vertices_datasource"][vlabel] - data = pd.read_csv(datasource["location"], sep=",|\|", engine="python") + data = pd.read_csv( + datasource["location"], sep=",|\|", engine="python" + ) # noqa: W605 for record in data.itertuples(index=False): primary_key_dict = {} property_mapping = {} @@ -76,7 +83,10 @@ def _import_data_from_local_file(self): else: property_mapping[v] = record[int(k)] vertices.append( - [VertexRecordKey(vlabel, primary_key_dict), property_mapping] + [ + VertexRecordKey(vlabel, primary_key_dict), + property_mapping, + ] ) edges = [] for e in self._job_config["edges"]: @@ -101,7 +111,8 @@ def _import_data_from_local_file(self): e["type_name"], VertexRecordKey(e["source_vertex"], source_pk_column_map), VertexRecordKey( - e["destination_vertex"], destination_pk_column_map + e["destination_vertex"], + destination_pk_column_map, ), ), property_mapping, diff --git a/flex/coordinator/gs_flex_coordinator/core/interactive/hqps.py b/flex/coordinator/gs_flex_coordinator/core/interactive/hqps.py index bbdbc2c4ac9c..993036ec7cf6 100644 --- a/flex/coordinator/gs_flex_coordinator/core/interactive/hqps.py +++ b/flex/coordinator/gs_flex_coordinator/core/interactive/hqps.py @@ -21,27 +21,24 @@ import os import pickle import time -import requests -from typing import List, Union +from typing import List +from typing import Union import interactive_sdk.openapi +import requests from interactive_sdk.openapi import CreateGraphRequest from interactive_sdk.openapi import CreateProcedureRequest -from interactive_sdk.openapi import UpdateProcedureRequest -from interactive_sdk.openapi import StartServiceRequest from interactive_sdk.openapi import SchemaMapping +from interactive_sdk.openapi import StartServiceRequest +from interactive_sdk.openapi import UpdateProcedureRequest +from gs_flex_coordinator.core.config import CLUSTER_TYPE +from gs_flex_coordinator.core.config import HQPS_ADMIN_SERVICE_PORT +from gs_flex_coordinator.core.config import WORKSPACE from gs_flex_coordinator.core.datasource import DataSourceManager -from gs_flex_coordinator.core.config import ( - CLUSTER_TYPE, - HQPS_ADMIN_SERVICE_PORT, - WORKSPACE, -) -from gs_flex_coordinator.core.utils import ( - encode_datetime, - get_internal_ip, - get_public_ip, -) +from gs_flex_coordinator.core.utils import encode_datetime +from gs_flex_coordinator.core.utils import get_internal_ip +from gs_flex_coordinator.core.utils import get_public_ip class HQPSClient(object): @@ -62,13 +59,14 @@ def dump_to_disk(self): with open(self._job_config_pickle_path, "wb") as f: pickle.dump(self._job_config, f) except Exception as e: - logging.warn("Failed to dump job config file: %s", str(e)) + logging.warn("Failed to dump job config file: %s", str(e)) def try_to_recover_from_disk(self): try: if os.path.exists(self._job_config_pickle_path): logging.info( - "Recover job config from file %s", self._job_config_pickle_path + "Recover job config from file %s", + self._job_config_pickle_path, ) with open(self._job_config_pickle_path, "rb") as f: self._job_config = pickle.load(f) @@ -90,7 +88,9 @@ def list_graphs(self) -> List[dict]: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi( + api_client + ) graphs = [g.to_dict() for g in api_instance.list_graphs()] for g in graphs: # `schema_update_time` is same to `creation_time` in Interactive @@ -105,14 +105,18 @@ def get_schema_by_id(self, graph_id: str) -> dict: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi( + api_client + ) return api_instance.get_schema(graph_id).to_dict() def create_graph(self, graph: dict) -> dict: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi( + api_client + ) response = api_instance.create_graph(CreateGraphRequest.from_dict(graph)) return response.to_dict() @@ -120,7 +124,9 @@ def delete_graph_by_id(self, graph_id: str) -> str: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi( + api_client + ) rlt = api_instance.delete_graph(graph_id) return rlt @@ -128,8 +134,9 @@ def get_graph_by_id(self, graph_id: str) -> dict: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi(api_client) - a = api_instance.get_graph(graph_id) + api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi( + api_client + ) g = api_instance.get_graph(graph_id).to_dict() # `schema_update_time` is same to `creation_time` in Interactive g["schema_update_time"] = g["creation_time"] @@ -148,14 +155,22 @@ def create_edge_type(self, graph_id: str, etype: dict): def delete_vertex_type_by_name(self, graph_id: str, type_name: str): raise RuntimeError("Create vertex type is not supported yet!") - def delete_edge_type_by_name(self, graph_id: str, edge_type: str, source_vertex_type: str, destination_vertex_type: str): + def delete_edge_type_by_name( + self, + graph_id: str, + edge_type: str, + source_vertex_type: str, + destination_vertex_type: str, + ): raise RuntimeError("Create vertex type is not supported yet!") def create_procedure(self, graph_id: str, procedure: dict) -> dict: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceProcedureManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceProcedureManagementApi( + api_client + ) response = api_instance.create_procedure( graph_id, CreateProcedureRequest.from_dict(procedure) ) @@ -166,7 +181,9 @@ def list_procedures(self, graph_id: str) -> List[dict]: interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: procedures = [] - api_instance = interactive_sdk.openapi.AdminServiceProcedureManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceProcedureManagementApi( + api_client + ) procedures = [p.to_dict() for p in api_instance.list_procedures(graph_id)] return procedures @@ -176,30 +193,40 @@ def update_procedure_by_id( with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceProcedureManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceProcedureManagementApi( + api_client + ) return api_instance.update_procedure( - graph_id, procedure_id, UpdateProcedureRequest.from_dict(procedure) + graph_id, + procedure_id, + UpdateProcedureRequest.from_dict(procedure), ) def delete_procedure_by_id(self, graph_id: str, procedure_id: str) -> str: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceProcedureManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceProcedureManagementApi( + api_client + ) return api_instance.delete_procedure(graph_id, procedure_id) def get_procedure_by_id(self, graph_id: str, procedure_id: str) -> dict: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceProcedureManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceProcedureManagementApi( + api_client + ) return api_instance.get_procedure(graph_id, procedure_id).to_dict() def get_service_status(self) -> dict: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceServiceManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceServiceManagementApi( + api_client + ) response = api_instance.get_service_status() # transfer if CLUSTER_TYPE == "HOSTS": @@ -221,33 +248,40 @@ def get_service_status(self) -> dict: status["graph"] = graph return status - def stop_service(self) -> str: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceServiceManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceServiceManagementApi( + api_client + ) return api_instance.stop_service() def restart_service(self) -> str: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceServiceManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceServiceManagementApi( + api_client + ) return api_instance.restart_service() def start_service(self, graph_id: str) -> str: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceServiceManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceServiceManagementApi( + api_client + ) return api_instance.start_service(StartServiceRequest(graph_id=graph_id)) def list_jobs(self) -> List[dict]: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceJobManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceJobManagementApi( + api_client + ) rlt = [] for s in api_instance.list_jobs(): job_status = s.to_dict() @@ -265,7 +299,9 @@ def get_job_by_id(self, job_id: str) -> dict: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceJobManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceJobManagementApi( + api_client + ) job_status = api_instance.get_job_by_id(job_id).to_dict() job_status["start_time"] = encode_datetime( datetime.datetime.fromtimestamp(job_status["start_time"] / 1000) @@ -280,7 +316,9 @@ def delete_job_by_id(self, job_id: str) -> str: with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceJobManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceJobManagementApi( + api_client + ) return api_instance.delete_job_by_id(job_id) def submit_dataloading_job( @@ -297,7 +335,12 @@ def submit_dataloading_job( if vds: schema_mapping["vertex_mappings"].append(vds) for e in config["edges"]: - eds = ds_manager.get_edge_datasource(graph_id, e["type_name"], e["source_vertex"], e["destination_vertex"]) + eds = ds_manager.get_edge_datasource( + graph_id, + e["type_name"], + e["source_vertex"], + e["destination_vertex"], + ) if eds: schema_mapping["edge_mappings"].append(eds) # set job configuration before submission @@ -307,7 +350,9 @@ def submit_dataloading_job( with interactive_sdk.openapi.ApiClient( interactive_sdk.openapi.Configuration(self._hqps_endpoint) ) as api_client: - api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi(api_client) + api_instance = interactive_sdk.openapi.AdminServiceGraphManagementApi( + api_client + ) response = api_instance.create_dataloading_job( graph_id, SchemaMapping.from_dict(schema_mapping) ) diff --git a/flex/coordinator/gs_flex_coordinator/core/scheduler.py b/flex/coordinator/gs_flex_coordinator/core/scheduler.py index b254fe305ce2..1e7af5d85910 100644 --- a/flex/coordinator/gs_flex_coordinator/core/scheduler.py +++ b/flex/coordinator/gs_flex_coordinator/core/scheduler.py @@ -17,10 +17,10 @@ # import datetime -import json import random import time -from abc import ABCMeta, abstractmethod +from abc import ABCMeta +from abc import abstractmethod from string import ascii_uppercase import schedule @@ -38,9 +38,10 @@ class Schedule(object): def __init__(self): self._schedule = schedule.Scheduler() - self._run_pending_thread = StoppableThread(target=self.run_pending, args=()) - self._run_pending_thread.daemon = True - self._run_pending_thread.start() + # thread + self._run_pending_trd = StoppableThread(target=self.run_pending, args=()) + self._run_pending_trd.daemon = True + self._run_pending_trd.start() @property def schedule(self): @@ -80,13 +81,14 @@ def __init__(self, at_time, repeat): # repeat every day or week, or run job once(no repeat) # optional value "day", "week", "once" self._repeat = repeat - # job running thread, note that: - # the last job should be end of execution at the beginning of the next job + # job running thread, note that the last job should be + # end of execution at the beginning of the next job self._running_thread = None # tags self._tags = [] - # when the job actually scheduled, the following variables will be generated and overridden. + # the following variables will be generated and overridden + # when the job actually scheduled self._jobid = None self._last_run = None @@ -129,9 +131,7 @@ def sunday(self): @property def timestr(self): - """return str of the time object. - time([hour[, minute[, second[, microsecond[, tzinfo]]]]]) --> a time object - """ + """return str of the time object.""" return str(self._at_time.time()) @property @@ -185,9 +185,14 @@ def do_run(self): self._running_thread.start() def submit(self): - if not self._run_now and self._repeat not in ["week", "day", "once", None]: + if not self._run_now and self._repeat not in [ + "week", + "day", + "once", + None, + ]: raise RuntimeError( - "Submit schedule job failed: at_time is '{0}', repeat is '{1}'".format( + "Submit schedule job failed: at_time: '{0}', repeat: '{1}'".format( self._at_time, self._repeat ) ) diff --git a/flex/coordinator/gs_flex_coordinator/core/stoppable_thread.py b/flex/coordinator/gs_flex_coordinator/core/stoppable_thread.py index 34b332a2d208..be4fdee0313b 100644 --- a/flex/coordinator/gs_flex_coordinator/core/stoppable_thread.py +++ b/flex/coordinator/gs_flex_coordinator/core/stoppable_thread.py @@ -34,5 +34,5 @@ def stop(self): self._stop_event.set() def stopped(self): - """The thread itself should check regularly for the stopped() condition.""" + """Thread itself should check regularly for the stopped() condition.""" return self._stop_event.is_set() diff --git a/flex/coordinator/pyproject.toml b/flex/coordinator/pyproject.toml new file mode 100644 index 000000000000..8bb6ee5f516f --- /dev/null +++ b/flex/coordinator/pyproject.toml @@ -0,0 +1,2 @@ +[tool.black] +line-length = 88 diff --git a/flex/coordinator/requirements.txt b/flex/coordinator/requirements.txt index 5018f3d675f0..f3d3b4171b85 100644 --- a/flex/coordinator/requirements.txt +++ b/flex/coordinator/requirements.txt @@ -9,4 +9,4 @@ pydantic >= 2 typing-extensions >= 4.7.1 psutil schedule -graphscope-client >= 0.26.0 +interactive-sdk == 0.0.3 diff --git a/flex/coordinator/setup.cfg b/flex/coordinator/setup.cfg new file mode 100644 index 000000000000..194d641fbbc2 --- /dev/null +++ b/flex/coordinator/setup.cfg @@ -0,0 +1,8 @@ +[isort] +ensure_newline_before_comments = True +line_length = 88 +force_single_line = True + +[flake8] +max-line-length = 88 +extend-ignore = E203,F401,F401,F403,C4,I250,E402,W605 diff --git a/flex/coordinator/setup.py b/flex/coordinator/setup.py index 54df7584d2d9..78abe0932790 100644 --- a/flex/coordinator/setup.py +++ b/flex/coordinator/setup.py @@ -73,6 +73,47 @@ def run(self): ) +class FormatAndLint(Command): + description = "format and lint code" + user_options = [] + + user_options = [("inplace=", "i", "Run code formatter and linter inplace")] + + def initialize_options(self): + self.inplace = False + + def finalize_options(self): + if self.inplace or self.inplace == "True" or self.inplace == "true": + self.inplace = True + else: + self.inplace = False + + def run(self): + codedir = os.path.join(pkg_root, "gs_flex_coordinator", "core") + if self.inplace: + subprocess.check_call( + [sys.executable, "-m", "isort", codedir], cwd=pkg_root + ) + subprocess.check_call( + [sys.executable, "-m", "black", codedir], cwd=pkg_root + ) + subprocess.check_call( + [sys.executable, "-m", "flake8", codedir], cwd=pkg_root + ) + else: + subprocess.check_call( + [sys.executable, "-m", "isort", "--check", "--diff", codedir], + cwd=pkg_root, + ) + subprocess.check_call( + [sys.executable, "-m", "black", "--check", "--diff", codedir], + cwd=pkg_root, + ) + subprocess.check_call( + [sys.executable, "-m", "flake8", codedir], cwd=pkg_root + ) + + setup( name=NAME, version=VERSION, @@ -85,6 +126,7 @@ def run(self): package_data={"": ["openapi/openapi.yaml", "VERSION"]}, cmdclass={ "generate_flex_server": GenerateFlexServer, + "lint": FormatAndLint, }, include_package_data=True, entry_points={ diff --git a/python/graphscope/gsctl/commands/__init__.py b/python/graphscope/gsctl/commands/__init__.py index a42f03ad55cf..823cc85e486c 100644 --- a/python/graphscope/gsctl/commands/__init__.py +++ b/python/graphscope/gsctl/commands/__init__.py @@ -22,6 +22,7 @@ from graphscope.gsctl.commands.common import cli as common from graphscope.gsctl.commands.dev import cli as dev + # from graphscope.gsctl.commands.insight.graph import cli as insight_graph from graphscope.gsctl.commands.interactive.glob import cli as interactive from graphscope.gsctl.commands.interactive.graph import cli as interactive_graph diff --git a/python/graphscope/gsctl/commands/interactive/glob.py b/python/graphscope/gsctl/commands/interactive/glob.py index 0f81283310e9..81f2d334b5af 100644 --- a/python/graphscope/gsctl/commands/interactive/glob.py +++ b/python/graphscope/gsctl/commands/interactive/glob.py @@ -21,6 +21,8 @@ from graphscope.gsctl.impl import create_graph from graphscope.gsctl.impl import delete_graph_by_id +from graphscope.gsctl.impl import get_datasource_by_id +from graphscope.gsctl.impl import get_graph_id_by_name from graphscope.gsctl.impl import get_service_status from graphscope.gsctl.impl import list_graphs from graphscope.gsctl.impl import list_jobs @@ -29,8 +31,6 @@ from graphscope.gsctl.impl import start_service from graphscope.gsctl.impl import stop_service from graphscope.gsctl.impl import switch_context -from graphscope.gsctl.impl import get_datasource_by_id -from graphscope.gsctl.impl import get_graph_id_by_name from graphscope.gsctl.utils import TreeDisplay from graphscope.gsctl.utils import err from graphscope.gsctl.utils import info @@ -183,7 +183,13 @@ def ls(): # noqa: F811 """Display current service status""" def _construct_and_display_data(status): - head = ["STATUS", "SERVING_GRAPH(IDENTIFIER)", "CYPHER_ENDPOINT", "HQPS_ENDPOINT", "GREMLIN_ENDPOINT"] + head = [ + "STATUS", + "SERVING_GRAPH(IDENTIFIER)", + "CYPHER_ENDPOINT", + "HQPS_ENDPOINT", + "GREMLIN_ENDPOINT", + ] data = [head] if status.status == "Stopped": data.append([status.status, "-", "-", "-", "-"]) diff --git a/python/graphscope/gsctl/commands/interactive/graph.py b/python/graphscope/gsctl/commands/interactive/graph.py index 5e9ab72e3ce6..ca87b69f9a8a 100644 --- a/python/graphscope/gsctl/commands/interactive/graph.py +++ b/python/graphscope/gsctl/commands/interactive/graph.py @@ -20,20 +20,20 @@ import yaml from graphscope.gsctl.config import get_current_context -from graphscope.gsctl.impl import submit_dataloading_job +from graphscope.gsctl.impl import bind_datasource_in_batch from graphscope.gsctl.impl import create_procedure from graphscope.gsctl.impl import delete_job_by_id from graphscope.gsctl.impl import delete_procedure_by_id +from graphscope.gsctl.impl import get_datasource_by_id +from graphscope.gsctl.impl import get_graph_id_by_name from graphscope.gsctl.impl import get_job_by_id from graphscope.gsctl.impl import list_graphs from graphscope.gsctl.impl import list_jobs -from graphscope.gsctl.impl import bind_datasource_in_batch -from graphscope.gsctl.impl import get_datasource_by_id from graphscope.gsctl.impl import list_procedures +from graphscope.gsctl.impl import submit_dataloading_job from graphscope.gsctl.impl import switch_context from graphscope.gsctl.impl import unbind_edge_datasource from graphscope.gsctl.impl import unbind_vertex_datasource -from graphscope.gsctl.impl import get_graph_id_by_name from graphscope.gsctl.utils import TreeDisplay from graphscope.gsctl.utils import err from graphscope.gsctl.utils import info @@ -156,7 +156,7 @@ def datasource(filename): # noqa: F811 datasource = read_yaml_file(filename) bind_datasource_in_batch(graph_identifier, datasource) except Exception as e: - err(f"Failed to bind data source: {str(e)}") + err(f"Failed to bind data source: {str(e)}") else: succ("Bind data source successfully.") @@ -194,7 +194,7 @@ def datasource(type, source_vertex_type, destination_vertex_type): # noqa: F811 except Exception as e: err(f"Failed to unbind data source: {str(e)}") else: - succ(f"Unbind data source successfully.") + succ("Unbind data source successfully.") @create.command() diff --git a/python/graphscope/gsctl/impl/__init__.py b/python/graphscope/gsctl/impl/__init__.py index af0c96ae23b5..b2ee74ea67ba 100644 --- a/python/graphscope/gsctl/impl/__init__.py +++ b/python/graphscope/gsctl/impl/__init__.py @@ -18,32 +18,26 @@ from graphscope.gsctl.impl.common import connect_coordinator from graphscope.gsctl.impl.common import disconnect_coordinator - +from graphscope.gsctl.impl.datasource import bind_datasource_in_batch +from graphscope.gsctl.impl.datasource import get_datasource_by_id +from graphscope.gsctl.impl.datasource import unbind_edge_datasource +from graphscope.gsctl.impl.datasource import unbind_vertex_datasource from graphscope.gsctl.impl.graph import create_graph -from graphscope.gsctl.impl.graph import list_graphs from graphscope.gsctl.impl.graph import delete_graph_by_id - -from graphscope.gsctl.impl.service import get_service_status +from graphscope.gsctl.impl.graph import list_graphs +from graphscope.gsctl.impl.job import delete_job_by_id +from graphscope.gsctl.impl.job import get_job_by_id +from graphscope.gsctl.impl.job import list_jobs +from graphscope.gsctl.impl.job import submit_dataloading_job +from graphscope.gsctl.impl.procedure import create_procedure +from graphscope.gsctl.impl.procedure import delete_procedure_by_id +from graphscope.gsctl.impl.procedure import get_procedure_by_id +from graphscope.gsctl.impl.procedure import list_procedures +from graphscope.gsctl.impl.procedure import update_procedure_by_id +from graphscope.gsctl.impl.service import get_service_status from graphscope.gsctl.impl.service import restart_service from graphscope.gsctl.impl.service import start_service from graphscope.gsctl.impl.service import stop_service - +from graphscope.gsctl.impl.utils import get_graph_id_by_name from graphscope.gsctl.impl.utils import switch_context from graphscope.gsctl.impl.utils import upload_file -from graphscope.gsctl.impl.utils import get_graph_id_by_name - -from graphscope.gsctl.impl.procedure import create_procedure -from graphscope.gsctl.impl.procedure import list_procedures -from graphscope.gsctl.impl.procedure import delete_procedure_by_id -from graphscope.gsctl.impl.procedure import update_procedure_by_id -from graphscope.gsctl.impl.procedure import get_procedure_by_id - -from graphscope.gsctl.impl.datasource import bind_datasource_in_batch -from graphscope.gsctl.impl.datasource import get_datasource_by_id -from graphscope.gsctl.impl.datasource import unbind_edge_datasource -from graphscope.gsctl.impl.datasource import unbind_vertex_datasource - -from graphscope.gsctl.impl.job import submit_dataloading_job -from graphscope.gsctl.impl.job import list_jobs -from graphscope.gsctl.impl.job import delete_job_by_id -from graphscope.gsctl.impl.job import get_job_by_id diff --git a/python/graphscope/gsctl/impl/datasource.py b/python/graphscope/gsctl/impl/datasource.py index c0c41ec697d4..a78fc3c1903a 100644 --- a/python/graphscope/gsctl/impl/datasource.py +++ b/python/graphscope/gsctl/impl/datasource.py @@ -16,12 +16,11 @@ # limitations under the License. # -import os import itertools +import os import graphscope.flex.rest from graphscope.flex.rest import SchemaMapping - from graphscope.gsctl.config import get_current_context from graphscope.gsctl.impl.utils import upload_file diff --git a/python/graphscope/gsctl/impl/utils.py b/python/graphscope/gsctl/impl/utils.py index 1b34f0ef8e8f..d5d57b632a50 100644 --- a/python/graphscope/gsctl/impl/utils.py +++ b/python/graphscope/gsctl/impl/utils.py @@ -17,9 +17,9 @@ # import graphscope.flex.rest -from graphscope.gsctl.impl import list_graphs from graphscope.gsctl.config import get_current_context from graphscope.gsctl.config import load_gs_config +from graphscope.gsctl.impl import list_graphs def upload_file(location: str) -> str: @@ -41,7 +41,6 @@ def switch_context(context: str): def get_graph_id_by_name(name_or_id: str): graphs = list_graphs() id_candidate = [] - graph_exist = False for g in graphs: if name_or_id == g.id: return name_or_id diff --git a/python/graphscope/gsctl/utils.py b/python/graphscope/gsctl/utils.py index 47d0207a38b3..69eaeb70d578 100644 --- a/python/graphscope/gsctl/utils.py +++ b/python/graphscope/gsctl/utils.py @@ -398,10 +398,10 @@ def create_datasource_mapping_node(self, graph, datasource_mapping): property_column_mapping.column.index, property_column_mapping.column.name, ) - property_mapping_identifier = f"{specific_vertex_mapping_identifier}_{property_column_mapping.var_property}" + p_mapping_identifier = f"{specific_vertex_mapping_identifier}_{property_column_mapping.var_property}" self.tree.create_node( tag=tag, - identifier=property_mapping_identifier, + identifier=p_mapping_identifier, parent=specific_vertex_mapping_identifier, ) # edge mapping @@ -463,10 +463,10 @@ def create_datasource_mapping_node(self, graph, datasource_mapping): property_column_mapping.column.index, property_column_mapping.column.name, ) - property_mapping_identifier = f"{specific_edge_mapping_identifier}_{property_column_mapping.var_property}" + p_mapping_identifier = f"{specific_edge_mapping_identifier}_{property_column_mapping.var_property}" self.tree.create_node( tag=tag, - identifier=property_mapping_identifier, + identifier=p_mapping_identifier, parent=specific_edge_mapping_identifier, ) diff --git a/python/graphscope/tests/flex/test_interactive.py b/python/graphscope/tests/flex/test_interactive.py index 8bdad35923f6..50d2f37325e5 100644 --- a/python/graphscope/tests/flex/test_interactive.py +++ b/python/graphscope/tests/flex/test_interactive.py @@ -22,39 +22,32 @@ warnings.filterwarnings("ignore", category=Warning) import time -import pytest +import pytest from click.testing import CliRunner +from graphscope.gsctl.impl import bind_datasource_in_batch from graphscope.gsctl.impl import connect_coordinator -from graphscope.gsctl.impl import disconnect_coordinator - from graphscope.gsctl.impl import create_graph +from graphscope.gsctl.impl import create_procedure from graphscope.gsctl.impl import delete_graph_by_id -from graphscope.gsctl.impl import list_graphs - +from graphscope.gsctl.impl import delete_procedure_by_id +from graphscope.gsctl.impl import disconnect_coordinator +from graphscope.gsctl.impl import get_datasource_by_id +from graphscope.gsctl.impl import get_job_by_id +from graphscope.gsctl.impl import get_procedure_by_id from graphscope.gsctl.impl import get_service_status +from graphscope.gsctl.impl import list_graphs +from graphscope.gsctl.impl import list_procedures from graphscope.gsctl.impl import restart_service from graphscope.gsctl.impl import start_service from graphscope.gsctl.impl import stop_service - -from graphscope.gsctl.impl import create_procedure -from graphscope.gsctl.impl import list_procedures -from graphscope.gsctl.impl import delete_procedure_by_id -from graphscope.gsctl.impl import update_procedure_by_id -from graphscope.gsctl.impl import get_procedure_by_id - -from graphscope.gsctl.impl import bind_datasource_in_batch -from graphscope.gsctl.impl import get_datasource_by_id +from graphscope.gsctl.impl import submit_dataloading_job from graphscope.gsctl.impl import unbind_edge_datasource from graphscope.gsctl.impl import unbind_vertex_datasource - -from graphscope.gsctl.impl import submit_dataloading_job -from graphscope.gsctl.impl import get_job_by_id - +from graphscope.gsctl.impl import update_procedure_by_id from graphscope.gsctl.impl import upload_file - COORDINATOR_ENDPOINT = "http://127.0.0.1:8080" @@ -233,9 +226,7 @@ def test_bulk_loading(self, tmpdir): }, }, "vertices": [ - { - "type_name": "person" - }, + {"type_name": "person"}, ], "edges": [ {