Skip to content

Commit

Permalink
fixes after merge
Browse files Browse the repository at this point in the history
  • Loading branch information
alievmirza committed May 7, 2024
1 parent 45326a7 commit 830e415
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.tableId;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.txId;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.waitAndGetPrimaryReplica;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.zoneIdForTable;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -57,6 +58,7 @@
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
Expand Down Expand Up @@ -88,6 +90,7 @@
public class ItTxResourcesVacuumTest extends ClusterPerTestIntegrationTest {
/** Table name. */
private static final String TABLE_NAME = "test_table";
private static final String ZONE_NAME = "test_zone";

private static final Tuple INITIAL_TUPLE = Tuple.create().set("key", 1L).set("val", "1");

Expand Down Expand Up @@ -118,7 +121,7 @@ public class ItTxResourcesVacuumTest extends ClusterPerTestIntegrationTest {
public void setup(TestInfo testInfo) throws Exception {
super.setup(testInfo);

String zoneSql = "create zone test_zone with partitions=20, replicas=" + REPLICAS
String zoneSql = "create zone " + ZONE_NAME + " with partitions=20, replicas=" + REPLICAS
+ ", storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
String sql = "create table " + TABLE_NAME + " (key bigint primary key, val varchar(20)) with primary_zone='TEST_ZONE'";

Expand Down Expand Up @@ -205,7 +208,7 @@ public void testVacuum() throws InterruptedException {

int partId = partitionIdForTuple(node, TABLE_NAME, tuple, tx);

Set<String> nodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), partId));
Set<String> nodes = partitionAssignment(node, new ZonePartitionId(zoneIdForTable(node, TABLE_NAME), partId));

view.upsert(tx, tuple);
view.upsert(parallelTx1, tupleForParallelTx);
Expand Down Expand Up @@ -296,7 +299,7 @@ public void testAbandonedTxnsAreNotVacuumizedUntilRecovered() throws Interrupted

int partId = partitionIdForTuple(anyNode(), TABLE_NAME, tuple, null);

TablePartitionId groupId = new TablePartitionId(tableId(anyNode(), TABLE_NAME), partId);
ZonePartitionId groupId = new ZonePartitionId(zoneIdForTable(anyNode(), TABLE_NAME), partId);

Set<String> txNodes = partitionAssignment(anyNode(), groupId);

Expand Down Expand Up @@ -380,7 +383,7 @@ public void testVacuumWithCleanupDelay() throws InterruptedException {
ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));

Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
Set<String> commitPartNodes = partitionAssignment(node, new ZonePartitionId(zoneIdForTable(node, TABLE_NAME), commitPartId));

log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);

Expand Down Expand Up @@ -484,7 +487,7 @@ public void testCommitPartitionPrimaryChangesBeforeVacuum() throws InterruptedEx
ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));

Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
Set<String> commitPartNodes = partitionAssignment(node, new ZonePartitionId(zoneIdForTable(node, TABLE_NAME), commitPartId));

log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);

Expand Down Expand Up @@ -568,7 +571,7 @@ public void testVacuumPersistentStateAfterCleanupDelayAndVolatileStateVacuum() t
ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));

Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
Set<String> commitPartNodes = partitionAssignment(node, new ZonePartitionId(zoneIdForTable(node, TABLE_NAME), commitPartId));

log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);

Expand Down Expand Up @@ -655,7 +658,7 @@ public void testRecoveryAfterPersistentStateVacuumized() throws InterruptedExcep
int commitPartId = partitionIdForTuple(commitPartitionLeaseholder, TABLE_NAME, tuple0, null);

Set<String> commitPartitionNodes = partitionAssignment(commitPartitionLeaseholder,
new TablePartitionId(tableId(commitPartitionLeaseholder, TABLE_NAME), commitPartId));
new ZonePartitionId(zoneIdForTable(commitPartitionLeaseholder, TABLE_NAME), commitPartId));

// Choose some node that doesn't host the partition as a tx coordinator.
IgniteImpl coord0 = findNode(n -> !commitPartitionNodes.contains(n.name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.table.RecordBinaryViewImpl;
import org.apache.ignite.internal.table.TableImpl;
Expand All @@ -61,7 +62,7 @@ public class ItTransactionTestUtils {
* @param grpId Group id.
* @return Node names.
*/
public static Set<String> partitionAssignment(IgniteImpl node, TablePartitionId grpId) {
public static Set<String> partitionAssignment(IgniteImpl node, ZonePartitionId grpId) {
MetaStorageManager metaStorageManager = node.metaStorageManager();

ByteArray stableAssignmentKey = stablePartAssignmentsKey(grpId);
Expand Down Expand Up @@ -137,7 +138,7 @@ public static Tuple findTupleToBeHostedOnNode(
return t;
}
} else {
Set<String> assignments = partitionAssignment(node, grpId);
Set<String> assignments = partitionAssignment(node, new ZonePartitionId(zoneIdForTable(node, tableName), partId));

if (assignments.contains(node.name())) {
return t;
Expand Down Expand Up @@ -174,6 +175,17 @@ public static int tableId(IgniteImpl node, String tableName) {
return table(node, tableName).tableId();
}

/**
* Returns the zone id of the provided table.
*
* @param node Any node in the cluster.
* @param tableName Table name.
* @return Table id.
*/
public static int zoneIdForTable(IgniteImpl node, String tableName) {
return table(node, tableName).internalTable().zoneId();
}

/**
* Transaction id.
*
Expand Down

0 comments on commit 830e415

Please sign in to comment.