Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-5211 Consistent Immutable Global Indexes for Non-Transactiona… #517

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
*/
package org.apache.phoenix.end2end;

import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -34,21 +31,18 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
Expand All @@ -60,6 +54,7 @@
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;

Expand All @@ -83,7 +78,6 @@ public IndexToolForPartialBuildIT() {
public static Map<String, String> getServerProperties() {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
Expand All @@ -105,14 +99,14 @@ public void testSecondaryIndex() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME);
final String indxTable = String.format("%s_IDX", dataTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString());
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
final Connection conn = DriverManager.getConnection(getUrl(), props);
Statement stmt = conn.createStatement();
try {
try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();){
if (isNamespaceEnabled) {
conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
}
Expand All @@ -121,25 +115,36 @@ public void testSecondaryIndex() throws Exception {
fullTableName, tableDDLOptions));
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
FailingRegionObserver.FAIL_WRITE = false;
// insert two rows
upsertRow(stmt1, 1000);
upsertRow(stmt1, 2000);

conn.commit();
stmt.execute(String.format("CREATE INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ", indxTable, fullTableName));
FailingRegionObserver.FAIL_WRITE = true;
upsertRow(stmt1, 3000);
upsertRow(stmt1, 4000);
upsertRow(stmt1, 5000);
try {
conn.commit();
fail();
} catch (SQLException e) {} catch (Exception e) {}
conn.commit();

// delete these indexes
PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
try (Table hTable = admin.getConnection().
getTable(TableName.valueOf(pindexTable.getPhysicalName().toString()));) {
Scan scan = new Scan();
ResultScanner scanner = hTable.getScanner(scan);
int cnt = 0;
for (Result res = scanner.next(); res != null; res = scanner.next()) {
cnt++;
if (cnt > 2) {
hTable.delete(new Delete(res.getRow()));
}
}
}
TestUtil.doMajorCompaction(conn, pindexTable.getPhysicalName().toString());

conn.createStatement()
.execute(String.format("ALTER INDEX %s on %s REBUILD ASYNC", indxTable, fullTableName));

FailingRegionObserver.FAIL_WRITE = false;
ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
Expand All @@ -150,21 +155,6 @@ public void testSecondaryIndex() throws Exception {
upsertRow(stmt1, 6000);
upsertRow(stmt1, 7000);
conn.commit();

rs = conn.createStatement()
.executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + ","
+ PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
+"\""+ SYSTEM_CATALOG_SCHEMA + "\"." + SYSTEM_CATALOG_TABLE + " ("
+ PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
+ PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
+ PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
rs.next();
PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
assertEquals(PIndexState.BUILDING, pindexTable.getIndexState());
assertEquals(rs.getLong(1), pindexTable.getTimeStamp());

//assert disabled timestamp
assertEquals(0, rs.getLong(2));

String selectSql = String.format("SELECT LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName);
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
Expand All @@ -173,7 +163,7 @@ public void testSecondaryIndex() throws Exception {
// assert we are pulling from data table.
assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, isNamespaceEnabled);

rs = stmt1.executeQuery(selectSql);
rs = stmt1.executeQuery(selectSql);
for (int i = 1; i <= 7; i++) {
assertTrue(rs.next());
assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
Expand Down Expand Up @@ -260,24 +250,4 @@ public static void upsertRow(PreparedStatement stmt, int i) throws SQLException
stmt.executeUpdate();
}


public static class FailingRegionObserver extends SimpleRegionObserver {
public static volatile boolean FAIL_WRITE = false;
public static final String INDEX_NAME = "IDX";
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
throw new DoNotRetryIOException();
}
Mutation operation = miniBatchOp.getOperation(0);
Set<byte[]> keySet = operation.getFamilyCellMap().keySet();
for(byte[] family: keySet) {
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
throw new DoNotRetryIOException();
}
}
}

}

}
Loading