Skip to content

Commit

Permalink
Local Index Replication Integration Test
Browse files Browse the repository at this point in the history
  • Loading branch information
hnguyen08 committed Apr 30, 2019
1 parent a6cfed7 commit 3589cb4
Showing 1 changed file with 139 additions and 19 deletions.
Expand Up @@ -42,11 +42,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
Expand Down Expand Up @@ -98,7 +94,7 @@ public class MutableIndexReplicationIT extends BaseTest {

protected static HBaseTestingUtility utility1;
protected static HBaseTestingUtility utility2;
protected static final int REPLICATION_RETRIES = 100;
protected static final int REPLICATION_RETRIES = 3;

protected static final byte[] tableName = Bytes.toBytes("test");
protected static final byte[] row = Bytes.toBytes("row");
Expand Down Expand Up @@ -245,20 +241,23 @@ public void testReplicationWithMutableIndexes() throws Exception {
// lookup tables. For right now, we just go through an HTable
LOG.info("Looking up tables in replication target");
TableName[] tables = admin2.listTableNames();
HTable remoteTable = new HTable(utility2.getConfiguration(), tables[0]);
for (int i = 0; i < REPLICATION_RETRIES; i++) {
if (i >= REPLICATION_RETRIES - 1) {
fail("Waited too much time for put replication on table " + remoteTable
.getTableDescriptor().getNameAsString());
}
if (ensureAnyRows(remoteTable)) {
break;
for (TableName tableName: tables) {
LOG.info("checking rows in table: " + tableName);
HTable remoteTable = new HTable(utility2.getConfiguration(), tableName);
for (int i = 0; i < REPLICATION_RETRIES; i++) {
if (i >= REPLICATION_RETRIES - 1) {
fail("Waited too much time for put replication on table " + remoteTable
.getTableDescriptor().getNameAsString());
}
if (ensureAnyRows(remoteTable)) {
break;
}
LOG.info("Sleeping for " + REPLICATION_WAIT_TIME_MILLIS
+ " for edits to get replicated");
Thread.sleep(REPLICATION_WAIT_TIME_MILLIS);
}
LOG.info("Sleeping for " + REPLICATION_WAIT_TIME_MILLIS
+ " for edits to get replicated");
Thread.sleep(REPLICATION_WAIT_TIME_MILLIS);
remoteTable.close();
}
remoteTable.close();
}

private boolean ensureAnyRows(HTable remoteTable) throws IOException {
Expand All @@ -274,8 +273,129 @@ private boolean ensureAnyRows(HTable remoteTable) throws IOException {
return found;
}

private boolean ensureNumberOfRows(HTable remoteTable, int numRows) throws IOException {
Scan scan = new Scan();
scan.setRaw(true);
ResultScanner scanner = remoteTable.getScanner(scan);
int rowsFound = 0;
for (Result r : scanner) {
LOG.info("got row: " + r);
rowsFound++;
}
scanner.close();
return (numRows == rowsFound);
}

private static Connection getConnection() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
return DriverManager.getConnection(URL, props);
}
}

@Test
public void testReplicationWithMutableLocalIndexes() throws Exception {
Connection conn = getConnection();

//create the primary and index tables
conn.createStatement().execute(
"CREATE TABLE " + DATA_TABLE_FULL_NAME
+ " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
conn.createStatement().execute(
"CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME
+ " (v1)");

// make sure that the tables are empty, but reachable
String query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
ResultSet rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());

//make sure there is no data in the table
query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());

// make sure the data tables are created on the remote cluster
HBaseAdmin admin = utility1.getHBaseAdmin();
HBaseAdmin admin2 = utility2.getHBaseAdmin();

List<String> dataTables = new ArrayList<String>();
dataTables.add(DATA_TABLE_FULL_NAME);
// There is no separate index table
//dataTables.add(INDEX_TABLE_FULL_NAME);
for (String tableName : dataTables) {
HTableDescriptor desc = admin.getTableDescriptor(TableName.valueOf(tableName));

//create it as-is on the remote cluster
admin2.createTable(desc);

LOG.info("Enabling replication on source table: "+tableName);
HColumnDescriptor[] cols = desc.getColumnFamilies();
assertEquals(2, cols.length);
// add the replication scope to the first column family
LOG.info("Enabling replication on CF: " + cols[0].getName());
HColumnDescriptor col = desc.removeFamily(cols[0].getName());
col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
desc.addFamily(col);
// add the replication scope to the second column family
LOG.info("Enabling replication on CF: " + cols[1].getName());
HColumnDescriptor col2 = desc.removeFamily(cols[1].getName());
col2.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
desc.addFamily(col2);
//disable/modify/enable table so it has replication enabled
admin.disableTable(desc.getTableName());
admin.modifyTable(tableName, desc);
admin.enableTable(desc.getTableName());
LOG.info("Replication enabled on source table: "+tableName);
}

// load some data into the source cluster table
PreparedStatement stmt =
conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
stmt.setString(1, "a"); // k
stmt.setString(2, "x"); // v1 <- has index
stmt.setString(3, "1"); // v2
stmt.execute();
conn.commit();

// make sure the index is working as expected
query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("x", rs.getString(1));
assertFalse(rs.next());
conn.close();

// make sure there are 2 rows in the HTable (one for the data row, one for the index entry)
TableName[] mainTables = admin.listTableNames(DATA_TABLE_FULL_NAME);
HTable mainTable = new HTable(utility1.getConfiguration(), mainTables[0]);
assertTrue(ensureNumberOfRows(mainTable, 2));

/*
Validate that we have replicated the rows to the remote cluster
*/

// other table can't be reached through Phoenix right now - would need to change how we
// lookup tables. For right now, we just go through an HTable
LOG.info("Looking up tables in replication target");
TableName[] tables = admin2.listTableNames();
LOG.info("Table name length: " + tables.length);
for (TableName tn : tables) {
LOG.info("Table name: " + tn.getNameAsString());
}
HTable remoteTable = new HTable(utility2.getConfiguration(), tables[0]);
for (int i = 0; i < REPLICATION_RETRIES; i++) {
if (i >= REPLICATION_RETRIES - 1) {
fail("Waited too much time for put replication on table " + remoteTable
.getTableDescriptor().getNameAsString());
}
// Should have replicated both rows in the HTable
if (ensureNumberOfRows(remoteTable, 2)) {
break;
}
LOG.info("Sleeping for " + REPLICATION_WAIT_TIME_MILLIS
+ " for edits to get replicated");
Thread.sleep(REPLICATION_WAIT_TIME_MILLIS);
}
remoteTable.close();

}
}

0 comments on commit 3589cb4

Please sign in to comment.