From cdd4c9548a469846374928f75b6ece054646e5ac Mon Sep 17 00:00:00 2001 From: Nanda kumar Date: Wed, 10 Jul 2019 15:13:54 +0530 Subject: [PATCH] HDDS-1778. Fix existing blockade tests. --- .../main/compose/ozoneblockade/docker-config | 3 + .../blockade/clusterUtils/cluster_utils.py | 335 ----------- .../{blockadeUtils => ozone}/blockade.py | 16 +- .../src/test/blockade/ozone/client.py | 75 +++ .../src/test/blockade/ozone/cluster.py | 526 +++++++++--------- .../__init__.py => ozone/constants.py} | 11 +- .../src/test/blockade/ozone/container.py | 117 ++++ .../__init__.py => ozone/exceptions.py} | 10 +- .../src/test/blockade/{ => ozone}/util.py | 56 +- .../blockade/test_blockade_client_failure.py | 158 +++--- .../test_blockade_datanode_isolation.py | 228 ++++---- .../src/test/blockade/test_blockade_flaky.py | 42 +- .../blockade/test_blockade_mixed_failure.py | 240 ++++---- ...ckade_mixed_failure_three_nodes_isolate.py | 357 +++++------- .../test_blockade_mixed_failure_two_nodes.py | 275 ++++----- .../blockade/test_blockade_scm_isolation.py | 252 ++++----- 16 files changed, 1185 insertions(+), 1516 deletions(-) delete mode 100644 hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/clusterUtils/cluster_utils.py rename hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/{blockadeUtils => ozone}/blockade.py (86%) create mode 100644 hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/client.py rename hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/{clusterUtils/__init__.py => ozone/constants.py} (77%) create mode 100644 hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/container.py rename hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/{blockadeUtils/__init__.py => ozone/exceptions.py} (78%) rename hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/{ => ozone}/util.py (54%) diff --git a/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config b/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config index f5e6a9225367c..8347998feeaa9 100644 --- a/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config @@ -23,12 +23,15 @@ OZONE-SITE.XML_ozone.scm.block.client.address=scm OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata OZONE-SITE.XML_ozone.handler.type=distributed OZONE-SITE.XML_ozone.scm.client.address=scm +OZONE-SITE.XML_ozone.client.max.retries=10 +OZONE-SITE.XML_ozone.scm.stale.node.interval=2m OZONE-SITE.XML_ozone.scm.dead.node.interval=5m OZONE-SITE.XML_ozone.replication=1 OZONE-SITE.XML_hdds.datanode.dir=/data/hdds OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1 OZONE-SITE.XML_ozone.scm.pipeline.destroy.timeout=15s OZONE-SITE.XML_hdds.heartbeat.interval=2s +OZONE-SITE.XML_hdds.scm.wait.time.after.safemode.exit=30s OZONE-SITE.XML_hdds.scm.replication.thread.interval=5s OZONE-SITE.XML_hdds.scm.replication.event.timeout=7s OZONE-SITE.XML_dfs.ratis.server.failure.duration=25s diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/clusterUtils/cluster_utils.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/clusterUtils/cluster_utils.py deleted file mode 100644 index 53e3fa037f9c8..0000000000000 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/clusterUtils/cluster_utils.py +++ /dev/null @@ -1,335 +0,0 @@ -#!/usr/bin/python - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from subprocess import call - -import subprocess -import logging -import time -import re -import os -import yaml - - -logger = logging.getLogger(__name__) - - -class ClusterUtils(object): - """ - This class contains all cluster related operations. - """ - - @classmethod - def cluster_setup(cls, docker_compose_file, datanode_count, - destroy_existing_cluster=True): - """start a blockade cluster""" - logger.info("compose file :%s", docker_compose_file) - logger.info("number of DNs :%d", datanode_count) - if destroy_existing_cluster: - call(["docker-compose", "-f", docker_compose_file, "down"]) - call(["docker-compose", "-f", docker_compose_file, "up", "-d", - "--scale", "datanode=" + str(datanode_count)]) - - logger.info("Waiting 30s for cluster start up...") - time.sleep(30) - output = subprocess.check_output(["docker-compose", "-f", - docker_compose_file, "ps"]) - output_array = output.split("\n")[2:-1] - - container_list = [] - for out in output_array: - container = out.split(" ")[0] - container_list.append(container) - call(["blockade", "add", container]) - time.sleep(2) - - assert container_list, "no container found!" - logger.info("blockade created with containers %s", - ' '.join(container_list)) - - return container_list - - @classmethod - def cluster_destroy(cls, docker_compose_file): - logger.info("Running docker-compose -f %s down", docker_compose_file) - call(["docker-compose", "-f", docker_compose_file, "down"]) - - @classmethod - def run_freon(cls, docker_compose_file, num_volumes, num_buckets, - num_keys, key_size, replication_type, replication_factor, - freon_client='om'): - # run freon - cmd = "docker-compose -f %s " \ - "exec %s /opt/hadoop/bin/ozone " \ - "freon rk " \ - "--numOfVolumes %s " \ - "--numOfBuckets %s " \ - "--numOfKeys %s " \ - "--keySize %s " \ - "--replicationType %s " \ - "--factor %s" % (docker_compose_file, freon_client, num_volumes, - num_buckets, num_keys, key_size, - replication_type, replication_factor) - exit_code, output = cls.run_cmd(cmd) - return exit_code, output - - @classmethod - def run_cmd(cls, cmd): - command = cmd - if isinstance(cmd, list): - command = ' '.join(cmd) - logger.info(" RUNNING: %s", command) - all_output = "" - myprocess = subprocess.Popen(cmd, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, shell=True) - while myprocess.poll() is None: - op = myprocess.stdout.readline() - if op: - all_output += op - logger.info(op) - other_output = myprocess.communicate() - other_output = other_output[0].strip() - if other_output != "": - all_output += other_output - for each_line in other_output.split("\n"): - logger.info(" %s", each_line.strip()) - reg = re.compile(r"(\r\n|\n)$") - all_output = reg.sub("", all_output, 1) - - return myprocess.returncode, all_output - - @classmethod - def get_ozone_confkey_value(cls, docker_compose_file, key_name): - cmd = "docker-compose -f %s " \ - "exec om /opt/hadoop/bin/ozone " \ - "getconf -confKey %s" \ - % (docker_compose_file, key_name) - exit_code, output = cls.run_cmd(cmd) - assert exit_code == 0, "getconf of key=[%s] failed with output=[%s]" \ - % (key_name, output) - return str(output).strip() - - @classmethod - def find_scm_uuid(cls, docker_compose_file): - """ - This function returns scm uuid. - """ - ozone_metadata_dir = cls.get_ozone_confkey_value(docker_compose_file, - "ozone.metadata.dirs") - cmd = "docker-compose -f %s exec scm cat %s/scm/current/VERSION" % \ - (docker_compose_file, ozone_metadata_dir) - exit_code, output = cls.run_cmd(cmd) - assert exit_code == 0, "get scm UUID failed with output=[%s]" % output - output_list = output.split("\n") - output_list = [x for x in output_list if re.search(r"\w+=\w+", x)] - output_dict = dict(x.split("=") for x in output_list) - return str(output_dict['scmUuid']).strip() - - @classmethod - def find_container_status(cls, docker_compose_file, datanode_index): - """ - This function returns the datanode's container replica state. - In this function, it finds all .container files. - Then, it opens each file and checks the state of the containers - in the datanode. - It returns 'None' as container state if it cannot find any - .container file in the datanode. - Sample .container contains state as following: - state: - """ - - datanode_dir = cls.get_ozone_confkey_value(docker_compose_file, - "hdds.datanode.dir") - scm_uuid = cls.find_scm_uuid(docker_compose_file) - container_parent_path = "%s/hdds/%s/current/containerDir0" % \ - (datanode_dir, scm_uuid) - cmd = "docker-compose -f %s exec --index=%s datanode find %s -type f " \ - "-name '*.container'" \ - % (docker_compose_file, datanode_index, container_parent_path) - exit_code, output = cls.run_cmd(cmd) - container_state = "None" - if exit_code == 0 and output: - container_list = map(str.strip, output.split("\n")) - for container_path in container_list: - cmd = "docker-compose -f %s exec --index=%s datanode cat %s" \ - % (docker_compose_file, datanode_index, container_path) - exit_code, output = cls.run_cmd(cmd) - assert exit_code == 0, \ - "command=[%s] failed with output=[%s]" % (cmd, output) - container_db_list = output.split("\n") - container_db_list = [x for x in container_db_list - if re.search(r"\w+:\s\w+", x)] - # container_db_list will now contain the lines which has got - # yaml representation , i.e , key: value - container_db_info = "\n".join(container_db_list) - container_db_dict = yaml.load(container_db_info) - for key, value in container_db_dict.items(): - container_db_dict[key] = str(value).lstrip() - if container_state == "None": - container_state = container_db_dict['state'] - else: - assert container_db_dict['state'] == container_state, \ - "all containers are not in same state" - - return container_state - - @classmethod - def findall_container_status(cls, docker_compose_file, scale): - """ - This function returns container replica states of all datanodes. - """ - all_datanode_container_status = [] - for index in range(scale): - all_datanode_container_status.append( - cls.find_container_status(docker_compose_file, index + 1)) - logger.info("All datanodes container status: %s", - ' '.join(all_datanode_container_status)) - - return all_datanode_container_status - - @classmethod - def create_volume(cls, docker_compose_file, volume_name): - command = "docker-compose -f %s " \ - "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh volume create /%s --user root" % \ - (docker_compose_file, volume_name) - logger.info("Creating Volume %s", volume_name) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "Ozone volume create failed with output=[%s]" \ - % output - - @classmethod - def delete_volume(cls, docker_compose_file, volume_name): - command = "docker-compose -f %s " \ - "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh volume delete /%s" % (docker_compose_file, volume_name) - logger.info("Deleting Volume %s", volume_name) - exit_code, output = cls.run_cmd(command) - return exit_code, output - - @classmethod - def create_bucket(cls, docker_compose_file, bucket_name, volume_name): - command = "docker-compose -f %s " \ - "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh bucket create /%s/%s" % (docker_compose_file, - volume_name, bucket_name) - logger.info("Creating Bucket %s in volume %s", - bucket_name, volume_name) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "Ozone bucket create failed with output=[%s]" \ - % output - - @classmethod - def delete_bucket(cls, docker_compose_file, bucket_name, volume_name): - command = "docker-compose -f %s " \ - "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh bucket delete /%s/%s" % (docker_compose_file, - volume_name, bucket_name) - logger.info("Running delete bucket of %s/%s", volume_name, bucket_name) - exit_code, output = cls.run_cmd(command) - return exit_code, output - - @classmethod - def put_key(cls, docker_compose_file, bucket_name, volume_name, - filepath, key_name=None, replication_factor=None): - command = "docker-compose -f %s " \ - "exec ozone_client ls %s" % (docker_compose_file, filepath) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "%s does not exist" % filepath - if key_name is None: - key_name = os.path.basename(filepath) - command = "docker-compose -f %s " \ - "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh key put /%s/%s/%s %s" % (docker_compose_file, - volume_name, bucket_name, - key_name, filepath) - if replication_factor: - command = "%s --replication=%s" % (command, replication_factor) - logger.info("Creating key %s in %s/%s", key_name, - volume_name, bucket_name) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "Ozone put Key failed with output=[%s]" % output - - @classmethod - def delete_key(cls, docker_compose_file, bucket_name, volume_name, - key_name): - command = "docker-compose -f %s " \ - "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh key delete /%s/%s/%s" \ - % (docker_compose_file, volume_name, bucket_name, key_name) - logger.info("Running delete key %s in %s/%s", - key_name, volume_name, bucket_name) - exit_code, output = cls.run_cmd(command) - return exit_code, output - - @classmethod - def get_key(cls, docker_compose_file, bucket_name, volume_name, - key_name, filepath=None): - if filepath is None: - filepath = '.' - command = "docker-compose -f %s " \ - "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh key get /%s/%s/%s %s" % (docker_compose_file, - volume_name, bucket_name, - key_name, filepath) - logger.info("Running get key %s in %s/%s", key_name, - volume_name, bucket_name) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output - - @classmethod - def find_checksum(cls, docker_compose_file, filepath, client="ozone_client"): - """ - This function finds the checksum of a file present in a docker container. - Before running any 'putKey' operation, this function is called to store - the original checksum of the file. The file is then uploaded as a key. - """ - command = "docker-compose -f %s " \ - "exec %s md5sum %s" % \ - (docker_compose_file, client, filepath) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "Cant find checksum" - myoutput = output.split("\n") - finaloutput = "" - for line in myoutput: - if line.find("Warning") >= 0 or line.find("is not a tty") >= 0: - logger.info("skip this line: %s", line) - else: - finaloutput = finaloutput + line - checksum = finaloutput.split(" ") - logger.info("Checksum of %s is : %s", filepath, checksum[0]) - return checksum[0] - - @classmethod - def get_pipelines(cls, docker_compose_file): - command = "docker-compose -f %s " \ - + "exec ozone_client /opt/hadoop/bin/ozone scmcli " \ - + "listPipelines" % (docker_compose_file) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "list pipeline command failed" - return output - - @classmethod - def find_om_scm_client_datanodes(cls, container_list): - - om = filter(lambda x: 'om_1' in x, container_list) - scm = filter(lambda x: 'scm' in x, container_list) - datanodes = sorted( - list(filter(lambda x: 'datanode' in x, container_list))) - client = filter(lambda x: 'ozone_client' in x, container_list) - return om, scm, client, datanodes diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/blockadeUtils/blockade.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/blockade.py similarity index 86% rename from hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/blockadeUtils/blockade.py rename to hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/blockade.py index 7809c70a3b14d..7e32f09494dc7 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/blockadeUtils/blockade.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/blockade.py @@ -20,7 +20,6 @@ from subprocess import call import logging import util -from clusterUtils.cluster_utils import ClusterUtils logger = logging.getLogger(__name__) @@ -34,12 +33,13 @@ def blockade_destroy(cls): @classmethod def blockade_up(cls): + logger.info("Running blockade up") call(["blockade", "up"]) @classmethod def blockade_status(cls): - exit_code, output = util.run_cmd("blockade status") - return exit_code, output + logger.info("Running blockade status") + return call(["blockade", "status"]) @classmethod def make_flaky(cls, flaky_node): @@ -58,15 +58,15 @@ def blockade_create_partition(cls, *args): for node_list in args: nodes = nodes + ','.join(node_list) + " " exit_code, output = \ - util.run_cmd("blockade partition %s" % nodes) + util.run_command("blockade partition %s" % nodes) assert exit_code == 0, \ "blockade partition command failed with exit code=[%s]" % output @classmethod def blockade_join(cls): - output = call(["blockade", "join"]) - assert output == 0, "blockade join command failed with exit code=[%s]" \ - % output + exit_code = call(["blockade", "join"]) + assert exit_code == 0, "blockade join command failed with exit code=[%s]" \ + % exit_code @classmethod def blockade_stop(cls, node, all_nodes=False): @@ -89,4 +89,4 @@ def blockade_start(cls, node, all_nodes=False): @classmethod def blockade_add(cls, node): output = call(["blockade", "add", node]) - assert output == 0, "blockade add command failed" \ No newline at end of file + assert output == 0, "blockade add command failed" diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/client.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/client.py new file mode 100644 index 0000000000000..9d40cf42dacab --- /dev/null +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/client.py @@ -0,0 +1,75 @@ +#!/usr/bin/python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from ozone import util +from ozone.cluster import Command + + +class OzoneClient: + + __logger__ = logging.getLogger(__name__) + + def __init__(self, cluster): + self.cluster = cluster + pass + + def create_volume(self, volume_name): + OzoneClient.__logger__.info("Creating Volume %s" % volume_name) + command = [Command.ozone, "sh volume create /%s --user root" % volume_name] + util.run_docker_command(command, self.cluster.client) + + def create_bucket(self, volume_name, bucket_name): + OzoneClient.__logger__.info("Creating Bucket %s in Volume %s" % (bucket_name, volume_name)) + command = [Command.ozone, "sh bucket create /%s/%s" % (volume_name, bucket_name)] + util.run_docker_command(command, self.cluster.client) + + def put_key(self, source_file, volume_name, bucket_name, key_name, replication_factor=None): + OzoneClient.__logger__.info("Creating Key %s in %s/%s" % (key_name, volume_name, bucket_name)) + exit_code, output = util.run_docker_command( + "ls %s" % source_file, self.cluster.client) + assert exit_code == 0, "%s does not exist" % source_file + command = [Command.ozone, "sh key put /%s/%s/%s %s" % + (volume_name, bucket_name, key_name, source_file)] + if replication_factor: + command.append("--replication=%s" % replication_factor) + + exit_code, output = util.run_docker_command(command, self.cluster.client) + assert exit_code == 0, "Ozone put Key failed with output=[%s]" % output + + def get_key(self, volume_name, bucket_name, key_name, file_path='.'): + OzoneClient.__logger__.info("Reading key %s from %s/%s" % (key_name, volume_name, bucket_name)) + command = [Command.ozone, "sh key get /%s/%s/%s %s" % + (volume_name, bucket_name, key_name, file_path)] + exit_code, output = util.run_docker_command(command, self.cluster.client) + assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output + + def run_freon(self, num_volumes, num_buckets, num_keys, key_size, + replication_type="RATIS", replication_factor="THREE"): + """ + Runs freon on the cluster. + """ + command = [Command.freon, + " rk", + " --numOfVolumes " + str(num_volumes), + " --numOfBuckets " + str(num_buckets), + " --numOfKeys " + str(num_keys), + " --keySize " + str(key_size), + " --replicationType " + replication_type, + " --factor " + replication_factor] + return util.run_docker_command(command, self.cluster.client) diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/cluster.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/cluster.py index f75b3d2c8c298..d13779340d849 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/cluster.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/cluster.py @@ -20,284 +20,272 @@ import re import subprocess import yaml -import util -from os import environ -from subprocess import call -from blockadeUtils.blockade import Blockade -class Command(object): - docker = "docker" - blockade = "blockade" - docker_compose = "docker-compose" - ozone = "/opt/hadoop/bin/ozone" - freon = "/opt/hadoop/bin/ozone freon" +from os import environ +from subprocess import call +from ozone import util +from ozone.constants import Command +from ozone.blockade import Blockade +from ozone.client import OzoneClient +from ozone.container import Container +from ozone.exceptions import ContainerNotFoundError class Configuration: - """ - Configurations to be used while starting Ozone Cluster. - Here @property decorators is used to achieve getters, setters and delete - behaviour for 'datanode_count' attribute. - @datanode_count.setter will set the value for 'datanode_count' attribute. - @datanode_count.deleter will delete the current value of 'datanode_count' - attribute. - """ - - def __init__(self): - if "MAVEN_TEST" in os.environ: - compose_dir = environ.get("MAVEN_TEST") - self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml") - elif "OZONE_HOME" in os.environ: - compose_dir = os.path.join(environ.get("OZONE_HOME"), "compose", "ozoneblockade") - self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml") - else: - __parent_dir__ = os.path.dirname(os.path.dirname(os.path.dirname( - os.path.dirname(os.path.realpath(__file__))))) - self.docker_compose_file = os.path.join(__parent_dir__, - "compose", "ozoneblockade", - "docker-compose.yaml") - self._datanode_count = 3 - os.environ["DOCKER_COMPOSE_FILE"] = self.docker_compose_file - - @property - def datanode_count(self): - return self._datanode_count - - @datanode_count.setter - def datanode_count(self, datanode_count): - self._datanode_count = datanode_count - - @datanode_count.deleter - def datanode_count(self): - del self._datanode_count - - -class Cluster(object): - """ - This represents Ozone Cluster. - Here @property decorators is used to achieve getters, setters and delete - behaviour for 'om', 'scm', 'datanodes' and 'clients' attributes. - """ - - __logger__ = logging.getLogger(__name__) - - def __init__(self, conf): - self.conf = conf - self.docker_compose_file = conf.docker_compose_file - self._om = None - self._scm = None - self._datanodes = None - self._clients = None - self.scm_uuid = None - self.datanode_dir = None - - @property - def om(self): - return self._om - - @om.setter - def om(self, om): - self._om = om - - @om.deleter - def om(self): - del self._om - - @property - def scm(self): - return self._scm - - @scm.setter - def scm(self, scm): - self._scm = scm - - @scm.deleter - def scm(self): - del self._scm - - @property - def datanodes(self): - return self._datanodes - - @datanodes.setter - def datanodes(self, datanodes): - self._datanodes = datanodes - - @datanodes.deleter - def datanodes(self): - del self._datanodes - - @property - def clients(self): - return self._clients - - @clients.setter - def clients(self, clients): - self._clients = clients - - @clients.deleter - def clients(self): - del self._clients - - @classmethod - def create(cls, config=Configuration()): - return Cluster(config) - - def start(self): - """ - Start Ozone Cluster in docker containers. - """ - Cluster.__logger__.info("Starting Ozone Cluster") - Blockade.blockade_destroy() - call([Command.docker_compose, "-f", self.docker_compose_file, - "up", "-d", "--scale", - "datanode=" + str(self.conf.datanode_count)]) - Cluster.__logger__.info("Waiting 10s for cluster start up...") - # Remove the sleep and wait only till the cluster is out of safemode - # time.sleep(10) - output = subprocess.check_output([Command.docker_compose, "-f", - self.docker_compose_file, "ps"]) - node_list = [] - for out in output.split("\n")[2:-1]: - node = out.split(" ")[0] - node_list.append(node) - Blockade.blockade_add(node) - - Blockade.blockade_status() - self.om = filter(lambda x: 'om' in x, node_list)[0] - self.scm = filter(lambda x: 'scm' in x, node_list)[0] - self.datanodes = sorted(list(filter(lambda x: 'datanode' in x, node_list))) - self.clients = filter(lambda x: 'ozone_client' in x, node_list) - self.scm_uuid = self.__get_scm_uuid__() - self.datanode_dir = self.get_conf_value("hdds.datanode.dir") - - assert node_list, "no node found in the cluster!" - Cluster.__logger__.info("blockade created with nodes %s", - ' '.join(node_list)) - - def get_conf_value(self, key): """ - Returns the value of given configuration key. + Configurations to be used while starting Ozone Cluster. + Here @property decorators is used to achieve getters, setters and delete + behaviour for 'datanode_count' attribute. + @datanode_count.setter will set the value for 'datanode_count' attribute. + @datanode_count.deleter will delete the current value of 'datanode_count' + attribute. """ - command = [Command.ozone, "getconf -confKey " + key] - exit_code, output = self.__run_docker_command__(command, self.om) - return str(output).strip() - def scale_datanode(self, datanode_count): + def __init__(self): + if "MAVEN_TEST" in os.environ: + compose_dir = environ.get("MAVEN_TEST") + self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml") + elif "OZONE_HOME" in os.environ: + compose_dir = os.path.join(environ.get("OZONE_HOME"), "compose", "ozoneblockade") + self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml") + else: + __parent_dir__ = os.path.dirname(os.path.dirname(os.path.dirname( + os.path.dirname(os.path.realpath(__file__))))) + self.docker_compose_file = os.path.join(__parent_dir__, + "compose", "ozoneblockade", + "docker-compose.yaml") + self._datanode_count = 3 + os.environ["DOCKER_COMPOSE_FILE"] = self.docker_compose_file + + @property + def datanode_count(self): + return self._datanode_count + + @datanode_count.setter + def datanode_count(self, datanode_count): + self._datanode_count = datanode_count + + @datanode_count.deleter + def datanode_count(self): + del self._datanode_count + + +class OzoneCluster(object): """ - Commission new datanodes to the running cluster. + This represents Ozone Cluster. + Here @property decorators is used to achieve getters, setters and delete + behaviour for 'om', 'scm', 'datanodes' and 'client' attributes. """ - call([Command.docker_compose, "-f", self.docker_compose_file, - "up", "-d", "--scale", "datanode=" + datanode_count]) - - def partition_network(self, *args): - """ - Partition the network which is used by the cluster. - """ - Blockade.blockade_create_partition(*args) + __logger__ = logging.getLogger(__name__) + + def __init__(self, conf): + self.conf = conf + self.docker_compose_file = conf.docker_compose_file + self._om = None + self._scm = None + self._datanodes = None + self._client = None + self.scm_uuid = None + self.datanode_dir = None + + @property + def om(self): + return self._om + + @om.setter + def om(self, om): + self._om = om + + @om.deleter + def om(self): + del self._om + + @property + def scm(self): + return self._scm + + @scm.setter + def scm(self, scm): + self._scm = scm + + @scm.deleter + def scm(self): + del self._scm + + @property + def datanodes(self): + return self._datanodes + + @datanodes.setter + def datanodes(self, datanodes): + self._datanodes = datanodes + + @datanodes.deleter + def datanodes(self): + del self._datanodes + + @property + def client(self): + return self._client + + @client.setter + def client(self, client): + self._client = client + + @client.deleter + def client(self): + del self._client + + @classmethod + def create(cls, config=Configuration()): + return OzoneCluster(config) + + def start(self): + """ + Start Ozone Cluster in docker containers. + """ + self.__logger__.info("Starting Ozone Cluster") + if Blockade.blockade_status() == 0: + Blockade.blockade_destroy() + + Blockade.blockade_up() + + call([Command.docker_compose, "-f", self.docker_compose_file, + "up", "-d", "--scale", + "datanode=" + str(self.conf.datanode_count)]) + self.__logger__.info("Waiting 10s for cluster start up...") + # Remove the sleep and wait only till the cluster is out of safemode + # time.sleep(10) + output = subprocess.check_output([Command.docker_compose, "-f", + self.docker_compose_file, "ps"]) + node_list = [] + for out in output.split("\n")[2:-1]: + node = out.split(" ")[0] + node_list.append(node) + Blockade.blockade_add(node) + + self.om = filter(lambda x: 'om' in x, node_list)[0] + self.scm = filter(lambda x: 'scm' in x, node_list)[0] + self.datanodes = sorted(list(filter(lambda x: 'datanode' in x, node_list))) + self.client = filter(lambda x: 'ozone_client' in x, node_list)[0] + self.scm_uuid = self.__get_scm_uuid__() + self.datanode_dir = self.get_conf_value("hdds.datanode.dir") + + assert node_list, "no node found in the cluster!" + self.__logger__.info("blockade created with nodes %s", ' '.join(node_list)) + + def get_conf_value(self, key): + """ + Returns the value of given configuration key. + """ + command = [Command.ozone, "getconf -confKey " + key] + exit_code, output = util.run_docker_command(command, self.om) + return str(output).strip() + + def scale_datanode(self, datanode_count): + """ + Commission new datanodes to the running cluster. + """ + call([Command.docker_compose, "-f", self.docker_compose_file, + "up", "-d", "--scale", "datanode=" + datanode_count]) + + def partition_network(self, *args): + """ + Partition the network which is used by the cluster. + """ + Blockade.blockade_create_partition(*args) + + def restore_network(self): + """ + Restores the network partition. + """ + Blockade.blockade_join() + + def __get_scm_uuid__(self): + """ + Returns SCM's UUID. + """ + ozone_metadata_dir = self.get_conf_value("ozone.metadata.dirs") + command = "cat %s/scm/current/VERSION" % ozone_metadata_dir + exit_code, output = util.run_docker_command(command, self.scm) + output_list = output.split("\n") + key_value = [x for x in output_list if re.search(r"\w+=\w+", x)] + uuid = [token for token in key_value if 'scmUuid' in token] + return uuid.pop().split("=")[1].strip() + + def get_client(self): + return OzoneClient(self) + + def get_container(self, container_id): + command = [Command.ozone, "scmcli list -c=1 -s=%s | grep containerID", container_id - 1] + exit_code, output = util.run_docker_command(command, self.om) + if exit_code != 0: + raise ContainerNotFoundError(container_id) + return Container(container_id, self) + + def get_containers_on_datanode(self, datanode): + """ + Returns all the container on given datanode. + """ + container_parent_path = "%s/hdds/%s/current/containerDir0" % \ + (self.datanode_dir, self.scm_uuid) + command = "find %s -type f -name '*.container'" % container_parent_path + exit_code, output = util.run_docker_command(command, datanode) + containers = [] + + container_list = map(str.strip, output.split("\n")) + for container_path in container_list: + # Reading the container file. + exit_code, output = util.run_docker_command( + "cat " + container_path, datanode) + if exit_code is not 0: + continue + data = output.split("\n") + # Reading key value pairs from container file. + key_value = [x for x in data if re.search(r"\w+:\s\w+", x)] + content = "\n".join(key_value) + content_yaml = yaml.load(content) + if content_yaml is None: + continue + containers.append(Container(content_yaml.get('containerID'), self)) + return containers + + def get_container_state(self, container_id, datanode): + container_parent_path = "%s/hdds/%s/current/containerDir0" % \ + (self.datanode_dir, self.scm_uuid) + command = "find %s -type f -name '%s.container'" % (container_parent_path, container_id) + exit_code, output = util.run_docker_command(command, datanode) + container_path = output.strip() + if not container_path: + raise ContainerNotFoundError("Container not found!") + + # Reading the container file. + exit_code, output = util.run_docker_command("cat " + container_path, datanode) + data = output.split("\n") + # Reading key value pairs from container file. + key_value = [x for x in data if re.search(r"\w+:\s\w+", x)] + content = "\n".join(key_value) + content_yaml = yaml.load(content) + return str(content_yaml.get('state')).lstrip() + + def get_container_datanodes(self, container_id): + result = [] + for datanode in self.datanodes: + container_parent_path = "%s/hdds/%s/current/containerDir0" % \ + (self.datanode_dir, self.scm_uuid) + command = "find %s -type f -name '%s.container'" % (container_parent_path, container_id) + exit_code, output = util.run_docker_command(command, datanode) + if exit_code == 0: + result.append(datanode) + return result + + def stop(self): + """ + Stops the Ozone Cluster. + """ + self.__logger__.info("Stopping Ozone Cluster") + call([Command.docker_compose, "-f", self.docker_compose_file, "down"]) + Blockade.blockade_destroy() - def restore_network(self): - """ - Restores the network partition. - """ - Blockade.blockade_join() - - - def __get_scm_uuid__(self): - """ - Returns SCM's UUID. - """ - ozone_metadata_dir = self.get_conf_value("ozone.metadata.dirs") - command = "cat %s/scm/current/VERSION" % ozone_metadata_dir - exit_code, output = self.__run_docker_command__(command, self.scm) - output_list = output.split("\n") - key_value = [x for x in output_list if re.search(r"\w+=\w+", x)] - uuid = [token for token in key_value if 'scmUuid' in token] - return uuid.pop().split("=")[1].strip() - - def get_container_states(self, datanode): - """ - Returns the state of all the containers in the given datanode. - """ - container_parent_path = "%s/hdds/%s/current/containerDir0" % \ - (self.datanode_dir, self.scm_uuid) - command = "find %s -type f -name '*.container'" % container_parent_path - exit_code, output = self.__run_docker_command__(command, datanode) - container_state = {} - - container_list = map(str.strip, output.split("\n")) - for container_path in container_list: - # Reading the container file. - exit_code, output = self.__run_docker_command__( - "cat " + container_path, datanode) - if exit_code is not 0: - continue - data = output.split("\n") - # Reading key value pairs from container file. - key_value = [x for x in data if re.search(r"\w+:\s\w+", x)] - content = "\n".join(key_value) - content_yaml = yaml.load(content) - if content_yaml is None: - continue - for key, value in content_yaml.items(): - content_yaml[key] = str(value).lstrip() - # Stores the container state in a dictionary. - container_state[content_yaml['containerID']] = content_yaml['state'] - return container_state - - def run_freon(self, num_volumes, num_buckets, num_keys, key_size, - replication_type="RATIS", replication_factor="THREE", - run_on=None): - """ - Runs freon on the cluster. - """ - if run_on is None: - run_on = self.om - command = [Command.freon, - " rk", - " --numOfVolumes " + str(num_volumes), - " --numOfBuckets " + str(num_buckets), - " --numOfKeys " + str(num_keys), - " --keySize " + str(key_size), - " --replicationType " + replication_type, - " --factor " + replication_factor] - return self.__run_docker_command__(command, run_on) - - def __run_docker_command__(self, command, run_on): - if isinstance(command, list): - command = ' '.join(command) - command = [Command.docker, - "exec " + run_on, - command] - return util.run_cmd(command) - - def stop(self): - """ - Stops the Ozone Cluster. - """ - Cluster.__logger__.info("Stopping Ozone Cluster") - call([Command.docker_compose, "-f", self.docker_compose_file, "down"]) - Blockade.blockade_destroy() - - def container_state_predicate_all_closed(self, datanodes): - for datanode in datanodes: - container_states_dn = self.get_container_states(datanode) - if not container_states_dn \ - or container_states_dn.popitem()[1] != 'CLOSED': - return False - return True - - def container_state_predicate_one_closed(self, datanodes): - for datanode in datanodes: - container_states_dn = self.get_container_states(datanode) - if container_states_dn and container_states_dn.popitem()[1] == 'CLOSED': - return True - return False - - def container_state_predicate(self, datanode, state): - container_states_dn = self.get_container_states(datanode) - if container_states_dn and container_states_dn.popitem()[1] == state: - return True - return False diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/clusterUtils/__init__.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/constants.py similarity index 77% rename from hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/clusterUtils/__init__.py rename to hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/constants.py index 13878a13a7f86..a79d6b1be0f07 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/clusterUtils/__init__.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/constants.py @@ -1,3 +1,5 @@ +#!/usr/bin/python + # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -11,4 +13,11 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file +# limitations under the License. + + +class Command(object): + docker = "docker" + docker_compose = "docker-compose" + ozone = "/opt/hadoop/bin/ozone" + freon = "/opt/hadoop/bin/ozone freon" diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/container.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/container.py new file mode 100644 index 0000000000000..ffb6a3df3ae67 --- /dev/null +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/container.py @@ -0,0 +1,117 @@ +#!/usr/bin/python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import util +from ozone.exceptions import ContainerNotFoundError + + +class Container: + + def __init__(self, container_id, cluster): + self.container_id = container_id + self.cluster = cluster + + def get_datanode_states(self): + dns = self.cluster.get_container_datanodes(self.container_id) + states = [] + for dn in dns: + states.append(self.get_state(dn)) + return states + + def get_state(self, datanode): + return self.cluster.get_container_state(self.container_id, datanode) + + def wait_until_replica_is_quasi_closed(self, datanode): + def predicate(): + try: + if self.cluster.get_container_state(self.container_id, datanode) == 'QUASI_CLOSED': + return True + else: + return False + except ContainerNotFoundError: + return False + + util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) + if not predicate(): + raise Exception("Replica is not quasi closed!") + + def wait_until_one_replica_is_quasi_closed(self): + def predicate(): + dns = self.cluster.get_container_datanodes(self.container_id) + for dn in dns: + if self.cluster.get_container_state(self.container_id, dn) == 'QUASI_CLOSED': + return True + else: + return False + + util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) + if not predicate(): + raise Exception("None of the container replica is quasi closed!") + + def wait_until_replica_is_closed(self, datanode): + def predicate(): + try: + if self.cluster.get_container_state(self.container_id, datanode) == 'CLOSED': + return True + else: + return False + except ContainerNotFoundError: + return False + + util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) + if not predicate(): + raise Exception("Replica is not closed!") + + def wait_until_one_replica_is_closed(self): + def predicate(): + dns = self.cluster.get_container_datanodes(self.container_id) + for dn in dns: + if self.cluster.get_container_state(self.container_id, dn) == 'CLOSED': + return True + else: + return False + + util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) + if not predicate(): + raise Exception("None of the container replica is closed!") + + def wait_until_all_replicas_are_closed(self): + def predicate(): + dns = self.cluster.get_container_datanodes(self.container_id) + for dn in dns: + if self.cluster.get_container_state(self.container_id, dn) != 'CLOSED': + return False + return True + + util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) + if not predicate(): + raise Exception("Not all the replicas are closed!") + + def wait_until_replica_is_not_open_anymore(self, datanode): + def predicate(): + try: + if self.cluster.get_container_state(self.container_id, datanode) != 'OPEN': + return True + else: + return False + except ContainerNotFoundError: + return False + + util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) + if not predicate(): + raise Exception("Replica is not closed!") \ No newline at end of file diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/blockadeUtils/__init__.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/exceptions.py similarity index 78% rename from hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/blockadeUtils/__init__.py rename to hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/exceptions.py index 13878a13a7f86..9917eaad37e3b 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/blockadeUtils/__init__.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/exceptions.py @@ -1,3 +1,5 @@ +#!/usr/bin/python + # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -11,4 +13,10 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file +# limitations under the License. + + +class ContainerNotFoundError(RuntimeError): + """ ContainerNotFoundError run-time error. """ + def __init__(self, *args, **kwargs): + pass diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/util.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/util.py similarity index 54% rename from hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/util.py rename to hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/util.py index 84f7fdaca6891..066b16f67f406 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/util.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/ozone/util.py @@ -15,38 +15,66 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import time import re -import logging import subprocess +from ozone.constants import Command + logger = logging.getLogger(__name__) + def wait_until(predicate, timeout, check_frequency=1): - deadline = time.time() + timeout - while time.time() < deadline: - if predicate(): - return - time.sleep(check_frequency) + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return + time.sleep(check_frequency) -def run_cmd(cmd): +def run_docker_command(command, run_on): + if isinstance(command, list): + command = ' '.join(command) + command = [Command.docker, + "exec " + run_on, + command] + return run_command(command) + + +def run_command(cmd): command = cmd if isinstance(cmd, list): - command = ' '.join(cmd) - logger.info(" RUNNING: %s", command) + command = ' '.join(cmd) + logger.info("RUNNING: %s", command) all_output = "" my_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) while my_process.poll() is None: - op = my_process.stdout.readline() - if op: - all_output += op - logger.info(op) + op = my_process.stdout.readline() + if op: + all_output += op + logger.info(op) other_output = my_process.communicate() other_output = other_output[0].strip() if other_output != "": - all_output += other_output + all_output += other_output reg = re.compile(r"(\r\n|\n)$") + logger.debug("Output: %s", all_output) all_output = reg.sub("", all_output, 1) return my_process.returncode, all_output + + +def get_checksum(file_path, run_on): + command = "md5sum %s" % file_path + exit_code, output = run_docker_command(command, run_on) + assert exit_code == 0, "Cant find checksum" + output_split = output.split("\n") + result = "" + for line in output_split: + if line.find("Warning") >= 0 or line.find("is not a tty") >= 0: + logger.info("skip this line: %s", line) + else: + result = result + line + checksum = result.split(" ") + return checksum[0] diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_client_failure.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_client_failure.py index 9e1b04f68261e..55b5291391186 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_client_failure.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_client_failure.py @@ -15,117 +15,103 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import re import time import logging -from os import environ -from blockadeUtils.blockade import Blockade -from clusterUtils.cluster_utils import ClusterUtils +import ozone.util +from ozone.cluster import OzoneCluster logger = logging.getLogger(__name__) -if "MAVEN_TEST" in os.environ: - compose_dir = environ.get("MAVEN_TEST") - FILE = os.path.join(compose_dir, "docker-compose.yaml") -elif "OZONE_HOME" in os.environ: - compose_dir = environ.get("OZONE_HOME") - FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") -else: - parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) - FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") - -os.environ["DOCKER_COMPOSE_FILE"] = FILE -SCALE = 3 -CONTAINER_LIST = [] -OM = [] -SCM = [] -DATANODES = [] -CLIENT = [] - - -def setup(): - global CONTAINER_LIST, OM, SCM, DATANODES, CLIENT, ORIG_CHECKSUM, \ - TEST_VOLUME_NAME, TEST_BUCKET_NAME - epoch_time = int(time.time()) - TEST_VOLUME_NAME = "%s%s" % ("volume", epoch_time) - TEST_BUCKET_NAME = "%s%s" % ("bucket", epoch_time) - Blockade.blockade_destroy() - CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) - exit_code, output = Blockade.blockade_status() - assert exit_code == 0, "blockade status command failed with output=[%s]" % \ - output - OM, SCM, CLIENT, DATANODES = \ - ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST) - exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", - "THREE", "ozone_client") - assert exit_code == 0, "freon run failed with output=[%s]" % output - ClusterUtils.create_volume(FILE, TEST_VOLUME_NAME) - ClusterUtils.create_bucket(FILE, TEST_BUCKET_NAME, TEST_VOLUME_NAME) - ORIG_CHECKSUM = ClusterUtils.find_checksum(FILE, "/etc/passwd") -def teardown(): - logger.info("Inside teardown") - Blockade.blockade_destroy() +def setup_function(): + global cluster + cluster = OzoneCluster.create() + cluster.start() -def teardown_module(): - ClusterUtils.cluster_destroy(FILE) +def teardown_function(): + cluster.stop() def test_client_failure_isolate_two_datanodes(): """ - In this test, all datanodes are isolated from each other. - two of the datanodes cannot communicate with any other node in the cluster. + In this test, all DNs are isolated from each other. + two of the DNs cannot communicate with any other node in the cluster. Expectation : Write should fail. - Keys written before parition created can be read. + Keys written before partition created should be read. """ - test_key_name = "testkey1" - ClusterUtils.put_key(FILE, TEST_BUCKET_NAME, TEST_VOLUME_NAME, - "/etc/passwd", key_name=test_key_name, - replication_factor='THREE') - first_set = [OM[0], SCM[0], DATANODES[0], CLIENT[0]] - second_set = [DATANODES[1]] - third_set = [DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set, third_set) - Blockade.blockade_status() - exit_code, output = \ - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - assert re.search( - "Status: Failed", - output) is not None - ClusterUtils.get_key(FILE, TEST_BUCKET_NAME, TEST_VOLUME_NAME, - test_key_name, "/tmp/") - key_checksum = ClusterUtils.find_checksum(FILE, "/tmp/%s" % test_key_name) - - assert key_checksum == ORIG_CHECKSUM + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + epoch_time = int(time.time()) + volume_name = "%s-%s" % ("volume", epoch_time) + bucket_name = "%s-%s" % ("bucket", epoch_time) + key_name = "key-1" + + oz_client.create_volume(volume_name) + oz_client.create_bucket(volume_name, bucket_name) + oz_client.put_key("/etc/passwd", volume_name, bucket_name, key_name, "THREE") + + first_set = [om, scm, dns[0], client] + second_set = [dns[1]] + third_set = [dns[2]] + + logger.info("Partitioning the network") + cluster.partition_network(first_set, second_set, third_set) + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert re.search("Status: Failed", output) is not None + + oz_client.get_key(volume_name, bucket_name, key_name, "/tmp/") + + file_checksum = ozone.util.get_checksum("/etc/passwd", client) + key_checksum = ozone.util.get_checksum("/tmp/%s" % key_name, client) + + assert file_checksum == key_checksum def test_client_failure_isolate_one_datanode(): """ - In this test, one of the datanodes is isolated from all other nodes. + In this test, one of the DNs is isolated from all other nodes. Expectation : Write should pass. Keys written before partition created can be read. """ - test_key_name = "testkey2" - ClusterUtils.put_key(FILE, TEST_BUCKET_NAME, TEST_VOLUME_NAME, - "/etc/passwd", key_name=test_key_name, - replication_factor='THREE') - first_set = [OM[0], SCM[0], DATANODES[0], DATANODES[1], CLIENT[0]] - second_set = [DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set) - Blockade.blockade_status() - exit_code, output = \ - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + epoch_time = int(time.time()) + volume_name = "%s-%s" % ("volume", epoch_time) + bucket_name = "%s-%s" % ("bucket", epoch_time) + key_name = "key-1" + + oz_client.create_volume(volume_name) + oz_client.create_bucket(volume_name, bucket_name) + oz_client.put_key("/etc/passwd", volume_name, bucket_name, key_name, "THREE") + + first_set = [om, scm, dns[0], dns[1], client] + second_set = [dns[2]] + + logger.info("Partitioning the network") + cluster.partition_network(first_set, second_set) + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) assert re.search("3 way commit failed", output) is not None assert re.search("Status: Success", output) is not None - ClusterUtils.get_key(FILE, TEST_BUCKET_NAME, TEST_VOLUME_NAME, - test_key_name, "/tmp/") - key_checksum = ClusterUtils.find_checksum(FILE, "/tmp/%s" % test_key_name) - assert key_checksum == ORIG_CHECKSUM + oz_client.get_key(volume_name, bucket_name, key_name, "/tmp/") + + file_checksum = ozone.util.get_checksum("/etc/passwd", client) + key_checksum = ozone.util.get_checksum("/tmp/%s" % key_name, cluster.client) + + assert file_checksum == key_checksum + diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_datanode_isolation.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_datanode_isolation.py index 85d99e213da92..5c19116976688 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_datanode_isolation.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_datanode_isolation.py @@ -15,123 +15,133 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import logging -import util -from ozone.cluster import Cluster +import pytest + +from ozone.cluster import OzoneCluster logger = logging.getLogger(__name__) -def setup_function(function): - global cluster - cluster = Cluster.create() - cluster.start() + +def setup_function(): + global cluster + cluster = OzoneCluster.create() + cluster.start() -def teardown_function(function): - cluster.stop() +def teardown_function(): + cluster.stop() def test_isolate_single_datanode(): - """ - In this test case we will create a network partition in such a way that - one of the datanode will not be able to communicate with other datanodes - but it will be able to communicate with SCM. - - Once the network partition happens, SCM detects it and closes the pipeline, - which in-turn closes the containers. - - The container on the first two datanode will get CLOSED as they have quorum. - The container replica on the third node will be QUASI_CLOSED as it is not - able to connect with the other datanodes and it doesn't have latest BCSID. - - Once we restore the network, the stale replica on the third datanode will be - deleted and a latest replica will be copied from any one of the other - datanodes. - - """ - cluster.run_freon(1, 1, 1, 10240) - first_set = [cluster.om, cluster.scm, - cluster.datanodes[0], cluster.datanodes[1]] - second_set = [cluster.om, cluster.scm, cluster.datanodes[2]] - logger.info("Partitioning the network") - cluster.partition_network(first_set, second_set) - cluster.run_freon(1, 1, 1, 10240) - logger.info("Waiting for container to be QUASI_CLOSED") - - util.wait_until(lambda: cluster.get_container_states(cluster.datanodes[2]) - .popitem()[1] == 'QUASI_CLOSED', - int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) - container_states_dn_0 = cluster.get_container_states(cluster.datanodes[0]) - container_states_dn_1 = cluster.get_container_states(cluster.datanodes[1]) - container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2]) - assert len(container_states_dn_0) != 0 - assert len(container_states_dn_1) != 0 - assert len(container_states_dn_2) != 0 - for key in container_states_dn_0: - assert container_states_dn_0.get(key) == 'CLOSED' - for key in container_states_dn_1: - assert container_states_dn_1.get(key) == 'CLOSED' - for key in container_states_dn_2: - assert container_states_dn_2.get(key) == 'QUASI_CLOSED' - - # Since the replica in datanode[2] doesn't have the latest BCSID, - # ReplicationManager will delete it and copy a closed replica. - # We will now restore the network and datanode[2] should get a - # closed replica of the container - logger.info("Restoring the network") - cluster.restore_network() - - logger.info("Waiting for the replica to be CLOSED") - util.wait_until( - lambda: cluster.container_state_predicate(cluster.datanodes[2], 'CLOSED'), - int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) - container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2]) - assert len(container_states_dn_2) != 0 - for key in container_states_dn_2: - assert container_states_dn_2.get(key) == 'CLOSED' + """ + In this test case we will create a network partition in such a way that + one of the DN will not be able to communicate with other datanodes + but it will be able to communicate with SCM. + + Once the network partition happens, SCM detects it and closes the pipeline, + which in-turn closes the containers. + + The container on the first two DN will get CLOSED as they have quorum. + The container replica on the third node will be QUASI_CLOSED as it is not + able to connect with the other DNs and it doesn't have latest BCSID. + + Once we restore the network, the stale replica on the third DN will be + deleted and a latest replica will be copied from any one of the other + DNs. + + """ + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + oz_client.run_freon(1, 1, 1, 10240) + + # Partition the network + first_set = [om, scm, dns[0], dns[1], client] + second_set = [om, scm, dns[2], client] + logger.info("Partitioning the network") + cluster.partition_network(first_set, second_set) + + oz_client.run_freon(1, 1, 1, 10240) + + logger.info("Waiting for container to be QUASI_CLOSED") + containers = cluster.get_containers_on_datanode(dns[2]) + for container in containers: + container.wait_until_replica_is_quasi_closed(dns[2]) + + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'QUASI_CLOSED' + + # Since the replica in datanode[2] doesn't have the latest BCSID, + # ReplicationManager will delete it and copy a closed replica. + # We will now restore the network and datanode[2] should get a + # closed replica of the container + logger.info("Restoring the network") + cluster.restore_network() + + logger.info("Waiting for the replica to be CLOSED") + for container in containers: + container.wait_until_replica_is_closed(dns[2]) + + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert exit_code == 0, "freon run failed with output=[%s]" % output + + +@pytest.mark.skip(reason="RATIS-615") +def test_datanode_isolation_all(): + """ + In this test case we will create a network partition in such a way that + all DNs cannot communicate with each other. + All DNs will be able to communicate with SCM. + Once the network partition happens, SCM detects it and closes the pipeline, + which in-turn tries to close the containers. + At least one of the replica should be in closed state -def test_datanode_isolation_all(): - """ - In this test case we will create a network partition in such a way that - all datanodes cannot communicate with each other. - All datanodes will be able to communicate with SCM. - - Once the network partition happens, SCM detects it and closes the pipeline, - which in-turn tries to close the containers. - At least one of the replica should be in closed state - - Once we restore the network, there will be three closed replicas. - - """ - cluster.run_freon(1, 1, 1, 10240) - - assert len(cluster.get_container_states(cluster.datanodes[0])) != 0 - assert len(cluster.get_container_states(cluster.datanodes[1])) != 0 - assert len(cluster.get_container_states(cluster.datanodes[2])) != 0 - - logger.info("Partitioning the network") - first_set = [cluster.om, cluster.scm, cluster.datanodes[0]] - second_set = [cluster.om, cluster.scm, cluster.datanodes[1]] - third_set = [cluster.om, cluster.scm, cluster.datanodes[2]] - cluster.partition_network(first_set, second_set, third_set) - - logger.info("Waiting for the replica to be CLOSED") - util.wait_until( - lambda: cluster.container_state_predicate_one_closed(cluster.datanodes), - int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) - - # At least one of the replica should be in closed state - assert cluster.container_state_predicate_one_closed(cluster.datanodes) - - # After restoring the network all the replicas should be in - # CLOSED state - logger.info("Restoring the network") - cluster.restore_network() - - logger.info("Waiting for the container to be replicated") - util.wait_until( - lambda: cluster.container_state_predicate_all_closed(cluster.datanodes), - int(os.environ["CONTAINER_STATUS_SLEEP"]), 10) - assert cluster.container_state_predicate_all_closed(cluster.datanodes) + Once we restore the network, there will be three closed replicas. + + """ + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + oz_client.run_freon(1, 1, 1, 10240) + + logger.info("Partitioning the network") + first_set = [om, scm, dns[0], client] + second_set = [om, scm, dns[1], client] + third_set = [om, scm, dns[2], client] + cluster.partition_network(first_set, second_set, third_set) + + containers = cluster.get_containers_on_datanode(dns[0]) + container = containers.pop() + + logger.info("Waiting for a replica to be CLOSED") + container.wait_until_one_replica_is_closed() + + # At least one of the replica should be in closed state + assert 'CLOSED' in container.get_datanode_states() + + logger.info("Restoring the network") + cluster.restore_network() + + logger.info("Waiting for the container to be replicated") + container.wait_until_all_replicas_are_closed() + # After restoring the network all the replicas should be in CLOSED state + for state in container.get_datanode_states(): + assert state == 'CLOSED' + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert exit_code == 0, "freon run failed with output=[%s]" % output diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_flaky.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_flaky.py index 6f1df18bec025..b9ba0e0f57c5c 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_flaky.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_flaky.py @@ -15,50 +15,33 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import logging import random import pytest -from os import environ -from blockadeUtils.blockade import Blockade -from ozone.cluster import Cluster +from ozone.blockade import Blockade +from ozone.cluster import OzoneCluster -logger = logging.getLogger(__name__) -if "MAVEN_TEST" in os.environ: - compose_dir = environ.get("MAVEN_TEST") - FILE = os.path.join(compose_dir, "docker-compose.yaml") -elif "OZONE_HOME" in os.environ: - compose_dir = environ.get("OZONE_HOME") - FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") -else: - parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) - FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") -os.environ["DOCKER_COMPOSE_FILE"] = FILE -SCALE = 6 -CONTAINER_LIST = [] +logger = logging.getLogger(__name__) -def setup_function(function): - global cluster - cluster = Cluster.create() - cluster.start() +def setup_function(): + global cluster + cluster = OzoneCluster.create() + cluster.start() -def teardown_function(function): - cluster.stop() +def teardown_function(): + cluster.stop() -@pytest.mark.parametrize("flaky_node", ["datanode", "scm", "om", "all"]) +@pytest.mark.parametrize("flaky_node", ["datanode", "scm", "om"]) def test_flaky(flaky_node): """ In these tests, we make the network of the nodes as flaky using blockade. There are 4 tests : - 1) one of the datanodes selected randomly and network of the datanode is - made flaky. + 1) one of the DNs selected randomly and network of the DN is made flaky. 2) scm network is made flaky. 3) om network is made flaky. 4) Network of all the nodes are made flaky. @@ -72,6 +55,5 @@ def test_flaky(flaky_node): }[flaky_node] Blockade.make_flaky(flaky_container_name) - Blockade.blockade_status() - exit_code, output = cluster.run_freon(1, 1, 1, 10240) + exit_code, output = cluster.get_client().run_freon(1, 1, 1, 10240) assert exit_code == 0, "freon run failed with output=[%s]" % output diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure.py index 86d5311d19226..10220b9a71244 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure.py @@ -15,145 +15,107 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import time import logging -import re -from os import environ -from blockadeUtils.blockade import Blockade -from clusterUtils.cluster_utils import ClusterUtils + +from ozone.cluster import OzoneCluster logger = logging.getLogger(__name__) -if "MAVEN_TEST" in os.environ: - compose_dir = environ.get("MAVEN_TEST") - FILE = os.path.join(compose_dir, "docker-compose.yaml") -elif "OZONE_HOME" in os.environ: - compose_dir = environ.get("OZONE_HOME") - FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") -else: - parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) - FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") - -os.environ["DOCKER_COMPOSE_FILE"] = FILE -SCALE = 3 -INCREASED_SCALE = 5 -CONTAINER_LIST = [] -OM = [] -SCM = [] -DATANODES = [] - - -def setup(): - global CONTAINER_LIST, OM, SCM, DATANODES - Blockade.blockade_destroy() - CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) - exit_code, output = Blockade.blockade_status() - assert exit_code == 0, "blockade status command failed with output=[%s]" % \ - output - OM, SCM, _, DATANODES = \ - ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST) - - exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", - "THREE") - assert exit_code == 0, "freon run failed with output=[%s]" % output - - -def teardown(): - logger.info("Inside teardown") - Blockade.blockade_destroy() - - -def teardown_module(): - ClusterUtils.cluster_destroy(FILE) - - -def test_one_dn_isolate_scm_other_dn(run_second_phase): - """ - In this test, one of the datanodes cannot communicate with SCM and other - datanodes. - Other datanodes can communicate with each other and SCM . - Expectation : The container should eventually have two closed replicas. - """ - first_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]] - second_set = [OM[0], DATANODES[0]] - Blockade.blockade_create_partition(first_set, second_set) - Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, SCALE) - count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', - all_datanodes_container_status) - assert len(count_closed_container_datanodes) == 2, \ - "The container should have two closed replicas." - if str(run_second_phase).lower() == "true": - ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - count_closed_container_datanodes = filter( - lambda x: x == 'CLOSED', all_datanodes_container_status) - assert len(count_closed_container_datanodes) >= 3, \ - "The container should have at least three closed replicas." - _, output = \ - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - assert re.search("Status: Success", output) is not None - - -def test_one_dn_isolate_other_dn(run_second_phase): - """ - In this test, one of the datanodes (first datanode) cannot communicate - other datanodes but can communicate with SCM. - One of the other two datanodes (second datanode) cannot communicate with - SCM. - Expectation : - The container replica state in first datanode can be either closed or - quasi-closed. - The container replica state in second datanode can be either closed or open. - The container should eventually have at lease one closed replica. - """ - first_set = [OM[0], SCM[0], DATANODES[0]] - second_set = [OM[0], DATANODES[1], DATANODES[2]] - third_set = [SCM[0], DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set, third_set) - Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, SCALE) - count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', - all_datanodes_container_status) - first_datanode_status = all_datanodes_container_status[0] - second_datanode_status = all_datanodes_container_status[1] - assert first_datanode_status == 'CLOSED' or \ - first_datanode_status == "QUASI_CLOSED" - assert second_datanode_status == 'CLOSED' or \ - second_datanode_status == "OPEN" - assert len(count_closed_container_datanodes) >= 1, \ - "The container should have at least one closed replica" - if str(run_second_phase).lower() == "true": - ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - count_closed_container_datanodes = filter( - lambda x: x == 'CLOSED', all_datanodes_container_status) - assert len(count_closed_container_datanodes) >= 3, \ - "The container should have at least three closed replicas." - _, output = \ - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - assert re.search("Status: Success", output) is not None + + +def setup_function(): + global cluster + cluster = OzoneCluster.create() + cluster.start() + + +def teardown_function(): + cluster.stop() + + +def test_one_dn_isolate_scm_other_dn(): + """ + In this test, one of the DNs cannot communicate with SCM and other DNs. + Other DNs can communicate with each other and SCM . + Expectation : The container should eventually have two closed replicas. + """ + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + oz_client.run_freon(1, 1, 1, 10240) + + # Partition the network + first_set = [dns[0], client] + second_set = [scm, om, dns[1], dns[2], client] + cluster.partition_network(first_set, second_set) + oz_client.run_freon(1, 1, 1, 10240) + containers = cluster.get_containers_on_datanode(dns[1]) + for container in containers: + container.wait_until_one_replica_is_closed() + + for container in containers: + assert container.get_state(dns[0]) == 'OPEN' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + cluster.restore_network() + for container in containers: + container.wait_until_all_replicas_are_closed() + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert exit_code == 0, "freon run failed with output=[%s]" % output + + +def test_one_dn_isolate_other_dn(): + """ + In this test, one of the DNs (first DN) cannot communicate + other DNs but can communicate with SCM. + One of the other two DNs (second DN) cannot communicate with SCM. + Expectation : + The container replica state in first DN can be either closed or + quasi-closed. + The container replica state in second DN can be either closed or open. + The container should eventually have at lease one closed replica. + """ + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + oz_client.run_freon(1, 1, 1, 10240) + + # Partition the network + first_set = [om, scm, dns[0], client] + second_set = [om, dns[1], dns[2], client] + third_set = [scm, dns[2], client] + cluster.partition_network(first_set, second_set, third_set) + oz_client.run_freon(1, 1, 1, 10240) + + containers = cluster.get_containers_on_datanode(dns[0]) + for container in containers: + container.wait_until_replica_is_quasi_closed(dns[0]) + + for container in containers: + assert container.get_state(dns[0]) == 'QUASI_CLOSED' + assert container.get_state(dns[1]) == 'OPEN' or \ + container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'QUASI_CLOSED' or \ + container.get_state(dns[2]) == 'CLOSED' + + cluster.restore_network() + for container in containers: + container.wait_until_all_replicas_are_closed() + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert exit_code == 0, "freon run failed with output=[%s]" % output diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure_three_nodes_isolate.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure_three_nodes_isolate.py index ab4c2d4869971..d213a229de646 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure_three_nodes_isolate.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure_three_nodes_isolate.py @@ -15,221 +15,150 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import time import logging -import re -from os import environ -from blockadeUtils.blockade import Blockade -from clusterUtils.cluster_utils import ClusterUtils + +from ozone.cluster import OzoneCluster logger = logging.getLogger(__name__) -if "MAVEN_TEST" in os.environ: - compose_dir = environ.get("MAVEN_TEST") - FILE = os.path.join(compose_dir, "docker-compose.yaml") -elif "OZONE_HOME" in os.environ: - compose_dir = environ.get("OZONE_HOME") - FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") -else: - parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) - FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") - -os.environ["DOCKER_COMPOSE_FILE"] = FILE -SCALE = 3 -INCREASED_SCALE = 5 -CONTAINER_LIST = [] -OM = [] -SCM = [] -DATANODES = [] - - -def setup(): - global CONTAINER_LIST, OM, SCM, DATANODES - Blockade.blockade_destroy() - CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) - exit_code, output = Blockade.blockade_status() - assert exit_code == 0, "blockade status command failed with output=[%s]" % \ - output - OM, SCM, _, DATANODES = \ - ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST) - - exit_code, output = \ - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - assert exit_code == 0, "freon run failed with output=[%s]" % output - - -def teardown(): - logger.info("Inside teardown") - Blockade.blockade_destroy() - - -def teardown_module(): - ClusterUtils.cluster_destroy(FILE) - - -def test_three_dns_isolate_onescmfailure(run_second_phase): - """ - In this test, all datanodes are isolated from each other. - One of the datanodes (third datanode) cannot communicate with SCM. - Expectation : - The container replica state in first datanode should be closed. - The container replica state in second datanode should be closed. - The container replica state in third datanode should be open. - """ - first_set = [OM[0], SCM[0], DATANODES[0]] - second_set = [OM[0], SCM[0], DATANODES[1]] - third_set = [OM[0], DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set, third_set) - Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, SCALE) - first_datanode_status = all_datanodes_container_status[0] - second_datanode_status = all_datanodes_container_status[1] - third_datanode_status = all_datanodes_container_status[2] - assert first_datanode_status == 'CLOSED' - assert second_datanode_status == 'CLOSED' - assert third_datanode_status == 'OPEN' - - if str(run_second_phase).lower() == "true": - ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - count_closed_container_datanodes = filter( - lambda x: x == 'CLOSED', all_datanodes_container_status) - assert len(count_closed_container_datanodes) == 3, \ - "The container should have three closed replicas." - Blockade.blockade_join() - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - count_closed_container_datanodes = filter( - lambda x: x == 'CLOSED', all_datanodes_container_status) - assert len(count_closed_container_datanodes) == 3, \ - "The container should have three closed replicas." - - -def test_three_dns_isolate_twoscmfailure(run_second_phase): - """ - In this test, all datanodes are isolated from each other. - two datanodes cannot communicate with SCM (second datanode and third - datanode) - Expectation : - The container replica state in first datanode should be quasi-closed. - The container replica state in second datanode should be open. - The container replica state in third datanode should be open. - """ - first_set = [OM[0], SCM[0], DATANODES[0]] - second_set = [OM[0], DATANODES[1]] - third_set = [OM[0], DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set, third_set) - Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, SCALE) - first_datanode_status = all_datanodes_container_status[0] - second_datanode_status = all_datanodes_container_status[1] - third_datanode_status = all_datanodes_container_status[2] - assert first_datanode_status == 'QUASI_CLOSED' - assert second_datanode_status == 'OPEN' - assert third_datanode_status == 'OPEN' - - if str(run_second_phase).lower() == "true": - ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - count_quasi_closed_container_datanodes = filter( - lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status) - assert len(count_quasi_closed_container_datanodes) >= 3, \ - "The container should have at least three quasi-closed replicas." - Blockade.blockade_join() - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - count_closed_container_datanodes = filter( - lambda x: x == 'CLOSED', all_datanodes_container_status) - assert len(count_closed_container_datanodes) == 3, \ - "The container should have three closed replicas." - - -def test_three_dns_isolate_threescmfailure(run_second_phase): - """ - In this test, all datanodes are isolated from each other and also cannot - communicate with SCM. - Expectation : - The container replica state in first datanode should be open. - The container replica state in second datanode should be open. - The container replica state in third datanode should be open. - """ - first_set = [OM[0], DATANODES[0]] - second_set = [OM[0], DATANODES[1]] - third_set = [OM[0], DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set, third_set) - Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, SCALE) - first_datanode_status = all_datanodes_container_status[0] - second_datanode_status = all_datanodes_container_status[1] - third_datanode_status = all_datanodes_container_status[2] - assert first_datanode_status == 'OPEN' - assert second_datanode_status == 'OPEN' - assert third_datanode_status == 'OPEN' - - if str(run_second_phase).lower() == "true": - ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - output = ClusterUtils.get_pipelines(FILE) - if output: - assert re.search("Factor:THREE", output) is None - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - datanodes_having_container_status = filter( - lambda x: x != 'None', all_datanodes_container_status) - assert len(datanodes_having_container_status) == 3, \ - "Containers should not be replicated on addition of new nodes." - Blockade.blockade_join() - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - count_closed_container_datanodes = filter( - lambda x: x == 'CLOSED', all_datanodes_container_status) - assert len(count_closed_container_datanodes) == 3, \ - "The container should have three closed replicas." + + +def setup_function(): + global cluster + cluster = OzoneCluster.create() + cluster.start() + + +def teardown_function(): + cluster.stop() + + +def test_three_dns_isolate_one_scm_failure(): + """ + In this test, all DNs are isolated from each other. + One of the DNs (third DN) cannot communicate with SCM. + Expectation : + The container replica state in first DN should be closed. + The container replica state in second DN should be closed. + The container replica state in third DN should be open. + """ + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + oz_client.run_freon(1, 1, 1, 10240) + + first_set = [om, scm, dns[0], client] + second_set = [om, scm, dns[1], client] + third_set = [om, dns[2], client] + + cluster.partition_network(first_set, second_set, third_set) + containers = cluster.get_containers_on_datanode(dns[0]) + for container in containers: + container.wait_until_replica_is_closed(dns[0]) + + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'OPEN' + + cluster.restore_network() + for container in containers: + container.wait_until_all_replicas_are_closed() + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert exit_code == 0, "freon run failed with output=[%s]" % output + + +def test_three_dns_isolate_two_scm_failure(): + """ + In this test, all DNs are isolated from each other. + two DNs cannot communicate with SCM (second DN and third DN) + Expectation : + The container replica state in first DN should be quasi-closed. + The container replica state in second DN should be open. + The container replica state in third DN should be open. + """ + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + oz_client.run_freon(1, 1, 1, 10240) + + first_set = [om, scm, dns[0], client] + second_set = [om, dns[1], client] + third_set = [om, dns[2], client] + + cluster.partition_network(first_set, second_set, third_set) + containers = cluster.get_containers_on_datanode(dns[0]) + for container in containers: + container.wait_until_replica_is_closed(dns[0]) + + for container in containers: + assert container.get_state(dns[0]) == 'QUASI_CLOSED' + assert container.get_state(dns[1]) == 'OPEN' + assert container.get_state(dns[2]) == 'OPEN' + + cluster.restore_network() + for container in containers: + container.wait_until_all_replicas_are_closed() + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert exit_code == 0, "freon run failed with output=[%s]" % output + + +def test_three_dns_isolate_three_scm_failure(): + """ + In this test, all DNs are isolated from each other and also cannot + communicate with SCM. + Expectation : + The container replica state in first DN should be open. + The container replica state in second DN should be open. + The container replica state in third DN should be open. + """ + om = cluster.om + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + oz_client.run_freon(1, 1, 1, 10240) + + first_set = [om, dns[0], client] + second_set = [om, dns[1], client] + third_set = [om, dns[2], client] + + cluster.partition_network(first_set, second_set, third_set) + + # Wait till the datanodes are marked as stale by SCM + time.sleep(150) + + containers = cluster.get_containers_on_datanode(dns[0]) + for container in containers: + assert container.get_state(dns[0]) == 'OPEN' + assert container.get_state(dns[1]) == 'OPEN' + assert container.get_state(dns[2]) == 'OPEN' + + cluster.restore_network() + + for container in containers: + container.wait_until_all_replicas_are_closed() + + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert exit_code == 0, "freon run failed with output=[%s]" % output diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure_two_nodes.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure_two_nodes.py index 03da7d03ed530..20b0cc3d9d878 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure_two_nodes.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_mixed_failure_two_nodes.py @@ -15,169 +15,118 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import time import logging -import re -from os import environ -from blockadeUtils.blockade import Blockade -from clusterUtils.cluster_utils import ClusterUtils + +from ozone.cluster import OzoneCluster logger = logging.getLogger(__name__) -if "MAVEN_TEST" in os.environ: - compose_dir = environ.get("MAVEN_TEST") - FILE = os.path.join(compose_dir, "docker-compose.yaml") -elif "OZONE_HOME" in os.environ: - compose_dir = environ.get("OZONE_HOME") - FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") -else: - parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) - FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") - -os.environ["DOCKER_COMPOSE_FILE"] = FILE -SCALE = 3 -INCREASED_SCALE = 5 -CONTAINER_LIST = [] -OM = [] -SCM = [] -DATANODES = [] - - -def setup(): - global CONTAINER_LIST, OM, SCM, DATANODES - Blockade.blockade_destroy() - CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) - exit_code, output = Blockade.blockade_status() - assert exit_code == 0, "blockade status command failed with output=[%s]" % \ - output - OM, SCM, _, DATANODES = \ - ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST) - - exit_code, output = \ - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - assert exit_code == 0, "freon run failed with output=[%s]" % output - - -def teardown(): - logger.info("Inside teardown") - Blockade.blockade_destroy() - - -def teardown_module(): - ClusterUtils.cluster_destroy(FILE) - - -def test_two_dns_isolate_scm_same_partition(run_second_phase): - """ - In this test, there are three datanodes, DN1, DN2, DN3 - DN1 is on a network partition and - DN2, DN3 are on a different network partition. - DN2 and DN3 cannot communicate with SCM. - Expectation : - The container replica state in DN1 should be quasi-closed. - The container replica state in DN2 should be open. - The container replica state in DN3 should be open. - """ - first_set = [OM[0], DATANODES[1], DATANODES[2]] - second_set = [OM[0], SCM[0], DATANODES[0]] - Blockade.blockade_create_partition(first_set, second_set) - Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, SCALE) - first_datanode_status = all_datanodes_container_status[0] - second_datanode_status = all_datanodes_container_status[1] - third_datanode_status = all_datanodes_container_status[2] - assert first_datanode_status == 'QUASI_CLOSED' - assert second_datanode_status == 'OPEN' - assert third_datanode_status == 'OPEN' - - if str(run_second_phase).lower() == "true": - ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - count_quasi_closed_container_datanodes = filter( - lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status) - assert len(count_quasi_closed_container_datanodes) >= 3, \ - "The container should have at least three quasi-closed replicas." - Blockade.blockade_join() - Blockade.blockade_status() - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - count_closed_container_datanodes = filter( - lambda x: x == 'CLOSED', all_datanodes_container_status) - assert len(count_closed_container_datanodes) >= 3 - - -def test_two_dns_isolate_scm_different_partition(run_second_phase): - """ - In this test, there are three datanodes, DN1, DN2, DN3 - DN1 is on a network partition and - DN2, DN3 are on a different network partition. - DN1 and DN2 cannot communicate with SCM. - Expectation : - The container replica state in datanode DN1 should be open. - The container replica states can be either 'closed' - in DN2 and DN3, or, - 'open' in DN2 and 'quasi-closed' in DN3. - """ - first_set = [OM[0], DATANODES[0]] - second_set = [OM[0], DATANODES[1], DATANODES[2]] - third_set = [SCM[0], DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set, third_set) - Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, SCALE) - first_datanode_status = all_datanodes_container_status[0] - second_datanode_status = all_datanodes_container_status[1] - third_datanode_status = all_datanodes_container_status[2] - assert first_datanode_status == 'OPEN' - assert (second_datanode_status == 'CLOSED' and - third_datanode_status == 'CLOSED') or \ - (second_datanode_status == 'OPEN' and - third_datanode_status == 'QUASI_CLOSED') - - if str(run_second_phase).lower() == "true": - ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - count_closed_container_datanodes = filter( - lambda x: x == 'CLOSED', all_datanodes_container_status) - count_qausi_closed_container_datanodes = filter( - lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status) - assert len(count_closed_container_datanodes) >= 3 or \ - len(count_qausi_closed_container_datanodes) >= 3 - Blockade.blockade_join() - Blockade.blockade_status() - if len(count_closed_container_datanodes) < 3: - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status( - FILE, INCREASED_SCALE) - count_closed_container_datanodes = filter( - lambda x: x == 'CLOSED', all_datanodes_container_status) - assert len(count_closed_container_datanodes) >= 3 - _, output = \ - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - assert re.search("Status: Success", output) is not None + + +def setup_function(): + global cluster + cluster = OzoneCluster.create() + cluster.start() + + +def teardown_function(): + cluster.stop() + + +def test_two_dns_isolate_scm_same_partition(): + """ + In this test, there are three DNs, + DN1 is on a network partition and + DN2, DN3 are on a different network partition. + DN2 and DN3 cannot communicate with SCM. + Expectation : + The container replica state in DN1 should be quasi-closed. + The container replica state in DN2 should be open. + The container replica state in DN3 should be open. + """ + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + oz_client.run_freon(1, 1, 1, 10240) + + first_set = [om, dns[1], dns[2], client] + second_set = [om, scm, dns[0], client] + cluster.partition_network(first_set, second_set) + oz_client.run_freon(1, 1, 1, 10240) + + containers = cluster.get_containers_on_datanode(dns[0]) + + for container in containers: + container.wait_until_one_replica_is_quasi_closed() + + for container in containers: + assert container.get_state(dns[0]) == 'QUASI_CLOSED' + assert container.get_state(dns[1]) == 'OPEN' + assert container.get_state(dns[2]) == 'OPEN' + + cluster.restore_network() + + for container in containers: + container.wait_until_all_replicas_are_closed() + + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert exit_code == 0, "freon run failed with output=[%s]" % output + + +def test_two_dns_isolate_scm_different_partition(): + """ + In this test, there are three DNs, + DN1 is on a network partition and + DN2, DN3 are on a different network partition. + DN1 and DN2 cannot communicate with SCM. + Expectation : + The container replica state in DN1 should be open. + The container replica states can be either 'closed' + in DN2 and DN3 or 'open' in DN2 and 'quasi-closed' in DN3. + """ + + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + oz_client.run_freon(1, 1, 1, 10240) + + first_set = [om, dns[0], client] + second_set = [om, dns[1], dns[2], client] + third_set = [scm, dns[2], client] + cluster.partition_network(first_set, second_set, third_set) + oz_client.run_freon(1, 1, 1, 10240) + + containers = cluster.get_containers_on_datanode(dns[2]) + + for container in containers: + container.wait_until_replica_is_not_open_anymore(dns[2]) + + for container in containers: + assert container.get_state(dns[0]) == 'OPEN' + assert (container.get_state(dns[1]) == 'CLOSED' and + container.get_state(dns[2]) == 'CLOSED') or \ + (container.get_state(dns[1]) == 'OPEN' and + container.get_state(dns[2]) == 'QUASI_CLOSED') + + cluster.restore_network() + + for container in containers: + container.wait_until_all_replicas_are_closed() + + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert exit_code == 0, "freon run failed with output=[%s]" % output diff --git a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_scm_isolation.py b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_scm_isolation.py index 47bbb76b76c7d..b6ca5a48f4888 100644 --- a/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_scm_isolation.py +++ b/hadoop-ozone/fault-injection-test/network-tests/src/test/blockade/test_blockade_scm_isolation.py @@ -15,153 +15,111 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import time -import re import logging -from os import environ -from blockadeUtils.blockade import Blockade -from clusterUtils.cluster_utils import ClusterUtils + +from ozone.cluster import OzoneCluster logger = logging.getLogger(__name__) -if "MAVEN_TEST" in os.environ: - compose_dir = environ.get("MAVEN_TEST") - FILE = os.path.join(compose_dir, "docker-compose.yaml") -elif "OZONE_HOME" in os.environ: - compose_dir = environ.get("OZONE_HOME") - FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") -else: - parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) - FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \ - "docker-compose.yaml") - -os.environ["DOCKER_COMPOSE_FILE"] = FILE -SCALE = 3 -INCREASED_SCALE = 5 -CONTAINER_LIST = [] -OM = [] -SCM = [] -DATANODES = [] - - -def setup(): - global CONTAINER_LIST, OM, SCM, DATANODES - Blockade.blockade_destroy() - CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) - exit_code, output = Blockade.blockade_status() - assert exit_code == 0, "blockade status command failed with output=[%s]" % \ - output - OM, SCM, _, DATANODES = \ - ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST) - - exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", - "THREE") - assert exit_code == 0, "freon run failed with output=[%s]" % output - - -def teardown(): - logger.info("Inside teardown") - Blockade.blockade_destroy() - - -def teardown_module(): - ClusterUtils.cluster_destroy(FILE) - - -def test_scm_isolation_one_node(run_second_phase): - """ - In this test, one of the datanodes cannot communicate with SCM. - Other datanodes can communicate with SCM. - Expectation : The container should eventually have at least two closed - replicas. - """ - first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]] - second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set) - Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, SCALE) - closed_container_datanodes = [x for x in all_datanodes_container_status - if x == 'CLOSED'] - assert len(closed_container_datanodes) >= 2, \ - "The container should have at least two closed replicas." - - if str(run_second_phase).lower() == "true": - ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, INCREASED_SCALE) - closed_container_datanodes = [x for x in all_datanodes_container_status - if x == 'CLOSED'] - assert len(closed_container_datanodes) >= 3, \ - "The container should have at least three closed replicas." - Blockade.blockade_join() - Blockade.blockade_status() - _, output = \ - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - assert re.search("Status: Success", output) is not None - - -def test_scm_isolation_two_node(run_second_phase): - """ - In this test, two datanodes cannot communicate with SCM. - Expectation : The container should eventually have at three closed replicas - or, two open replicas and one quasi-closed replica. - """ - first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]] - second_set = [OM[0], SCM[0], DATANODES[1]] - Blockade.blockade_create_partition(first_set, second_set) - Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, SCALE) - closed_container_datanodes = [x for x in all_datanodes_container_status - if x == 'CLOSED'] - qausiclosed_container_datanodes = [x for x in all_datanodes_container_status - if x == 'QUASI_CLOSED'] - count_open_container_datanodes = [x for x in all_datanodes_container_status - if x == 'OPEN'] - assert len(closed_container_datanodes) == 3 or \ - (len(count_open_container_datanodes) == 2 and - len(qausiclosed_container_datanodes) == 1), \ - "The container should have three closed replicas or two open " \ - "replicas and one quasi_closed replica." - - if str(run_second_phase).lower() == "true": - ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) - Blockade.blockade_status() - logger.info("Waiting for %s seconds before checking container status", - os.environ["CONTAINER_STATUS_SLEEP"]) - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, INCREASED_SCALE) - closed_container_datanodes = [x for x in all_datanodes_container_status - if x == 'CLOSED'] - qausiclosed_container_datanodes = \ - [x for x in all_datanodes_container_status if x == 'QUASI_CLOSED'] - assert len(closed_container_datanodes) >= 3 or \ - len(qausiclosed_container_datanodes) >= 3 - Blockade.blockade_join() - Blockade.blockade_status() - if len(closed_container_datanodes) < 3: - time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) - all_datanodes_container_status = \ - ClusterUtils.findall_container_status(FILE, INCREASED_SCALE) - closed_container_datanodes = [x for x in all_datanodes_container_status - if x == 'CLOSED'] - - assert len(closed_container_datanodes) >= 3 - _, output = \ - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") - assert re.search("Status: Success", output) is not None + + +def setup_function(): + global cluster + cluster = OzoneCluster.create() + cluster.start() + + +def teardown_function(): + cluster.stop() + + +def test_scm_isolation_one_node(): + """ + In this test, one of the DNs cannot communicate with SCM. + Other DNs can communicate with SCM. + Expectation : The container should eventually have at least two closed + replicas. + """ + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + oz_client.run_freon(1, 1, 1, 10240) + + first_set = [om, dns[0], dns[1], dns[2], client] + second_set = [om, scm, dns[1], dns[2], client] + cluster.partition_network(first_set, second_set) + oz_client.run_freon(1, 1, 1, 10240) + + containers = cluster.get_containers_on_datanode(dns[1]) + + for container in containers: + container.wait_until_one_replica_is_closed() + + for container in containers: + assert container.get_state(dns[0]) == 'OPEN' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + cluster.restore_network() + + for container in containers: + container.wait_until_all_replicas_are_closed() + + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert exit_code == 0, "freon run failed with output=[%s]" % output + + +def test_scm_isolation_two_node(): + """ + In this test, two DNs cannot communicate with SCM. + Expectation : The container should eventually have at three closed replicas + or, two open replicas and one quasi-closed replica. + """ + om = cluster.om + scm = cluster.scm + dns = cluster.datanodes + client = cluster.client + oz_client = cluster.get_client() + + oz_client.run_freon(1, 1, 1, 10240) + + first_set = [om, dns[0], dns[1], dns[2], client] + second_set = [om, scm, dns[1], client] + cluster.partition_network(first_set, second_set) + oz_client.run_freon(1, 1, 1, 10240) + + containers = cluster.get_containers_on_datanode(dns[1]) + + for container in containers: + container.wait_until_replica_is_not_open_anymore(dns[1]) + + for container in containers: + state = container.get_state(dns[1]) + assert state == 'QUASI_CLOSED' or state == 'CLOSED' + + if state == 'QUASI_CLOSED': + assert container.get_state(dns[0]) == 'OPEN' + assert container.get_state(dns[2]) == 'OPEN' + else : + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + cluster.restore_network() + + for container in containers: + container.wait_until_all_replicas_are_closed() + + for container in containers: + assert container.get_state(dns[0]) == 'CLOSED' + assert container.get_state(dns[1]) == 'CLOSED' + assert container.get_state(dns[2]) == 'CLOSED' + + exit_code, output = oz_client.run_freon(1, 1, 1, 10240) + assert exit_code == 0, "freon run failed with output=[%s]" % output