Skip to content

Commit

Permalink
feature(k8s-serverless): scylla-bench cloud bundle support
Browse files Browse the repository at this point in the history
* switch to newer scylla-bench with (scylladb/scylla-bench#108)
* add suport in `scylla_bench_thread.py`
* add unit test (that can only work if you have a bundle available,
  for now)
  • Loading branch information
fruch committed Nov 10, 2022
1 parent ab21efa commit 8728cb0
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 19 deletions.
2 changes: 1 addition & 1 deletion defaults/test_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ stress_image:
ycsb: 'scylladb/hydra-loaders:ycsb-jdk8-20220918'
nosqlbench: 'scylladb/hydra-loaders:nosqlbench-4.15.49'
cassandra-stress: '' # default would be same version as scylla under test
scylla-bench: 'scylladb/hydra-loaders:scylla-bench-v0.1.12'
scylla-bench: 'scylladb/hydra-loaders:scylla-bench-v0.1.13'
gemini: 'scylladb/hydra-loaders:gemini-1.7.7'
alternator-dns: 'scylladb/hydra-loaders:alternator-dns-0.1'
cdc-stresser: 'scylladb/hydra-loaders:cdc-stresser-20210630'
Expand Down
21 changes: 21 additions & 0 deletions functional_tests/scylla_operator/test_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,3 +825,24 @@ def test_cloud_bundle_connectivity_cassandra_stress(tester):

assert "latency 99th percentile" in output[0]
assert float(output[0]["latency 99th percentile"]) > 0


@pytest.mark.required_operator("v1.8.0")
@pytest.mark.requires_tls
def test_cloud_bundle_connectivity_scylla_bench(tester):

assert tester.db_cluster.connection_bundle_file, "cloud bundle wasn't found"

cmd = (
"scylla-bench -workload=sequential -mode=write -replication-factor=1 -partition-count=10 "
"-clustering-row-count=5555 -clustering-row-size=uniform:10..20 -concurrency=10 "
"-connection-count=10 -consistency-level=one -rows-per-request=10 -timeout=60s -duration=1m"
)

stress_obj = tester.run_stress_thread(cmd, stop_test_on_failure=False)
summaries, errors = stress_obj.verify_results()
assert not errors
assert summaries[0]["Clustering row size"] == "Uniform(min=10, max=20)"

# TODO: add verification that the output say it's using the cloud bundle
# (need to add that to log output in scylla-bench)
9 changes: 9 additions & 0 deletions sdcm/cluster_k8s/mini_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,15 @@ def start_k8s_software(self) -> None:
"""
for _ in range(self.params.get("n_db_nodes")):
script_start_part += scylla_node_definition

loader_node_definition = f"""
- role: worker
labels:
{POOL_LABEL_NAME}: {self.LOADER_POOL_NAME}
"""
for _ in range(self.params.get("n_loaders")):
script_start_part += loader_node_definition

script_end_part = """
EndOfSpec
/var/tmp/kind delete cluster || true
Expand Down
32 changes: 26 additions & 6 deletions sdcm/scylla_bench_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import contextlib
from enum import Enum
from pathlib import Path

from sdcm.loader import ScyllaBenchStressExporter
from sdcm.prometheus import nemesis_metrics_obj
Expand Down Expand Up @@ -140,6 +141,24 @@ def verify_results(self):

return sb_summary, errors

@property
def connection_bundle_file(self) -> Path:
return self.node_list[0].parent_cluster.connection_bundle_file

@property
def target_connection_bundle_file(self) -> str:
return str(Path('/tmp/') / self.connection_bundle_file.name)

def create_stress_cmd(self, stress_cmd):
if self.connection_bundle_file:
stress_cmd = f'{stress_cmd.strip()} -cloud-config-path={self.target_connection_bundle_file}'
else:
# Select first seed node to send the scylla-bench cmds
ips = ",".join([n.cql_ip_address for n in self.node_list])
stress_cmd = f'{stress_cmd.strip()} -nodes {ips}'

return stress_cmd

def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many-locals
cmd_runner = None
if "k8s" in self.params.get("cluster_backend") and self.params.get(
Expand All @@ -153,10 +172,13 @@ def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many-
cpu_options = f'--cpuset-cpus="{cpu_idx}"'
cmd_runner = cleanup_context = RemoteDocker(
loader, self.params.get('stress_image.scylla-bench'),
extra_docker_opts=(
f'{cpu_options} --label shell_marker={self.shell_marker} --network=host'))
extra_docker_opts=f'{cpu_options} --label shell_marker={self.shell_marker} --network=host',
)
cmd_runner_name = loader.ip_address

if self.connection_bundle_file:
cmd_runner.send_files(str(self.connection_bundle_file), self.target_connection_bundle_file)

if self.sb_mode == ScyllaBenchModes.WRITE and self.sb_workload == ScyllaBenchWorkloads.TIMESERIES:
loader.parent_cluster.sb_write_timeseries_ts = write_timestamp = time.time_ns()
LOGGER.debug("Set start-time: %s", write_timestamp)
Expand All @@ -180,9 +202,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many-
os.makedirs(loader.logdir, exist_ok=True)

log_file_name = os.path.join(loader.logdir, f'scylla-bench-l{loader_idx}-{uuid.uuid4()}.log')
# Select first seed node to send the scylla-bench cmds
ips = ",".join([n.cql_ip_address for n in self.node_list])

stress_cmd = self.create_stress_cmd(stress_cmd)
with ScyllaBenchStressExporter(instance_name=cmd_runner_name,
metrics=nemesis_metrics_obj(),
stress_operation=self.sb_mode,
Expand All @@ -196,7 +216,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many-
result = None
try:
result = cmd_runner.run(
cmd="{name} -nodes {ips}".format(name=stress_cmd.strip(), ips=ips),
cmd=stress_cmd,
timeout=self.timeout,
log_file=log_file_name)
except Exception as exc: # pylint: disable=broad-except
Expand Down
28 changes: 20 additions & 8 deletions sdcm/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -1247,11 +1247,6 @@ def get_cluster_baremetal(self):
def get_cluster_k8s_local_kind_cluster(self):
self.credentials.append(UserRemoteCredentials(key_file=self.params.get('user_credentials_path')))

container_node_params = dict(
docker_image=self.params.get('docker_image'),
docker_image_tag=self.params.get('scylla_version'),
node_key_file=self.credentials[0].key_file,
)
self.k8s_cluster = mini_k8s.LocalKindCluster(
software_version=self.params.get("mini_k8s_version"),
user_prefix=self.params.get("user_prefix"),
Expand All @@ -1262,6 +1257,16 @@ def get_cluster_k8s_local_kind_cluster(self):
self.k8s_cluster.set_nodeselector_for_deployments(
pool_name=self.k8s_cluster.AUXILIARY_POOL_NAME, namespace=namespace)

loader_pool = None
if self.params.get("n_loaders"):
loader_pool = mini_k8s.MinimalK8SNodePool(
k8s_cluster=self.k8s_cluster,
name=self.k8s_cluster.LOADER_POOL_NAME,
num_nodes=self.params.get("n_loaders"),
image_type="fake-image-type",
instance_type="fake-instance-type")
self.k8s_cluster.deploy_node_pool(loader_pool, wait_till_ready=False)

scylla_pool = mini_k8s.MinimalK8SNodePool(
k8s_cluster=self.k8s_cluster,
name=self.k8s_cluster.SCYLLA_POOL_NAME,
Expand All @@ -1287,20 +1292,27 @@ def get_cluster_k8s_local_kind_cluster(self):
n_nodes=self.params.get("k8s_n_scylla_pods_per_cluster") or self.params.get("n_db_nodes"),
params=self.params,
node_pool=scylla_pool,
add_nodes=False,
)

if self.params.get('k8s_deploy_monitoring'):
self.k8s_cluster.deploy_monitoring_cluster(
is_manager_deployed=self.params.get('use_mgmt')
)

if self.params.get("n_loaders"):
self.loaders = cluster_docker.LoaderSetDocker(
**container_node_params,
n_nodes=self.params.get("n_loaders"),
self.loaders = cluster_k8s.LoaderPodCluster(
k8s_cluster=self.k8s_cluster,
loader_cluster_name=self.params.get("k8s_loader_cluster_name"),
user_prefix=self.params.get("user_prefix"),
n_nodes=self.params.get("k8s_n_loader_pods_per_cluster") or self.params.get("n_loaders"),
params=self.params,
node_pool=loader_pool,
add_nodes=False,
)

self._add_and_wait_for_cluster_nodes_in_parallel([self.db_cluster, self.loaders])

if self.params.get("n_monitor_nodes") > 0:
self.monitors = cluster_docker.MonitorSetDocker(
n_nodes=self.params.get("n_monitor_nodes"),
Expand Down
3 changes: 3 additions & 0 deletions test-cases/scylla-operator/functional.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ n_db_nodes: 4
k8s_n_scylla_pods_per_cluster: 3

n_loaders: 1
k8s_loader_run_type: 'dynamic'
k8s_loader_cluster_name: 'sct-loaders'

n_monitor_nodes: 0

use_mgmt: true
Expand Down
16 changes: 12 additions & 4 deletions unit_tests/test_scylla_bench_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@
]


@pytest.mark.parametrize("extra_cmd", argvalues=[
pytest.param('', id="regular"),
pytest.param('-tls', id="tls", marks=[pytest.mark.docker_scylla_args(ssl=True)])])
@pytest.mark.parametrize(
"extra_cmd",
argvalues=[
pytest.param("", id="regular"),
pytest.param("-tls", id="tls", marks=[pytest.mark.docker_scylla_args(ssl=True)]),
pytest.param("cloud-config", id="sni_proxy"),
],
)
def test_01_scylla_bench(request, docker_scylla, params, extra_cmd):
loader_set = LocalLoaderSetDummy()

if extra_cmd == "cloud-config":
params["k8s_connection_bundle_file"] = "/home/fruch/Downloads/k8s_config.yaml"
docker_scylla.parent_cluster.params = params
extra_cmd = ""
cmd = (
f"scylla-bench -workload=sequential {extra_cmd} -mode=write -replication-factor=1 -partition-count=10 "
+ "-clustering-row-count=5555 -clustering-row-size=uniform:10..20 -concurrency=10 "
Expand Down

0 comments on commit 8728cb0

Please sign in to comment.