From b7c72e3656659678e1309b09c50e8aae22237277 Mon Sep 17 00:00:00 2001 From: Baohua Yang Date: Mon, 27 Mar 2017 16:08:44 +0800 Subject: [PATCH] Split the agent code and the orchestration code 1. Now the orchestration layer only maintain data in db; 2. Agent layer code will do the real work on platform; 3. Init the structure to support k8s, and also fabric 1.0; 4. Some related documentation is also updated. This patchset helps fix CE-20. https://jira.hyperledger.org/browse/CE-20. Change-Id: Idf87d4041ebb1b5273089e457b2d3da92dfe5fd2 Signed-off-by: Baohua Yang --- Makefile | 1 + docs/arch.md | 14 +++ docs/index.md | 22 ++-- .../setup_docker_worker_node.sh | 3 + .../setup_k8s_worker_node.sh | 3 + .../setup_swarm_worker_node.sh | 3 + src/agent/__init__.py | 9 +- src/agent/cluster_base.py | 30 +++++ src/agent/docker/__init__.py | 0 .../fabric-0.6}/local/cluster-4.yml | 0 .../fabric-0.6}/local/cluster-6.yml | 0 .../fabric-0.6}/local/peer-pbft.yml | 0 .../fabric-0.6}/syslog/cluster-4.yml | 0 .../fabric-0.6}/syslog/cluster-6.yml | 0 .../fabric-0.6}/syslog/peer-pbft.yml | 0 src/agent/docker/cluster.py | 83 ++++++++++++++ src/agent/{ => docker}/docker_swarm.py | 36 ++++-- src/agent/docker/host.py | 107 ++++++++++++++++++ src/agent/host_base.py | 52 +++++++++ src/agent/k8s/__init__.py | 1 + src/agent/k8s/host.py | 8 ++ src/common/__init__.py | 1 - src/common/utils.py | 4 +- src/modules/cluster.py | 50 ++++---- src/modules/host.py | 48 ++++---- src/themes/basic/templates/about.html | 5 +- 26 files changed, 405 insertions(+), 75 deletions(-) create mode 100644 scripts/worker_node_setup/setup_docker_worker_node.sh create mode 100644 scripts/worker_node_setup/setup_k8s_worker_node.sh create mode 100644 scripts/worker_node_setup/setup_swarm_worker_node.sh create mode 100644 src/agent/cluster_base.py create mode 100644 src/agent/docker/__init__.py rename src/{_compose_files => agent/docker/_compose_files/fabric-0.6}/local/cluster-4.yml (100%) rename src/{_compose_files => agent/docker/_compose_files/fabric-0.6}/local/cluster-6.yml (100%) rename src/{_compose_files => agent/docker/_compose_files/fabric-0.6}/local/peer-pbft.yml (100%) rename src/{_compose_files => agent/docker/_compose_files/fabric-0.6}/syslog/cluster-4.yml (100%) rename src/{_compose_files => agent/docker/_compose_files/fabric-0.6}/syslog/cluster-6.yml (100%) rename src/{_compose_files => agent/docker/_compose_files/fabric-0.6}/syslog/peer-pbft.yml (100%) create mode 100644 src/agent/docker/cluster.py rename src/agent/{ => docker}/docker_swarm.py (94%) create mode 100644 src/agent/docker/host.py create mode 100644 src/agent/host_base.py create mode 100644 src/agent/k8s/__init__.py create mode 100644 src/agent/k8s/host.py diff --git a/Makefile b/Makefile index 6d4448fd..79fb4a13 100644 --- a/Makefile +++ b/Makefile @@ -23,6 +23,7 @@ all: check check: ##@Code Check code format tox + make start && sleep 10 && make stop clean: ##@Code Clean tox result rm -rf .tox diff --git a/docs/arch.md b/docs/arch.md index 441b8d5a..de1eb84a 100644 --- a/docs/arch.md +++ b/docs/arch.md @@ -11,6 +11,20 @@ The architecture will follow the following principles: * Fault-resilience: Means the service should be tolerant for fault, such as database crash. * Scalability: Try best to distribute the services, to mitigate centralized bottle neck. +## Functional Layers + +Following the decouple design, there are 3 layers in Cello. + +* Access layer: including those Web UI dashboards operated by users. +* Orchestration layer: received the request form Access layer, and make call to correct agents to operate the blockchain resources. +* Agent layer: real workers that interact with underly infrastructures like Docker, Swarm, K8s. + +Each layer should maintain stable APIs for upper layers, to achieve pluggability without changing upper layer code. + +### Agent layer APIs + +* Host management: create, query/list, update, delete, fillup, clean, reset +* Cluster management: create, query/list, start/stop/restart, delete, reset ## Components diff --git a/docs/index.md b/docs/index.md index d2d5e45b..e6db5010 100644 --- a/docs/index.md +++ b/docs/index.md @@ -32,19 +32,19 @@ You can also find more [scenarios](docs/scenario.md). ## Documentation ### Operational Docs -* [Installation & Deployment](install.md) -* [Terminologies](terminology.md) -* [Tutorial](tutorial.md) -* [Scenarios](scenario.md) -* [Production Configuration](production_config.md) +* [Installation & Deployment](docs/install.md) +* [Terminologies](docs/terminology.md) +* [Tutorial](docs/tutorial.md) +* [Scenarios](docs/scenario.md) +* [Production Configuration](docs/production_config.md) ### Development Docs -* [How to contribute](CONTRIBUTING.md) -* We're following [pep8 style guide](https://www.python.org/dev/peps/pep-0008/), [Coding Style](code_style.md) -* [Architecture Design](arch.md) -* [Database Model](db.md) -* [API](../api/restserver_v2.md) -* [Develop react js](reactjs.md) +* [How to contribute](docs/CONTRIBUTING.md) +* We're following [pep8 style guide](https://www.python.org/dev/peps/pep-0008/), [Coding Style](docs/code_style.md) +* [Architecture Design](docs/arch.md) +* [Database Model](docs/db.md) +* [API](api/restserver_v2.md) +* [Develop react js](docs/reactjs.md) ## Why named Cello? Can u find anyone better at playing chains? :) diff --git a/scripts/worker_node_setup/setup_docker_worker_node.sh b/scripts/worker_node_setup/setup_docker_worker_node.sh new file mode 100644 index 00000000..17ea1d3a --- /dev/null +++ b/scripts/worker_node_setup/setup_docker_worker_node.sh @@ -0,0 +1,3 @@ +# This script will help setup Docker at a server, then the server can be used as a worker node. + +# TODO: \ No newline at end of file diff --git a/scripts/worker_node_setup/setup_k8s_worker_node.sh b/scripts/worker_node_setup/setup_k8s_worker_node.sh new file mode 100644 index 00000000..98dd5bb9 --- /dev/null +++ b/scripts/worker_node_setup/setup_k8s_worker_node.sh @@ -0,0 +1,3 @@ +# This script will help setup a Kubernetes cluster at servers, then the cluster can be used as a worker node. + +# TODO: \ No newline at end of file diff --git a/scripts/worker_node_setup/setup_swarm_worker_node.sh b/scripts/worker_node_setup/setup_swarm_worker_node.sh new file mode 100644 index 00000000..2f945c67 --- /dev/null +++ b/scripts/worker_node_setup/setup_swarm_worker_node.sh @@ -0,0 +1,3 @@ +# This script will help setup a Swarm cluster at servers, then the cluster can be used as a worker node. + +# TODO: \ No newline at end of file diff --git a/src/agent/__init__.py b/src/agent/__init__.py index bfb783d8..28076bdf 100644 --- a/src/agent/__init__.py +++ b/src/agent/__init__.py @@ -1,5 +1,12 @@ -from .docker_swarm import get_project, \ +# Agent pkg provides drivers to those underly platforms, e.g., swarm/k8s. jAS + +from .docker.docker_swarm import get_project, \ check_daemon, detect_daemon_type, \ get_swarm_node_ip, \ compose_up, compose_clean, compose_start, compose_stop, compose_restart, \ setup_container_host, cleanup_host, reset_container_host + +from .docker.host import DockerHost +from .docker.cluster import ClusterOnDocker + +from .k8s.host import KubernetesHost diff --git a/src/agent/cluster_base.py b/src/agent/cluster_base.py new file mode 100644 index 00000000..54f77da0 --- /dev/null +++ b/src/agent/cluster_base.py @@ -0,0 +1,30 @@ +import abc + + +class ClusterBase(object): + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def create(self, *args, **kwargs): + """ + Create a new cluster + Args: + *args: args + **kwargs: keyword args + + Returns: + + """ + return + + @abc.abstractmethod + def delete(self, *args, **kwargs): + return + + @abc.abstractmethod + def start(self, *args, **kwargs): + return + + @abc.abstractmethod + def stop(self, *args, **kwargs): + return diff --git a/src/agent/docker/__init__.py b/src/agent/docker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/_compose_files/local/cluster-4.yml b/src/agent/docker/_compose_files/fabric-0.6/local/cluster-4.yml similarity index 100% rename from src/_compose_files/local/cluster-4.yml rename to src/agent/docker/_compose_files/fabric-0.6/local/cluster-4.yml diff --git a/src/_compose_files/local/cluster-6.yml b/src/agent/docker/_compose_files/fabric-0.6/local/cluster-6.yml similarity index 100% rename from src/_compose_files/local/cluster-6.yml rename to src/agent/docker/_compose_files/fabric-0.6/local/cluster-6.yml diff --git a/src/_compose_files/local/peer-pbft.yml b/src/agent/docker/_compose_files/fabric-0.6/local/peer-pbft.yml similarity index 100% rename from src/_compose_files/local/peer-pbft.yml rename to src/agent/docker/_compose_files/fabric-0.6/local/peer-pbft.yml diff --git a/src/_compose_files/syslog/cluster-4.yml b/src/agent/docker/_compose_files/fabric-0.6/syslog/cluster-4.yml similarity index 100% rename from src/_compose_files/syslog/cluster-4.yml rename to src/agent/docker/_compose_files/fabric-0.6/syslog/cluster-4.yml diff --git a/src/_compose_files/syslog/cluster-6.yml b/src/agent/docker/_compose_files/fabric-0.6/syslog/cluster-6.yml similarity index 100% rename from src/_compose_files/syslog/cluster-6.yml rename to src/agent/docker/_compose_files/fabric-0.6/syslog/cluster-6.yml diff --git a/src/_compose_files/syslog/peer-pbft.yml b/src/agent/docker/_compose_files/fabric-0.6/syslog/peer-pbft.yml similarity index 100% rename from src/_compose_files/syslog/peer-pbft.yml rename to src/agent/docker/_compose_files/fabric-0.6/syslog/peer-pbft.yml diff --git a/src/agent/docker/cluster.py b/src/agent/docker/cluster.py new file mode 100644 index 00000000..27118b8a --- /dev/null +++ b/src/agent/docker/cluster.py @@ -0,0 +1,83 @@ +import logging +import os +import sys + +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) +from common import log_handler, LOG_LEVEL + +from agent import compose_up, compose_clean, compose_start, compose_stop, \ + compose_restart + +from common import CONSENSUS_PLUGINS, \ + CONSENSUS_MODES, CLUSTER_SIZES + +from ..cluster_base import ClusterBase + + +logger = logging.getLogger(__name__) +logger.setLevel(LOG_LEVEL) +logger.addHandler(log_handler) + + +class ClusterOnDocker(ClusterBase): + """ Main handler to operate the cluster in pool + + """ + def __init__(self): + pass + + def create(self, cid, mapped_ports, host, user_id="", + consensus_plugin=CONSENSUS_PLUGINS[0], + consensus_mode=CONSENSUS_MODES[0], size=CLUSTER_SIZES[0]): + """ Create a cluster based on given data + + TODO: maybe need other id generation mechanism + + :param name: name of the cluster + :param host_id: id of the host URL + :param start_port: first service port for cluster, will generate + if not given + :param user_id: user_id of the cluster if start to be applied + :param consensus_plugin: type of the consensus type + :param size: size of the cluster, int type + :return: Id of the created cluster or None + """ + + # from now on, we should be safe + + # start compose project, failed then clean and return + logger.debug("Start compose project with name={}".format(cid)) + containers = compose_up( + name=cid, mapped_ports=mapped_ports, host=host, + consensus_plugin=consensus_plugin, consensus_mode=consensus_mode, + cluster_size=size) + if not containers or len(containers) != size: + logger.warning("failed to create cluster, with container={}" + .format(containers)) + return [] + else: + return containers + + def delete(self, id, daemon_url, consensus_plugin): + return compose_clean(id, daemon_url, consensus_plugin) + + def start(self, name, daemon_url, mapped_ports, consensus_plugin, + consensus_mode, log_type, log_level, log_server, cluster_size): + return compose_start(name, daemon_url, mapped_ports, consensus_plugin, + consensus_mode, log_type, log_level, log_server, + cluster_size) + + def restart(self, name, daemon_url, mapped_ports, consensus_plugin, + consensus_mode, log_type, log_level, log_server, cluster_size): + return compose_restart(name, daemon_url, mapped_ports, + consensus_plugin, consensus_mode, log_type, + log_level, log_server, cluster_size) + + def stop(self, name, daemon_url, mapped_ports, consensus_plugin, + consensus_mode, log_type, log_level, log_server, cluster_size): + return compose_stop(name, daemon_url, mapped_ports, consensus_plugin, + consensus_mode, log_type, log_level, log_server, + cluster_size) + + +cluster_on_docker = ClusterOnDocker() diff --git a/src/agent/docker_swarm.py b/src/agent/docker/docker_swarm.py similarity index 94% rename from src/agent/docker_swarm.py rename to src/agent/docker/docker_swarm.py index 34de38dc..42819c10 100644 --- a/src/agent/docker_swarm.py +++ b/src/agent/docker/docker_swarm.py @@ -13,12 +13,15 @@ from common import \ HOST_TYPES, \ CLUSTER_NETWORK, \ - COMPOSE_FILE_PATH, \ CONSENSUS_PLUGINS, CONSENSUS_MODES, \ CLUSTER_LOG_TYPES, CLUSTER_LOG_LEVEL, \ CLUSTER_SIZES, \ SERVICE_PORTS +COMPOSE_FILE_PATH = os.getenv("COMPOSE_FILE_PATH", + "./agent/docker/_compose_files") + + logger = logging.getLogger(__name__) logger.setLevel(LOG_LEVEL) logger.addHandler(log_handler) @@ -336,7 +339,7 @@ def compose_up(name, host, mapped_ports, consensus_plugin=CONSENSUS_PLUGINS[0], consensus_mode=CONSENSUS_MODES[0], cluster_size=CLUSTER_SIZES[0], - timeout=5): + timeout=5, cluster_version='fabric-0.6'): """ Compose up a cluster :param name: The name of the cluster @@ -363,7 +366,8 @@ def compose_up(name, host, mapped_ports, consensus_mode, cluster_size, log_level, log_type, log_server) try: - project = get_project(COMPOSE_FILE_PATH + "/" + log_type) + project = get_project(COMPOSE_FILE_PATH + + "/{}/".format(cluster_version) + log_type) containers = project.up(detached=True, timeout=timeout) except Exception as e: logger.warning("Exception when compose start={}".format(e)) @@ -417,7 +421,8 @@ def compose_start(name, daemon_url, mapped_ports=SERVICE_PORTS, consensus_mode=CONSENSUS_MODES[0], log_type=CLUSTER_LOG_TYPES[0], log_server="", log_level=CLUSTER_LOG_LEVEL[0], - cluster_size=CLUSTER_SIZES[0]): + cluster_size=CLUSTER_SIZES[0], + cluster_version='fabric-0.6'): """ Start the cluster :param name: The name of the cluster @@ -438,7 +443,8 @@ def compose_start(name, daemon_url, mapped_ports=SERVICE_PORTS, consensus_mode, cluster_size, log_level, log_type, log_server) # project = get_project(COMPOSE_FILE_PATH+"/"+consensus_plugin) - project = get_project(COMPOSE_FILE_PATH + "/" + log_type) + project = get_project(COMPOSE_FILE_PATH + + "/{}/".format(cluster_version) + log_type) try: project.start() start_containers(daemon_url, name + '-') @@ -453,7 +459,8 @@ def compose_restart(name, daemon_url, mapped_ports=SERVICE_PORTS, consensus_mode=CONSENSUS_MODES[0], log_type=CLUSTER_LOG_TYPES[0], log_server="", log_level=CLUSTER_LOG_LEVEL[0], - cluster_size=CLUSTER_SIZES[0]): + cluster_size=CLUSTER_SIZES[0], + cluster_version='fabric-0.6'): """ Restart the cluster :param name: The name of the cluster @@ -474,7 +481,8 @@ def compose_restart(name, daemon_url, mapped_ports=SERVICE_PORTS, consensus_mode, cluster_size, log_level, log_type, log_server) # project = get_project(COMPOSE_FILE_PATH+"/"+consensus_plugin) - project = get_project(COMPOSE_FILE_PATH + "/" + log_type) + project = get_project(COMPOSE_FILE_PATH + + "/{}/".format(cluster_version) + log_type) try: project.restart() start_containers(daemon_url, name + '-') @@ -489,7 +497,8 @@ def compose_stop(name, daemon_url, mapped_ports=SERVICE_PORTS, consensus_mode=CONSENSUS_MODES[0], log_type=CLUSTER_LOG_TYPES[0], log_server="", log_level=CLUSTER_LOG_LEVEL[0], - cluster_size=CLUSTER_SIZES[0], timeout=5): + cluster_size=CLUSTER_SIZES[0], timeout=5, + cluster_version='fabric-0.6'): """ Stop the cluster :param name: The name of the cluster @@ -512,7 +521,8 @@ def compose_stop(name, daemon_url, mapped_ports=SERVICE_PORTS, _compose_set_env(name, daemon_url, mapped_ports, consensus_plugin, consensus_mode, cluster_size, log_level, log_type, log_server) - project = get_project(COMPOSE_FILE_PATH + "/" + log_type) + project = get_project(COMPOSE_FILE_PATH + + "/{}/".format(cluster_version) + log_type) try: project.stop(timeout=timeout) except Exception as e: @@ -526,7 +536,8 @@ def compose_down(name, daemon_url, mapped_ports=SERVICE_PORTS, consensus_mode=CONSENSUS_MODES[0], log_type=CLUSTER_LOG_TYPES[0], log_server="", log_level=CLUSTER_LOG_LEVEL[0], - cluster_size=CLUSTER_SIZES[0], timeout=5): + cluster_size=CLUSTER_SIZES[0], timeout=5, + cluster_version='fabric-0.6'): """ Stop the cluster and remove the service containers :param name: The name of the cluster @@ -542,13 +553,16 @@ def compose_down(name, daemon_url, mapped_ports=SERVICE_PORTS, """ logger.debug("Compose remove {} with daemon_url={}, " "consensus={}".format(name, daemon_url, consensus_plugin)) + # import os, sys # compose use this _compose_set_env(name, daemon_url, mapped_ports, consensus_plugin, consensus_mode, cluster_size, log_level, log_type, log_server) # project = get_project(COMPOSE_FILE_PATH+"/"+consensus_plugin) - project = get_project(COMPOSE_FILE_PATH + "/" + log_type) + project = get_project(COMPOSE_FILE_PATH + + "/{}/".format(cluster_version) + log_type) + # project.down(remove_orphans=True) project.stop(timeout=timeout) project.remove_stopped(one_off=OneOffFilter.include, force=True) diff --git a/src/agent/docker/host.py b/src/agent/docker/host.py new file mode 100644 index 00000000..573363f3 --- /dev/null +++ b/src/agent/docker/host.py @@ -0,0 +1,107 @@ +import datetime +import logging +import os +import sys + + +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) +from common import \ + db, log_handler, \ + LOG_LEVEL + +from ..host_base import HostBase + +from agent import cleanup_host, check_daemon, detect_daemon_type, \ + reset_container_host, setup_container_host + +logger = logging.getLogger(__name__) +logger.setLevel(LOG_LEVEL) +logger.addHandler(log_handler) + + +def check_status(func): + def wrapper(self, *arg): + if not self.is_active(*arg): + logger.warning("Host inactive") + return False + else: + return func(self, *arg) + return wrapper + + +class DockerHost(HostBase): + """ Main handler to operate the Docker hosts + """ + def __init__(self): + self.col = db["host"] + + def create(self, daemon_url): + """ Create a new docker host node + + A docker host is potentially a single node or a swarm. + Will full fill with clusters of given capacity. + + :param name: name of the node + :param daemon_url: daemon_url of the host + :param capacity: The number of clusters to hold + :param log_type: type of the log + :param log_level: level of the log + :param log_server: server addr of the syslog + :param autofill: Whether automatically fillup with chains + :param schedulable: Whether can schedule cluster request to it + :param serialization: whether to get serialized result or object + :return: True or False + """ + + if check_daemon(daemon_url): + logger.warning("The daemon_url is active:" + daemon_url) + else: + logger.warning("The daemon_url is inactive:" + daemon_url) + return False + + detected_type = detect_daemon_type(daemon_url) + + if detected_type not in ['docker', 'swarm']: + logger.warning("Detected type={} is not docker or swarm". + format(detected_type)) + return False + + if setup_container_host(detected_type, daemon_url): + return True + else: + logger.warning("Cannot setup Docker host daemon_url={}" + .format(daemon_url)) + return False + + def delete(self, daemon_url): + """ Delete a host instance + + :param id: id of the host to delete + :return: + """ + + cleanup_host(daemon_url) + return True + + @check_status + def reset(self, host_type, daemon_url): + """ + Clean a host's free clusters. + + :param id: host id + :return: True or False + """ + return reset_container_host(host_type=host_type, + daemon_url=daemon_url) + + def refresh_status(self, daemon_url): + """ + Refresh the status of the host by detection + + :param host: the host to update status + :return: Updated host + """ + return check_daemon(daemon_url) + + +docker_host = DockerHost() diff --git a/src/agent/host_base.py b/src/agent/host_base.py new file mode 100644 index 00000000..661df98e --- /dev/null +++ b/src/agent/host_base.py @@ -0,0 +1,52 @@ +import abc + + +class HostBase(object): + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def create(self, *args, **kwargs): + """ + Create a new host + Args: + *args: args + **kwargs: keyword args + + Returns: + + """ + return + + @abc.abstractmethod + def delete(self, *args, **kwargs): + return + + @abc.abstractmethod + def reset(self, *args, **kwargs): + return + + @abc.abstractmethod + def is_active(self, *args): + """ + Test if a host is active + + Args: + *args: args + + Returns: Boolean + + """ + return + + @abc.abstractmethod + def fillup(self, *args): + """ + Fill up a host with blockchains + + Args: + *args: args + + Returns: Boolean + + """ + return diff --git a/src/agent/k8s/__init__.py b/src/agent/k8s/__init__.py new file mode 100644 index 00000000..cf680491 --- /dev/null +++ b/src/agent/k8s/__init__.py @@ -0,0 +1 @@ +# This is for agent that interacts with Kubernetes platform diff --git a/src/agent/k8s/host.py b/src/agent/k8s/host.py new file mode 100644 index 00000000..c15168c4 --- /dev/null +++ b/src/agent/k8s/host.py @@ -0,0 +1,8 @@ +from ..host_base import HostBase + + +class KubernetesHost(HostBase): + """ Main handler to operate the K8s hosts + """ + def __init__(self): + pass diff --git a/src/common/__init__.py b/src/common/__init__.py index 045c1fc8..b27f9310 100644 --- a/src/common/__init__.py +++ b/src/common/__init__.py @@ -7,7 +7,6 @@ from .log import log_handler, LOG_LEVEL from .utils import \ PEER_SERVICE_PORTS, CA_SERVICE_PORTS, SERVICE_PORTS, \ - COMPOSE_FILE_PATH, \ CONSENSUS_PLUGINS, CONSENSUS_MODES, CONSENSUS_TYPES, \ HOST_TYPES, \ CLUSTER_PORT_START, CLUSTER_PORT_STEP, CLUSTER_SIZES, \ diff --git a/src/common/utils.py b/src/common/utils.py index b61fba26..934f7db4 100644 --- a/src/common/utils.py +++ b/src/common/utils.py @@ -2,8 +2,6 @@ import os -COMPOSE_FILE_PATH = os.getenv("COMPOSE_FILE_PATH", "./_compose_files") - CLUSTER_NETWORK = "cello_net" CLUSTER_SIZES = [4, 6] @@ -45,7 +43,7 @@ ] -HOST_TYPES = ['single', 'swarm'] +HOST_TYPES = ['docker', 'swarm', 'kubernetes'] # all supported host types CLUSTER_LOG_TYPES = ['local', 'syslog'] diff --git a/src/modules/cluster.py b/src/modules/cluster.py index edbcad47..5ec94323 100644 --- a/src/modules/cluster.py +++ b/src/modules/cluster.py @@ -1,18 +1,17 @@ import datetime import logging import os -import requests import sys import time - from threading import Thread + +import requests from pymongo.collection import ReturnDocument sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) from common import db, log_handler, LOG_LEVEL -from agent import get_swarm_node_ip, \ - compose_up, compose_clean, compose_start, compose_stop, compose_restart +from agent import get_swarm_node_ip from common import CLUSTER_PORT_START, CLUSTER_PORT_STEP, CONSENSUS_PLUGINS, \ CONSENSUS_MODES, HOST_TYPES, SYS_CREATOR, SYS_DELETER, SYS_USER, \ @@ -20,6 +19,8 @@ from modules import host +from agent import ClusterOnDocker + logger = logging.getLogger(__name__) logger.setLevel(LOG_LEVEL) logger.addHandler(log_handler) @@ -29,10 +30,15 @@ class ClusterHandler(object): """ Main handler to operate the cluster in pool """ + def __init__(self): self.col_active = db["cluster_active"] self.col_released = db["cluster_released"] self.host_handler = host.host_handler + self.cluster_agents = { + 'docker': ClusterOnDocker(), + 'swarm': ClusterOnDocker() + } def list(self, filter_data={}, col_name="active"): """ List clusters with given criteria @@ -154,13 +160,13 @@ def create(self, name, host_id, start_port=0, user_id="", # start compose project, failed then clean and return logger.debug("Start compose project with name={}".format(cid)) - containers = compose_up( - name=cid, mapped_ports=mapped_ports, host=h, - consensus_plugin=consensus_plugin, consensus_mode=consensus_mode, - cluster_size=size) - if not containers or len(containers) != size: - logger.warning("failed containers={}, then delete cluster".format( - containers)) + containers = self.cluster_agents[h.get('type')]\ + .create(cid, mapped_ports, h, user_id=user_id, + consensus_plugin=consensus_plugin, + consensus_mode=consensus_mode, size=size) + if not containers: + logger.warning("failed to start cluster={}, then delete" + .format(name)) self.delete(id=cid, record=False, forced=True) return None @@ -189,6 +195,7 @@ def create(self, name, host_id, start_port=0, user_id="", def check_health_work(cid): time.sleep(5) self.refresh_health(cid) + t = Thread(target=check_health_work, args=(cid,)) t.start() @@ -222,7 +229,7 @@ def delete(self, id, record=False, forced=False): {"$set": {"user_id": user_id}}) return False - # 0. forced + # 0. forced # 1. user_id == SYS_DELETER or "" # Then, add deleting flag to the db, and start deleting if not user_id.startswith(SYS_DELETER): @@ -233,14 +240,15 @@ def delete(self, id, record=False, forced=False): c.get("host_id"), c.get("daemon_url"), \ c.get("consensus_plugin", CONSENSUS_PLUGINS[0]) # port = api_url.split(":")[-1] or CLUSTER_PORT_START - - if not self.host_handler.get_active_host_by_id(host_id): + h = self.host_handler.get_active_host_by_id(host_id) + if not h: logger.warning("Host {} inactive".format(host_id)) self.col_active.update_one({"id": id}, {"$set": {"user_id": user_id}}) return False - if not compose_clean(id, daemon_url, consensus_plugin): + if not self.cluster_agents[h.get('type')]\ + .delete(id, daemon_url, consensus_plugin): logger.warning("Error to run compose clean work") self.col_active.update_one({"id": id}, {"$set": {"user_id": user_id}}) @@ -356,7 +364,7 @@ def start(self, cluster_id): if not h: logger.warning('No host found with id={}'.format(h_id)) return False - result = compose_start( + result = self.cluster_agents[h.get('type')].start( name=cluster_id, daemon_url=h.get('daemon_url'), mapped_ports=c.get('mapped_ports', PEER_SERVICE_PORTS), consensus_plugin=c.get('consensus_plugin'), @@ -388,7 +396,7 @@ def restart(self, cluster_id): if not h: logger.warning('No host found with id={}'.format(h_id)) return False - result = compose_restart( + result = self.cluster_agents[h.get('type')].restart( name=cluster_id, daemon_url=h.get('daemon_url'), mapped_ports=c.get('mapped_ports', PEER_SERVICE_PORTS), consensus_plugin=c.get('consensus_plugin'), @@ -420,14 +428,14 @@ def stop(self, cluster_id): if not h: logger.warning('No host found with id={}'.format(h_id)) return False - result = compose_stop( + result = self.cluster_agents[h.get('type')].stop( name=cluster_id, daemon_url=h.get('daemon_url'), mapped_ports=c.get('mapped_ports', PEER_SERVICE_PORTS), consensus_plugin=c.get('consensus_plugin'), consensus_mode=c.get('consensus_mode'), log_type=h.get('log_type'), log_level=h.get('log_level'), - log_server="", + log_server='', cluster_size=c.get('size'), ) if result: @@ -450,8 +458,8 @@ def reset(self, cluster_id, record=False): c = self.get_by_id(cluster_id) logger.debug("Run recreate_work in background thread") cluster_name, host_id, mapped_ports, consensus_plugin, \ - consensus_mode, size \ - = c.get("name"), c.get("host_id"), \ + consensus_mode, size = \ + c.get("name"), c.get("host_id"), \ c.get("mapped_ports"), c.get("consensus_plugin"), \ c.get("consensus_mode"), c.get("size") if not self.delete(cluster_id, record=record, forced=True): diff --git a/src/modules/host.py b/src/modules/host.py index 17ef7bfa..0123696d 100644 --- a/src/modules/host.py +++ b/src/modules/host.py @@ -4,8 +4,8 @@ import random import sys import time - from threading import Thread + from pymongo.collection import ReturnDocument sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) @@ -13,10 +13,9 @@ db, log_handler, \ LOG_LEVEL, CLUSTER_LOG_TYPES, CLUSTER_LOG_LEVEL, \ CLUSTER_SIZES, CLUSTER_PORT_START, CLUSTER_PORT_STEP, \ - CONSENSUS_TYPES + CONSENSUS_TYPES, HOST_TYPES -from agent import cleanup_host, check_daemon, detect_daemon_type, \ - reset_container_host, setup_container_host +from agent import DockerHost, KubernetesHost from modules import cluster @@ -36,15 +35,23 @@ def wrapper(self, *arg): class HostHandler(object): - """ Main handler to operate the Docker hosts + """ Main handler to operate the hosts. + + Host can be platforms like Docker, Swarm or Kubernetes """ def __init__(self): self.col = db["host"] + self.host_agents = { + 'docker': DockerHost(), + 'swarm': DockerHost(), + 'kubernetes': KubernetesHost() + } def create(self, name, daemon_url, capacity=1, log_level=CLUSTER_LOG_LEVEL[0], log_type=CLUSTER_LOG_TYPES[0], log_server="", autofill="false", - schedulable="false", serialization=True): + schedulable="false", serialization=True, + host_type=HOST_TYPES[0]): """ Create a new docker host node A docker host is potentially a single node or a swarm. @@ -76,16 +83,8 @@ def create(self, name, daemon_url, capacity=1, log_server = "udp://" + log_server if log_type == CLUSTER_LOG_TYPES[0]: log_server = "" - if check_daemon(daemon_url): - logger.warning("The daemon_url is active:" + daemon_url) - status = "active" - else: - logger.warning("The daemon_url is inactive:" + daemon_url) - status = "inactive" - - detected_type = detect_daemon_type(daemon_url) - if not setup_container_host(detected_type, daemon_url): + if not self.host_agents[host_type].create(daemon_url): logger.warning("{} cannot be setup".format(name)) return {} @@ -95,9 +94,9 @@ def create(self, name, daemon_url, capacity=1, 'daemon_url': daemon_url, 'create_ts': datetime.datetime.now(), 'capacity': capacity, - 'status': status, + 'status': 'active', 'clusters': [], - 'type': detected_type, + 'type': host_type, 'log_level': log_level, 'log_type': log_type, 'log_server': log_server, @@ -131,7 +130,7 @@ def get_by_id(self, id): return self._serialize(ins) def update(self, id, d): - """ Update a host + """ Update a host's property TODO: may check when changing host type @@ -169,7 +168,7 @@ def list(self, filter_data={}): hosts = self.col.find(filter_data) return list(map(self._serialize, hosts)) - def delete(self, id): + def delete(self, id, host_type=HOST_TYPES[0]): """ Delete a host instance :param id: id of the host to delete @@ -184,7 +183,8 @@ def delete(self, id): if h.get("clusters", ""): logger.warning("There are clusters on that host, cannot delete.") return False - cleanup_host(h.get("daemon_url")) + + self.host_agents[host_type].delete(h.get("daemon_url")) self.col.delete_one({"id": id}) return True @@ -260,7 +260,6 @@ def clean(self, id): return True - @check_status def reset(self, id): """ Clean a host's free clusters. @@ -273,8 +272,8 @@ def reset(self, id): if not host or len(host.get("clusters")) > 0: logger.warning("No find resettable host with id ={}".format(id)) return False - return reset_container_host(host_type=host.get("type"), - daemon_url=host.get("daemon_url")) + return self.host_agents[host.get("type")].reset( + host_type=host.get("type"), daemon_url=host.get("daemon_url")) def refresh_status(self, id): """ @@ -287,7 +286,8 @@ def refresh_status(self, id): if not host: logger.warning("No host found with id=" + id) return False - if not check_daemon(host.get("daemon_url")): + if not self.host_agents[host.get("type")]\ + .refresh_status(host.get("daemon_url")): logger.warning("Host {} is inactive".format(id)) self.db_set_by_id(id, status="inactive") return False diff --git a/src/themes/basic/templates/about.html b/src/themes/basic/templates/about.html index b62c54fe..51df8fef 100644 --- a/src/themes/basic/templates/about.html +++ b/src/themes/basic/templates/about.html @@ -12,9 +12,8 @@

Written-By: {{ author }}

Version: {{ version }}

- +
-

Host: An aggregation of some physical/virtual machines to hold the clusters.

-

Cluster: A blockchain with numbers of peer nodes.

+

The doc is now hosted at readthedoc.

{% endblock %} \ No newline at end of file