Skip to content

Commit

Permalink
IGNITE-13508: fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Sega76 committed Mar 4, 2021
1 parent df0aef6 commit f864c1e
Showing 1 changed file with 28 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def two_phased_rebalancing_test(self, ignite_version):
2. Start cleaned node with preservance of consistent id and cell attributes.
3. Wait for the rebalance to complete.
Phase 2.
Run steps 1-3 of Phase 2 on the other half of the cluster.
Run steps 1-3 of Phase 1 on the other half of the cluster.
Verifications.
1. Check that PDS size reduced (compare to step 3)
2. Check data consistency (idle_verify --dump)
Expand All @@ -76,6 +76,7 @@ def two_phased_rebalancing_test(self, ignite_version):
name=CACHE_NAME, backups=NUM_NODES_CELL-1, affinity=Affinity())],
metric_exporter='org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi')

# Start 4 cells.
cells = self.start_cells(config)

control_utility = ControlUtility(cells[0])
Expand All @@ -85,84 +86,74 @@ def two_phased_rebalancing_test(self, ignite_version):
version=IgniteVersion(ignite_version),
discovery_spi=from_ignite_cluster(cells[0]))

streamer = IgniteApplicationService(
# Load data to cache.
IgniteApplicationService(
self.test_context,
client_config,
java_class_name="org.apache.ignite.internal.ducktest.tests.snapshot_test.DataLoaderApplication",
params={"start": 0,
"cacheName": "test-cache",
"interval": 500 * 1024,
"valueSizeKb": 1}
)
).run()

deleter = IgniteApplicationService(
# Delete 80% of data and fix PDS size on all nodes.
IgniteApplicationService(
self.test_context,
client_config,
java_class_name="org.apache.ignite.internal.ducktest.tests.DeleteDataApplication",
shutdown_timeout_sec=15 * 60,
params={"cacheName": "test-cache",
"size": 400 * 1024})

streamer.run()
"size": 400 * 1024}
).run()

node = cells[0].nodes[0]

self.await_skipping_checkpoint(node)

self.fix_pds_size(cells, "Step prepare, load data. PDS.")

deleter.run()

self.await_skipping_checkpoint(node)

dump_1 = fix_data(control_utility, node, cells[0].log_dir)

pds_before = self.fix_pds_size(cells, "After Delete 80%, PDS.")
self.await_cluster_idle(node)

restart_with_clean_idx_node_on_cell(cells, [0, 1])
dump_1 = create_idle_dump_and_copy_to_log_dir(control_utility, node, cells[0].log_dir)

self.fix_pds_size(cells, "After rebalancing complate on nodes 0, 1. PDS.")
pds_before = self.get_pds_size(cells, "After Delete 80%, PDS.")

restart_with_clean_idx_node_on_cell(cells, [2, 3])
# Restarting the cluster in twos nodes per a cell with a cleanup and waiting for rebalancing.
restart_with_clean_idx_node_on_cell_and_await_rebalance(cells, [0, 1])
restart_with_clean_idx_node_on_cell_and_await_rebalance(cells, [2, 3])

pds_after = self.fix_pds_size(cells, "After rebalancing complate on nodes 2, 3. PDS.")
pds_after = self.get_pds_size(cells, "After rebalancing complete, PDS.")

# Check that PDS size reduced.
for host in pds_after:
assert pds_after[host] < pds_before[host], f'Host {host}: size after = {pds_after[host]}, ' \
f'size before = {pds_before[host]}.'

dump_2 = fix_data(control_utility, node, cells[0].log_dir)
dump_2 = create_idle_dump_and_copy_to_log_dir(control_utility, node, cells[0].log_dir)

# Check data consistency.
diff = node.account.ssh_output(f'diff {dump_1} {dump_2}', allow_fail=True)
assert not diff, f"Validation error, files are different. Difference:: \n {diff}"
assert not diff, f"Validation error, files are different. Difference:\n {diff}"

def start_cells(self, config: IgniteConfiguration, cells_cnt: int = NUM_CELL, cell_nodes_cnt: int = NUM_NODES_CELL)\
-> List[IgniteService]:
def start_cells(self, config: IgniteConfiguration) -> List[IgniteService]:
"""
Start cells.
:param config IgniteConfiguration.
:param cells_cnt Cells size.
:param cell_nodes_cnt Nodes on cell.
:return List of IgniteServices
"""
assert cells_cnt > 0

cells = []

cell = start_cell(self.test_context, config, cell_nodes_cnt, [f'-D{ATTRIBUTE}=0'])
cell = start_cell(self.test_context, config, NUM_NODES_CELL, [f'-D{ATTRIBUTE}=0'])

discovery_spi = from_ignite_cluster(cell)
config = config._replace(discovery_spi=discovery_spi)

cells.append(cell)

if cells_cnt > 1:
for i in range(1, cells_cnt):
cells.append(start_cell(self.test_context, config, cell_nodes_cnt, [f'-D{ATTRIBUTE}={i}']))
for i in range(1, NUM_CELL):
cells.append(start_cell(self.test_context, config, NUM_NODES_CELL, [f'-D{ATTRIBUTE}={i}']))

return cells

def fix_pds_size(self, cells: [IgniteService], msg: str) -> dict:
def get_pds_size(self, cells: [IgniteService], msg: str) -> dict:
"""
Pds size in megabytes.
:param cells List of IgniteService
Expand All @@ -184,7 +175,7 @@ def fix_pds_size(self, cells: [IgniteService], msg: str) -> dict:

return res

def await_skipping_checkpoint(self, node: ClusterNode):
def await_cluster_idle(self, node: ClusterNode):
"""
Await Skipping checkpoint.
:param node ClusterNode
Expand All @@ -195,7 +186,7 @@ def await_skipping_checkpoint(self, node: ClusterNode):
self.logger.warn(ex)


def restart_with_clean_idx_node_on_cell(cells: [IgniteService], idxs: [int]):
def restart_with_clean_idx_node_on_cell_and_await_rebalance(cells: [IgniteService], idxs: [int]):
"""
Restart idxs nodes on cells with cleaning working directory and await rebalance.
:param cells List of IgniteService
Expand Down Expand Up @@ -267,7 +258,7 @@ def start_idx_node_on_cell(cells: [IgniteService], idxs: [int]):
cell.await_started()


def fix_data(control_utility: ControlUtility, node: ClusterNode, log_dir: str) -> str:
def create_idle_dump_and_copy_to_log_dir(control_utility: ControlUtility, node: ClusterNode, log_dir: str) -> str:
"""
Creates a dump file and copies it to the log directory.
:param control_utility ControlUtility.
Expand Down

0 comments on commit f864c1e

Please sign in to comment.