Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions modules/ducktests/tests/ignitetest/services/ignite.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
This module contains class to start ignite cluster node.
"""

import os
import re
import signal
from datetime import datetime
Expand All @@ -32,7 +31,6 @@ class IgniteService(IgniteAwareService):
Ignite node service.
"""
APP_SERVICE_CLASS = "org.apache.ignite.startup.cmdline.CommandLineStartup"
HEAP_DUMP_FILE = os.path.join(IgniteAwareService.PERSISTENT_ROOT, "ignite-heap.bin")

# pylint: disable=R0913
def __init__(self, context, config, num_nodes, jvm_opts=None, startup_timeout_sec=60, shutdown_timeout_sec=10,
Expand All @@ -42,7 +40,7 @@ def __init__(self, context, config, num_nodes, jvm_opts=None, startup_timeout_se

def clean_node(self, node):
node.account.kill_java_processes(self.APP_SERVICE_CLASS, clean_shutdown=False, allow_fail=True)
node.account.ssh("sudo rm -rf -- %s" % self.PERSISTENT_ROOT, allow_fail=False)
node.account.ssh("rm -rf -- %s" % self.persistent_root, allow_fail=False)

def thread_dump(self, node):
"""
Expand Down
2 changes: 1 addition & 1 deletion modules/ducktests/tests/ignitetest/services/ignite_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def clean_node(self, node):

node.account.kill_java_processes(self.servicejava_class_name, clean_shutdown=False, allow_fail=True)

node.account.ssh("rm -rf %s" % self.PERSISTENT_ROOT, allow_fail=False)
node.account.ssh("rm -rf -- %s" % self.persistent_root, allow_fail=False)

def pids(self, node):
return node.account.java_pids(self.servicejava_class_name)
Expand Down
68 changes: 39 additions & 29 deletions modules/ducktests/tests/ignitetest/services/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,43 @@
"""

import os.path
from distutils.version import LooseVersion

from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.services.background_thread import BackgroundThreadService

from ignitetest.services.utils.ignite_persistence import PersistenceAware
from ignitetest.services.utils.path import PathAware
from ignitetest.services.utils.log_utils import monitor_log


class SparkService(BackgroundThreadService, PersistenceAware):
# pylint: disable=abstract-method
class SparkService(BackgroundThreadService, PathAware):
"""
Start a spark node.
"""
INSTALL_DIR = "/opt/spark-{version}".format(version="2.3.4")
SPARK_PERSISTENT_ROOT = "/mnt/spark"

logs = {}

# pylint: disable=R0913
def __init__(self, context, num_nodes=3):
def __init__(self, context, num_nodes=3, version=LooseVersion("2.3.4")):
"""
:param context: test context
:param num_nodes: number of Ignite nodes.
"""
super().__init__(context, num_nodes)

self.log_level = "DEBUG"
self._version = version
self.init_logs_attribute()

for node in self.nodes:
self.logs["master_logs" + node.account.hostname] = {
"path": self.master_log_path(node),
"collect_default": True
}
self.logs["worker_logs" + node.account.hostname] = {
"path": self.slave_log_path(node),
"collect_default": True
}
@property
def project(self):
return "spark"

@property
def version(self):
return self._version

@property
def globals(self):
return self.context.globals

def start(self, clean=True):
BackgroundThreadService.start(self, clean=clean)
Expand All @@ -69,14 +70,25 @@ def start_cmd(self, node):
else:
script = "start-slave.sh spark://{spark_master}:7077".format(spark_master=self.nodes[0].account.hostname)

start_script = os.path.join(SparkService.INSTALL_DIR, "sbin", script)
start_script = os.path.join(self.home_dir, "sbin", script)

cmd = "export SPARK_LOG_DIR={spark_dir}; ".format(spark_dir=SparkService.SPARK_PERSISTENT_ROOT)
cmd += "export SPARK_WORKER_DIR={spark_dir}; ".format(spark_dir=SparkService.SPARK_PERSISTENT_ROOT)
cmd = "export SPARK_LOG_DIR={spark_dir}; ".format(spark_dir=self.persistent_root)
cmd += "export SPARK_WORKER_DIR={spark_dir}; ".format(spark_dir=self.persistent_root)
cmd += "{start_script} &".format(start_script=start_script)

return cmd

def init_logs_attribute(self):
for node in self.nodes:
self.logs["master_logs" + node.account.hostname] = {
"path": self.master_log_path(node),
"collect_default": True
}
self.logs["worker_logs" + node.account.hostname] = {
"path": self.slave_log_path(node),
"collect_default": True
}

def start_node(self, node):
self.init_persistent(node)

Expand All @@ -103,17 +115,17 @@ def start_node(self, node):

def stop_node(self, node):
if node == self.nodes[0]:
node.account.ssh(os.path.join(SparkService.INSTALL_DIR, "sbin", "stop-master.sh"))
node.account.ssh(os.path.join(self.home_dir, "sbin", "stop-master.sh"))
else:
node.account.ssh(os.path.join(SparkService.INSTALL_DIR, "sbin", "stop-slave.sh"))
node.account.ssh(os.path.join(self.home_dir, "sbin", "stop-slave.sh"))

def clean_node(self, node):
"""
Clean spark persistence files
"""
node.account.kill_java_processes(self.java_class_name(node),
clean_shutdown=False, allow_fail=True)
node.account.ssh("sudo rm -rf -- %s" % SparkService.SPARK_PERSISTENT_ROOT, allow_fail=False)
node.account.ssh("rm -rf -- %s" % self.persistent_root, allow_fail=False)

def pids(self, node):
"""
Expand All @@ -135,26 +147,24 @@ def java_class_name(self, node):

return "org.apache.spark.deploy.worker.Worker"

@staticmethod
def master_log_path(node):
def master_log_path(self, node):
"""
:param node: Spark master node.
:return: Path to log file.
"""
return "{SPARK_LOG_DIR}/spark-{userID}-org.apache.spark.deploy.master.Master-{instance}-{host}.out".format(
SPARK_LOG_DIR=SparkService.SPARK_PERSISTENT_ROOT,
SPARK_LOG_DIR=self.persistent_root,
userID=node.account.user,
instance=1,
host=node.account.hostname)

@staticmethod
def slave_log_path(node):
def slave_log_path(self, node):
"""
:param node: Spark slave node.
:return: Path to log file.
"""
return "{SPARK_LOG_DIR}/spark-{userID}-org.apache.spark.deploy.worker.Worker-{instance}-{host}.out".format(
SPARK_LOG_DIR=SparkService.SPARK_PERSISTENT_ROOT,
SPARK_LOG_DIR=self.persistent_root,
userID=node.account.user,
instance=1,
host=node.account.hostname)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

from jinja2 import FileSystemLoader, Environment

DEFAULT_CONFIG_PATH = os.path.dirname(os.path.abspath(__file__)) + "/templates"
DEFAULT_IGNITE_CONF = DEFAULT_CONFIG_PATH + "/ignite.xml.j2"
DEFAULT_CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates")
DEFAULT_IGNITE_CONF = os.path.join(DEFAULT_CONFIG_PATH, "ignite.xml.j2")


class ConfigTemplate:
Expand Down Expand Up @@ -69,4 +69,4 @@ class IgniteLoggerConfigTemplate(ConfigTemplate):
Ignite logger configuration.
"""
def __init__(self):
super().__init__(DEFAULT_CONFIG_PATH + "/log4j.xml.j2")
super().__init__(os.path.join(DEFAULT_CONFIG_PATH, "log4j.xml.j2"))
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def __run(self, cmd):
return output

def __form_cmd(self, node, cmd):
return self._cluster.spec.path.script(f"{self.BASE_COMMAND} --host {node.account.externally_routable_ip} {cmd}")
return self._cluster.script(f"{self.BASE_COMMAND} --host {node.account.externally_routable_ip} {cmd}")

@staticmethod
def __parse_output(raw_output):
Expand Down
54 changes: 39 additions & 15 deletions modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,18 @@
from ducktape.utils.util import wait_until

from ignitetest.services.utils.concurrent import CountDownLatch, AtomicValue
from ignitetest.services.utils.ignite_persistence import IgnitePersistenceAware
from ignitetest.services.utils.path import IgnitePathAware
from ignitetest.services.utils.ignite_spec import resolve_spec
from ignitetest.services.utils.jmx_utils import ignite_jmx_mixin
from ignitetest.services.utils.log_utils import monitor_log


class IgniteAwareService(BackgroundThreadService, IgnitePersistenceAware, metaclass=ABCMeta):
# pylint: disable=too-many-public-methods
class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABCMeta):
"""
The base class to build services aware of Ignite.
"""

NETFILTER_STORE_PATH = os.path.join(IgnitePersistenceAware.TEMP_DIR, "iptables.bak")

# pylint: disable=R0913
def __init__(self, context, config, num_nodes, startup_timeout_sec, shutdown_timeout_sec, **kwargs):
"""
Expand All @@ -58,10 +57,23 @@ def __init__(self, context, config, num_nodes, startup_timeout_sec, shutdown_tim
self.shutdown_timeout_sec = shutdown_timeout_sec

self.spec = resolve_spec(self, context, config, **kwargs)
self.init_logs_attribute()

self.disconnected_nodes = []
self.killed = False

@property
def version(self):
return self.config.version

@property
def project(self):
return self.spec.project

@property
def globals(self):
return self.context.globals

def start_async(self, clean=True):
"""
Starts in async way.
Expand Down Expand Up @@ -89,7 +101,7 @@ def start_node(self, node):

wait_until(lambda: self.alive(node), timeout_sec=10)

ignite_jmx_mixin(node, self.pids(node))
ignite_jmx_mixin(node, self.spec, self.pids(node))

def stop_async(self):
"""
Expand Down Expand Up @@ -159,7 +171,7 @@ def init_persistent(self, node):

node_config = self._prepare_config(node)

node.account.create_file(self.CONFIG_FILE, node_config)
node.account.create_file(self.config_file, node_config)

def _prepare_config(self, node):
if not self.config.consistent_id:
Expand All @@ -171,7 +183,7 @@ def _prepare_config(self, node):

config.discovery_spi.prepare_on_start(cluster=self)

node_config = self.spec.config_template.render(config_dir=self.PERSISTENT_ROOT, work_dir=self.WORK_DIR,
node_config = self.spec.config_template.render(config_dir=self.persistent_root, work_dir=self.work_dir,
config=config)

setattr(node, "consistent_id", node.account.externally_routable_ip)
Expand All @@ -190,7 +202,7 @@ def pids(self, node):

# pylint: disable=W0613
def _worker(self, idx, node):
cmd = self.spec.command(node.log_file)
cmd = self.spec.command(node)

self.logger.debug("Attempting to start Application Service on %s with command: %s" % (str(node.account), cmd))

Expand Down Expand Up @@ -277,6 +289,13 @@ def __exec_on_node(node, task, start_waiter=None, delay_ms=0, time_holder=None):

task(node)

@property
def netfilter_store_path(self):
"""
:return: path to store backup of iptables filter
"""
return os.path.join(self.temp_dir, "iptables.bak")

def drop_network(self, nodes=None):
"""
Disconnects node from cluster.
Expand Down Expand Up @@ -311,12 +330,12 @@ def drop_network(self, nodes=None):
def __backup_iptables(self, nodes):
# Store current network filter settings.
for node in nodes:
cmd = "sudo iptables-save | tee " + IgniteAwareService.NETFILTER_STORE_PATH
cmd = f"sudo iptables-save | tee {self.netfilter_store_path}"

exec_error = str(node.account.ssh_client.exec_command(cmd)[2].read(), sys.getdefaultencoding())

if "Warning: iptables-legacy tables present" in exec_error:
cmd = "sudo iptables-legacy-save | tee " + IgniteAwareService.NETFILTER_STORE_PATH
cmd = f"sudo iptables-legacy-save | tee {self.netfilter_store_path}"

exec_error = str(node.account.ssh_client.exec_command(cmd)[2].read(), sys.getdefaultencoding())

Expand All @@ -330,7 +349,7 @@ def __backup_iptables(self, nodes):

def __restore_iptables(self):
# Restore previous network filter settings.
cmd = "sudo iptables-restore < " + IgniteAwareService.NETFILTER_STORE_PATH
cmd = f"sudo iptables-restore < {self.netfilter_store_path}"

errors = []

Expand Down Expand Up @@ -359,8 +378,13 @@ def __update_node_log_file(self, node):
"""
Update the node log file.
"""
cnt = list(node.account.ssh_capture(f'ls {self.LOGS_DIR} | '
f'grep -E "^console_[0-9]*.log$" | '
f'wc -l', callback=int))[0]
if not hasattr(node, 'log_file'):
node.log_file = os.path.join(self.log_dir, "console.log")

node.log_file = self.STDOUT_STDERR_CAPTURE.replace('.log', f'_{cnt + 1}.log')
cnt = list(node.account.ssh_capture(f'ls {self.log_dir} | '
f'grep -E "^console.log(.[0-9]+)?$" | '
f'wc -l', callback=int))[0]
if cnt > 0:
rotated_log = os.path.join(self.log_dir, f"console.log.{cnt}")
self.logger.debug(f"rotating {node.log_file} to {rotated_log} on {node.name}")
node.account.ssh(f"mv {node.log_file} {rotated_log}")
56 changes: 0 additions & 56 deletions modules/ducktests/tests/ignitetest/services/utils/ignite_path.py

This file was deleted.

Loading