diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/start_stop_client/IgniteCachePutClient.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/start_stop_client/IgniteCachePutClient.java new file mode 100644 index 0000000000000..b0ecd63d8c881 --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/start_stop_client/IgniteCachePutClient.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests.start_stop_client; + +import java.util.Optional; +import java.util.UUID; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** + * Java client. Tx put operation + */ +public class IgniteCachePutClient extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override protected void run(JsonNode jsonNode) throws Exception { + String cacheName = jsonNode.get("cacheName").asText(); + + long pacing = Optional.ofNullable(jsonNode.get("pacing")) + .map(JsonNode::asLong) + .orElse(0l); + + log.info("Test props:" + + " cacheName=" + cacheName + + " pacing=" + pacing); + + IgniteCache cache = ignite.getOrCreateCache(cacheName); + log.info("Node name: " + ignite.name() + " starting cache operations."); + + markInitialized(); + + while (!terminated()) { + UUID uuid = UUID.randomUUID(); + + long startTime = System.nanoTime(); + + cache.put(uuid, uuid); + + long resultTime = System.nanoTime() - startTime; + + log.info("Success put, latency: " + resultTime + "ns."); + + Thread.sleep(pacing); + } + + markFinished(); + } +} diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py b/modules/ducktests/tests/ignitetest/services/ignite_app.py index 8fbf0351a66a5..fa5cc5d6820bc 100644 --- a/modules/ducktests/tests/ignitetest/services/ignite_app.py +++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py @@ -34,9 +34,9 @@ class IgniteApplicationService(IgniteAwareService): SERVICE_JAVA_CLASS_NAME = "org.apache.ignite.internal.ducktest.utils.IgniteAwareApplicationService" # pylint: disable=R0913 - def __init__(self, context, config, java_class_name, params="", timeout_sec=60, modules=None, + def __init__(self, context, config, java_class_name, num_nodes=1, params="", timeout_sec=60, modules=None, servicejava_class_name=SERVICE_JAVA_CLASS_NAME, jvm_opts=None, start_ignite=True): - super().__init__(context, config, 1, modules=modules, servicejava_class_name=servicejava_class_name, + super().__init__(context, config, num_nodes, modules=modules, servicejava_class_name=servicejava_class_name, java_class_name=java_class_name, params=params, jvm_opts=jvm_opts, start_ignite=start_ignite) self.servicejava_class_name = servicejava_class_name @@ -54,28 +54,42 @@ def start(self): self.__check_status("IGNITE_APPLICATION_INITIALIZED", timeout=self.timeout_sec) def stop_async(self, clean_shutdown=True): + """ + Stop in async way. + """ + for node in self.nodes: + self.stop_node(node=node, clean_shutdown=clean_shutdown) + + # pylint: disable=W0221 + def stop_node(self, node, clean_shutdown=True): """ Stops node in async way. """ - self.logger.info("%s Stopping node %s" % (self.__class__.__name__, str(self.nodes[0].account))) - self.nodes[0].account.kill_java_processes(self.servicejava_class_name, clean_shutdown=clean_shutdown, - allow_fail=True) + self.logger.info("%s Stopping node %s" % (self.__class__.__name__, str(node.account))) + node.account.kill_java_processes(self.servicejava_class_name, clean_shutdown=clean_shutdown, + allow_fail=True) def await_stopped(self, timeout_sec=10): """ Awaits node stop finish. """ - stopped = self.wait_node(self.nodes[0], timeout_sec=timeout_sec) - assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \ - (str(self.nodes[0].account), str(timeout_sec)) + for node in self.nodes: + stopped = self.wait_node(node, timeout_sec=timeout_sec) + assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \ + (str(node.account), str(timeout_sec)) self.__check_status("IGNITE_APPLICATION_FINISHED", timeout=timeout_sec) # pylint: disable=W0221 - def stop_node(self, node, clean_shutdown=True, timeout_sec=10): - assert node == self.nodes[0] - self.stop_async(clean_shutdown) - self.await_stopped(timeout_sec) + def stop(self, clean_shutdown=True, timeout_sec=10): + """ + Stop services. + """ + if clean_shutdown: + self.stop_async(clean_shutdown) + self.await_stopped(timeout_sec) + else: + self.stop_async(clean_shutdown) def __check_status(self, desired, timeout=1): self.await_event("%s\\|IGNITE_APPLICATION_BROKEN" % desired, timeout, from_the_beginning=True) @@ -110,7 +124,8 @@ def extract_result(self, name): """ results = self.extract_results(name) - assert len(results) <= 1, f"Expected exactly one result occurence, {len(results)} found." + assert len(results) == len(self.nodes), f"Expected exactly {len(self.nodes)} occurence," \ + f" but found {len(results)}." return results[0] if results else "" @@ -121,10 +136,10 @@ def extract_results(self, name): """ res = [] - output = self.nodes[0].account.ssh_capture( - "grep '%s' %s" % (name + "->", self.STDOUT_STDERR_CAPTURE), allow_fail=False) - - for line in output: - res.append(re.search("%s(.*)%s" % (name + "->", "<-"), line).group(1)) + for node in self.nodes: + output = node.account.ssh_capture( + "grep '%s' %s" % (name + "->", self.STDOUT_STDERR_CAPTURE), allow_fail=False) + for line in output: + res.append(re.search("%s(.*)%s" % (name + "->", "<-"), line).group(1)) return res diff --git a/modules/ducktests/tests/ignitetest/tests/client_in_out_test.py b/modules/ducktests/tests/ignitetest/tests/client_in_out_test.py new file mode 100644 index 0000000000000..aab699a9ef579 --- /dev/null +++ b/modules/ducktests/tests/ignitetest/tests/client_in_out_test.py @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module contains client tests +""" +import time + +from ducktape.mark.resource import cluster + +from ducktape.mark import parametrize +from ignitetest.services.ignite import IgniteService +from ignitetest.services.ignite_app import IgniteApplicationService +from ignitetest.services.utils.control_utility import ControlUtility +from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration +from ignitetest.services.utils.ignite_configuration import IgniteConfiguration +from ignitetest.utils import ignite_versions +from ignitetest.utils.ignite_test import IgniteTest +from ignitetest.utils.version import DEV_BRANCH, V_2_8_1, IgniteVersion + + +# pylint: disable=W0223 +class ClientTest(IgniteTest): + """ + cluster - cluster size + CACHE_NAME - name of the cache to create for the test. + PACING - the frequency of the operation on clients (ms). + JAVA_CLIENT_CLASS_NAME - running classname. + client_work_time - clients working time (s). + iteration_count - the number of iterations of starting and stopping client nodes (s). + static_clients - the number of permanently employed clients. + temp_client - number of clients who come log in and out. + """ + + CACHE_NAME = "simple-tx-cache" + PACING = 10 + JAVA_CLIENT_CLASS_NAME = "org.apache.ignite.internal.ducktest.tests.start_stop_client.IgniteCachePutClient" + + @ignite_versions(str(V_2_8_1), str(DEV_BRANCH)) + @cluster(num_nodes=7) + @parametrize(num_nodes=7, + static_clients=2, + temp_client=3, + iteration_count=3, + client_work_time=30) + # pylint: disable=R0913 + def test_ignite_start_stop_nodes(self, ignite_version, + num_nodes, static_clients, temp_client, iteration_count, client_work_time): + """ + Start and stop clients node test without kill java process. + Check topology. + """ + self.ignite_start_stop(ignite_version, True, num_nodes, static_clients, + temp_client, iteration_count, client_work_time) + + @ignite_versions(str(V_2_8_1), str(DEV_BRANCH)) + @cluster(num_nodes=7) + @parametrize(num_nodes=7, + static_clients=2, + temp_client=3, + iteration_count=3, + client_work_time=30) + # pylint: disable=R0913 + def test_ignite_kill_start_nodes(self, ignite_version, + num_nodes, static_clients, temp_client, iteration_count, client_work_time): + """ + Start and kill client nodes, Check topology + """ + self.ignite_start_stop(ignite_version, False, num_nodes, static_clients, + temp_client, iteration_count, client_work_time) + + # pylint: disable=R0914 + # pylint: disable=R0913 + def ignite_start_stop(self, ignite_version, correct_stop_temp_node, + nodes_num, static_clients_num, temp_client, iteration_count, client_work_time): + """ + Test for starting and stopping fat clients. + """ + + servers_count = nodes_num - static_clients_num - temp_client + + current_top_v = servers_count + # Topology version after test. + fin_top_ver = servers_count + (2 * static_clients_num) + (2 * iteration_count * temp_client) + + server_cfg = IgniteConfiguration( + version=IgniteVersion(ignite_version), + caches=[CacheConfiguration(name=self.CACHE_NAME, backups=1, atomicity_mode='TRANSACTIONAL')] + ) + + ignite = IgniteService(self.test_context, server_cfg, num_nodes=servers_count) + control_utility = ControlUtility(ignite, self.test_context) + + client_cfg = server_cfg._replace(client_mode=True) + + static_clients = IgniteApplicationService( + self.test_context, + client_cfg, + java_class_name=self.JAVA_CLIENT_CLASS_NAME, + num_nodes=static_clients_num, + params={"cacheName": self.CACHE_NAME, + "pacing": self.PACING}) + + temp_clients = IgniteApplicationService( + self.test_context, + client_cfg, + java_class_name=self.JAVA_CLIENT_CLASS_NAME, + num_nodes=temp_client, + params={"cacheName": self.CACHE_NAME, + "pacing": self.PACING}) + + ignite.start() + + static_clients.start() + + current_top_v += static_clients_num + check_topology(control_utility, current_top_v) + + # Start / stop temp_clients node. Check cluster. + for i in range(iteration_count): + self.logger.debug(f'Starting iteration: {i}.') + + temp_clients.start() + current_top_v += temp_client + + await_event(static_clients, f'ver={current_top_v}, locNode=') + check_topology(control_utility, current_top_v) + + await_event(temp_clients, f'clients={static_clients_num + temp_client}') + + time.sleep(client_work_time) + temp_clients.stop(correct_stop_temp_node) + + current_top_v += temp_client + + await_event(static_clients, f'ver={current_top_v}, locNode=') + static_clients.stop() + + check_topology(control_utility, fin_top_ver) + + +def await_event(service: IgniteApplicationService, message): + """ + :param service: target service for wait + :param message: message + """ + service.await_event(message, timeout_sec=80, from_the_beginning=True) + + +def check_topology(control_utility: ControlUtility, fin_top_ver: int): + """ + Check current topology version. + """ + top_ver = control_utility.cluster_state().topology_version + assert top_ver == fin_top_ver, f'Cluster current topology version={top_ver}, ' \ + f'expected topology version={fin_top_ver}.' diff --git a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml index 698c1d84c74e0..714a9af43e840 100644 --- a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml +++ b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml @@ -27,3 +27,7 @@ cellular_affinity: rebalance: - ../add_node_rebalance_test.py + +clients: + - ../client_in_out_test.py +