diff --git a/modules/ducktests/tests/ignitetest/tests/two_phased_rebalanced_test.py b/modules/ducktests/tests/ignitetest/tests/two_phased_rebalanced_test.py index 1b36400c5f259..a4bdab2128049 100644 --- a/modules/ducktests/tests/ignitetest/tests/two_phased_rebalanced_test.py +++ b/modules/ducktests/tests/ignitetest/tests/two_phased_rebalanced_test.py @@ -64,7 +64,7 @@ def two_phased_rebalancing_test(self, ignite_version): 2. Start cleaned node with preservance of consistent id and cell attributes. 3. Wait for the rebalance to complete. Phase 2. - Run steps 1-3 of Phase 2 on the other half of the cluster. + Run steps 1-3 of Phase 1 on the other half of the cluster. Verifications. 1. Check that PDS size reduced (compare to step 3) 2. Check data consistency (idle_verify --dump) @@ -76,6 +76,7 @@ def two_phased_rebalancing_test(self, ignite_version): name=CACHE_NAME, backups=NUM_NODES_CELL-1, affinity=Affinity())], metric_exporter='org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi') + # Start 4 cells. cells = self.start_cells(config) control_utility = ControlUtility(cells[0]) @@ -85,7 +86,8 @@ def two_phased_rebalancing_test(self, ignite_version): version=IgniteVersion(ignite_version), discovery_spi=from_ignite_cluster(cells[0])) - streamer = IgniteApplicationService( + # Load data to cache. + IgniteApplicationService( self.test_context, client_config, java_class_name="org.apache.ignite.internal.ducktest.tests.snapshot_test.DataLoaderApplication", @@ -93,76 +95,65 @@ def two_phased_rebalancing_test(self, ignite_version): "cacheName": "test-cache", "interval": 500 * 1024, "valueSizeKb": 1} - ) + ).run() - deleter = IgniteApplicationService( + # Delete 80% of data and fix PDS size on all nodes. + IgniteApplicationService( self.test_context, client_config, java_class_name="org.apache.ignite.internal.ducktest.tests.DeleteDataApplication", shutdown_timeout_sec=15 * 60, params={"cacheName": "test-cache", - "size": 400 * 1024}) - - streamer.run() + "size": 400 * 1024} + ).run() node = cells[0].nodes[0] - self.await_skipping_checkpoint(node) - - self.fix_pds_size(cells, "Step prepare, load data. PDS.") - - deleter.run() - - self.await_skipping_checkpoint(node) - - dump_1 = fix_data(control_utility, node, cells[0].log_dir) - - pds_before = self.fix_pds_size(cells, "After Delete 80%, PDS.") + self.await_cluster_idle(node) - restart_with_clean_idx_node_on_cell(cells, [0, 1]) + dump_1 = create_idle_dump_and_copy_to_log_dir(control_utility, node, cells[0].log_dir) - self.fix_pds_size(cells, "After rebalancing complate on nodes 0, 1. PDS.") + pds_before = self.get_pds_size(cells, "After Delete 80%, PDS.") - restart_with_clean_idx_node_on_cell(cells, [2, 3]) + # Restarting the cluster in twos nodes per a cell with a cleanup and waiting for rebalancing. + restart_with_clean_idx_node_on_cell_and_await_rebalance(cells, [0, 1]) + restart_with_clean_idx_node_on_cell_and_await_rebalance(cells, [2, 3]) - pds_after = self.fix_pds_size(cells, "After rebalancing complate on nodes 2, 3. PDS.") + pds_after = self.get_pds_size(cells, "After rebalancing complete, PDS.") + # Check that PDS size reduced. for host in pds_after: assert pds_after[host] < pds_before[host], f'Host {host}: size after = {pds_after[host]}, ' \ f'size before = {pds_before[host]}.' - dump_2 = fix_data(control_utility, node, cells[0].log_dir) + dump_2 = create_idle_dump_and_copy_to_log_dir(control_utility, node, cells[0].log_dir) + # Check data consistency. diff = node.account.ssh_output(f'diff {dump_1} {dump_2}', allow_fail=True) - assert not diff, f"Validation error, files are different. Difference:: \n {diff}" + assert not diff, f"Validation error, files are different. Difference:\n {diff}" - def start_cells(self, config: IgniteConfiguration, cells_cnt: int = NUM_CELL, cell_nodes_cnt: int = NUM_NODES_CELL)\ - -> List[IgniteService]: + def start_cells(self, config: IgniteConfiguration) -> List[IgniteService]: """ Start cells. :param config IgniteConfiguration. - :param cells_cnt Cells size. - :param cell_nodes_cnt Nodes on cell. :return List of IgniteServices """ - assert cells_cnt > 0 cells = [] - cell = start_cell(self.test_context, config, cell_nodes_cnt, [f'-D{ATTRIBUTE}=0']) + cell = start_cell(self.test_context, config, NUM_NODES_CELL, [f'-D{ATTRIBUTE}=0']) discovery_spi = from_ignite_cluster(cell) config = config._replace(discovery_spi=discovery_spi) cells.append(cell) - if cells_cnt > 1: - for i in range(1, cells_cnt): - cells.append(start_cell(self.test_context, config, cell_nodes_cnt, [f'-D{ATTRIBUTE}={i}'])) + for i in range(1, NUM_CELL): + cells.append(start_cell(self.test_context, config, NUM_NODES_CELL, [f'-D{ATTRIBUTE}={i}'])) return cells - def fix_pds_size(self, cells: [IgniteService], msg: str) -> dict: + def get_pds_size(self, cells: [IgniteService], msg: str) -> dict: """ Pds size in megabytes. :param cells List of IgniteService @@ -184,7 +175,7 @@ def fix_pds_size(self, cells: [IgniteService], msg: str) -> dict: return res - def await_skipping_checkpoint(self, node: ClusterNode): + def await_cluster_idle(self, node: ClusterNode): """ Await Skipping checkpoint. :param node ClusterNode @@ -195,7 +186,7 @@ def await_skipping_checkpoint(self, node: ClusterNode): self.logger.warn(ex) -def restart_with_clean_idx_node_on_cell(cells: [IgniteService], idxs: [int]): +def restart_with_clean_idx_node_on_cell_and_await_rebalance(cells: [IgniteService], idxs: [int]): """ Restart idxs nodes on cells with cleaning working directory and await rebalance. :param cells List of IgniteService @@ -267,7 +258,7 @@ def start_idx_node_on_cell(cells: [IgniteService], idxs: [int]): cell.await_started() -def fix_data(control_utility: ControlUtility, node: ClusterNode, log_dir: str) -> str: +def create_idle_dump_and_copy_to_log_dir(control_utility: ControlUtility, node: ClusterNode, log_dir: str) -> str: """ Creates a dump file and copies it to the log directory. :param control_utility ControlUtility.