Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-13508: Test scenario of two-phased rebalance #8385

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
08c0c1a
IGNITE-13508: added two-phase rebalance test
Sega76 Feb 20, 2021
600c0ba
IGNITE-13508: fix
Sega76 Feb 24, 2021
ecf4b1d
IGNITE-13508: fix
Sega76 Feb 24, 2021
9a642b2
IGNITE-13508: fix pydoc
Sega76 Feb 25, 2021
7257a29
IGNITE-13508: fix pydoc
Sega76 Feb 25, 2021
df0aef6
IGNITE-13508: fix comments
Sega76 Mar 2, 2021
233943f
IGNITE-13508: fix comments
Sega76 Mar 2, 2021
5eb455b
IGNITE-13508: added indexes
Sega76 Mar 9, 2021
46ce2eb
IGNITE-13508: added await_cluster_idle after rebalance
Sega76 Mar 10, 2021
3a9ef2b
IGNITE-13508: fix comments
Sega76 Mar 15, 2021
bb49d93
IGNITE-13508: set NUM_CELL=2
Sega76 Mar 15, 2021
40d61a5
IGNITE-13508: optimize cache_macro a bit
Sega76 Mar 15, 2021
33562ed
IGNITE-13508: added use globals cluster_size
Sega76 Mar 15, 2021
0290e1c
IGNITE-13508: fix comments
Sega76 Mar 15, 2021
2d2b88e
IGNITE-13508: fix comments
Sega76 Mar 18, 2021
1f396d2
IGNITE-13508: fix comments
Sega76 Mar 18, 2021
987caba
IGNITE-13508: fix comments
Sega76 Mar 21, 2021
c617c3a
Merge remote-tracking branch 'ignite/ignite-ducktape' into IGNITE-13508
Sega76 Mar 22, 2021
6234090
IGNITE-13508: fix comments
Sega76 Mar 22, 2021
78df8d1
IGNITE-13508: fix comments
Sega76 Mar 22, 2021
ca3976e
IGNITE-13508: fix versions
Sega76 Mar 23, 2021
0dd8561
Merge remote-tracking branch 'ignite/ignite-ducktape' into IGNITE-13508
Sega76 Mar 25, 2021
31de1d4
IGNITE-13508: remove LATEST_2_9
Sega76 Mar 25, 2021
d7788a1
Merge remote-tracking branch 'ignite/ignite-ducktape' into IGNITE-13508
Sega76 May 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.ducktest.tests;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import javax.cache.Cache;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
import org.apache.ignite.lang.IgniteFuture;

