Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IGNITE-22091] CLI for disaster recovery: partition states #3668

Merged
merged 17 commits into from
May 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.StringWriter;
import java.io.Writer;
import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.Cluster;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.cli.call.connect.ConnectCall;
Expand Down Expand Up @@ -151,12 +152,34 @@ protected void assertOutputContains(String expectedOutput) {
.contains(expectedOutput);
}

protected void assertOutputContainsAnyIgnoringCase(Set<String> expectedOutput) {
CharSequence[] expectedUpperCase = expectedOutput.stream().map(String::toUpperCase).toArray(CharSequence[]::new);

assertThat(sout.toString().toUpperCase())
.as("Expected command output to contain any of: " + expectedOutput + " but was " + sout.toString())
.containsAnyOf(expectedUpperCase);
}

protected void assertOutputContainsAllIgnoringCase(Set<String> expectedOutput) {
CharSequence[] expectedUpperCase = expectedOutput.stream().map(String::toUpperCase).toArray(CharSequence[]::new);

assertThat(sout.toString().toUpperCase())
.as("Expected command output to contain all of: " + expectedOutput + " but was " + sout.toString())
.contains(expectedUpperCase);
}

protected void assertOutputDoesNotContain(String expectedOutput) {
assertThat(sout.toString())
.as("Expected command output to not contain: " + expectedOutput + " but was " + sout.toString())
.doesNotContain(expectedOutput);
}

protected void assertOutputDoesNotContainIgnoreCase(Set<String> expectedOutput) {
assertThat(sout.toString())
.as("Expected command output to not contain: " + expectedOutput + " but was " + sout.toString())
.doesNotContainIgnoringCase(expectedOutput.toArray(CharSequence[]::new));
}

