Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
Expand Down Expand Up @@ -66,6 +67,7 @@ public GlobalIndexCheckerIT(boolean async, boolean encoded) {
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}

Expand Down Expand Up @@ -318,18 +320,28 @@ public void testOnePhaseOverwrite() throws Exception {
// update phase) and check that this does not impact the correctness (one overwrite)
IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
conn.createStatement().execute("upsert into " + dataTableName + " (id, val2, val3) values ('a', 'abcc', 'abccc')");
commitWithException(conn);
IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE val1 = 'ab'";
// Read only one column and verify that this is sufficient for the read repair to fix
// all the columns of the unverified index row that was generated due to doing only one phase write above
String selectSql = "SELECT val2 from " + dataTableName + " WHERE val1 = 'ab'";
// Verify that we will read from the first index table
assertExplainPlan(conn, selectSql, dataTableName, indexTableName + "1");
// Verify that one phase write has no effect
ResultSet rs = conn.createStatement().executeQuery(selectSql);
assertTrue(rs.next());
assertEquals("abc", rs.getString(1));
assertEquals("abcd", rs.getString(2));
assertFalse(rs.next());
// Now read the other column and verify that it is also fixed
selectSql = "SELECT val3 from " + dataTableName + " WHERE val1 = 'ab'";
// Verify that we will read from the first index table
assertExplainPlan(conn, selectSql, dataTableName, indexTableName + "1");
// Verify that one phase write has no effect
rs = conn.createStatement().executeQuery(selectSql);
assertTrue(rs.next());
assertEquals("abcd", rs.getString(1));
assertFalse(rs.next());
selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE val2 = 'abcc'";
// Verify that we will read from the second index table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ public int getValue() {
* and used to verify individual rows and rebuild them if they are not valid
*/
private class GlobalIndexScanner implements RegionScanner {
RegionScanner scanner;
private RegionScanner scanner;
private RegionScanner deleteRowScanner;
private long ageThreshold;
private Scan scan;
private Scan indexScan;
private Scan deleteRowScan;
private Scan singleRowIndexScan;
private Scan buildIndexScan = null;
private Table dataHTable = null;
Expand Down Expand Up @@ -245,6 +247,14 @@ private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, long ts,
if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
Delete del = new Delete(indexRowKey, ts);
if (specific) {
// Get all the cells of this row
deleteRowScan.withStartRow(indexRowKey, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm...if you knew for sure what the CFs were you could probbaly do addFamilyVersion in a loop instead, possibly saving yourself a Scan. You wouldn't have to worry about dynamic columns (can't be indexed), but you'd need a PTable for getColumnFamilies, and to know for sure it was up to date as of the timestamp you're deleting.

I'm going to approve as-is, just thinking out loud here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is safer to retrieve the columns to find out their timestamps and delete markers for these timestamps, at least for now. We can think about your suggestion in future.

deleteRowScan.withStopRow(indexRowKey, true);
deleteRowScan.setTimeRange(0, ts + 1);
deleteRowScanner = region.getScanner(deleteRowScan);
row.clear();
deleteRowScanner.next(row);
deleteRowScanner.close();
// We are deleting a specific version of a row so the flowing loop is for that
for (Cell cell : row) {
del.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp());
Expand All @@ -260,6 +270,7 @@ private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) throws
if (buildIndexScan == null) {
buildIndexScan = new Scan();
indexScan = new Scan(scan);
deleteRowScan = new Scan();
singleRowIndexScan = new Scan(scan);
byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
byte[] indexTableName = region.getRegionInfo().getTable().getName();
Expand Down