/**
* Deleting data from the cache.
*/
public class DeleteDataApplication extends IgniteAwareApplication {
/** {@inheritDoc} */
@Override public void run(JsonNode jNode) {
String cacheName = jNode.get("cacheName").asText();

int size = jNode.get("size").asInt();

int batchSize = Optional.ofNullable(jNode.get("batchSize"))
.map(JsonNode::asInt)
.orElse(1000);

IgniteCache<Object, Object> cache = ignite.getOrCreateCache(cacheName);

log.info("Cache size before: " + cache.size());

markInitialized();

long start = System.currentTimeMillis();

Iterator<Cache.Entry<Object, Object>> iter = cache.iterator();

ArrayList<Object> keys = new ArrayList<>(size);

for (int cnt = 0; iter.hasNext() && cnt < size; cnt++)
keys.add(iter.next().getKey());

log.info("Start removing: " + keys.size());

int listSize = keys.size();

List<IgniteFuture<Void>> futures = new LinkedList<>();

for (int from = 0; from < listSize; from += batchSize) {
int to = Math.min(from + batchSize, listSize);

futures.add(cache.removeAllAsync(new TreeSet<>(keys.subList(from, to))));
}

futures.forEach(f -> f.get(TimeUnit.MINUTES.toMillis(5)));

log.info("Cache size after: " + cache.size());

recordResult("DURATION_SECONDS", TimeUnit.MILLISECONDS.toSeconds( System.currentTimeMillis() - start));

markFinished();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@

package org.apache.ignite.internal.ducktest.tests.snapshot_test;

import java.util.Collections;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;

/**
Expand All @@ -42,19 +37,7 @@ public class DataLoaderApplication extends IgniteAwareApplication {

markInitialized();

QueryEntity qryEntity = new QueryEntity()
.setKeyFieldName("id")
.setKeyType(Long.class.getName())
.setTableName("TEST_TABLE")
.setValueType(byte[].class.getName())
.addQueryField("id", Long.class.getName(), null)
.setIndexes(Collections.singletonList(new QueryIndex("id")));

CacheConfiguration<Long, byte[]> cacheCfg = new CacheConfiguration<>(cacheName);
cacheCfg.setCacheMode(CacheMode.REPLICATED);
cacheCfg.setQueryEntities(Collections.singletonList(qryEntity));

ignite.getOrCreateCache(cacheCfg);
ignite.getOrCreateCache(cacheName);

byte[] data = new byte[valSize];

Expand Down
27 changes: 25 additions & 2 deletions modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import sys
import time
from abc import abstractmethod, ABCMeta
from datetime import datetime
from datetime import datetime, timedelta
from enum import IntEnum
from threading import Thread

Expand All @@ -32,7 +32,7 @@
from ignitetest.services.utils.concurrent import CountDownLatch, AtomicValue
from ignitetest.services.utils.path import IgnitePathAware
from ignitetest.services.utils.ignite_spec import resolve_spec
from ignitetest.services.utils.jmx_utils import ignite_jmx_mixin
from ignitetest.services.utils.jmx_utils import ignite_jmx_mixin, JmxClient
from ignitetest.services.utils.log_utils import monitor_log
from ignitetest.utils.enum import constructible

Expand Down Expand Up @@ -459,3 +459,26 @@ def restore_from_snapshot(self, snapshot_name: str):

node.account.ssh(f'rm -rf {self.database_dir}', allow_fail=False)
node.account.ssh(f'cp -r {snapshot_db} {self.work_dir}', allow_fail=False)

def await_rebalance(self, timeout_sec=180):
"""
Waiting for the rebalance to complete.
For the method, you need to set the
metric_exporter='org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi'
to the config.
:param timeout_sec: timeout to wait the rebalance to complete.
"""
delta_time = datetime.now() + timedelta(seconds=timeout_sec)

rebalanced = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unuseful var. There is already rebalanced var declared on the 475 line. If self.nodes is empty then code raises TimeoutError, but it is actually wrong. Please replace it with logic that shold handle empty nodes list if it is required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an announcement of a variable in order to expand the zone of its visibility.
added
assert self.nodes, 'Node list is empty.'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

for node in self.nodes:
rebalanced = False
mbean = JmxClient(node).find_mbean('.*name=cluster')

while datetime.now() < delta_time and not rebalanced:
rebalanced = next(mbean.Rebalanced) == 'true'

if rebalanced:
return

raise TimeoutError(f'Rebalancing was not completed within the time: {timeout_sec} seconds.')
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@
"""
from typing import NamedTuple

AFFINITY_BACKUP_FILTER = 'BACKUP_FILTER'
CELL = 'CELL'


class Affinity(NamedTuple):
"""
Affinity.
"""
name: str = AFFINITY_BACKUP_FILTER
attr_name: str = CELL


class CacheConfiguration(NamedTuple):
"""
Expand All @@ -27,3 +38,5 @@ class CacheConfiguration(NamedTuple):
cache_mode: str = 'PARTITIONED'
atomicity_mode: str = 'ATOMIC'
backups: int = 0
indexed_types: list = None
affinity: Affinity = None
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ class DataStorageConfiguration(NamedTuple):
"""
default: DataRegionConfiguration = DataRegionConfiguration()
regions: list = []
checkpoint_frequency: int = None
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,43 @@
limitations under the License.
#}


{% macro rendezvous_backup_filter(affinity) %}
<bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
<property name="affinityBackupFilter">
<bean class="org.apache.ignite.internal.ducktest.tests.cellular_affinity_test.CellularAffinityBackupFilter">
<constructor-arg value="{{ affinity.attr_name }}"/>
</bean>
</property>
</bean>
{% endmacro %}

{% macro cache_configs(caches) %}
{% if caches %}
<property name="cacheConfiguration">
<list>
{% for cache in caches %}
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="{{ cache.name }}"/>
{% if cache.cache_mode == 'PARTITIONED' %}
<property name="backups" value="{{ cache.backups or 0 }}"/>
<property name="backups" value="{{ cache.backups }}"/>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 is default

<property name="cacheMode" value="{{ cache.cache_mode }}"/>
<property name="atomicityMode" value="{{ cache.atomicity_mode }}"/>

{% if cache.indexed_types %}
<property name="indexedTypes">
<list>
{% for class in cache.indexed_types %}
<value>{{ class }}</value>
{% endfor %}
</list>
</property>
{% endif %}

{% if cache.affinity and cache.affinity.name == 'BACKUP_FILTER' %}
<property name="affinity">
{{ rendezvous_backup_filter(cache.affinity) }}
</property>
{% endif %}
<property name="atomicityMode" value="{{ cache.atomicity_mode or 'ATOMIC' }}"/>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'ATOMIC' is default

</bean>
{% endfor %}
</list>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
</list>
</property>
{% endif %}

{% if config.checkpoint_frequency %}
<property name="checkpointFrequency" value="{{ config.checkpoint_frequency }}"/>
{% endif %}
</bean>
</property>
{% endif %}
Expand Down
31 changes: 31 additions & 0 deletions modules/ducktests/tests/ignitetest/services/utils/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains utility methods.
"""
import os


def copy_file_to_dest(node, file_path: str, dest_dir: str):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You used this method only once, so there is no need to create a separate module for this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be useful in the future when we need to save some data for the result.
For example, to save statistics files

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will do it after the future comes. No need for a new module right now.

"""
Copy file to destination directory.
:return new path to file.
"""
node.account.ssh_output(f'cp {file_path} {dest_dir}')

file_name = os.path.basename(file_path)

return os.path.join(dest_dir, file_name)
6 changes: 5 additions & 1 deletion modules/ducktests/tests/ignitetest/tests/snapshot_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
"""
Module contains snapshot test.
"""

from ducktape.mark.resource import cluster
from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration

from ignitetest.services.ignite import IgniteService
from ignitetest.services.ignite_app import IgniteApplicationService
Expand Down Expand Up @@ -47,9 +47,13 @@ def snapshot_test(self, ignite_version):
"""
version = IgniteVersion(ignite_version)

cache_cfg = CacheConfiguration(name=self.CACHE_NAME, cache_mode='REPLICATED',
indexed_types=['java.lang.Long', 'byte[]'])

ignite_config = IgniteConfiguration(
version=version,
data_storage=DataStorageConfiguration(default=DataRegionConfiguration(persistent=True)),
caches=[cache_cfg],
metric_exporter='org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi'
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ discovery:

snapshot:
- ../snapshot_test.py

two_phased_rebalance:
- ../two_phased_rebalanced_test.py.py
Loading