Skip to content

Commit

Permalink
PHOENIX-2635 Partial index rebuild doesn't work for mutable data
Browse files Browse the repository at this point in the history
  • Loading branch information
jtaylor-sfdc committed Feb 15, 2016
1 parent e2a6386 commit 046bda3
Show file tree
Hide file tree
Showing 22 changed files with 368 additions and 385 deletions.

Large diffs are not rendered by default.

Expand Up @@ -27,19 +27,20 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.exception.SQLExceptionCode;
Expand All @@ -56,6 +57,9 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import com.google.common.collect.Maps;
/**
Expand All @@ -69,28 +73,37 @@
*/

@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
private static final String FAIL_ON_FIRST_PUT = "bbb";
public static volatile boolean FAIL_WRITE = false;
public static final String INDEX_NAME = "IDX";

private String tableName;
private String indexName;
private String fullTableName;
private String fullIndexName;
private final boolean localIndex;

public ReadOnlyIndexFailureIT() {
this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME;
this.indexName = "IDX";
public ReadOnlyIndexFailureIT(boolean localIndex) {
this.localIndex = localIndex;
this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME;
this.indexName = INDEX_NAME;
this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
}

@Parameters(name = "localIndex = {0}")
public static Collection<Boolean[]> data() {
return Arrays.asList(new Boolean[][] { { false }, { true } });
}

@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put("hbase.client.retries.number", "2");
serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
serverProps.put("hbase.client.pause", "5000");
serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, "0");
serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, "true");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
Expand All @@ -104,17 +117,12 @@ public static void doSetup() throws Exception {
new ReadOnlyProps(clientProps.entrySet().iterator()));
}

@Test
public void testWriteFailureReadOnlyLocalIndex() throws Exception {
helpTestWriteFailureReadOnlyIndex(true);
}

@Test
public void testWriteFailureReadOnlyIndex() throws Exception {
helpTestWriteFailureReadOnlyIndex(false);
helpTestWriteFailureReadOnlyIndex();
}

public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Exception {
public void helpTestWriteFailureReadOnlyIndex() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = driver.connect(url, props)) {
String query;
Expand All @@ -126,6 +134,7 @@ public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Excepti
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());

FAIL_WRITE = false;
if(localIndex) {
conn.createStatement().execute(
"CREATE LOCAL INDEX " + indexName + " ON " + fullTableName
Expand Down Expand Up @@ -157,9 +166,10 @@ public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Excepti
stmt.execute();
conn.commit();

FAIL_WRITE = true;
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "2");
stmt.setString(2, FAIL_ON_FIRST_PUT);
stmt.setString(2, "bbb");
stmt.setString(3, "b2");
stmt.execute();
try {
Expand Down Expand Up @@ -201,6 +211,7 @@ public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Excepti
assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), e.getErrorCode());
}

FAIL_WRITE = false;
// Second attempt at writing will succeed
int retries = 0;
do {
Expand All @@ -222,12 +233,12 @@ public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Excepti
conn.commit();

// verify index table has data
query = "SELECT count(1) FROM " + indexName;
query = "SELECT count(1) FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));

query = "SELECT v1 FROM " + fullTableName;
query = "SELECT /*+ INDEX(" + indexName + ") */ v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("aaa", rs.getString(1));
Expand Down Expand Up @@ -261,29 +272,13 @@ private static boolean hasIndexDisableTimestamp(Connection conn, String indexNam
return (!rs.wasNull() && ts > 0);
}


public static class FailingRegionObserver extends SimpleRegionObserver {
private Integer failCount = new Integer(0);

@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
final Durability durability) throws HBaseIOException {
if (shouldFailUpsert(c, put)) {
synchronized (failCount) {
failCount++;
if (failCount.intValue() == 1) {
// throwing anything other than instances of IOException result
// in this coprocessor being unloaded
// DoNotRetryIOException tells HBase not to retry this mutation
// multiple times
throw new DoNotRetryIOException();
}
}
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
throw new DoNotRetryIOException();
}
}

private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
return Bytes.contains(put.getRow(), Bytes.toBytes(FAIL_ON_FIRST_PUT));
}

}
}
Expand Up @@ -148,7 +148,7 @@ public ListMatchingVerifier(String msg, List<Cell> kvs, ColumnReference... colum
public void verify(TableState state) {
try {
Scanner kvs =
((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns)).getFirst();
((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns), false).getFirst();

int count = 0;
Cell kv;
Expand Down
Expand Up @@ -58,10 +58,10 @@
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;

import co.cask.tephra.Transaction;

import com.google.common.collect.ImmutableList;

import co.cask.tephra.Transaction;


abstract public class BaseScannerRegionObserver extends BaseRegionObserver {

Expand Down Expand Up @@ -98,6 +98,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK";
public static final String TX_SCN = "_TxScn";
public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS";

/**
* Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
Expand Down

0 comments on commit 046bda3

Please sign in to comment.