Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bidirectional replication flakey test #4130

Merged
merged 3 commits into from Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -60,12 +59,12 @@ public class HbaseToCloudBigtableBidirectionalReplicationEndpointTest {

public static class TestReplicationEndpoint extends HbaseToCloudBigtableReplicationEndpoint {

static AtomicInteger replicatedEntries = new AtomicInteger();
static AtomicInteger replicatingEntries = new AtomicInteger();

@Override
public boolean replicate(ReplicateContext replicateContext) {
boolean result = super.replicate(replicateContext);
replicatedEntries.getAndAdd(replicateContext.getEntries().size());
replicatingEntries.getAndAdd(replicateContext.getEntries().size());
return result;
}
}
Expand Down Expand Up @@ -141,7 +140,7 @@ public void setupTestCase() throws IOException {
hbaseTable = hbaseConnection.getTable(table1);

// Reset the entry counts for TestReplicationEndpoint
TestReplicationEndpoint.replicatedEntries.set(0);
TestReplicationEndpoint.replicatingEntries.set(0);
}

private void createTables(TableName tableName, int cf1Scope, int cf2Scope) throws IOException {
Expand Down Expand Up @@ -185,15 +184,16 @@ public void testDropsReplicatedEntry() throws IOException, InterruptedException
// Wait for replication
TestUtils.waitForReplication(
() -> {
// Only one entry should've been replicated
return TestReplicationEndpoint.replicatedEntries.get() >= 1;
// Both entries should've gone through the replication process
// However, one would've been filtered out during this process
return TestReplicationEndpoint.replicatingEntries.get() >= 2;
});

// Hbase table should have both mutations
Assert.assertTrue(hbaseTable.get(new Get(TestUtils.ROW_KEY)).size() == 1);
Assert.assertTrue(hbaseTable.get(new Get(TestUtils.ROW_KEY_2)).size() == 1);
Assert.assertEquals(1, hbaseTable.get(new Get(TestUtils.ROW_KEY)).size());
Assert.assertEquals(1, hbaseTable.get(new Get(TestUtils.ROW_KEY_2)).size());
// Cbt table should have only one mutation
Assert.assertTrue(cbtTable.get(new Get(TestUtils.ROW_KEY)).isEmpty());
Assert.assertTrue(cbtTable.get(new Get(TestUtils.ROW_KEY_2)).size() == 1);
Assert.assertEquals(1, cbtTable.get(new Get(TestUtils.ROW_KEY_2)).size());
}
}
Expand Up @@ -38,7 +38,6 @@
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -61,12 +60,13 @@ public class HbaseToCloudBigtableBidirectionalReplicationEndpointTest {

public static class TestReplicationEndpoint extends HbaseToCloudBigtableReplicationEndpoint {

static AtomicInteger replicatedEntries = new AtomicInteger();
static AtomicInteger replicatingEntries = new AtomicInteger();

@Override
public boolean replicate(ReplicateContext replicateContext) {
boolean result = super.replicate(replicateContext);
replicatedEntries.getAndAdd(replicateContext.getEntries().size());
replicatingEntries.getAndAdd(replicateContext.getEntries().size());

return result;
}
}
Expand Down Expand Up @@ -144,9 +144,7 @@ public void setupTestCase() throws IOException {
hbaseTable = hbaseConnection.getTable(table1);

// Reset the entry counts for TestReplicationEndpoint
HbaseToCloudBigtableBidirectionalReplicationEndpointTest.TestReplicationEndpoint
.replicatedEntries
.set(0);
TestReplicationEndpoint.replicatingEntries.set(0);
}

private void createTables(TableName tableName, int cf1Scope, int cf2Scope) throws IOException {
Expand Down Expand Up @@ -191,18 +189,16 @@ public void testDropsReplicatedEntry() throws IOException, InterruptedException
// Wait for replication
TestUtils.waitForReplication(
() -> {
// Only one entry should've been replicated
return HbaseToCloudBigtableBidirectionalReplicationEndpointTest.TestReplicationEndpoint
.replicatedEntries
.get()
>= 1;
// Both entries should've gone through the replication process
// However, one would've been filtered out during this process
return TestReplicationEndpoint.replicatingEntries.get() >= 2;
});

// Hbase table should have both mutations
Assert.assertTrue(hbaseTable.get(new Get(TestUtils.ROW_KEY)).size() == 1);
Assert.assertTrue(hbaseTable.get(new Get(TestUtils.ROW_KEY_2)).size() == 1);
Assert.assertEquals(1, hbaseTable.get(new Get(TestUtils.ROW_KEY)).size());
Assert.assertEquals(1, hbaseTable.get(new Get(TestUtils.ROW_KEY_2)).size());
// Cbt table should have only one mutation
Assert.assertTrue(cbtTable.get(new Get(TestUtils.ROW_KEY)).isEmpty());
Assert.assertTrue(cbtTable.get(new Get(TestUtils.ROW_KEY_2)).size() == 1);
Assert.assertEquals(1, cbtTable.get(new Get(TestUtils.ROW_KEY_2)).size());
}
}