Skip to content

Commit

Permalink
PHOENIX-5211 Consistent Immutable Global Indexes for Non-Transactiona…
Browse files Browse the repository at this point in the history
…l Tables
  • Loading branch information
gokceni committed Jun 27, 2019
1 parent c4d75dd commit 2fb57ee
Show file tree
Hide file tree
Showing 9 changed files with 571 additions and 281 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
*/
package org.apache.phoenix.end2end;

import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -34,21 +31,18 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
Expand All @@ -60,6 +54,7 @@
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;

Expand All @@ -83,7 +78,6 @@ public IndexToolForPartialBuildIT() {
public static Map<String, String> getServerProperties() {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
Expand All @@ -105,14 +99,14 @@ public void testSecondaryIndex() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME);
final String indxTable = String.format("%s_IDX", dataTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString());
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
final Connection conn = DriverManager.getConnection(getUrl(), props);
Statement stmt = conn.createStatement();
try {
try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();){
if (isNamespaceEnabled) {
conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
}
Expand All @@ -121,25 +115,37 @@ public void testSecondaryIndex() throws Exception {
fullTableName, tableDDLOptions));
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
FailingRegionObserver.FAIL_WRITE = false;
// insert two rows
upsertRow(stmt1, 1000);
upsertRow(stmt1, 2000);

conn.commit();
stmt.execute(String.format("CREATE INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ", indxTable, fullTableName));
FailingRegionObserver.FAIL_WRITE = true;
upsertRow(stmt1, 3000);
upsertRow(stmt1, 4000);
upsertRow(stmt1, 5000);
try {
conn.commit();
fail();
} catch (SQLException e) {} catch (Exception e) {}
conn.commit();

// delete these indexes
PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
Table hTable = admin.getConnection().
getTable(TableName.valueOf(pindexTable.getPhysicalName().toString()));

Scan scan = new Scan();
ResultScanner scanner = hTable.getScanner(scan);
int cnt=0;
for (Result res = scanner.next(); res != null; res = scanner.next()) {
cnt++;
if (cnt > 2) {
hTable.delete(new Delete(res.getRow()));
}
}
hTable.close();
TestUtil.doMajorCompaction(conn, pindexTable.getPhysicalName().toString());

conn.createStatement()
.execute(String.format("ALTER INDEX %s on %s REBUILD ASYNC", indxTable, fullTableName));

FailingRegionObserver.FAIL_WRITE = false;
ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
Expand All @@ -150,21 +156,6 @@ public void testSecondaryIndex() throws Exception {
upsertRow(stmt1, 6000);
upsertRow(stmt1, 7000);
conn.commit();

rs = conn.createStatement()
.executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + ","
+ PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
+"\""+ SYSTEM_CATALOG_SCHEMA + "\"." + SYSTEM_CATALOG_TABLE + " ("
+ PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
+ PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
+ PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
rs.next();
PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
assertEquals(PIndexState.BUILDING, pindexTable.getIndexState());
assertEquals(rs.getLong(1), pindexTable.getTimeStamp());

//assert disabled timestamp
assertEquals(0, rs.getLong(2));

String selectSql = String.format("SELECT LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName);
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
Expand All @@ -173,7 +164,7 @@ public void testSecondaryIndex() throws Exception {
// assert we are pulling from data table.
assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, isNamespaceEnabled);

rs = stmt1.executeQuery(selectSql);
rs = stmt1.executeQuery(selectSql);
for (int i = 1; i <= 7; i++) {
assertTrue(rs.next());
assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
Expand Down Expand Up @@ -260,24 +251,4 @@ public static void upsertRow(PreparedStatement stmt, int i) throws SQLException
stmt.executeUpdate();
}


public static class FailingRegionObserver extends SimpleRegionObserver {
public static volatile boolean FAIL_WRITE = false;
public static final String INDEX_NAME = "IDX";
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
throw new DoNotRetryIOException();
}
Mutation operation = miniBatchOp.getOperation(0);
Set<byte[]> keySet = operation.getFamilyCellMap().keySet();
for(byte[] family: keySet) {
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
throw new DoNotRetryIOException();
}
}
}

}

}
Loading

0 comments on commit 2fb57ee

Please sign in to comment.