diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DeleteDataApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DeleteDataApplication.java new file mode 100644 index 0000000000000..b38ee660bd7e5 --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DeleteDataApplication.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import javax.cache.Cache; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; +import org.apache.ignite.lang.IgniteFuture; + +/** + * Deleting data from the cache. + */ +public class DeleteDataApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jNode) { + String cacheName = jNode.get("cacheName").asText(); + + int size = jNode.get("size").asInt(); + + int batchSize = Optional.ofNullable(jNode.get("batchSize")) + .map(JsonNode::asInt) + .orElse(1000); + + IgniteCache cache = ignite.getOrCreateCache(cacheName); + + log.info("Cache size before: " + cache.size()); + + markInitialized(); + + long start = System.currentTimeMillis(); + + Iterator> iter = cache.iterator(); + + ArrayList keys = new ArrayList<>(size); + + for (int cnt = 0; iter.hasNext() && cnt < size; cnt++) + keys.add(iter.next().getKey()); + + log.info("Start removing: " + keys.size()); + + int listSize = keys.size(); + + List> futures = new LinkedList<>(); + + for (int from = 0; from < listSize; from += batchSize) { + int to = Math.min(from + batchSize, listSize); + + futures.add(cache.removeAllAsync(new TreeSet<>(keys.subList(from, to)))); + } + + futures.forEach(f -> f.get(TimeUnit.MINUTES.toMillis(5))); + + log.info("Cache size after: " + cache.size()); + + recordResult("DURATION_SECONDS", TimeUnit.MILLISECONDS.toSeconds( System.currentTimeMillis() - start)); + + markFinished(); + } +} diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/snapshot_test/DataLoaderApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/snapshot_test/DataLoaderApplication.java index d225855cc32ce..4ee2dfffcfb73 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/snapshot_test/DataLoaderApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/snapshot_test/DataLoaderApplication.java @@ -17,13 +17,8 @@ package org.apache.ignite.internal.ducktest.tests.snapshot_test; -import java.util.Collections; import com.fasterxml.jackson.databind.JsonNode; import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.QueryEntity; -import org.apache.ignite.cache.QueryIndex; -import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; /** @@ -39,19 +34,7 @@ public class DataLoaderApplication extends IgniteAwareApplication { markInitialized(); - QueryEntity qryEntity = new QueryEntity() - .setKeyFieldName("id") - .setKeyType(Long.class.getName()) - .setTableName("TEST_TABLE") - .setValueType(byte[].class.getName()) - .addQueryField("id", Long.class.getName(), null) - .setIndexes(Collections.singletonList(new QueryIndex("id"))); - - CacheConfiguration cacheCfg = new CacheConfiguration<>(cacheName); - cacheCfg.setCacheMode(CacheMode.REPLICATED); - cacheCfg.setQueryEntities(Collections.singletonList(qryEntity)); - - ignite.getOrCreateCache(cacheCfg); + ignite.getOrCreateCache(cacheName); byte[] data = new byte[valSize]; diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py index ee4714407abc9..e37e8e4355b98 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py +++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py @@ -23,7 +23,7 @@ import time import tempfile from abc import ABCMeta -from datetime import datetime +from datetime import datetime, timedelta from enum import IntEnum from pathlib import Path from threading import Thread @@ -36,7 +36,7 @@ from ignitetest.services.utils.background_thread import BackgroundThreadService from ignitetest.services.utils.concurrent import CountDownLatch, AtomicValue from ignitetest.services.utils.ignite_spec import resolve_spec, SHARED_PREPARED_FILE -from ignitetest.services.utils.jmx_utils import ignite_jmx_mixin +from ignitetest.services.utils.jmx_utils import ignite_jmx_mixin, JmxClient from ignitetest.services.utils.log_utils import monitor_log from ignitetest.services.utils.path import IgnitePathAware from ignitetest.utils.enum import constructible @@ -541,6 +541,30 @@ def restore_from_snapshot(self, snapshot_name: str): node.account.ssh(f'rm -rf {self.database_dir}', allow_fail=False) node.account.ssh(f'cp -r {snapshot_db} {self.work_dir}', allow_fail=False) + def await_rebalance(self, timeout_sec=180): + """ + Waiting for the rebalance to complete. + For the method, you need to set the + metric_exporter='org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi' + to the config. + :param timeout_sec: timeout to wait the rebalance to complete. + """ + assert self.nodes, 'Node list is empty.' + + delta_time = datetime.now() + timedelta(seconds=timeout_sec) + + for node in self.nodes: + rebalanced = False + mbean = JmxClient(node).find_mbean('.*name=cluster') + + while datetime.now() < delta_time and not rebalanced: + rebalanced = next(mbean.Rebalanced) == 'true' + + if rebalanced: + return + + raise TimeoutError(f'Rebalancing was not completed within the time: {timeout_sec} seconds.') + def node_failed_event_pattern(failed_node_id=None): """Failed node pattern in log.""" diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py index cc58a69bb3670..b094d7bba2a67 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py +++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py @@ -18,6 +18,17 @@ """ from typing import NamedTuple +AFFINITY_BACKUP_FILTER = 'BACKUP_FILTER' +CELL = 'CELL' + + +class Affinity(NamedTuple): + """ + Affinity. + """ + name: str = AFFINITY_BACKUP_FILTER + attr_name: str = CELL + class CacheConfiguration(NamedTuple): """ @@ -27,3 +38,5 @@ class CacheConfiguration(NamedTuple): cache_mode: str = 'PARTITIONED' atomicity_mode: str = 'ATOMIC' backups: int = 0 + indexed_types: list = None + affinity: Affinity = None diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py index 7b2999d82e4b8..5f18fc83ce549 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py +++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py @@ -36,3 +36,4 @@ class DataStorageConfiguration(NamedTuple): """ default: DataRegionConfiguration = DataRegionConfiguration() regions: list = [] + wal_mode: str = "LOG_ONLY" diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 index d227369abe3d7..d10cf127e8fee 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 @@ -15,6 +15,17 @@ limitations under the License. #} + +{% macro rendezvous_backup_filter(affinity) %} + + + + + + + +{% endmacro %} + {% macro cache_configs(caches) %} {% if caches %} @@ -22,10 +33,25 @@ {% for cache in caches %} - {% if cache.cache_mode == 'PARTITIONED' %} - + + + + + {% if cache.indexed_types %} + + + {% for class in cache.indexed_types %} + {{ class }} + {% endfor %} + + + {% endif %} + + {% if cache.affinity and cache.affinity.name == 'BACKUP_FILTER' %} + + {{ rendezvous_backup_filter(cache.affinity) }} + {% endif %} - {% endfor %} diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/datastorage_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/datastorage_macro.j2 index 1c2b4627b10e4..f780d64b3d999 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/datastorage_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/datastorage_macro.j2 @@ -31,6 +31,7 @@ {% endif %} + {% endif %} diff --git a/modules/ducktests/tests/ignitetest/tests/snapshot_test.py b/modules/ducktests/tests/ignitetest/tests/snapshot_test.py index ae8ca9650bb33..10c6bd4fcba3d 100644 --- a/modules/ducktests/tests/ignitetest/tests/snapshot_test.py +++ b/modules/ducktests/tests/ignitetest/tests/snapshot_test.py @@ -16,8 +16,8 @@ """ Module contains snapshot test. """ - from ducktape.mark.resource import cluster +from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration from ignitetest.services.ignite import IgniteService from ignitetest.services.ignite_app import IgniteApplicationService @@ -47,9 +47,13 @@ def snapshot_test(self, ignite_version): """ version = IgniteVersion(ignite_version) + cache_cfg = CacheConfiguration(name=self.CACHE_NAME, cache_mode='REPLICATED', + indexed_types=['java.lang.Long', 'byte[]']) + ignite_config = IgniteConfiguration( version=version, data_storage=DataStorageConfiguration(default=DataRegionConfiguration(persistent=True)), + caches=[cache_cfg], metric_exporter='org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi' ) diff --git a/modules/ducktests/tests/ignitetest/tests/suites/slow_suite.yml b/modules/ducktests/tests/ignitetest/tests/suites/slow_suite.yml index 863f9a4b26841..134e7366247a5 100644 --- a/modules/ducktests/tests/ignitetest/tests/suites/slow_suite.yml +++ b/modules/ducktests/tests/ignitetest/tests/suites/slow_suite.yml @@ -18,3 +18,6 @@ discovery: snapshot: - ../snapshot_test.py + +two_phased_rebalance: + - ../two_phased_rebalanced_test.py.py diff --git a/modules/ducktests/tests/ignitetest/tests/two_phased_rebalanced_test.py b/modules/ducktests/tests/ignitetest/tests/two_phased_rebalanced_test.py new file mode 100644 index 0000000000000..6b477a9d3b7d6 --- /dev/null +++ b/modules/ducktests/tests/ignitetest/tests/two_phased_rebalanced_test.py @@ -0,0 +1,272 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module contains Cellular Affinity tests. +""" +import math +import os +from typing import List +from ducktape.cluster.cluster import ClusterNode + +from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration, Affinity + +from ignitetest.services.ignite import IgniteService +from ignitetest.services.ignite_app import IgniteApplicationService +from ignitetest.services.utils.control_utility import ControlUtility +from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration +from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration +from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster +from ignitetest.utils import cluster, ignite_versions +from ignitetest.utils.ignite_test import IgniteTest +from ignitetest.utils.version import IgniteVersion, DEV_BRANCH, LATEST + +NUM_NODES_CELL = 4 + +NUM_CELL = 2 + +ATTRIBUTE = "CELL" + +CACHE_NAME = "test-cache" + + +# pylint: disable=W0223 +class TwoPhasedRebalancedTest(IgniteTest): + """ + Two-phase rebalancing test case. + """ + # pylint: disable=R0914 + @cluster(num_nodes=(NUM_NODES_CELL * NUM_CELL) + 1) + @ignite_versions(str(DEV_BRANCH), str(LATEST)) + def two_phased_rebalancing_test(self, ignite_version): + """ + Test case of two-phase rebalancing. + Preparations. + 1. Start cells. + 2. Load data to cache with the mentioned above affinity function. + 3. Delete 80% of data and measure PDS size on all nodes. + Phase 1. + 1. Stop two nodes in each cell, total a half of all nodes and clean PDS. + 2. Start cleaned node with preservance of consistent id and cell attributes. + 3. Wait for the rebalance to complete. + Phase 2. + Run steps 1-3 of Phase 1 on the other half of the cluster. + Verifications. + 1. Check that PDS size reduced (compare to step 3) + 2. Check data consistency (idle_verify --dump) + """ + config = IgniteConfiguration(version=IgniteVersion(ignite_version), + data_storage=DataStorageConfiguration( + wal_mode='NONE', + default=DataRegionConfiguration(persistent=True)), + caches=[CacheConfiguration( + name=CACHE_NAME, backups=2, affinity=Affinity(), + indexed_types=['java.lang.Long', 'byte[]'])], + metric_exporter='org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi') + + cluster_size = len(self.test_context.cluster) + + num_cell = math.floor((cluster_size - 1) / NUM_NODES_CELL) + + # Start cells. + cells = self.start_cells(config, num_cell) + + control_utility = ControlUtility(cells[0]) + control_utility.activate() + + client_config = IgniteConfiguration(client_mode=True, + version=IgniteVersion(ignite_version), + discovery_spi=from_ignite_cluster(cells[0])) + + # Load data to cache. + app = IgniteApplicationService( + self.test_context, + client_config, + java_class_name="org.apache.ignite.internal.ducktest.tests.snapshot_test.DataLoaderApplication", + params={"start": 0, + "cacheName": "test-cache", + "interval": 500 * 1024, + "valueSizeKb": 1} + ) + app.run() + app.free() + + # Delete 80% of data and measure PDS size on all nodes. + app = IgniteApplicationService( + self.test_context, + client_config, + java_class_name="org.apache.ignite.internal.ducktest.tests.DeleteDataApplication", + shutdown_timeout_sec=5 * 60, + params={"cacheName": "test-cache", + "size": 400 * 1024} + ) + app.start(clean=False) + app.stop() + + node = cells[0].nodes[0] + + control_utility.deactivate() # flush dirty pages on disk + control_utility.activate() + + dump_1 = create_idle_dump_and_copy_to_log_dir(control_utility, node, cells[0].log_dir) + + pds_before = self.get_pds_size(cells, "After Delete 80%, PDS.") + + # Restarting the cluster in twos nodes per a cell with a cleanup and waiting for rebalancing. + restart_with_clean_idx_node_on_cell_and_await_rebalance(cells, [0, 1]) + restart_with_clean_idx_node_on_cell_and_await_rebalance(cells, [2, 3]) + + control_utility.deactivate() # flush dirty pages on disk + control_utility.activate() + + pds_after = self.get_pds_size(cells, "After rebalancing complete, PDS.") + + # Check that PDS size reduced. + for host in pds_after: + assert pds_after[host] < pds_before[host], f'Host {host}: size after = {pds_after[host]}, ' \ + f'size before = {pds_before[host]}.' + + control_utility.validate_indexes() + dump_2 = create_idle_dump_and_copy_to_log_dir(control_utility, node, cells[0].log_dir) + + # Check data consistency. + diff = node.account.ssh_output(f'diff {dump_1} {dump_2}', allow_fail=True) + assert not diff, f"Validation error, files are different. Difference:\n {diff}" + + def start_cells(self, config: IgniteConfiguration, num_cell: int) -> List[IgniteService]: + """ + Start cells. + :param config IgniteConfiguration. + :param num_cell Number of cell. + :return List of IgniteServices. + """ + cells = [] + + first = IgniteService(self.test_context, config, NUM_NODES_CELL, [f'-D{ATTRIBUTE}=0']) + first.start_async() + + cells.append(first) + + discovery_spi = from_ignite_cluster(first) + config = config._replace(discovery_spi=discovery_spi) + + for i in range(1, num_cell): + cell = IgniteService(self.test_context, config, NUM_NODES_CELL, [f'-D{ATTRIBUTE}={i}']) + cell.start_async() + + cells.append(cell) + + for cell in cells: + cell.await_started() + + return cells + + def get_pds_size(self, cells: [IgniteService], msg: str) -> dict: + """ + Pds size in megabytes. + :param cells List of IgniteService. + :param msg Information message. + :return dict with hostname -> pds size in megabytes. + """ + res = {} + for cell in cells: + for node in cell.nodes: + consistent_id = str(node.account.hostname).replace('.', '_').replace('-', '_') + cmd = f'du -sm {cell.database_dir}/{consistent_id} | ' + "awk '{print $1}'" + res[node.account.hostname] = int(node.account.ssh_output(cmd).decode("utf-8").rstrip()) + + self.logger.info(msg) + + for item in res.items(): + self.logger.info(f'Host: {item[0]}, PDS {item[1]}mb') + + return res + + +def restart_with_clean_idx_node_on_cell_and_await_rebalance(cells: [IgniteService], idxs: [int]): + """ + Restart idxs nodes on cells with cleaning working directory and await rebalance. + :param cells List of IgniteService. + :param idxs List the index nodes that need to be restarted with cleanup. + """ + stop_idx_node_on_cell(cells, idxs) + clean_work_idx_node_on_cell(cells, idxs) + start_idx_node_on_cell(cells, idxs) + + for cell in cells: + cell.await_rebalance() + + +def stop_idx_node_on_cell(cells: [IgniteService], idxs: [int]): + """ + Stop idxs nodes on cells. + :param cells List of IgniteService. + :param idxs List of index nodes to stop. + """ + for cell in cells: + for i in idxs: + cell.stop_node(cell.nodes[i]) + + for cell in cells: + for i in idxs: + cell.wait_node(cell.nodes[i]) + + +def clean_work_idx_node_on_cell(cells: [IgniteService], idxs: [int]): + """ + Cleaning the working directory on idxs nodes in cells. + :param cells List of IgniteService. + :param idxs List of index nodes to clean. + """ + for cell in cells: + for i in idxs: + node = cell.nodes[i] + + assert not cell.pids(node) + + node.account.ssh(f'rm -rf {cell.work_dir}') + + +def start_idx_node_on_cell(cells: [IgniteService], idxs: [int]): + """ + Start idxs nodes on cells. + :param cells List of IgniteService. + :param idxs List of index nodes to start. + """ + for cell in cells: + for i in idxs: + node = cell.nodes[i] + + cell.start_node(node) + + for cell in cells: + cell.await_started() + + +def create_idle_dump_and_copy_to_log_dir(control_utility: ControlUtility, node: ClusterNode, log_dir: str) -> str: + """ + Creates a dump file and copies it to the log directory. + :param control_utility ControlUtility. + :param node ClusterNode. + :param log_dir Path to log directory. + :return: Path to idle-verify dump file. + """ + control_utility.idle_verify() + + dump = control_utility.idle_verify_dump(node) + + node.account.ssh_output(f'cp {dump} {log_dir}') + + return os.path.join(log_dir, os.path.basename(dump))