Skip to content

Commit

Permalink
Fixes the initialization of paramters in load_from() (#2698)
Browse files Browse the repository at this point in the history
## Related issue number

Fixes #2697

---------

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow committed May 12, 2023
1 parent d4b72fd commit 52b1ee2
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 64 deletions.
174 changes: 118 additions & 56 deletions python/graphscope/deploy/kubernetes/resource_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ def get_role(name, namespace, api_groups, resources, verbs, labels):
metadata = kube_client.V1ObjectMeta(name=name, namespace=namespace)
metadata.labels = labels
rule = kube_client.V1PolicyRule(
api_groups=api_groups.split(','),
resources=resources.split(','),
verbs=verbs.split(','),
api_groups=api_groups.split(","),
resources=resources.split(","),
verbs=verbs.split(","),
)
role = kube_client.V1Role(metadata=metadata, rules=[rule])
return role
Expand All @@ -51,9 +51,9 @@ def get_role(name, namespace, api_groups, resources, verbs, labels):
def get_cluster_role(name, api_groups, resources, verbs, labels):
metadata = kube_client.V1ObjectMeta(name=name, labels=labels)
rule = kube_client.V1PolicyRule(
api_groups=api_groups.split(','),
resources=resources.split(','),
verbs=verbs.split(','),
api_groups=api_groups.split(","),
resources=resources.split(","),
verbs=verbs.split(","),
)
role = kube_client.V1ClusterRole(metadata=metadata, rules=[rule])
return role
Expand All @@ -74,7 +74,9 @@ def get_role_binding(name, namespace, role_name, service_account_name, labels):
return role_binding

@staticmethod
def get_cluster_role_binding(name, namespace, role_name, service_account_name, labels):
def get_cluster_role_binding(
name, namespace, role_name, service_account_name, labels
):
metadata = kube_client.V1ObjectMeta(name=name, labels=labels)
role_ref = kube_client.V1RoleRef(
kind="ClusterRole", name=role_name, api_group="rbac.authorization.k8s.io"
Expand Down Expand Up @@ -102,7 +104,9 @@ def get_exec_action(command):

@staticmethod
def get_lifecycle_handler(_exec=None, http_get=None, tcp_socket=None):
handler = kube_client.V1LifecycleHandler(_exec=_exec, http_get=http_get, tcp_socket=tcp_socket)
handler = kube_client.V1LifecycleHandler(
_exec=_exec, http_get=http_get, tcp_socket=tcp_socket
)
return handler

@staticmethod
Expand All @@ -127,12 +131,14 @@ def get_node_selector(node_selector):
@staticmethod
def get_user_defined_volumes(udf_volumes):
"""
.. code:: python
{
name: {
"type": "",
"field": {}, # the keys are subject to volume type
"mounts": [ {"mountPath": "", "subPath": ""}, ... ]
}
}
}
"""
if not udf_volumes:
Expand All @@ -141,37 +147,47 @@ def get_user_defined_volumes(udf_volumes):
for name, value in udf_volumes.items():
volume = kube_client.V1Volume(name=name)
field = value.get("field", {})
if value['type'] == 'hostPath':
volume.host_path = kube_client.V1HostPathVolumeSource(path=field['path'])
if 'type' in field:
volume.host_path.type = field['type']
elif value['type'] == 'emptyDir':
if value["type"] == "hostPath":
volume.host_path = kube_client.V1HostPathVolumeSource(
path=field["path"]
)
if "type" in field:
volume.host_path.type = field["type"]
elif value["type"] == "emptyDir":
volume.empty_dir = kube_client.V1EmptyDirVolumeSource()
if 'medium' in field:
volume.empty_dir.medium = field['medium']
if 'sizeLimit' in field:
volume.empty_dir.size_limit = field['sizeLimit']
elif value['type'] == 'persistentVolumeClaim':
pvc = kube_client.V1PersistentVolumeClaimVolumeSource(claim_name=field['claimName'])
if "medium" in field:
volume.empty_dir.medium = field["medium"]
if "sizeLimit" in field:
volume.empty_dir.size_limit = field["sizeLimit"]
elif value["type"] == "persistentVolumeClaim":
pvc = kube_client.V1PersistentVolumeClaimVolumeSource(
claim_name=field["claimName"]
)
volume.persistent_volume_claim = pvc
if 'readOnly' in field:
volume.persistent_volume_claim.read_only = field['readOnly']
elif value['type'] == 'configMap':
volume.config_map = kube_client.V1ConfigMapVolumeSource(name=field['name'])
elif value['type'] == 'secret':
volume.secret = kube_client.V1SecretVolumeSource(secret_name=field['name'])
if "readOnly" in field:
volume.persistent_volume_claim.read_only = field["readOnly"]
elif value["type"] == "configMap":
volume.config_map = kube_client.V1ConfigMapVolumeSource(
name=field["name"]
)
elif value["type"] == "secret":
volume.secret = kube_client.V1SecretVolumeSource(
secret_name=field["name"]
)
else:
raise ValueError(f"Unsupported volume type: {value['type']}")
volume_mounts = []
mounts_list = value['mounts']
mounts_list = value["mounts"]
if not isinstance(mounts_list, list):
mounts_list = [value['mounts']]
mounts_list = [value["mounts"]]
for udf_mount in mounts_list:
volume_mount = kube_client.V1VolumeMount(name=name, mount_path=udf_mount['mountPath'])
if 'subPath' in udf_mount:
volume_mount.sub_path = udf_mount['subPath']
if 'readOnly' in udf_mount:
volume_mount.read_only = udf_mount['readOnly']
volume_mount = kube_client.V1VolumeMount(
name=name, mount_path=udf_mount["mountPath"]
)
if "subPath" in udf_mount:
volume_mount.sub_path = udf_mount["subPath"]
if "readOnly" in udf_mount:
volume_mount.read_only = udf_mount["readOnly"]
volume_mounts.append(volume_mount)
volumes.append(volume)
source_volume_mounts.extend(volume_mounts)
Expand All @@ -189,25 +205,39 @@ def get_resources(requests, limits, preemptive=True):
return resource_requirements

@staticmethod
def get_pod_spec(containers: [kube_client.V1Container], image_pull_secrets=None, node_selector=None, volumes=None):
def get_pod_spec(
containers: [kube_client.V1Container],
image_pull_secrets=None,
node_selector=None,
volumes=None,
):
pod_spec = kube_client.V1PodSpec(containers=containers)
if image_pull_secrets is not None and image_pull_secrets:
pod_spec.image_pull_secrets = ResourceBuilder.get_image_pull_secrets(image_pull_secrets)
pod_spec.image_pull_secrets = ResourceBuilder.get_image_pull_secrets(
image_pull_secrets
)
if node_selector is not None and node_selector:
pod_spec.node_selector = ResourceBuilder.get_node_selector(node_selector)
if volumes is not None and volumes:
pod_spec.volumes = volumes
return pod_spec

@staticmethod
def get_pod_template_spec(spec: kube_client.V1PodSpec, labels: dict, annotations=None, default_container=None):
def get_pod_template_spec(
spec: kube_client.V1PodSpec,
labels: dict,
annotations=None,
default_container=None,
):
pod_template_spec = kube_client.V1PodTemplateSpec()
pod_template_spec.spec = spec
if annotations is None:
annotations = dict()
if default_container is not None:
annotations['kubectl.kubernetes.io/default-container'] = default_container
pod_template_spec.metadata = kube_client.V1ObjectMeta(labels=labels, annotations=annotations)
annotations["kubectl.kubernetes.io/default-container"] = default_container
pod_template_spec.metadata = kube_client.V1ObjectMeta(
labels=labels, annotations=annotations
)
return pod_template_spec

@staticmethod
Expand All @@ -222,14 +252,18 @@ def get_deployment(namespace, name, spec, labels):
deployment = kube_client.V1Deployment()
deployment.api_version = "apps/v1"
deployment.kind = "Deployment"
deployment.metadata = kube_client.V1ObjectMeta(name=name, labels=labels, namespace=namespace)
deployment.metadata = kube_client.V1ObjectMeta(
name=name, labels=labels, namespace=namespace
)
deployment.spec = spec
return deployment

@staticmethod
def get_stateful_set_spec(template, replicas, labels, service_name):
selector = kube_client.V1LabelSelector(match_labels=labels)
spec = kube_client.V1StatefulSetSpec(selector=selector, template=template, service_name=service_name)
spec = kube_client.V1StatefulSetSpec(
selector=selector, template=template, service_name=service_name
)
spec.replicas = replicas
return spec

Expand All @@ -238,7 +272,9 @@ def get_stateful_set(namespace, name, spec, labels):
statefulset = kube_client.V1StatefulSet()
statefulset.api_version = "apps/v1"
statefulset.kind = "StatefulSet"
statefulset.metadata = kube_client.V1ObjectMeta(name=name, labels=labels, namespace=namespace)
statefulset.metadata = kube_client.V1ObjectMeta(
name=name, labels=labels, namespace=namespace
)
statefulset.spec = spec
return statefulset

Expand Down Expand Up @@ -273,14 +309,28 @@ def get_service(namespace, name, service_spec, labels, annotations=None):
service.api_version = "v1"
service.kind = "Service"
service.spec = service_spec
metadata = kube_client.V1ObjectMeta(namespace=namespace, name=name, labels=labels, annotations=annotations)
metadata = kube_client.V1ObjectMeta(
namespace=namespace, name=name, labels=labels, annotations=annotations
)
service.metadata = metadata
return service


class CoordinatorDeployment:
def __init__(self, namespace, name, image, args, labels, image_pull_secret,
image_pull_policy, node_selector, env, host_network, port=None):
def __init__(
self,
namespace,
name,
image,
args,
labels,
image_pull_secret,
image_pull_policy,
node_selector,
env,
host_network,
port=None,
):
self._replicas = 1
self._namespace = namespace
self._name = name
Expand All @@ -306,7 +356,10 @@ def get_lifecycle(self):
def get_coordinator_container(self):
resources = ResourceBuilder.get_resources(self._requests, self._limits)
lifecycle = self.get_lifecycle()
env = [kube_client.V1EnvVar(name=key, value=value) for key, value in self._env.items()]
env = [
kube_client.V1EnvVar(name=key, value=value)
for key, value in self._env.items()
]
container = kube_client.V1Container(
name="coordinator",
image=self._image,
Expand All @@ -321,23 +374,26 @@ def get_coordinator_container(self):
container_ports = [kube_client.V1ContainerPort(container_port=self._port)]
container_ports.append(kube_client.V1ContainerPort(container_port=8000))
container.ports = container_ports
container.readiness_probe = ResourceBuilder.get_tcp_probe(port=self._port,
timeout=15,
period=1,
failure_threshold=20)
container.readiness_probe = ResourceBuilder.get_tcp_probe(
port=self._port, timeout=15, period=1, failure_threshold=20
)
return container

def get_coordinator_pod_spec(self):
container = self.get_coordinator_container()
pod_spec = ResourceBuilder.get_pod_spec(containers=[container],
image_pull_secrets=self._image_pull_secret,
node_selector=self._node_selector)
pod_spec = ResourceBuilder.get_pod_spec(
containers=[container],
image_pull_secrets=self._image_pull_secret,
node_selector=self._node_selector,
)
pod_spec.host_network = self._host_network
return pod_spec

def get_coordinator_pod_template_spec(self):
spec = self.get_coordinator_pod_spec()
return ResourceBuilder.get_pod_template_spec(spec, self._labels, default_container='coordinator')
return ResourceBuilder.get_pod_template_spec(
spec, self._labels, default_container="coordinator"
)

def get_coordinator_deployment_spec(self, replicas):
template = self.get_coordinator_pod_template_spec()
Expand All @@ -346,11 +402,17 @@ def get_coordinator_deployment_spec(self, replicas):

def get_coordinator_deployment(self):
spec = self.get_coordinator_deployment_spec(self._replicas)
return ResourceBuilder.get_deployment(self._namespace, self._name, spec, self._labels)
return ResourceBuilder.get_deployment(
self._namespace, self._name, spec, self._labels
)

def get_coordinator_service(self, service_type, port):
ports = [kube_client.V1ServicePort(name="coordinator", port=port)]
ports.append(kube_client.V1ServicePort(name="debug", port=8000))
service_spec = ResourceBuilder.get_service_spec(service_type, ports, self._labels, None)
service = ResourceBuilder.get_service(self._namespace, self._name, service_spec, self._labels)
service_spec = ResourceBuilder.get_service_spec(
service_type, ports, self._labels, None
)
service = ResourceBuilder.get_service(
self._namespace, self._name, service_spec, self._labels
)
return service
16 changes: 10 additions & 6 deletions python/graphscope/framework/graph_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,19 @@ def load_from(
op = dag_utils.create_graph(
sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[loader_op], attrs=config
)
graph = sess.g(op, vertex_map=vertex_map)
graph = sess.g(
op,
oid_type=oid_type,
directed=directed,
generate_eid=generate_eid,
retain_oid=retain_oid,
vertex_map=vertex_map,
)
return graph


def load_from_gar(
graph_info_path: str,
directed=True,
oid_type="int64_t",
vertex_map="global"
graph_info_path: str, directed=True, oid_type="int64_t", vertex_map="global"
) -> Graph:
sess = get_default_session()
oid_type = utils.normalize_data_type_str(oid_type)
Expand All @@ -220,5 +224,5 @@ def load_from_gar(
op = dag_utils.create_graph(
sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[], attrs=config
)
graph = sess.g(op, vertex_map=vertex_map)
graph = sess.g(op, oid_type=oid_type, directed=directed, vertex_map=vertex_map)
return graph
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ exclude = '''
.*forward.*
| .*node_modules.*
| .*\.eggs.*
| .*build.*
| .*build/.*
| ^/dist.*
| .*_pb2\.py
| .*_pb2_grpc\.py
Expand Down
2 changes: 1 addition & 1 deletion python/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ extend-exclude =
graphscope/nx/generators/tests/**
graphscope/nx/readwrite/tests/**
graphscope/proto/**
jupyter/graphscope/node_modules/.*
graphscope/learning/examples/**
graphscope/learning/graphlearn/**
jupyter/graphscope/node_modules/.*
per-file-ignores =
graphscope/nx/classes/function.py:F405
graphscope/nx/algorithms/builtin.py:W605
Expand Down

0 comments on commit 52b1ee2

Please sign in to comment.