From a37980c01305843ae7436d6f086012f316f924b6 Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Sun, 12 Jan 2020 16:32:48 -0800 Subject: [PATCH] =?UTF-8?q?PHOENIX-5645=20-=20GlobalIndexChecker=20should?= =?UTF-8?q?=20prevent=20compaction=20from=20purg=E2=80=A6=20(#662)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PHOENIX-5645 - GlobalIndexChecker should prevent compaction from purging very recently deleted cells --- .../apache/phoenix/end2end/MaxLookbackIT.java | 350 ++++++++++++++++++ .../org/apache/phoenix/end2end/SCNIT.java | 72 +++- .../end2end/index/GlobalIndexCheckerIT.java | 68 ++-- .../hbase/regionserver/ScanInfoUtil.java | 86 ++++- .../apache/phoenix/compile/QueryCompiler.java | 51 +++ .../BaseScannerRegionObserver.java | 121 +++++- .../UngroupedAggregateRegionObserver.java | 9 +- .../phoenix/exception/SQLExceptionCode.java | 4 + .../phoenix/index/GlobalIndexChecker.java | 1 + .../apache/phoenix/util/TransactionUtil.java | 2 +- .../org/apache/phoenix/util/TestUtil.java | 136 ++++++- 11 files changed, 840 insertions(+), 60 deletions(-) create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java new file mode 100644 index 00000000000..9215b445523 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + + +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.ScanInfoUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; +import java.util.Properties; + +import static org.apache.phoenix.util.TestUtil.assertRawCellCount; +import static org.apache.phoenix.util.TestUtil.assertRawRowCount; +import static org.apache.phoenix.util.TestUtil.assertRowExistsAtSCN; +import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN; +import static org.apache.phoenix.util.TestUtil.assertTableHasTtl; +import static org.apache.phoenix.util.TestUtil.assertTableHasVersions; + +@NeedsOwnMiniClusterTest +public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT { + private static final Log LOG = LogFactory.getLog(MaxLookbackIT.class); + private static final int MAX_LOOKBACK_AGE = 10; + private static final int ROWS_POPULATED = 2; + private String tableDDLOptions; + private StringBuilder optionBuilder; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(1); + props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); + props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest(){ + optionBuilder = new StringBuilder(); + this.tableDDLOptions = optionBuilder.toString(); + } + + @Test + public void testTooLowSCNWithMaxLookbackAge() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexStem = generateUniqueName(); + createTableAndIndexes(conn, dataTableName, indexStem); + //need to sleep long enough for the SCN to still find the syscat row for the table + Thread.sleep(MAX_LOOKBACK_AGE * 1000 + 1000); + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, + Long.toString(EnvironmentEdgeManager.currentTime() - (MAX_LOOKBACK_AGE + 1) * 1000)); + try (Connection connscn = DriverManager.getConnection(getUrl(), props)) { + connscn.createStatement().executeQuery("select * from " + dataTableName); + } catch (SQLException se) { + SQLExceptionCode code = + SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE; + TestUtil.assertSqlExceptionCode(code, se); + return; + } + } + Assert.fail("We should have thrown an exception for the too-early SCN"); + } + + @Test(timeout=120000L) + public void testRecentlyDeletedRowsNotCompactedAway() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexStem = generateUniqueName(); + createTableAndIndexes(conn, dataTableName, indexStem); + String fullIndexName = indexStem + "1"; + TableName dataTable = TableName.valueOf(dataTableName); + TableName indexTable = TableName.valueOf(fullIndexName); + assertRawRowCount(conn, indexTable, ROWS_POPULATED); + assertTableHasTtl(conn, indexTable, Integer.MAX_VALUE); + long beforeDeleteSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis(); + Thread.sleep(1); //make sure we delete at a different ts + Statement stmt = conn.createStatement(); + stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id = 'a'"); + Assert.assertEquals(1, stmt.getUpdateCount()); + conn.commit(); + //select stmt to get row we deleted + String sql = String.format("SELECT * FROM %s WHERE val1 = 'ab'", dataTableName); + assertExplainPlan(conn, sql, dataTableName, fullIndexName); + int rowsPlusDeleteMarker = ROWS_POPULATED; + assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker); + assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true); + flush(dataTable); + flush(indexTable); + assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker); + assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true); + long beforeFirstCompactSCN = EnvironmentEdgeManager.currentTime(); + Thread.sleep(1); + majorCompact(indexTable, beforeFirstCompactSCN); + assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker); + assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true); + //wait for the lookback time. After this compactions should purge the deleted row + Thread.sleep(MAX_LOOKBACK_AGE * 1000); + long beforeSecondCompactSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis(); + String notDeletedRowSql = + String.format("SELECT * FROM %s WHERE val1 = 'bc'", dataTableName); + assertExplainPlan(conn, notDeletedRowSql, dataTableName, fullIndexName); + assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true); + assertRawRowCount(conn, indexTable, ROWS_POPULATED); + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + conn.createStatement().execute("upsert into " + dataTableName + + " values ('c', 'cd', 'cde', 'cdef')"); + conn.commit(); + majorCompact(indexTable, beforeSecondCompactSCN); + majorCompact(dataTable, beforeSecondCompactSCN); + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + //deleted row should be gone, but not deleted row should still be there. + assertRowExistsAtSCN(getUrl(), sql, beforeSecondCompactSCN, false); + assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true); + //1 deleted row should be gone + assertRawRowCount(conn, indexTable, ROWS_POPULATED); + } + } + + @Test(timeout=60000L) + public void testTTLAndMaxLookbackAge() throws Exception { + int ttl = 10; + optionBuilder.append("TTL=" + ttl); + tableDDLOptions = optionBuilder.toString(); + Configuration conf = getUtility().getConfiguration(); + //disable automatic memstore flushes + long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, + HRegion.DEFAULT_CACHE_FLUSH_INTERVAL); + conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexStem = generateUniqueName(); + createTableAndIndexes(conn, dataTableName, indexStem); + long afterFirstInsertSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis(); + TableName dataTable = TableName.valueOf(dataTableName); + assertTableHasTtl(conn, dataTable, ttl); + String fullIndexName = indexStem + "1"; + TableName indexTable = TableName.valueOf(fullIndexName); + assertTableHasTtl(conn, indexTable, ttl); + + //first make sure we inserted correctly + String sql = String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName); + assertExplainPlan(conn, sql, dataTableName, fullIndexName); + assertRowExistsAtSCN(getUrl(),sql, afterFirstInsertSCN, true); + int originalRowCount = 2; + assertRawRowCount(conn, indexTable, originalRowCount); + //force a flush + flush(indexTable); + //flush shouldn't have changed it + assertRawRowCount(conn, indexTable, originalRowCount); + //now wait the TTL + Thread.sleep((ttl +1) * 1000); + long afterTTLExpiresSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis(); + assertExplainPlan(conn, sql, dataTableName, fullIndexName); + //make sure we can't see it after expiration from masking + assertRowExistsAtSCN(getUrl(), sql, afterTTLExpiresSCN, false); + //but it's still on disk + assertRawRowCount(conn, indexTable, originalRowCount); + long beforeMajorCompactSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis(); + majorCompact(indexTable, beforeMajorCompactSCN); + assertRawRowCount(conn, indexTable, 0); + } finally{ + conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, oldMemstoreFlushInterval); + } + } + + @Test + public void testRecentMaxVersionsNotCompactedAway() throws Exception { + int versions = 2; + optionBuilder.append("VERSIONS=" + versions); + tableDDLOptions = optionBuilder.toString(); + String firstValue = "abc"; + String secondValue = "def"; + String thirdValue = "ghi"; + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexStem = generateUniqueName(); + createTableAndIndexes(conn, dataTableName, indexStem, versions); + long afterInsertSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis(); + //make sure table and index metadata is set up right for versions + TableName dataTable = TableName.valueOf(dataTableName); + assertTableHasVersions(conn, dataTable, versions); + String fullIndexName = indexStem + "1"; + TableName indexTable = TableName.valueOf(fullIndexName); + assertTableHasVersions(conn, indexTable, versions); + //check query optimizer is doing what we expect + String dataTableSelectSql = + String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName); + String indexTableSelectSql = + String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName); + assertExplainPlan(conn, indexTableSelectSql, dataTableName, fullIndexName); + //make sure the data was inserted correctly in the first place + assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, afterInsertSCN, firstValue); + assertRowHasExpectedValueAtSCN(getUrl(), indexTableSelectSql, afterInsertSCN, firstValue); + //force first update to get a distinct ts + Thread.sleep(1); + updateColumn(conn, dataTableName, "id", "a", "val2", secondValue); + long afterFirstUpdateSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis(); + //force second update to get a distinct ts + Thread.sleep(1); + updateColumn(conn, dataTableName, "id", "a", "val2", thirdValue); + long afterSecondUpdateSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis(); + //check to make sure we can see all three versions at the appropriate times + String[] allValues = {firstValue, secondValue, thirdValue}; + long[] allSCNs = {afterInsertSCN, afterFirstUpdateSCN, afterSecondUpdateSCN}; + assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs); + assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs); + + flush(dataTable); + flush(indexTable); + //after flush, check to make sure we can see all three versions at the appropriate times + assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs); + assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs); + majorCompact(dataTable, afterSecondUpdateSCN); + majorCompact(indexTable, afterSecondUpdateSCN); + //after major compaction, check to make sure we can see all three versions + // at the appropriate times + assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs); + assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs); + Thread.sleep(MAX_LOOKBACK_AGE * 1000); + long afterLookbackAgeSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis(); + majorCompact(dataTable, afterLookbackAgeSCN); + majorCompact(indexTable, afterLookbackAgeSCN); + //empty column, 1 version of val 1, 3 versions of val2, 1 version of val3 = 6 + assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6); + //2 versions of empty column, 2 versions of val2, + // 2 versions of val3 (since we write whole rows to index) = 6 + assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 6); + //empty column + 1 version each of val1,2 and 3 = 4 + assertRawCellCount(conn, dataTable, Bytes.toBytes("b"), 4); + //1 version of empty column, 1 version of val2, 1 version of val3 = 3 + assertRawCellCount(conn, indexTable, Bytes.toBytes("bc\u0000b"), 3); + } + } + + private void flush(TableName table) throws IOException { + Admin admin = getUtility().getHBaseAdmin(); + admin.flush(table); + } + + private void majorCompact(TableName table, long compactionRequestedSCN) throws Exception { + Admin admin = getUtility().getHBaseAdmin(); + admin.majorCompact(table); + long lastCompactionTimestamp; + AdminProtos.GetRegionInfoResponse.CompactionState state = null; + while ((lastCompactionTimestamp = admin.getLastMajorCompactionTimestamp(table)) < compactionRequestedSCN + || (state = admin.getCompactionState(table)). + equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR)){ + if (LOG.isTraceEnabled()) { + LOG.trace("Last compaction time:" + lastCompactionTimestamp); + LOG.trace("CompactionState: " + state); + } + Thread.sleep(100); + } + } + + public static void assertExplainPlan(Connection conn, String selectSql, + String dataTableFullName, String indexTableFullName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + IndexToolIT.assertExplainPlan(false, actualExplainPlan, dataTableFullName, indexTableFullName); + } + + private void assertMultiVersionLookbacks(String dataTableSelectSql, + String[] values, long[] scns) + throws Exception { + //make sure we can still look back after updating + for (int k = 0; k < values.length; k++){ + assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, scns[k], values[k]); + } + } + + private void updateColumn(Connection conn, String dataTableName, + String idColumn, String id, String valueColumn, String value) + throws SQLException { + String upsertSql = String.format("UPSERT INTO %s (%s, %s) VALUES ('%s', '%s')", + dataTableName, idColumn, valueColumn, id, value); + conn.createStatement().execute(upsertSql); + conn.commit(); + } + + private void createTableAndIndexes(Connection conn, String dataTableName, + String indexTableName) throws Exception { + createTableAndIndexes(conn, dataTableName, indexTableName, 1); + } + + private void createTableAndIndexes(Connection conn, String dataTableName, + String indexTableName, int indexVersions) throws Exception { + populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " + + dataTableName + " (val1) include (val2, val3)" + + " VERSIONS=" + indexVersions); + conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " + + dataTableName + " (val2) include (val1, val3)" + + " VERSIONS=" + indexVersions); + conn.commit(); + } + + private void populateTable(String tableName) throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + String createSql = "create table " + tableName + + " (id varchar(10) not null primary key, val1 varchar(10), " + + "val2 varchar(10), val3 varchar(10))" + tableDDLOptions; + conn.createStatement().execute(createSql); + conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + conn.close(); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java index 6c45b066cc4..d79752e10ff 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java @@ -24,11 +24,17 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.ScanInfoUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SchemaUtil; +import org.junit.Assert; import org.junit.Test; public class SCNIT extends ParallelStatsDisabledIT { @@ -67,7 +73,7 @@ public void testReadBeforeDelete() throws Exception { rs.close(); } props.clear(); - props.setProperty("CurrentSCN", Long.toString(timeAfterDelete)); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timeAfterDelete)); try (Connection connscn = DriverManager.getConnection(getUrl(), props)) { ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName); assertTrue(rs.next()); @@ -82,28 +88,68 @@ public void testReadBeforeDelete() throws Exception { @Test public void testSCNWithTTL() throws Exception { + int ttl = 2; + String fullTableName = createTableWithTTL(ttl); + //sleep for one second longer than ttl + Thread.sleep(ttl * 1000 + 1000); + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, + Long.toString(EnvironmentEdgeManager.currentTime() - 1000)); + try (Connection connscn = DriverManager.getConnection(getUrl(), props)) { + ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName); + assertFalse(rs.next()); + rs.close(); + } + } + + @Test + public void testTooLowSCNWithTTL() throws Exception { + //if scn is for an older time than a table's ttl, it should throw a SQLException + int ttl = 2; + String fullTableName = createTableWithTTL(ttl); + int sleepTime = (ttl + 1)* 1000; + //need to sleep long enough for the SCN to still find the syscat row for the table + Thread.sleep(sleepTime); + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, + Long.toString(EnvironmentEdgeManager.currentTime() - sleepTime)); + try (Connection connscn = DriverManager.getConnection(getUrl(), props)) { + connscn.createStatement(). + executeQuery(String.format("select * from %s", fullTableName)); + } catch (SQLException se){ + SQLExceptionCode code = SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL; + assertSqlExceptionCode(code, se); + return; + } + Assert.fail("We should have thrown an exception for the too-early SCN"); + } + + private void assertSqlExceptionCode(SQLExceptionCode code, SQLException se) { + assertEquals(code.getErrorCode(), se.getErrorCode()); + assertTrue("Wrong error message", se.getMessage().contains(code.getMessage())); + assertEquals(code.getSQLState(), se.getSQLState()); + } + + private String createTableWithTTL(int ttl) throws SQLException, InterruptedException { String schemaName = generateUniqueName(); String tableName = generateUniqueName(); + StringBuilder optionsBuilder = new StringBuilder(); + if (ttl > 0){ + optionsBuilder.append("TTL="); + optionsBuilder.append(ttl); + } + String ddlOptions = optionsBuilder.toString(); String fullTableName = SchemaUtil.getTableName(schemaName, tableName); try (Connection conn = DriverManager.getConnection(getUrl())) { conn.createStatement() - .execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) TTL=2"); + .execute(String.format("CREATE TABLE %s" + + "(k VARCHAR PRIMARY KEY, v VARCHAR) %s", fullTableName, ddlOptions)); conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','aa')"); conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')"); conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('c','cc')"); conn.commit(); - // TTL is 2 sec - Thread.sleep(3000); - } - - Properties props = new Properties(); - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, - Long.toString(EnvironmentEdgeManager.currentTime() - 1000)); - try (Connection connscn = DriverManager.getConnection(getUrl(), props)) { - ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName); - assertFalse(rs.next()); - rs.close(); } + return fullTableName; } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java index 58613239759..a8258268252 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java @@ -42,6 +42,7 @@ import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -53,15 +54,12 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { private static final Log LOG = LogFactory.getLog(GlobalIndexCheckerIT.class); private final boolean async; - private final String tableDDLOptions; - + private String tableDDLOptions; + private StringBuilder optionBuilder; + private final boolean encoded; public GlobalIndexCheckerIT(boolean async, boolean encoded) { this.async = async; - StringBuilder optionBuilder = new StringBuilder(); - if (!encoded) { - optionBuilder.append(" COLUMN_ENCODED_BYTES=0 "); - } - this.tableDDLOptions = optionBuilder.toString(); + this.encoded = encoded; } @BeforeClass @@ -71,6 +69,15 @@ public static synchronized void doSetup() throws Exception { setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } + @Before + public void beforeTest(){ + optionBuilder = new StringBuilder(); + if (!encoded) { + optionBuilder.append(" COLUMN_ENCODED_BYTES=0"); + } + this.tableDDLOptions = optionBuilder.toString(); + } + @Parameters( name = "async={0},encoded={1}") public static synchronized Collection data() { @@ -305,17 +312,8 @@ public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception { public void testOnePhaseOverwrite() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl())) { String dataTableName = generateUniqueName(); - populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') String indexTableName = generateUniqueName(); - conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " + - dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); - conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " + - dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")); - if (async) { - // run the index MR job. - IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1"); - IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2"); - } + createTableAndIndexes(conn, dataTableName, indexTableName); // Configure IndexRegionObserver to skip the last two write phase (i.e., the data table update and post index // update phase) and check that this does not impact the correctness (one overwrite) IndexRegionObserver.setFailDataTableUpdatesForTesting(true); @@ -385,21 +383,35 @@ public void testOnePhaseOverwrite() throws Exception { } } + private void createTableAndIndexes(Connection conn, String dataTableName, + String indexTableName) throws Exception { + createTableAndIndexes(conn, dataTableName, indexTableName, 1); + } + + private void createTableAndIndexes(Connection conn, String dataTableName, + String indexTableName, int indexVersions) throws Exception { + populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " + + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") + + " VERSIONS=" + indexVersions); + conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " + + dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")+ + " VERSIONS=" + indexVersions); + conn.commit(); + if (async) { + // run the index MR job. + IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1"); + IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2"); + } + } + @Test public void testFailDataTableAndPostIndexRowUpdate() throws Exception { - String dataTableName = generateUniqueName(); - populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); String indexName = generateUniqueName(); - conn.createStatement().execute("CREATE INDEX " + indexName + "1 on " + - dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); - conn.createStatement().execute("CREATE INDEX " + indexName + "2 on " + - dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")); - if (async) { - // run the index MR job. - IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "1"); - IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "2"); - } + createTableAndIndexes(conn, dataTableName, indexName); // Configure IndexRegionObserver to fail the last two write phase (i.e., the data table update and post index update phase) // and check that this does not impact the correctness IndexRegionObserver.setFailDataTableUpdatesForTesting(true); diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java index e0d62a276d1..6854a7548f4 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java @@ -20,28 +20,108 @@ import java.io.IOException; import java.util.NavigableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.client.Scan; public class ScanInfoUtil { + public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY = + "phoenix.max.lookback.age.seconds"; + public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0; + private ScanInfoUtil() { } - + public static boolean isKeepDeletedCells(ScanInfo scanInfo) { return scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE; } - + public static ScanInfo cloneScanInfoWithKeepDeletedCells(ScanInfo scanInfo) { return new ScanInfo(scanInfo.getConfiguration(), scanInfo.getFamily(), scanInfo.getMinVersions(), scanInfo.getMaxVersions(), scanInfo.getTtl(), KeepDeletedCells.TRUE, scanInfo.getTimeToPurgeDeletes(), scanInfo.getComparator()); } - public static StoreScanner createStoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet columns,long readPt) throws IOException { + public static StoreScanner createStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + final NavigableSet columns,long readPt) + throws IOException { if(!scan.isReversed()) { return new StoreScanner(store, scanInfo, scan, columns,readPt); } else { return new ReversedStoreScanner(store, scanInfo, scan, columns,readPt); } } + + public static long getTimeToLiveForCompactions(HColumnDescriptor columnDescriptor, + ScanInfo scanInfo) { + long ttl = scanInfo.getTtl(); + long maxLookbackTtl = getMaxLookback(scanInfo.getConfiguration()); + if (isMaxLookbackTimeEnabled(maxLookbackTtl)) { + if (ttl == Long.MAX_VALUE + && columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) { + // If user configured default TTL(FOREVER) and keep deleted cells to false or + // TTL then to remove unwanted delete markers we should change ttl to max lookback age + ttl = maxLookbackTtl; + } else { + //if there is a TTL, use TTL instead of max lookback age. + // Max lookback age should be more recent or equal to TTL + ttl = Math.max(ttl, maxLookbackTtl); + } + } + + return ttl; + } + + /* + * If KeepDeletedCells.FALSE, KeepDeletedCells.TTL , + * let delete markers age once lookback age is done. + */ + private static KeepDeletedCells getKeepDeletedCells(final Store store, ScanType scanType) { + //if we're doing a minor compaction or flush, always set keep deleted cells + //to true. Otherwise, if keep deleted cells is false or TTL, use KeepDeletedCells TTL, + //where the value of the ttl might be overriden to the max lookback age elsewhere + return (store.getFamily().getKeepDeletedCells() == KeepDeletedCells.TRUE + || scanType.equals(ScanType.COMPACT_RETAIN_DELETES)) ? + KeepDeletedCells.TRUE : KeepDeletedCells.TTL; + } + + /* + * if the user set a TTL we should leave MIN_VERSIONS at the default (0 in most of the cases). + * Otherwise the data (1st version) will not be removed after the TTL. If no TTL, we want + * Math.max(maxVersions, minVersions, 1) + */ + private static int getMinVersions(ScanInfo oldScanInfo, final Store store) { + return oldScanInfo.getTtl() != Long.MAX_VALUE ? store.getFamily().getMinVersions() + : Math.max(Math.max(store.getFamily().getMinVersions(), + store.getFamily().getMaxVersions()),1); + } + + public static ScanInfo getScanInfoForFlushesAndCompactions(Configuration conf, + ScanInfo oldScanInfo, + final Store store, + ScanType type) { + long ttl = getTimeToLiveForCompactions(store.getFamily(), oldScanInfo); + KeepDeletedCells keepDeletedCells = getKeepDeletedCells(store, type); + int minVersions = getMinVersions(oldScanInfo, store); + return new ScanInfo(conf,store.getFamily().getName(), minVersions, + Integer.MAX_VALUE, ttl, keepDeletedCells, + oldScanInfo.getTimeToPurgeDeletes(), + oldScanInfo.getComparator()); + } + + private static long getMaxLookback(Configuration conf){ + //config param is in seconds, switch to millis + return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000; + } + + public static boolean isMaxLookbackTimeEnabled(Configuration conf){ + return isMaxLookbackTimeEnabled(conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + DEFAULT_PHOENIX_MAX_LOOKBACK_AGE)); + } + + private static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){ + return maxLookbackTime > 0L; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index e149c775214..4722bd645f8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -28,13 +28,18 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.ScanInfoUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.JoinCompiler.JoinSpec; import org.apache.phoenix.compile.JoinCompiler.JoinTable; import org.apache.phoenix.compile.JoinCompiler.Table; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.ClientAggregatePlan; import org.apache.phoenix.execute.ClientScanPlan; @@ -68,6 +73,7 @@ import org.apache.phoenix.parse.SubqueryParseNode; import org.apache.phoenix.parse.TableNode; import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.AmbiguousColumnException; @@ -76,6 +82,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ScanUtil; @@ -150,6 +157,7 @@ public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnR * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables */ public QueryPlan compile() throws SQLException{ + verifySCN(); QueryPlan plan; if (select.isUnion()) { plan = compileUnionAll(select); @@ -159,6 +167,49 @@ public QueryPlan compile() throws SQLException{ return plan; } + private void verifySCN() throws SQLException { + PhoenixConnection conn = statement.getConnection(); + Long scn = conn.getSCN(); + if (scn == null) { + return; + } + List scnTooOldTableRefs = new ArrayList(); + ColumnResolver resolver = + FromCompiler.getResolverForQuery(select, conn); + List involvedTables = resolver.getTables(); + int maxLookBackAge = conn.getQueryServices(). + getConfiguration().getInt(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + ScanInfoUtil.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE); + long now = EnvironmentEdgeManager.currentTimeMillis(); + if (maxLookBackAge > 0 && now - maxLookBackAge * 1000L > scn){ + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE) + .build().buildException(); + } + for (TableRef tableRef : involvedTables) { + byte[] tableQualifier = tableRef.getTable().getPhysicalName().getBytes(); + //we can have a tableRef with an empty table, such as with sequences + if (tableQualifier.length > 0) { + HTableDescriptor td = conn.getQueryServices().getTableDescriptor(tableQualifier); + HColumnDescriptor cd = td.getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES); + now = EnvironmentEdgeManager.currentTimeMillis(); + if (now - cd.getTimeToLive() * 1000L > scn) { + scnTooOldTableRefs.add(tableRef); + } + } + } + if (scnTooOldTableRefs.size() > 0) { + TableRef tableRef = scnTooOldTableRefs.get(0); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL) + .setSchemaName(tableRef.getTable().getSchemaName().getString()) + .setTableName(tableRef.getTable().getTableName().getString()) + .build() + .buildException(); + } + + } + public QueryPlan compileUnionAll(SelectStatement select) throws SQLException { List unionAllSelects = select.getSelects(); List plans = new ArrayList(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 585c17f78eb..71e754672d9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -18,13 +18,18 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.NavigableSet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; @@ -32,14 +37,17 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanInfoUtil; +import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.htrace.Span; import org.apache.htrace.Trace; @@ -56,6 +64,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { + private static final Log LOG = LogFactory.getLog(BaseScannerRegionObserver.class); public static final String AGGREGATORS = "_Aggs"; public static final String UNORDERED_GROUP_BY_EXPRESSIONS = "_UnorderedGroupByExpressions"; @@ -121,7 +130,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { // In case of Index Write failure, we need to determine that Index mutation // is part of normal client write or Index Rebuilder. # PHOENIX-5080 public final static byte[] REPLAY_INDEX_REBUILD_WRITES = PUnsignedTinyint.INSTANCE.toBytes(3); - public enum ReplayWrite { TABLE_AND_INDEX, INDEX_ONLY, @@ -374,20 +382,107 @@ RegionScanner getWrappedScanner(final ObserverContext c, - final Store store, final Scan scan, final NavigableSet targetCols, - final KeyValueScanner s) throws IOException { + final Store store, final Scan scan, + final NavigableSet targetCols, + final KeyValueScanner s) throws IOException { + if (storeFileScanDoesntNeedAlteration(store, scan)) { + return s; + } - if (scan.isRaw() || ScanInfoUtil.isKeepDeletedCells(store.getScanInfo()) || scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP || TransactionUtil.isTransactionalTimestamp(scan.getTimeRange().getMax())) { - return s; - } - - if (s!=null) { - s.close(); - } - ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo()); - return ScanInfoUtil.createStoreScanner(store, scanInfo, scan, targetCols, - c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel())); + if (s != null) { + s.close(); + } + ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo()); + return ScanInfoUtil.createStoreScanner(store, scanInfo, scan, targetCols, + c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel())); + } + + private boolean storeFileScanDoesntNeedAlteration(Store store, Scan scan) { + boolean isRaw = scan.isRaw(); + //true if keep deleted cells is either TRUE or TTL + boolean keepDeletedCells = ScanInfoUtil.isKeepDeletedCells(store.getScanInfo()); + boolean timeRangeIsLatest = scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP; + boolean timestampIsTransactional = + TransactionUtil.isTransactionalTimestamp(scan.getTimeRange().getMax()); + return isRaw + || keepDeletedCells + || timeRangeIsLatest + || timestampIsTransactional; + } + + @Override + public InternalScanner preFlushScannerOpen(final ObserverContext c, + final Store store, + final KeyValueScanner memstoreScanner, + final InternalScanner s) + throws IOException { + + if (!ScanInfoUtil.isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){ + return s; + } + + //close last scanner object before creating a new one + if(s != null) { + s.close(); + } + + // Called during flushing the memstore to disk. + // Need to retain all the delete markers & all the versions + Scan scan = new Scan(); + scan.setMaxVersions(Integer.MAX_VALUE); + ScanInfo oldScanInfo = store.getScanInfo(); + + Configuration conf = c.getEnvironment().getConfiguration(); + //minor compactions and flushes both use "compact retain deletes" + ScanType scanType = ScanType.COMPACT_RETAIN_DELETES; + ScanInfo scanInfo = + ScanInfoUtil.getScanInfoForFlushesAndCompactions(conf, oldScanInfo, store, scanType); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating the store scanner with :" + scanInfo + ", " + + "scan object:" + scan + " for table " + store.getTableName().getNameAsString() + + " and region " + store.getRegionInfo().getRegionNameAsString() + + " and cf " + store.getColumnFamilyName()); + } + return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + scanType, store.getSmallestReadPoint(), + HConstants.LATEST_TIMESTAMP); + } + + @Override + public InternalScanner preCompactScannerOpen( + final ObserverContext c, final Store store, + List scanners, final ScanType scanType, final long earliestPutTs, + final InternalScanner s) throws IOException { + + if (!ScanInfoUtil.isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){ + return s; + } + //close last scanner object before creating a new one + if(s != null) { + s.close(); + } + Scan scan = new Scan(); + scan.setMaxVersions(Integer.MAX_VALUE); + ScanInfo oldScanInfo = store.getScanInfo(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Compaction triggering for table:" + + store.getRegionInfo().getTable().toString() + + " with scanType " + scanType + " for table " + + store.getTableName().getNameAsString() + " and region " + + store.getRegionInfo().getRegionNameAsString() + + " and cf " + store.getColumnFamilyName()); + } + + Configuration conf = c.getEnvironment().getConfiguration(); + ScanInfo scanInfo = + ScanInfoUtil.getScanInfoForFlushesAndCompactions(conf, oldScanInfo, + store, scanType); + return new StoreScanner(store, scanInfo, scan, scanners, scanType, + store.getSmallestReadPoint(), + earliestPutTs); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 23091a85743..347dd010861 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfoUtil; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; @@ -1546,9 +1547,15 @@ public InternalScanner preCompactScannerOpen(final ObserverContext() { @Override public InternalScanner run() throws Exception { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 2f072a36fa8..2bd9ff38017 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -191,6 +191,10 @@ public SQLException newException(SQLExceptionInfo info) { UNEQUAL_SCN_AND_BUILD_INDEX_AT(534, "42911", "If both specified, values of CURRENT_SCN and BUILD_INDEX_AT must be equal."), ONLY_INDEX_UPDATABLE_AT_SCN(535, "42912", "Only an index may be updated when the BUILD_INDEX_AT property is specified"), PARENT_TABLE_NOT_FOUND(536, "42913", "Can't drop the index because the parent table in the DROP statement is incorrect."), + CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL(537, "42914", + "Cannot use SCN to look further back in the past beyond the TTL"), + CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE(538, "42915", + "Cannot use SCN to look further back in the past beyond the configured max lookback age"), /** * HBase and Phoenix specific implementation defined sub-classes. diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index c7f79aed753..0e502ade40f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -534,4 +534,5 @@ public void start(CoprocessorEnvironment e) throws IOException { public void stop(CoprocessorEnvironment e) throws IOException { this.hTableFactory.shutdown(); } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index dee02d14439..6a4a8f3a3b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -51,7 +51,7 @@ private TransactionUtil() { } public static boolean isTransactionalTimestamp(long ts) { - return ts >= MAX_NON_TX_TIMESTAMP; + return ts >= MAX_NON_TX_TIMESTAMP && ts != HConstants.LATEST_TIMESTAMP; } public static boolean isDelete(Cell cell) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index d22971d94eb..9fa6422151d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -49,14 +49,20 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -87,6 +93,7 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; +import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.expression.AndExpression; import org.apache.phoenix.expression.ByteBasedLikeExpression; @@ -137,6 +144,7 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.transaction.TransactionFactory; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -866,6 +874,13 @@ public static void createTransactionalTable(Connection conn, String tableName, S conn.createStatement().execute("create table " + tableName + TestUtil.TEST_TABLE_SCHEMA + "TRANSACTIONAL=true" + (extraProps.length() == 0 ? "" : ("," + extraProps))); } + public static void dumpTable(Connection conn, TableName tableName) + throws SQLException, IOException{ + ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices(); + HTableInterface table = cqs.getTable(tableName.getName()); + dumpTable(table); + } + public static void dumpTable(HTableInterface table) throws IOException { System.out.println("************ dumping " + table + " **************"); Scan s = new Scan(); @@ -878,7 +893,9 @@ public static void dumpTable(HTableInterface table) throws IOException { Cell current = null; while (cellScanner.advance()) { current = cellScanner.current(); - System.out.println(current); + System.out.println(current + "column= " + + Bytes.toString(CellUtil.cloneQualifier(current)) + + " val=" + Bytes.toString(CellUtil.cloneValue(current))); } } } @@ -908,6 +925,46 @@ public static int getRowCount(Table table, boolean isRaw) throws IOException { return rows; } + public static CellCount getCellCount(Table table, boolean isRaw) throws IOException { + Scan s = new Scan(); + s.setRaw(isRaw);; + s.setMaxVersions(); + + CellCount cellCount = new CellCount(); + try (ResultScanner scanner = table.getScanner(s)) { + Result result = null; + while ((result = scanner.next()) != null) { + CellScanner cellScanner = result.cellScanner(); + Cell current = null; + while (cellScanner.advance()) { + current = cellScanner.current(); + cellCount.addCell(Bytes.toString(CellUtil.cloneRow(current))); + } + } + } + return cellCount; + } + + static class CellCount { + private Map rowCountMap = new HashMap(); + + void addCell(String key){ + if (rowCountMap.containsKey(key)){ + rowCountMap.put(key, rowCountMap.get(key) +1); + } else { + rowCountMap.put(key, 1); + } + } + + int getCellCount(String key){ + if (rowCountMap.containsKey(key)){ + return rowCountMap.get(key); + } else { + return 0; + } + } + } + public static void dumpIndexStatus(Connection conn, String indexName) throws IOException, SQLException { try (HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) { System.out.println("************ dumping index status for " + indexName + " **************"); @@ -1216,4 +1273,81 @@ public static JoinTable getJoinTable(String query, PhoenixConnection connection) public static void assertSelectStatement(FilterableStatement selectStatement , String sql) { assertTrue(selectStatement.toString().trim().equals(sql)); } + + public static void assertSqlExceptionCode(SQLExceptionCode code, SQLException se) { + assertEquals(code.getErrorCode(), se.getErrorCode()); + assertTrue("Wrong error message", se.getMessage().contains(code.getMessage())); + assertEquals(code.getSQLState(), se.getSQLState()); + } + + public static void assertTableHasTtl(Connection conn, TableName tableName, int ttl) + throws SQLException, IOException { + HColumnDescriptor cd = getColumnDescriptor(conn, tableName); + Assert.assertEquals(ttl, cd.getTimeToLive()); + } + + public static void assertTableHasVersions(Connection conn, TableName tableName, int versions) + throws SQLException, IOException { + HColumnDescriptor cd = getColumnDescriptor(conn, tableName); + Assert.assertEquals(versions, cd.getMaxVersions()); + } + + public static HColumnDescriptor getColumnDescriptor(Connection conn, TableName tableName) + throws SQLException, IOException { + Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + HTableDescriptor td = admin.getTableDescriptor(tableName); + return td.getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES); + } + + public static void assertRawRowCount(Connection conn, TableName table, int expectedRowCount) + throws SQLException, IOException { + ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices(); + int count = TestUtil.getRawRowCount(cqs.getTable(table.getName())); + assertEquals(expectedRowCount, count); + } + + public static void assertRawCellCount(Connection conn, TableName tableName, + byte[] row, int expectedCellCount) + throws SQLException, IOException{ + ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices(); + Table table = cqs.getTable(tableName.getName()); + CellCount cellCount = getCellCount(table, true); + int count = cellCount.getCellCount(Bytes.toString(row)); + assertEquals(expectedCellCount, count); + } + + public static void assertRowExistsAtSCN(String url, String sql, long scn, boolean shouldExist) + throws SQLException { + boolean rowExists = false; + Properties props = new Properties(); + ResultSet rs; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn)); + try (Connection conn = DriverManager.getConnection(url, props)){ + rs = conn.createStatement().executeQuery(sql); + rowExists = rs.next(); + if (shouldExist){ + Assert.assertTrue("Row was not found at time " + scn + + " when it should have been", + rowExists); + } else { + Assert.assertFalse("Row was found at time " + scn + + " when it should not have been", rowExists); + } + } + + } + + public static void assertRowHasExpectedValueAtSCN(String url, String sql, + long scn, String value) throws SQLException { + Properties props = new Properties(); + ResultSet rs; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn)); + try (Connection conn = DriverManager.getConnection(url, props)){ + rs = conn.createStatement().executeQuery(sql); + Assert.assertTrue("Value " + value + " does not exist at scn " + scn, rs.next()); + Assert.assertEquals(value, rs.getString(1)); + } + + } + }