protected void assertOutputMatches(String regex) {
assertThat(sout.toString())
.as("Expected command output to match regex: " + regex + " but it is not: " + sout.toString())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.cli.commands.recovery;

import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.cli.commands.Options.Constants.CLUSTER_URL_OPTION;
import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_LOCAL_OPTION;
import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_NODE_NAMES_OPTION;
import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_GLOBAL_OPTION;
import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_IDS_OPTION;
import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_ZONE_NAMES_OPTION;

import java.util.HashSet;
import java.util.Set;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.cli.CliIntegrationTest;
import org.apache.ignite.internal.util.CollectionUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class ItPartitionStatesTest extends CliIntegrationTest {
private static final int DEFAULT_PARTITION_COUNT = 25;

private static final Set<String> ZONES = Set.of("first_zone", "second_zone", "third_zone");

private static final Set<String> MIXED_ZONES = Set.of("mixed_case_zone", "MIXED_CASE_ZONE");
Phillippko marked this conversation as resolved.
Show resolved Hide resolved

private static final Set<String> ALL_ZONES = new HashSet<>(CollectionUtils.concat(ZONES, MIXED_ZONES));

private static final Set<String> STATES = Set.of("HEALTHY", "AVAILABLE");

private static Set<String> nodeNames;

@BeforeAll
public static void createTables() {
ALL_ZONES.forEach(name -> {
sql(String.format("CREATE ZONE \"%s\" WITH storage_profiles='%s'", name, DEFAULT_AIPERSIST_PROFILE_NAME));
sql("CREATE TABLE \"" + name + "_table\" (id INT PRIMARY KEY, val INT) WITH PRIMARY_ZONE = '" + name + "'");
});

sql("CREATE ZONE \"empty_zone\" WITH storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'");

nodeNames = CLUSTER.runningNodes().map(IgniteImpl::name).collect(toSet());
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void testPartitionStates(boolean global) {
execute("recovery", "partition-states",
CLUSTER_URL_OPTION, NODE_URL,
global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_LOCAL_OPTION
);

for (int i = 0; i < DEFAULT_PARTITION_COUNT; i++) {
assertOutputContains(String.valueOf(i));
}

checkOutput(global, ALL_ZONES, nodeNames);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void testPartitionStatesByZones(boolean global) {
execute("recovery", "partition-states",
CLUSTER_URL_OPTION, NODE_URL,
RECOVERY_ZONE_NAMES_OPTION, String.join(",", ZONES),
global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_LOCAL_OPTION
);

assertOutputContains("Table name");
assertOutputContains("Partition ID");
assertOutputContains("State");

for (int i = 0; i < DEFAULT_PARTITION_COUNT; i++) {
assertOutputContains(String.valueOf(i));
}

checkOutput(global, ZONES, nodeNames);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void testPartitionStatesZonesMixedCase(boolean global) {
execute("recovery", "partition-states",
CLUSTER_URL_OPTION, NODE_URL,
RECOVERY_ZONE_NAMES_OPTION, String.join(",", MIXED_ZONES),
global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_LOCAL_OPTION
);

checkOutput(global, MIXED_ZONES, nodeNames);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void testPartitionStatesMissingZone(boolean global) {
String unknownZone = "UNKNOWN_ZONE";

execute("recovery", "partition-states",
CLUSTER_URL_OPTION, NODE_URL,
RECOVERY_ZONE_NAMES_OPTION, unknownZone,
global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_LOCAL_OPTION
);

assertErrOutputContains("Some distribution zones are missing: [UNKNOWN_ZONE]");

assertOutputIsEmpty();
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void testPartitionStatesMissingPartition(boolean global) {
String unknownPartition = "-1";

execute("recovery", "partition-states",
CLUSTER_URL_OPTION, NODE_URL,
RECOVERY_PARTITION_IDS_OPTION, unknownPartition,
global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_LOCAL_OPTION);

assertErrOutputContains("Some partitions are missing: [-1]");

assertOutputIsEmpty();
}

@ParameterizedTest
Phillippko marked this conversation as resolved.
Show resolved Hide resolved
@ValueSource(booleans = {false, true})
void testPartitionStatesMissingNode() {
String unknownNode = "unknown_node";

execute("recovery", "partition-states",
CLUSTER_URL_OPTION, NODE_URL,
RECOVERY_NODE_NAMES_OPTION, unknownNode,
RECOVERY_LOCAL_OPTION);

assertErrOutputContains("Some nodes are missing: [unknown_node]");

assertOutputIsEmpty();
}


Phillippko marked this conversation as resolved.
Show resolved Hide resolved
@ParameterizedTest
@ValueSource(booleans = {false, true})
void testPartitionStatesEmptyResult(boolean global) {
execute("recovery", "partition-states",
CLUSTER_URL_OPTION, NODE_URL,
RECOVERY_ZONE_NAMES_OPTION, "empty_zone",
global ? RECOVERY_PARTITION_GLOBAL_OPTION : RECOVERY_LOCAL_OPTION
);

checkOutput(global, Set.of(), Set.of());
}

private void checkOutput(boolean global, Set<String> zoneNames, Set<String> nodes) {
assertErrOutputIsEmpty();

if (global) {
assertOutputDoesNotContain("Node name");
} else {
assertOutputContains("Node name");

if (!nodes.isEmpty()) {
assertOutputContainsAnyIgnoringCase(nodes);
}

Set<String> anotherNodes = CollectionUtils.difference(nodeNames, nodes);

if (!anotherNodes.isEmpty()) {
assertOutputDoesNotContainIgnoreCase(anotherNodes);
}
}

assertOutputContains("Zone name");
assertOutputContains("Table name");
assertOutputContains("Partition ID");
assertOutputContains("State");

if (!zoneNames.isEmpty()) {
assertOutputContainsAllIgnoringCase(zoneNames);

Set<String> tableNames = zoneNames.stream().map(it -> it + "_table").collect(toSet());

assertOutputContainsAllIgnoringCase(tableNames);
}

Set<String> anotherZones = CollectionUtils.difference(ZONES, zoneNames);

if (!anotherZones.isEmpty()) {
assertOutputDoesNotContainIgnoreCase(anotherZones);
}

if (!zoneNames.isEmpty() && nodeNames.isEmpty()) {
assertOutputContainsAnyIgnoringCase(STATES);
}
}
rpuch marked this conversation as resolved.
Show resolved Hide resolved
}
Phillippko marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.cli.call.recovery;

import static java.util.stream.Collectors.toList;

import jakarta.inject.Singleton;
import java.util.List;
import java.util.stream.Stream;
import org.apache.ignite.internal.cli.core.call.Call;
import org.apache.ignite.internal.cli.core.call.DefaultCallOutput;
import org.apache.ignite.internal.cli.core.exception.IgniteCliApiException;
import org.apache.ignite.internal.cli.core.rest.ApiClientFactory;
import org.apache.ignite.internal.cli.sql.table.Table;
import org.apache.ignite.rest.client.api.RecoveryApi;
import org.apache.ignite.rest.client.invoker.ApiException;
import org.apache.ignite.rest.client.model.GlobalPartitionStatesResponse;
import org.apache.ignite.rest.client.model.LocalPartitionStatesResponse;

/** Call to get partition states. */
@Singleton
public class PartitionStatesCall implements Call<PartitionStatesCallInput, Table> {
private final ApiClientFactory clientFactory;

private static final List<String> GLOBAL_HEADERS = List.of("Zone name", "Table name", "Partition ID", "State");

private static final List<String> LOCAL_HEADERS = Stream
.concat(Stream.of("Node name"), GLOBAL_HEADERS.stream())
.collect(toList());

public PartitionStatesCall(ApiClientFactory clientFactory) {
this.clientFactory = clientFactory;
}

@Override
public DefaultCallOutput<Table> execute(PartitionStatesCallInput input) {
RecoveryApi client = new RecoveryApi(clientFactory.getClient(input.clusterUrl()));

List<String> trimmedZoneNames = trim(input.zoneNames());
List<String> trimmedNodeNames = trim(input.nodeNames());

try {
if (input.local()) {
LocalPartitionStatesResponse localStates = client.getLocalPartitionStates(
rpuch marked this conversation as resolved.
Show resolved Hide resolved
trimmedZoneNames,
trimmedNodeNames,
input.partitionIds()
);

List<String> content;
content = localStates.getStates().stream()
.flatMap(state -> Stream.of(
state.getNodeName(),
state.getZoneName(),
state.getTableName(),
String.valueOf(state.getPartitionId()),
state.getState()
)
)
.collect(toList());

return DefaultCallOutput.success(new Table(LOCAL_HEADERS, content));
} else {
GlobalPartitionStatesResponse globalStates = client.getGlobalPartitionStates(
trimmedZoneNames,
input.partitionIds()
);

List<String> content = globalStates.getStates().stream()
.flatMap(state -> Stream.of(
state.getZoneName(),
state.getTableName(),
String.valueOf(state.getPartitionId()),
state.getState()
)
)
.collect(toList());

return DefaultCallOutput.success(new Table(GLOBAL_HEADERS, content));
}
} catch (ApiException e) {
return DefaultCallOutput.failure(new IgniteCliApiException(e, input.clusterUrl()));
}
}

private static List<String> trim(List<String> names) {
return names == null
? List.of()
: names.stream()
.map(String::trim)
.collect(toList());
}
}