New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IGNITE-13508: Test scenario of two-phased rebalance #8385
Conversation
62c329d
to
6789b34
Compare
33edec3
to
edccd69
Compare
9d43b8d
to
ca16b89
Compare
|
||
restart_with_clean_idx_node_on_cell(cells, [0, 1]) | ||
|
||
self.fix_pds_size(cells, "After rebalancing complate on nodes 0, 1. PDS.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
complete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
3c52db2
to
f864c1e
Compare
|
||
int size = jNode.get("size").asInt(); | ||
|
||
int bachSize = Optional.ofNullable(jNode.get("bachSize")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/bach/batch/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
while (fromIdx < listSize) { | ||
toIdx = Math.min(fromIdx + bachSize, listSize); | ||
|
||
futures.add(cache.removeAllAsync(new TreeSet<>(keys.subList(fromIdx, toIdx)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iteratro() supports dynamic remove. Can we use it to simplify deletion process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removal in batches faster
37 seconds versus 351
Affinity. | ||
""" | ||
name: str = AFFINITY_BACKUP_FILTER | ||
constructor_arg: str = CELL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's replace it for smth meaningful, e.g. attr_name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import os | ||
|
||
|
||
def copy_file_to_dest(node, file_path: str, dest_dir: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You used this method only once, so there is no need to create a separate module for this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be useful in the future when we need to save some data for the result.
For example, to save statistics files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will do it after the future comes. No need for a new module right now.
Two-phase rebalancing test case. | ||
""" | ||
# pylint: disable=R0914 | ||
@cluster(num_nodes=NUM_NODES_CELL * NUM_CELL + 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you change default IGNITE_NUM_CONTAINERS value to 18?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using NUM_CELL = 2
Test case of two-phase rebalancing. | ||
Preparations. | ||
1. Start 4 cells. | ||
2. Load data to cache with the mentioned above affinity function and fix PDS size on all nodes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's replace "fix" with smth different word. You use it in the meaning of "limit" [1]. But actually, you do not limit anything, but store an actual value of PDS size for future comparison.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return copy_file_to_dest(node, dump, log_dir) | ||
|
||
|
||
def start_cell(test_context, config: IgniteConfiguration, num_nodes: int, jvm_opts: list) -> IgniteService: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I don't think that this method improve readability. Maybe you can put it within the start_cells function without docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
:param msg Information message. | ||
:return dict with hostname -> pds size in megabytes. | ||
""" | ||
assert len(cells) > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useless assert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
size = len(cell.nodes) | ||
|
||
for i in idxs: | ||
assert i < size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useless assert here and below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
try: | ||
IgniteAwareService.await_event_on_node('Skipping checkpoint', node, timeout_sec=60) | ||
except errors.TimeoutError as ex: | ||
self.logger.warn(ex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you ignore this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may not happen after start waiting
|
||
int size = jNode.get("size").asInt(); | ||
|
||
int batchSize = jNode.get("batchSize").asInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provide default value for batch size. As it runs to infinite loop with default 0 value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
int fromIdx = 0; | ||
int toIdx = 0; | ||
|
||
while (fromIdx < listSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (int from = 0; from < listSize; from += batchSize)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
fromIdx = toIdx; | ||
} | ||
|
||
futures.forEach(IgniteFuture::get); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try do not use Future.get without specified timeout. We can hang, AFAIK timeout is optional parameter for IgniteApplicationService?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
"valueSizeKb": 1} | ||
) | ||
app.run() | ||
app.free() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please provide a ref to implementation of the free() method? I can't find one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ducktape/services/service.py
def free(self):
"""Free each node. This 'deallocates' the nodes so the cluster can assign them to other services."""
for node in self.nodes:
self.logger.info("%s: freeing node" % self.who_am_i(node))
node.account.logger = None
self.cluster.free(node)
self.nodes = []
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I saw this code. But didn't see cluster.free
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ducktape/cluster/cluster.py
def free(self, nodes):
"""Free the given node or list of nodes"""
if isinstance(nodes, collections.Iterable):
for s in nodes:
self.free_single(s)
else:
self.free_single(nodes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw this code, I was looking for this code:
https://github.com/confluentinc/ducktape/blob/master/ducktape/cluster/json.py#L122
I'm OK then.
# pylint: disable=R0914 | ||
@cluster(num_nodes=(NUM_NODES_CELL * NUM_CELL) + 1) | ||
@ignite_versions(str(DEV_BRANCH), str(LATEST_2_9), str(LATEST_2_8)) | ||
def two_phased_rebalancing_test(self, ignite_version): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT if we add some tests that check concurrent cases? Smth like rebalance while transaction with batch update/delete is open on other node in a cell? Is it a valid case to check it there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that there is no other cases.
""" | ||
delta_time = datetime.now() + timedelta(seconds=timeout_sec) | ||
|
||
rebalanced = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unuseful var. There is already rebalanced var declared on the 475 line. If self.nodes
is empty then code raises TimeoutError, but it is actually wrong. Please replace it with logic that shold handle empty nodes list if it is required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an announcement of a variable in order to expand the zone of its visibility.
added
assert self.nodes, 'Node list is empty.'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
indexed_types=['java.lang.Long', 'byte[]'])], | ||
metric_exporter='org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi') | ||
|
||
cluster_size = len(self.test_context.cluster) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does self.test_context.cluster relate to a value of @cluster
annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can set the cluster size through globals and then more cells will be used in the test
ad45093
to
68ef1a9
Compare
# Conflicts: # modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
Thank you for submitting the pull request to the Apache Ignite.
In order to streamline the review of the contribution
we ask you to ensure the following steps have been taken:
The Contribution Checklist
The description explains WHAT and WHY was made instead of HOW.
The following pattern must be used:
IGNITE-XXXX Change summary
whereXXXX
- number of JIRA issue.(see the Maintainers list)
the
green visa
attached to the JIRA ticket (see TC.Bot: Check PR)Notes
If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com #ignite channel.