Skip to content

Commit

Permalink
Support binding and geting single datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 committed Feb 29, 2024
1 parent 186b928 commit 93b7f2c
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def bind_edge_datasource(graph_name, edge_data_source): # noqa: E501
"""
if connexion.request.is_json:
edge_data_source = EdgeDataSource.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'
return client_wrapper.bind_edge_datasource(graph_name, edge_data_source)


@handle_api_exception()
Expand All @@ -44,7 +44,7 @@ def bind_vertex_datasource(graph_name, vertex_data_source): # noqa: E501
"""
if connexion.request.is_json:
vertex_data_source = VertexDataSource.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'
return client_wrapper.bind_vertex_datasource(graph_name, vertex_data_source)


@handle_api_exception()
Expand Down Expand Up @@ -78,7 +78,12 @@ def get_edge_datasource(graph_name, type_name, source_vertex_type, destination_v
:rtype: Union[EdgeDataSource, Tuple[EdgeDataSource, int], Tuple[EdgeDataSource, int, Dict[str, str]]
"""
return 'do some magic!'
return client_wrapper.get_edge_datasource(
graph_name,
type_name,
source_vertex_type,
destination_vertex_type
)


@handle_api_exception()
Expand All @@ -94,7 +99,7 @@ def get_vertex_datasource(graph_name, type_name): # noqa: E501
:rtype: Union[VertexDataSource, Tuple[VertexDataSource, int], Tuple[VertexDataSource, int, Dict[str, str]]
"""
return 'do some magic!'
return client_wrapper.get_vertex_datasource(graph_name, type_name)


@handle_api_exception()
Expand Down
43 changes: 38 additions & 5 deletions flex/coordinator/gs_flex_coordinator/core/client_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@
from gs_flex_coordinator.core.scheduler import schedule
from gs_flex_coordinator.core.utils import (GraphInfo, decode_datetimestr,
encode_datetime, get_current_time)
from gs_flex_coordinator.models import (DataSource, DeploymentInfo, EdgeType,
Graph, GrootGraph, GrootSchema,
JobStatus, ModelSchema, NodeStatus,
Procedure, SchemaMapping,
ServiceStatus, StartServiceRequest,
from gs_flex_coordinator.models import (DataSource, DeploymentInfo,
EdgeDataSource, EdgeType, Graph,
GrootGraph, GrootSchema, JobStatus,
ModelSchema, NodeStatus, Procedure,
SchemaMapping, ServiceStatus,
StartServiceRequest, VertexDataSource,
VertexType)
from gs_flex_coordinator.version import __version__

Expand Down Expand Up @@ -290,6 +291,38 @@ def import_datasource(self, graph_name: str, data_source: DataSource) -> str:
def get_datasource(self, graph_name: str) -> DataSource:
return DataSource.from_dict(self._client.get_datasource(graph_name))

def bind_vertex_datasource(
self, graph_name: str, vertex_data_source: VertexDataSource
) -> str:
return self._client.bind_vertex_datasource(
graph_name, vertex_data_source.to_dict()
)

def bind_edge_datasource(
self, graph_name: str, edge_data_source: EdgeDataSource
) -> str:
return self._client.bind_edge_datasource(graph_name, edge_data_source.to_dict())

def get_vertex_datasource(
self, graph_name: str, vertex_type: str
) -> VertexDataSource:
return VertexDataSource.from_dict(
self._client.get_vertex_datasource(graph_name, vertex_type)
)

def get_edge_datasource(
self,
graph_name: str,
edge_type: str,
source_vertex_type: str,
destination_vertex_type: str,
) -> EdgeDataSource:
return EdgeDataSource.from_dict(
self._client.get_edge_datasource(
graph_name, edge_type, source_vertex_type, destination_vertex_type
)
)

def unbind_vertex_datasource(self, graph_name: str, vertex_type: str) -> str:
return self._client.unbind_vertex_datasource(graph_name, vertex_type)

Expand Down
52 changes: 44 additions & 8 deletions flex/coordinator/gs_flex_coordinator/core/insight/groot.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,17 @@ def list_jobs(self) -> List[dict]:
return []

def import_datasource(self, graph_name: str, data_source: dict) -> str:
for v_datasource in data_source["vertices_datasource"]:
for vertex_data_source in data_source["vertices_datasource"]:
self._data_source["vertices_datasource"][
v_datasource["type_name"]
] = v_datasource
for e_datasource in data_source["edges_datasource"]:
vertex_data_source["type_name"]
] = vertex_data_source
for edge_data_source in data_source["edges_datasource"]:
edge_label = self.get_edge_full_label(
e_datasource["type_name"],
e_datasource["source_vertex"],
e_datasource["destination_vertex"],
edge_data_source["type_name"],
edge_data_source["source_vertex"],
edge_data_source["destination_vertex"],
)
self._data_source["edges_datasource"][edge_label] = e_datasource
self._data_source["edges_datasource"][edge_label] = edge_data_source
self._pickle_datasource_impl()

def get_datasource(self, graph_name: str) -> dict:
Expand All @@ -131,6 +131,42 @@ def get_datasource(self, graph_name: str) -> dict:
rlts["edges_datasource"].append(e)
return rlts

def bind_vertex_datasource(self, graph_name: str, vertex_data_source: dict) -> str:
self._data_source["vertices_datasource"][
vertex_data_source["type_name"]
] = vertex_data_source
self._pickle_datasource_impl()

def bind_edge_datasource(self, graph_name: str, edge_data_source: dict) -> str:
edge_label = self.get_edge_full_label(
edge_data_source["type_name"],
edge_data_source["source_vertex"],
edge_data_source["destination_vertex"],
)
self._data_source["edges_datasource"][edge_label] = edge_data_source
self._pickle_datasource_impl()

def get_vertex_datasource(self, graph_name: str, vertex_type: str) -> dict:
if vertex_type not in self._data_source["vertices_datasource"]:
raise RuntimeError(
f"Vertex type {vertex_type} does not bind any data source"
)
return self._data_source["vertices_datasource"][vertex_type]

def get_edge_datasource(
self,
graph_name: str,
edge_type: str,
source_vertex_type: str,
destination_vertex_type: str,
) -> dict:
edge_label = self.get_edge_full_label(
edge_type, source_vertex_type, destination_vertex_type
)
if edge_label not in self._data_source["edges_datasource"]:
raise RuntimeError(f"Edge type {edge_label} does not bind any data source")
return self._data_source["edges_datasource"][edge_label]

def unbind_vertex_datasource(self, graph_name: str, vertex_type: str) -> str:
# check
vertex_type_exists = False
Expand Down

0 comments on commit 93b7f2c

Please sign in to comment.