Skip to content

Commit

Permalink
PHOENIX-5211 Consistent Immutable Global Indexes for Non-Transactiona…
Browse files Browse the repository at this point in the history
…l Tables
  • Loading branch information
gokceni committed Jun 18, 2019
1 parent 934d3a3 commit 1c6679f
Show file tree
Hide file tree
Showing 6 changed files with 498 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
*/
package org.apache.phoenix.end2end.index;

import static org.apache.phoenix.end2end.IndexToolIT.assertExplainPlan;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.apache.phoenix.util.TestUtil.getRawRowCount;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
Expand All @@ -40,23 +44,38 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.execute.CommitException;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.transaction.PhoenixTransactionProvider;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
Expand Down Expand Up @@ -107,15 +126,16 @@ public static void doSetup() throws Exception {
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
clientProps.put(QueryServices.INDEX_POPULATION_SLEEP_TIME, "15000");
clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, "true");
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}

@Parameters(name="ImmutableIndexIT_localIndex={0},transactional={1},transactionProvider={2},columnEncoded={3}") // name is used by failsafe as file name in reports
public static Collection<Object[]> data() {
return TestUtil.filterTxParamData(
Arrays.asList(new Object[][] {
Arrays.asList(new Object[][] {
{ false, false, null, false }, { false, false, null, true },
{ false, true, "OMID", false },
{ false, true, "OMID", false },
{ false, true, "TEPHRA", false }, { false, true, "TEPHRA", true },
{ true, false, null, false }, { true, false, null, true },
{ true, true, "TEPHRA", false }, { true, true, "TEPHRA", true },
Expand Down Expand Up @@ -259,7 +279,150 @@ private void assertIndexMutations(Connection conn) throws SQLException {
(transactionProvider != null &&
transactionProvider.isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER)), iterator.hasNext());
}


private void createAndPopulateTableAndIndexForConsistentIndex(Connection conn, String tableName, String indexName, int numOfRowsToInsert)
throws Exception {
String ddl = "CREATE TABLE " + TABLE_NAME + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions;
INDEX_DDL =
"CREATE " + " INDEX IF NOT EXISTS " + SchemaUtil.getTableNameFromFullName(indexName)
+ " ON " + tableName + " (long_pk, varchar_pk)"
+ " INCLUDE (long_col1, long_col2) ";

conn.createStatement().execute(ddl);
conn.createStatement().execute(INDEX_DDL);
upsertRows(conn, tableName, numOfRowsToInsert);
conn.commit();

TestUtil.waitForIndexState(conn, indexName, PIndexState.ACTIVE);
}

@Test
public void testGlobalImmutableIndexCreate() throws Exception {
if (localIndex || transactionProvider != null) {
return;
}
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String tableName = "TBL_" + generateUniqueName();
String indexName = "IND_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
TABLE_NAME = fullTableName;
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
int numRows = 1;
createAndPopulateTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName,
numRows);

ResultSet rs;
rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
assertTrue(rs.next());
assertEquals(numRows, rs.getInt(1));
rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
assertTrue(rs.next());
assertEquals(numRows, rs.getInt(1));
assertEquals(true, verifyRowsForEmptyColValue(conn, fullIndexName, IndexRegionObserver.VERIFIED_BYTES));

// Now try to fail Phase1 and observe that index state is not DISABLED
try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
admin.disableTable(TableName.valueOf(fullIndexName));
boolean isWriteOnDisabledIndexFailed = false;
try {
upsertRows(conn, fullTableName, numRows);
} catch (CommitException ex) {
isWriteOnDisabledIndexFailed = true;
}
assertEquals(true, isWriteOnDisabledIndexFailed);
PIndexState indexState = TestUtil.getIndexState(conn, fullIndexName);
assertEquals(PIndexState.ACTIVE, indexState);
}
}
}

@Test
public void testGlobalImmutableIndexDelete() throws Exception {
if (localIndex || transactionProvider != null) {
return;
}
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String tableName = "TBL_" + generateUniqueName();
String indexName = "IND_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
TABLE_NAME = fullTableName;
try (Connection conn = DriverManager.getConnection(getUrl(), props);
Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
conn.setAutoCommit(true);
int numRows = 2;
createAndPopulateTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName, numRows);

String dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar1'";
conn.createStatement().execute(dml);
conn.commit();
ResultSet rs;
rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
assertTrue(rs.next());
assertEquals(numRows - 1, rs.getInt(1));
rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
assertTrue(rs.next());
assertEquals(numRows - 1, rs.getInt(1));

// Force delete to fail (data removed but operation failed) on data table and check index table row remains as unverified
TestUtil.addCoprocessor(conn, fullTableName, DeleteFailingRegionObserver.class);
dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar2'";
boolean isDeleteFailed = false;
try {
conn.createStatement().execute(dml);
} catch (Exception ex) {
isDeleteFailed = true;
}
assertEquals(true, isDeleteFailed);
TestUtil.removeCoprocessor(conn, fullTableName, DeleteFailingRegionObserver.class);
assertEquals(numRows - 1, getRawRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(Bytes.toBytes(fullIndexName)), false));
assertEquals(true, verifyRowsForEmptyColValue(conn, fullIndexName, IndexRegionObserver.UNVERIFIED_BYTES));

// Now delete via hbase, read from unverified index and see that we don't get any data
admin.disableTable(TableName.valueOf(fullTableName));
admin.truncateTable(TableName.valueOf(fullTableName), true);
String selectFromIndex = "SELECT long_pk, varchar_pk, long_col1 FROM " + TABLE_NAME + " WHERE varchar_pk='varchar2' AND long_pk=2";
rs =
conn.createStatement().executeQuery(
"EXPLAIN " + selectFromIndex);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
assertExplainPlan(false, actualExplainPlan, fullTableName, fullIndexName);

rs = conn.createStatement().executeQuery(selectFromIndex);
assertFalse(rs.next());
}
}

public static class DeleteFailingRegionObserver extends SimpleRegionObserver {
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws
IOException {
throw new DoNotRetryIOException();
}
}

public static boolean verifyRowsForEmptyColValue(Connection conn, String tableName, byte[] valueBytes)
throws IOException, SQLException {
PTable table = PhoenixRuntime.getTable(conn, tableName);
byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(table.getPhysicalName().getBytes());
Scan scan = new Scan();
scan.addColumn(emptyCF, emptyCQ);
ResultScanner resultScanner = htable.getScanner(scan);

for (Result result = resultScanner.next(); result != null; result = resultScanner.next()) {
if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, valueBytes.length,
valueBytes, 0, valueBytes.length) != 0) {
return false;
}
}
return true;
}


// This test is know to flap. We need PHOENIX-2582 to be fixed before enabling this back.
@Ignore
Expand Down
Loading

0 comments on commit 1c6679f

Please sign in to comment.