Skip to content

Commit

Permalink
Ensure ES http ports are available before running night rally
Browse files Browse the repository at this point in the history
Also use shlex.quote for safely generating cli arguments passed to
Rally instead of enclosing them in `\"{}\"`.

Relates elastic#62 
Closes elastic#51
  • Loading branch information
dliappis committed May 7, 2018
1 parent 0949c43 commit 80cec2a
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 104 deletions.
112 changes: 100 additions & 12 deletions night_rally.py
@@ -1,10 +1,13 @@
import argparse
import collections
import datetime
import errno
import logging
import os
import time
import re
import collections
import shlex
import socket
import time

ROOT = os.path.dirname(os.path.realpath(__file__))
RALLY_BINARY = "rally --skip-update"
Expand All @@ -28,6 +31,81 @@
logger = logging.getLogger("night_rally")


class NightRallyError(Exception):
"""
Base class for all Night Rally exceptions
"""

def __init__(self, message, cause=None):
super().__init__(message, cause)
self.message = message
self.cause = cause

def __repr__(self):
return self.message


class RemotePortNotFree(NightRallyError):
"""
Thrown whenever the specified port in --target-host is not free
"""


class RemotePortNotDefined(NightRallyError):
"""
Thrown whenever there was no port specified in --target-host
"""


def wait_until_port_is_free(target_hosts, connector=socket, wait_time=5):
max_attempts = 5
attempt = 1
port_free = True
connect_result = None

from elasticsearch.client import _normalize_hosts
for node in _normalize_hosts(target_hosts):
if "port" not in node:
raise RemotePortNotDefined(
message="No port specified for target host: [{}]. Please check your target-host parameter.".format(node["host"]),
cause="Missing target port error."
)

while attempt <= max_attempts:
c = connector.socket()
connect_result = c.connect_ex((node["host"], node["port"]))
try:
if connect_result == errno.ECONNREFUSED:
port_free = True
c.close()
break
else:
port_free = False
logger.info("Port [%s] on host [%d] is not free. "
"Waiting for %d seconds now, attempt %d/%d.",
node["host"],
node["port"],
wait_time,
attempt,
max_attempts)
c.close()
time.sleep(wait_time)
attempt += 1
except OSError:
logger.exception(
"Ignoring error while trying to close the socket after connecting to "
"node: [{}:{}]".format(node["host"],
node["port"]))
if not port_free:
# target host port occupied or some other socket error
raise RemotePortNotFree(
message="Port [{}] on host [{}] is not free, "
"or another persistent issue while attempting to connect to it.".format(node["port"], node["host"]),
cause="errno = {}".format(connect_result if connect_result == 0 else errno.errorcode[connect_result]))

return port_free


def date_for_cmd_param(d):
return "{:%Y-%m-%d %H:%M:%S}".format(d)

Expand Down Expand Up @@ -206,11 +284,11 @@ def command_line(self, race_config):
cmd = RALLY_BINARY
for k, v in cmd_line_params.items():
if isinstance(v, list):
cmd += " --{}=\"{}\"".format(k, join_nullables(*v))
cmd += " --{}={}".format(k, shlex.quote(join_nullables(*v)))
elif v is None:
cmd += " --{}".format(k)
else:
cmd += " --{}=\"{}\"".format(k, v)
cmd += " --{}={}".format(k, shlex.quote(str(v)))

return cmd

Expand Down Expand Up @@ -402,15 +480,25 @@ def run_rally(tracks, available_hosts, command, dry_run=False, skip_ansible=Fals
if not skip_ansible:
logger.info("Resetting benchmark environment...")
fixtures_dir = os.path.join(os.path.dirname(__file__), "fixtures", "ansible")
runner("cd \"%s\" && ansible-playbook -i inventory/production -u rally playbooks/setup.yml "
"--tags=\"drop-caches,trim\" && cd -" % fixtures_dir)
shell_command = "cd {} && " \
"ansible-playbook -i inventory/production -u rally playbooks/setup.yml " \
"--tags={} && cd -".format(fixtures_dir,
shlex.quote("drop-caches,trim"))
runner(shell_command)
logger.info("Running Rally on [%s]", race_cfg)
start = time.perf_counter()
if runner(command.command_line(race_cfg)):
rally_failure = True
logger.error("Failed to run [%s]. Please check the logs.", race_cfg)
stop = time.perf_counter()
logger.info("Finished running on [%s] in [%f] seconds.", race_cfg, (stop - start))
try:
wait_until_port_is_free(available_hosts)
if runner(command.command_line(race_cfg)):
rally_failure = True
logger.error("Failed to run [%s]. Please check the logs.", race_cfg)
stop = time.perf_counter()
logger.info("Finished running on [%s] in [%f] seconds.", race_cfg, (stop - start))
except (RemotePortNotFree, RemotePortNotDefined) as remote_port_exception:
logger.error("Skipped running [%s].", race_cfg)
logger.error(remote_port_exception.message)
logger.error(remote_port_exception.cause)
break
else:
logger.info("Skipping [%s] (not supported by command).", race_cfg)
else:
Expand Down Expand Up @@ -481,7 +569,7 @@ def copy_results_for_release_comparison(effective_start_date, dry_run):

def deactivate_outdated_results(effective_start_date, environment, release, env_tag, dry_run):
"""
Sets all results for the same major release version, environment and tag to active=False except for the records with the provided
Sets all results for the same major release version, environment and tag to active=False except for the records with the provided
effective start date.
"""
import client
Expand Down

0 comments on commit 80cec2a

Please sign in to comment.