diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index 176c5a08858..ebc6988a39d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -30,24 +30,17 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseCluster; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.query.QueryServices; @@ -61,7 +54,6 @@ import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TestUtil; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -75,28 +67,29 @@ * For some reason dropping tables after running this test * fails unless it runs its own mini cluster. * - * - * @since 2.1 */ @Category(NeedsOwnMiniClusterTest.class) @RunWith(Parameterized.class) public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { - private Timer scheduleTimer; - + public static volatile boolean FAIL_WRITE = false; + public static final String INDEX_NAME = "IDX"; + private String tableName; private String indexName; private String fullTableName; private String fullIndexName; - private boolean transactional; + private final boolean transactional; + private final boolean localIndex; private final String tableDDLOptions; - public MutableIndexFailureIT(boolean transactional) { + public MutableIndexFailureIT(boolean transactional, boolean localIndex) { this.transactional = transactional; + this.localIndex = localIndex; this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : ""; - this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : ""); - this.indexName = "IDX"; + this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : ""); + this.indexName = INDEX_NAME; this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); } @@ -104,31 +97,28 @@ public MutableIndexFailureIT(boolean transactional) { @BeforeClass public static void doSetup() throws Exception { Map serverProps = Maps.newHashMapWithExpectedSize(10); - serverProps.put("hbase.client.retries.number", "2"); + serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); + serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); + serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); serverProps.put("hbase.client.pause", "5000"); + serverProps.put("data.tx.snapshot.dir", "/tmp"); serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE)); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, "0"); Map clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true"); NUM_SLAVES_BASE = 4; setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } - @Parameters(name = "transactional = {0}") + @Parameters(name = "transactional = {0}, localIndex = {1}") public static Collection data() { - return Arrays.asList(new Boolean[][] { { false }, { true } }); - } - - @Test - public void testWriteFailureDisablesLocalIndex() throws Exception { - helpTestWriteFailureDisablesIndex(true); + return Arrays.asList(new Boolean[][] { { false, false }, { false, true }, { true, false }, { true, true } }); } @Test public void testWriteFailureDisablesIndex() throws Exception { - helpTestWriteFailureDisablesIndex(false); + helpTestWriteFailureDisablesIndex(); } - public void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Exception { + public void helpTestWriteFailureDisablesIndex() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = driver.connect(url, props)) { String query; @@ -140,15 +130,9 @@ public void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Excepti rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); - if(localIndex) { - conn.createStatement().execute( - "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); - conn.createStatement().execute( - "CREATE LOCAL INDEX " + indexName+ "_2" + " ON " + fullTableName + " (v2) INCLUDE (v1)"); - } else { - conn.createStatement().execute( - "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); - } + FAIL_WRITE = false; + conn.createStatement().execute( + "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); query = "SELECT * FROM " + fullIndexName; rs = conn.createStatement().executeQuery(query); @@ -167,23 +151,50 @@ public void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Excepti stmt.setString(2, "x"); stmt.setString(3, "1"); stmt.execute(); + stmt.setString(1, "b"); + stmt.setString(2, "y"); + stmt.setString(3, "2"); + stmt.execute(); + stmt.setString(1, "c"); + stmt.setString(2, "z"); + stmt.setString(3, "3"); + stmt.execute(); conn.commit(); - TableName indexTable = - TableName.valueOf(localIndex ? MetaDataUtil - .getLocalIndexTableName(fullTableName) : fullIndexName); - HBaseAdmin admin = getUtility().getHBaseAdmin(); - HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable); - try{ - admin.disableTable(indexTable); - admin.deleteTable(indexTable); - } catch (TableNotFoundException ignore) {} + query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName; + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + String expectedPlan = + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName; + assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs)); + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("x", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("b", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("c", rs.getString(1)); + assertEquals("z", rs.getString(2)); + assertFalse(rs.next()); + + FAIL_WRITE = true; stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); - stmt.setString(1, "a2"); + // Insert new row + stmt.setString(1, "d"); + stmt.setString(2, "d"); + stmt.setString(3, "4"); + stmt.execute(); + // Update existing row + stmt.setString(1, "a"); stmt.setString(2, "x2"); stmt.setString(3, "2"); stmt.execute(); + // Delete existing row + stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?"); + stmt.setString(1, "b"); + stmt.execute(); try { conn.commit(); fail(); @@ -196,20 +207,17 @@ public void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Excepti assertTrue(rs.next()); assertEquals(indexName, rs.getString(3)); // the index is only disabled for non-txn tables upon index table write failure - PIndexState indexState = transactional ? PIndexState.ACTIVE : PIndexState.DISABLE; - assertEquals(indexState.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); - if(localIndex) { - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2", - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - assertEquals(indexName + "_2", rs.getString(3)); - assertEquals(indexState.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); + if (transactional) { + assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); + } else { + String indexState = rs.getString("INDEX_STATE"); + assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState)); } + assertFalse(rs.next()); - // if the table is transactional the write to the index table will fail because the - // index has not been disabled + // If the table is transactional the write to both the data and index table will fail + // in an all or none manner. If the table is not transactional, then the data writes + // would have succeeded while the index writes would have failed. if (!transactional) { // Verify UPSERT on data table still work after index is disabled stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); @@ -218,210 +226,101 @@ public void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Excepti stmt.setString(3, "3"); stmt.execute(); conn.commit(); - } - if (transactional) { - // if the table was transactional there should be 1 row (written before the index - // was disabled) - query = "SELECT /*+ NO_INDEX */ v2 FROM " + fullTableName; + // Verify previous writes succeeded to data table + query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName; rs = conn.createStatement().executeQuery("EXPLAIN " + query); - String expectedPlan = + expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName; assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs)); rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); - assertEquals("1", rs.getString(1)); - assertFalse(rs.next()); - } else { - // if the table was not transactional there should be three rows (all writes to data - // table should succeed) - query = "SELECT v2 FROM " + fullTableName; - rs = conn.createStatement().executeQuery("EXPLAIN " + query); - String expectedPlan = - "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName; - assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs)); - rs = conn.createStatement().executeQuery(query); + assertEquals("a", rs.getString(1)); + assertEquals("x2", rs.getString(2)); assertTrue(rs.next()); - assertEquals("1", rs.getString(1)); + assertEquals("a3", rs.getString(1)); + assertEquals("x3", rs.getString(2)); assertTrue(rs.next()); - assertEquals("2", rs.getString(1)); + assertEquals("c", rs.getString(1)); + assertEquals("z", rs.getString(2)); assertTrue(rs.next()); - assertEquals("3", rs.getString(1)); + assertEquals("d", rs.getString(1)); + assertEquals("d", rs.getString(2)); assertFalse(rs.next()); } - // recreate index table - admin.createTable(indexTableDesc); - do { - Thread.sleep(15 * 1000); // sleep 15 secs - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ - break; - } - if(localIndex) { - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2", + // re-enable index table + FAIL_WRITE = false; + + boolean isActive = false; + if (!transactional) { + int maxTries = 3, nTries = 0; + do { + Thread.sleep(15 * 1000); // sleep 15 secs + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ + isActive = true; break; } - } - } while(true); + } while(++nTries < maxTries); + assertTrue(isActive); + } // Verify UPSERT on data table still work after index table is recreated stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); - stmt.setString(1, "a4"); + stmt.setString(1, "a3"); stmt.setString(2, "x4"); stmt.setString(3, "4"); stmt.execute(); conn.commit(); - // verify index table has data - query = "SELECT count(1) FROM " + fullIndexName; + // verify index table has correct data + query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + fullTableName; + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + expectedPlan = + " OVER " + (localIndex ? MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + tableName : fullIndexName); + String explainPlan = QueryUtil.getExplainPlan(rs); + assertTrue(explainPlan.contains(expectedPlan)); rs = conn.createStatement().executeQuery(query); - assertTrue(rs.next()); - - // for txn tables there will be only one row in the index (a4) - // for non txn tables there will be three rows because we only partially build index - // from where we failed and the oldest - // index row has been deleted when we dropped the index table during test - assertEquals(transactional ? 1 : 3, rs.getInt(1)); - } - } - - - @Ignore("See PHOENIX-2332") - @Test - public void testWriteFailureWithRegionServerDown() throws Exception { - String query; - ResultSet rs; - - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = driver.connect(url, props);) { - conn.setAutoCommit(false); - conn.createStatement().execute( - "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "+tableDDLOptions); - query = "SELECT * FROM " + fullTableName; - rs = conn.createStatement().executeQuery(query); - assertFalse(rs.next()); - - conn.createStatement().execute( - "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); - query = "SELECT * FROM " + fullIndexName; - rs = conn.createStatement().executeQuery(query); - assertFalse(rs.next()); - - // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - assertEquals(indexName, rs.getString(3)); - assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); - - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); - stmt.setString(1, "a"); - stmt.setString(2, "x"); - stmt.setString(3, "1"); - stmt.execute(); - conn.commit(); - - // find a RS which doesn't has CATALOG table - TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG"); - TableName indexTable = TableName.valueOf(fullIndexName); - final HBaseCluster cluster = getUtility().getHBaseCluster(); - Collection rss = cluster.getClusterStatus().getServers(); - HBaseAdmin admin = getUtility().getHBaseAdmin(); - List regions = admin.getTableRegions(catalogTable); - ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getTable(), - regions.get(0).getRegionName()); - ServerName metaRS = cluster.getServerHoldingMeta(); - ServerName rsToBeKilled = null; - - // find first RS isn't holding META or CATALOG table - for(ServerName curRS : rss) { - if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) { - rsToBeKilled = curRS; - break; - } - } - assertTrue(rsToBeKilled != null); - - regions = admin.getTableRegions(indexTable); - final HRegionInfo indexRegion = regions.get(0); - final ServerName dstRS = rsToBeKilled; - admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName())); - getUtility().waitFor(30000, 200, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - ServerName sn = cluster.getServerHoldingRegion(indexRegion.getTable(), - indexRegion.getRegionName()); - return (sn != null && sn.equals(dstRS)); - } - }); - - // use timer sending updates in every 10ms - this.scheduleTimer = new Timer(true); - this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn, fullTableName), 0, 10); - // let timer sending some updates - Thread.sleep(100); - - // kill RS hosting index table - getUtility().getHBaseCluster().killRegionServer(rsToBeKilled); - - // wait for index table completes recovery - getUtility().waitUntilAllRegionsAssigned(indexTable); - - // Verify the metadata for index is correct. - do { - Thread.sleep(15 * 1000); // sleep 15 secs - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, - new String[] { PTableType.INDEX.toString() }); + if (transactional) { // failed commit does not get retried + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("x", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("a3", rs.getString(1)); + assertEquals("x4", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("b", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("c", rs.getString(1)); + assertEquals("z", rs.getString(2)); + assertFalse(rs.next()); + } else { // failed commit eventually succeeds + assertTrue(rs.next()); + assertEquals("d", rs.getString(1)); + assertEquals("d", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("x2", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("a3", rs.getString(1)); + assertEquals("x4", rs.getString(2)); assertTrue(rs.next()); - if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ - break; - } - } while(true); - this.scheduleTimer.cancel(); + assertEquals("c", rs.getString(1)); + assertEquals("z", rs.getString(2)); + assertFalse(rs.next()); + } } } - - static class SendingUpdatesScheduleTask extends TimerTask { - private static final Log LOG = LogFactory.getLog(SendingUpdatesScheduleTask.class); - - // inProgress is to prevent timer from invoking a new task while previous one is still - // running - private final static AtomicInteger inProgress = new AtomicInteger(0); - private final Connection conn; - private final String fullTableName; - private int inserts = 0; - - public SendingUpdatesScheduleTask(Connection conn, String fullTableName) { - this.conn = conn; - this.fullTableName = fullTableName; - } - + + public static class FailingRegionObserver extends SimpleRegionObserver { @Override - public void run() { - if(inProgress.get() > 0){ - return; - } - - try { - inProgress.incrementAndGet(); - inserts++; - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); - stmt.setString(1, "a" + inserts); - stmt.setString(2, "x" + inserts); - stmt.setString(3, String.valueOf(inserts)); - stmt.execute(); - conn.commit(); - } catch (Throwable t) { - LOG.warn("ScheduledBuildIndexTask failed!", t); - } finally { - inProgress.decrementAndGet(); + public void preBatchMutate(ObserverContext c, MiniBatchOperationInProgress miniBatchOp) throws HBaseIOException { + if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) { + throw new DoNotRetryIOException(); } } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java index 8df82cef063..931fcaee08e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java @@ -27,19 +27,20 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.exception.SQLExceptionCode; @@ -56,6 +57,9 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Maps; /** @@ -69,28 +73,37 @@ */ @Category(NeedsOwnMiniClusterTest.class) +@RunWith(Parameterized.class) public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { - private static final String FAIL_ON_FIRST_PUT = "bbb"; + public static volatile boolean FAIL_WRITE = false; + public static final String INDEX_NAME = "IDX"; private String tableName; private String indexName; private String fullTableName; private String fullIndexName; + private final boolean localIndex; - public ReadOnlyIndexFailureIT() { - this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME; - this.indexName = "IDX"; + public ReadOnlyIndexFailureIT(boolean localIndex) { + this.localIndex = localIndex; + this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME; + this.indexName = INDEX_NAME; this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); } + @Parameters(name = "localIndex = {0}") + public static Collection data() { + return Arrays.asList(new Boolean[][] { { false }, { true } }); + } + @BeforeClass public static void doSetup() throws Exception { Map serverProps = Maps.newHashMapWithExpectedSize(10); - serverProps.put("hbase.client.retries.number", "2"); + serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); + serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); serverProps.put("hbase.client.pause", "5000"); serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE)); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, "0"); serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true"); serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, "true"); serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000"); @@ -104,17 +117,12 @@ public static void doSetup() throws Exception { new ReadOnlyProps(clientProps.entrySet().iterator())); } - @Test - public void testWriteFailureReadOnlyLocalIndex() throws Exception { - helpTestWriteFailureReadOnlyIndex(true); - } - @Test public void testWriteFailureReadOnlyIndex() throws Exception { - helpTestWriteFailureReadOnlyIndex(false); + helpTestWriteFailureReadOnlyIndex(); } - public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Exception { + public void helpTestWriteFailureReadOnlyIndex() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = driver.connect(url, props)) { String query; @@ -126,6 +134,7 @@ public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Excepti rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); + FAIL_WRITE = false; if(localIndex) { conn.createStatement().execute( "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName @@ -157,9 +166,10 @@ public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Excepti stmt.execute(); conn.commit(); + FAIL_WRITE = true; stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); stmt.setString(1, "2"); - stmt.setString(2, FAIL_ON_FIRST_PUT); + stmt.setString(2, "bbb"); stmt.setString(3, "b2"); stmt.execute(); try { @@ -201,6 +211,7 @@ public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Excepti assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), e.getErrorCode()); } + FAIL_WRITE = false; // Second attempt at writing will succeed int retries = 0; do { @@ -222,12 +233,12 @@ public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Excepti conn.commit(); // verify index table has data - query = "SELECT count(1) FROM " + indexName; + query = "SELECT count(1) FROM " + fullIndexName; rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); assertEquals(3, rs.getInt(1)); - query = "SELECT v1 FROM " + fullTableName; + query = "SELECT /*+ INDEX(" + indexName + ") */ v1 FROM " + fullTableName; rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); assertEquals("aaa", rs.getString(1)); @@ -261,29 +272,13 @@ private static boolean hasIndexDisableTimestamp(Connection conn, String indexNam return (!rs.wasNull() && ts > 0); } + public static class FailingRegionObserver extends SimpleRegionObserver { - private Integer failCount = new Integer(0); - @Override - public void prePut(ObserverContext c, Put put, WALEdit edit, - final Durability durability) throws HBaseIOException { - if (shouldFailUpsert(c, put)) { - synchronized (failCount) { - failCount++; - if (failCount.intValue() == 1) { - // throwing anything other than instances of IOException result - // in this coprocessor being unloaded - // DoNotRetryIOException tells HBase not to retry this mutation - // multiple times - throw new DoNotRetryIOException(); - } - } + public void preBatchMutate(ObserverContext c, MiniBatchOperationInProgress miniBatchOp) throws HBaseIOException { + if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) { + throw new DoNotRetryIOException(); } } - - private boolean shouldFailUpsert(ObserverContext c, Put put) { - return Bytes.contains(put.getRow(), Bytes.toBytes(FAIL_ON_FIRST_PUT)); - } - } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java index 23967193456..fe2f1b443a7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java @@ -148,7 +148,7 @@ public ListMatchingVerifier(String msg, List kvs, ColumnReference... colum public void verify(TableState state) { try { Scanner kvs = - ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns)).getFirst(); + ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns), false).getFirst(); int count = 0; Cell kv; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 9487b36ec7a..a5533afeed1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -58,10 +58,10 @@ import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; -import co.cask.tephra.Transaction; - import com.google.common.collect.ImmutableList; +import co.cask.tephra.Transaction; + abstract public class BaseScannerRegionObserver extends BaseRegionObserver { @@ -98,6 +98,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK"; public static final String TX_SCN = "_TxScn"; public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow"; + public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS"; /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 4e019cd462c..0cce4d7223f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -21,6 +21,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.TimerTask; @@ -31,37 +32,53 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.phoenix.cache.GlobalCache; +import org.apache.phoenix.cache.ServerCacheClient; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.parse.AlterIndexStatement; +import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.UpgradeUtil; +import com.google.common.collect.Lists; + /** * Coprocessor for metadata related operations. This coprocessor would only be registered @@ -190,8 +207,6 @@ public void run() { // separately, all updating the same data. RegionScanner scanner = null; PhoenixConnection conn = null; - boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, - QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); if (inProgress.get() > 0) { LOG.debug("New ScheduledBuildIndexTask skipped as there is already one running"); return; @@ -213,9 +228,13 @@ public void run() { scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); + PTable dataPTable = null; + MetaDataClient client = null; boolean hasMore = false; List results = new ArrayList(); + List indexesToPartiallyRebuild = Collections.emptyList(); scanner = this.env.getRegion().getScanner(scan); + long earliestDisableTimestamp = Long.MAX_VALUE; do { results.clear(); @@ -226,16 +245,18 @@ public void run() { byte[] disabledTimeStamp = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); - Long disabledTimeStampVal = 0L; if (disabledTimeStamp == null || disabledTimeStamp.length == 0) { continue; } // disableTimeStamp has to be a positive value - disabledTimeStampVal = (Long) PLong.INSTANCE.toObject(disabledTimeStamp); + long disabledTimeStampVal = PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0, SortOrder.getDefault()); if (disabledTimeStampVal <= 0) { continue; } + if (disabledTimeStampVal < earliestDisableTimestamp) { + earliestDisableTimestamp = disabledTimeStampVal; + } byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); @@ -247,12 +268,6 @@ public void run() { continue; } - if (!blockWriteRebuildIndex && ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0) - && (Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexStat) != 0))) { - // index has to be either in disable or inactive state - continue; - } - byte[][] rowKeyMetaData = new byte[3][]; SchemaUtil.getVarChars(r.getRow(), 3, rowKeyMetaData); byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; @@ -266,34 +281,101 @@ public void run() { if (conn == null) { final Properties props = new Properties(); + props.setProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB, Boolean.TRUE.toString()); // Set SCN so that we don't ping server and have the upper bound set back to // the timestamp when the failure occurred. props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE)); // don't run a second index populations upsert select props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); conn = DriverManager.getConnection(getJdbcUrl(env), props).unwrap(PhoenixConnection.class); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable); + dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName); + indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size()); + client = new MetaDataClient(conn); } - String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable); String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable); - PTable dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName); PTable indexPTable = PhoenixRuntime.getTable(conn, indexTableFullName); if (!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) { LOG.debug("Index rebuild has been skipped because not all regions of index table=" + indexPTable.getName() + " are online."); continue; } + // Allow index to begin incremental maintenance as index is back online and we + // cannot transition directly from DISABLED -> ACTIVE + if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) == 0) { + AlterIndexStatement statement = new AlterIndexStatement( + NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()), + dataPTable.getTableName().getString(), + false, PIndexState.INACTIVE); + client.alterIndex(statement); + } + indexesToPartiallyRebuild.add(indexPTable); + } while (hasMore); - MetaDataClient client = new MetaDataClient(conn); + if (!indexesToPartiallyRebuild.isEmpty()) { long overlapTime = env.getConfiguration().getLong( QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME); - long timeStamp = Math.max(0, disabledTimeStampVal - overlapTime); + long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime); - LOG.info("Starting to build index=" + indexPTable.getName() + " from timestamp=" + timeStamp); - client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp), blockWriteRebuildIndex); - - } while (hasMore); + LOG.info("Starting to build indexes=" + indexesToPartiallyRebuild + " from timestamp=" + timeStamp); + Scan dataTableScan = new Scan(); + dataTableScan.setRaw(true); + dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP); + byte[] physicalTableName = dataPTable.getPhysicalName().getBytes(); + try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName)) { + Result result; + try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan)) { + int batchSize = conn.getMutateBatchSize(); + List mutations = Lists.newArrayListWithExpectedSize(batchSize); + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); + IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, conn); + byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); + byte[] uuidValue = ServerCacheClient.generateId(); + + while ((result = dataTableScanner.next()) != null && !result.isEmpty()) { + Put put = null; + Delete del = null; + for (Cell cell : result.rawCells()) { + if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { + if (put == null) { + put = new Put(CellUtil.cloneRow(cell)); + put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + mutations.add(put); + } + put.add(cell); + } else { + if (del == null) { + del = new Delete(CellUtil.cloneRow(cell)); + del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + mutations.add(del); + } + del.addDeleteMarker(cell); + } + } + if (mutations.size() == batchSize) { + dataHTable.batch(mutations); + uuidValue = ServerCacheClient.generateId(); + } + } + if (!mutations.isEmpty()) { + dataHTable.batch(mutations); + } + } + } + for (PTable indexPTable : indexesToPartiallyRebuild) { + AlterIndexStatement statement = new AlterIndexStatement( + NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()), + dataPTable.getTableName().getString(), + false, PIndexState.ACTIVE); + client.alterIndex(statement); + } + } } catch (Throwable t) { LOG.warn("ScheduledBuildIndexTask failed!", t); } finally { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index c4ed7a0678c..2739cc26672 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@ -36,8 +36,6 @@ import org.apache.phoenix.hbase.index.scanner.ScannerBuilder; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; -import com.google.inject.Key; - /** * Manage the state of the HRegion's view of the table, for the single row. *

@@ -108,7 +106,7 @@ public long getCurrentTimestamp() { public void setCurrentTimestamp(long timestamp) { this.ts = timestamp; } - + public void resetTrackedColumns() { this.trackedColumns.clear(); } @@ -139,6 +137,9 @@ public Set getTrackedColumns() { * request - you will never see a column with the timestamp we are tracking, but the next oldest * timestamp for that column. * @param indexedColumns the columns to that will be indexed + * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful + * when replaying mutation state for partial index rebuild where writes succeeded to the data + * table, but not to the index table. * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to * the builder. Even if no update is necessary for the requested columns, you still need * to return the {@link IndexUpdate}, just don't set the update for the @@ -146,8 +147,8 @@ public Set getTrackedColumns() { * @throws IOException */ public Pair getIndexedColumnsTableState( - Collection indexedColumns) throws IOException { - ensureLocalStateInitialized(indexedColumns); + Collection indexedColumns, boolean ignoreNewerMutations) throws IOException { + ensureLocalStateInitialized(indexedColumns, ignoreNewerMutations); // filter out things with a newer timestamp and track the column references to which it applies ColumnTracker tracker = new ColumnTracker(indexedColumns); synchronized (this.trackedColumns) { @@ -167,7 +168,7 @@ public Pair getIndexedColumnsTableState( * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even * then, there is still fairly low contention as each new Put/Delete will have its own table state. */ - private synchronized void ensureLocalStateInitialized(Collection columns) + private synchronized void ensureLocalStateInitialized(Collection columns, boolean ignoreNewerMutations) throws IOException { // check to see if we haven't initialized any columns yet Collection toCover = this.columnSet.findNonCoveredColumns(columns); @@ -175,7 +176,7 @@ private synchronized void ensureLocalStateInitialized(Collection values) { } @Override - public Pair getIndexUpdateState(Collection indexedColumns) + public Pair getIndexUpdateState(Collection indexedColumns, boolean ignoreNewerMutations) throws IOException { - Pair pair = getIndexedColumnsTableState(indexedColumns); + Pair pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations); ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey()); return new Pair(valueGetter, pair.getSecond()); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java index 0e961db2d50..bd4bdfbbb59 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -58,9 +57,13 @@ public interface TableState { /** * Get a getter interface for the state of the index row + * @param indexedColumns list of indexed columns. + * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful + * when replaying mutation state for partial index rebuild where writes succeeded to the data + * table, but not to the index table. */ Pair getIndexUpdateState( - Collection indexedColumns) throws IOException; + Collection indexedColumns, boolean ignoreNewerMutations) throws IOException; /** * @return the row key for the current row for which we are building an index update. diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java index 6d20c1862f6..99686276864 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; - import org.apache.phoenix.hbase.index.covered.update.ColumnReference; /** @@ -35,13 +34,16 @@ public interface LocalHBaseState { * @param m mutation for which we should get the current table state * @param toCover all the columns the current row state needs to cover; hint the underlying lookup * to save getting all the columns for the row + * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful + * when replaying mutation state for partial index rebuild where writes succeeded to the data + * table, but not to the index table. * @return the full state of the given row. Includes all current versions (even if they are not * usually visible to the client (unless they are also doing a raw scan)). Never returns a * null {@link Result} - instead, when there is not data for the row, returns a * {@link Result} with no stored {@link KeyValue}s. * @throws IOException if there is an issue reading the row */ - public Result getCurrentRowState(Mutation m, Collection toCover) + public Result getCurrentRowState(Mutation m, Collection toCover, boolean ignoreNewerMutations) throws IOException; } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java index 549fe8c6250..003df2ad141 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; - import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; @@ -52,13 +51,19 @@ public LocalTable(RegionCoprocessorEnvironment env) { } @Override - public Result getCurrentRowState(Mutation m, Collection columns) + public Result getCurrentRowState(Mutation m, Collection columns, boolean ignoreNewerMutations) throws IOException { byte[] row = m.getRow(); // need to use a scan here so we can get raw state, which Get doesn't provide. Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns)); s.setStartRow(row); s.setStopRow(row); + if (ignoreNewerMutations) { + // Provides a means of client indicating that newer cells should not be considered, + // enabling mutations to be replayed to partially rebuild the index when a write fails. + long ts = m.getFamilyCellMap().firstEntry().getValue().get(0).getTimestamp(); + s.setTimeRange(0,ts); + } Region region = this.env.getRegion(); RegionScanner scanner = region.getScanner(s); List kvs = new ArrayList(1); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java index 4efca9f8622..0f960e45919 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java @@ -77,7 +77,7 @@ public Iterable getIndexUpserts(TableState state, IndexMetaData con private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) { List refs = group.getColumns(); try { - Pair stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs); + Pair stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs, false); Scanner kvs = stateInfo.getFirst(); Pair> columns = getNextEntries(refs, kvs, state.getCurrentRowKey()); // make sure we close the scanner @@ -132,7 +132,7 @@ public Iterable getIndexDeletes(TableState state, IndexMetaData con private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) { List refs = group.getColumns(); try { - Pair kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs); + Pair kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs, false); Pair> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey()); // make sure we close the scanner reference kvs.getFirst().close(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java index e120268d50e..b4282abcd4c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java @@ -59,7 +59,6 @@ public ScannerBuilder(KeyValueStore memstore, Mutation update) { public Scanner buildIndexedColumnScanner(Collection indexedColumns, ColumnTracker tracker, long ts) { - // TODO: This needs to use some form of the filter that Tephra has when transactional Filter columnFilters = getColumnFilters(indexedColumns); FilterList filters = new FilterList(Lists.newArrayList(columnFilters)); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 4d545a282bb..13ad7e548ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -89,14 +89,14 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; -import co.cask.tephra.TxConstants; - import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import co.cask.tephra.TxConstants; + /** * * Class that builds index row key from data row key and current state of diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 7acc90cf96f..8ad4d3e93e7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -15,8 +15,8 @@ import java.util.Map; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; @@ -59,7 +59,8 @@ boolean hasIndexMaintainers(Map attributes) { @Override public Iterable getIndexUpserts(TableState state, IndexMetaData context) throws IOException { - List indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers(); + PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context; + List indexMaintainers = metaData.getIndexMaintainers(); if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) { return Collections.emptyList(); } @@ -67,7 +68,7 @@ public Iterable getIndexUpserts(TableState state, IndexMetaData con ptr.set(state.getCurrentRowKey()); List indexUpdates = Lists.newArrayList(); for (IndexMaintainer maintainer : indexMaintainers) { - Pair statePair = state.getIndexUpdateState(maintainer.getAllColumns()); + Pair statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations()); ValueGetter valueGetter = statePair.getFirst(); IndexUpdate indexUpdate = statePair.getSecond(); indexUpdate.setTable(maintainer.getIndexTableName()); @@ -81,7 +82,8 @@ public Iterable getIndexUpserts(TableState state, IndexMetaData con @Override public Iterable getIndexDeletes(TableState state, IndexMetaData context) throws IOException { - List indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers(); + PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context; + List indexMaintainers = metaData.getIndexMaintainers(); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); ptr.set(state.getCurrentRowKey()); List indexUpdates = Lists.newArrayList(); @@ -90,7 +92,7 @@ public Iterable getIndexDeletes(TableState state, IndexMetaData con // to aid in rollback if there's a KeyValue column in the index. The alternative would be // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the // client side. - Pair statePair = state.getIndexUpdateState(maintainer.getAllColumns()); + Pair statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations()); ValueGetter valueGetter = statePair.getFirst(); IndexUpdate indexUpdate = statePair.getSecond(); indexUpdate.setTable(maintainer.getIndexTableName()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index 60ae915aaa8..4fab674ad93 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -22,8 +22,6 @@ import java.util.List; import java.util.Map; -import co.cask.tephra.Transaction; - import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.cache.GlobalCache; @@ -39,9 +37,12 @@ import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ServerUtil; +import co.cask.tephra.Transaction; + public class PhoenixIndexMetaData implements IndexMetaData { private final Map attributes; private final IndexMetaDataCache indexMetaDataCache; + private final boolean ignoreNewerMutations; private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map attributes) throws IOException { if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } @@ -87,6 +88,7 @@ public Transaction getTransaction() { public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map attributes) throws IOException { this.indexMetaDataCache = getIndexMetaData(env, attributes); this.attributes = attributes; + this.ignoreNewerMutations = attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null; } public Transaction getTransaction() { @@ -100,4 +102,8 @@ public List getIndexMaintainers() { public Map getAttributes() { return attributes; } + + public boolean ignoreNewerMutations() { + return ignoreNewerMutations; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 26f97258dbb..e4c106ed132 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -486,7 +486,7 @@ public Collection getPendingUpdate() { } @Override - public Pair getIndexUpdateState(Collection indexedColumns) + public Pair getIndexUpdateState(Collection indexedColumns, boolean ignoreNewerMutations) throws IOException { // TODO: creating these objects over and over again is wasteful ColumnTracker tracker = new ColumnTracker(indexedColumns); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java index 22b02c5854c..2c33d21a297 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java @@ -47,6 +47,7 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesImpl; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.PhoenixRuntime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -224,20 +225,23 @@ protected ConnectionQueryServices getConnectionQueryServices(String url, Propert connectionQueryServices = prevValue; } } - boolean success = false; - SQLException sqlE = null; - try { - connectionQueryServices.init(url, info); - success = true; - } catch (SQLException e) { - sqlE = e; - } - finally { - if (!success) { - // Remove from map, as initialization failed - connectionQueryServicesMap.remove(normalizedConnInfo); - if (sqlE != null) { - throw sqlE; + String noUpgradeProp = info.getProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB); + if (!Boolean.TRUE.equals(noUpgradeProp)) { + boolean success = false; + SQLException sqlE = null; + try { + connectionQueryServices.init(url, info); + success = true; + } catch (SQLException e) { + sqlE = e; + } + finally { + if (!success) { + // Remove from map, as initialization failed + connectionQueryServicesMap.remove(normalizedConnInfo); + if (sqlE != null) { + throw sqlE; + } } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java index 4e0906f0c1b..d3b4505ca10 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java @@ -39,6 +39,14 @@ public static NamedTableNode create (String alias, TableName name, ListemptyList()); + } + + public static NamedTableNode create (String schemaName, String tableName) { + return new NamedTableNode(null, TableName.create(schemaName, tableName), Collections.emptyList()); + } + NamedTableNode(String alias, TableName name) { super(alias, name); dynColumns = Collections. emptyList(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 62297ee4d98..27c569314b6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -154,7 +154,7 @@ public class QueryServicesOptions { public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; // auto rebuild on public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false; public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 10000; // 10 secs - public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 300000; // 5 mins + public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 1; // 1 ms /** * HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 6409dcda4d5..7f3f850a963 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -204,8 +204,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import co.cask.tephra.TxConstants; - import com.google.common.base.Objects; import com.google.common.collect.Iterators; import com.google.common.collect.ListMultimap; @@ -214,6 +212,8 @@ import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import co.cask.tephra.TxConstants; + public class MetaDataClient { private static final Logger logger = LoggerFactory.getLogger(MetaDataClient.class); @@ -1095,36 +1095,6 @@ private String getFullTableName(TableRef dataTableRef) { return fullName; } - /** - * Rebuild indexes from a timestamp which is the value from hbase row key timestamp field - */ - public void buildPartialIndexFromTimeStamp(PTable index, TableRef dataTableRef, boolean blockWriteRebuildIndex) throws SQLException { - boolean needRestoreIndexState = true; - AlterIndexStatement indexStatement = null; - if (!blockWriteRebuildIndex) { - // Need to change index state from Disable to InActive when build index partially so that - // new changes will be indexed during index rebuilding - indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null, - TableName.create(index.getSchemaName().getString(), index.getTableName().getString())), - dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE); - alterIndex(indexStatement); - } - try { - buildIndex(index, dataTableRef); - needRestoreIndexState = false; - } finally { - if(needRestoreIndexState) { - if (!blockWriteRebuildIndex) { - // reset index state to disable - indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null, - TableName.create(index.getSchemaName().getString(), index.getTableName().getString())), - dataTableRef.getTable().getTableName().getString(), false, PIndexState.DISABLE); - alterIndex(indexStatement); - } - } - } - } - /** * Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling * MetaDataClient.createTable. In doing so, we perform the following translations: diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index bebdd8c4158..05ba6d22887 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -129,6 +129,11 @@ public class PhoenixRuntime { */ public static final String TENANT_ID_ATTRIB = "TenantId"; + /** + * Use this connection property to prevent an upgrade from occurring when + * connecting to a new server version. + */ + public static final String NO_UPGRADE_ATTRIB = "NoUpgrade"; /** * Use this connection property to control the number of rows that are * batched together on an UPSERT INTO table1... SELECT ... FROM table2. @@ -163,7 +168,8 @@ public class PhoenixRuntime { UPSERT_BATCH_SIZE_ATTRIB, AUTO_COMMIT_ATTRIB, CONSISTENCY_ATTRIB, - REQUEST_METRIC_ATTRIB + REQUEST_METRIC_ATTRIB, + NO_UPGRADE_ATTRIB }; /** diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java index fa8bd85a809..a2e45af4972 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java @@ -91,7 +91,7 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable { ColumnReference col = new ColumnReference(fam, qual); table.setCurrentTimestamp(ts); //check that our value still shows up first on scan, even though this is a lazy load - Pair p = table.getIndexedColumnsTableState(Arrays.asList(col)); + Pair p = table.getIndexedColumnsTableState(Arrays.asList(col), false); Scanner s = p.getFirst(); assertEquals("Didn't get the pending mutation's value first", m.get(fam, qual).get(0), s.next()); } @@ -135,13 +135,13 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable { ColumnReference col = new ColumnReference(fam, qual); table.setCurrentTimestamp(ts); // check that the value is there - Pair p = table.getIndexedColumnsTableState(Arrays.asList(col)); + Pair p = table.getIndexedColumnsTableState(Arrays.asList(col), false); Scanner s = p.getFirst(); assertEquals("Didn't get the pending mutation's value first", kv, s.next()); // rollback that value table.rollback(Arrays.asList(kv)); - p = table.getIndexedColumnsTableState(Arrays.asList(col)); + p = table.getIndexedColumnsTableState(Arrays.asList(col), false); s = p.getFirst(); assertEquals("Didn't correctly rollback the row - still found it!", null, s.next()); Mockito.verify(env, Mockito.times(1)).getRegion(); @@ -179,14 +179,14 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable { ColumnReference col = new ColumnReference(fam, qual); table.setCurrentTimestamp(ts); // check that the value is there - Pair p = table.getIndexedColumnsTableState(Arrays.asList(col)); + Pair p = table.getIndexedColumnsTableState(Arrays.asList(col), false); Scanner s = p.getFirst(); // make sure it read the table the one time assertEquals("Didn't get the stored keyvalue!", storedKv, s.next()); // on the second lookup it shouldn't access the underlying table again - the cached columns // should know they are done - p = table.getIndexedColumnsTableState(Arrays.asList(col)); + p = table.getIndexedColumnsTableState(Arrays.asList(col), false); s = p.getFirst(); assertEquals("Lost already loaded update!", storedKv, s.next()); Mockito.verify(env, Mockito.times(1)).getRegion(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java index fc3a97602fb..b8fa72da375 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java @@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.IndexCodec; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.LocalTableState; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; @@ -138,7 +138,7 @@ public SimpleTableState(Result r) { } @Override - public Result getCurrentRowState(Mutation m, Collection toCover) + public Result getCurrentRowState(Mutation m, Collection toCover, boolean preMutationStateOnly) throws IOException { return r; }