diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java index f89bcb39b48..8b47ab0acf7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities; import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -1020,11 +1021,14 @@ public void testIndexToolForIncrementalVerify() throws Exception { customEdge.incrementValue(waitForUpsert); return; } + //In HBase 2.0-2.2, we can't see Puts behind Deletes even on lookback / SCN queries. Starting in 2.3 we can + //That changes the counts we expect from index tool verification + int putBehindDeleteMarkerCount = HbaseCompatCapabilities.isLookbackBeyondDeletesSupported() ? 1 :0; // regular job without delete row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t4)); - verifyCounters(it, 2, 2); + verifyCounters(it, 2, 2 + putBehindDeleteMarkerCount); customEdge.incrementValue(waitForUpsert); // job with 2 rows @@ -1036,13 +1040,13 @@ public void testIndexToolForIncrementalVerify() throws Exception { // job with update on only one row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),"-et", String.valueOf(t3)); - verifyCounters(it, 1, 1); + verifyCounters(it, 1, 1 + putBehindDeleteMarkerCount); customEdge.incrementValue(waitForUpsert); // job with update on only one row it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t2),"-et", String.valueOf(t4)); - verifyCounters(it, 1, 1); + verifyCounters(it, 1, 1 + putBehindDeleteMarkerCount); customEdge.incrementValue(waitForUpsert); // job with update on only one row diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java new file mode 100644 index 00000000000..957a7a4f7ea --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities; +import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.AfterClass; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; +import java.util.Properties; + +import static org.apache.phoenix.util.TestUtil.assertRawCellCount; +import static org.apache.phoenix.util.TestUtil.assertRawRowCount; +import static org.apache.phoenix.util.TestUtil.assertRowExistsAtSCN; +import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN; +import static org.apache.phoenix.util.TestUtil.assertTableHasTtl; +import static org.apache.phoenix.util.TestUtil.assertTableHasVersions; + +@NeedsOwnMiniClusterTest +public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT { + private static final Log LOG = LogFactory.getLog(MaxLookbackIT.class); + private static final int MAX_LOOKBACK_AGE = 15; + private static final int ROWS_POPULATED = 2; + public static final int WAIT_AFTER_TABLE_CREATION_MILLIS = 1; + private String tableDDLOptions; + private StringBuilder optionBuilder; + ManualEnvironmentEdge injectEdge; + private int ttl; + //max lookback isn't supported in HBase 2.1 and 2.2 because of missing coprocessor + // interfaces; see HBASE-24321 + private static boolean isMaxLookbackSupported = + HbaseCompatCapabilities.isMaxLookbackTimeSupported(); + + @BeforeClass + public static synchronized void doSetup() throws Exception { + if (!isMaxLookbackSupported) { + return; + } + Map props = Maps.newHashMapWithExpectedSize(1); + props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); + props.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest(){ + if (!isMaxLookbackSupported) { + return; + } + EnvironmentEdgeManager.reset(); + optionBuilder = new StringBuilder(); + this.tableDDLOptions = optionBuilder.toString(); + ttl = 0; + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis()); + } + + @After + public synchronized void afterClass() { + if (!isMaxLookbackSupported) { + return; + } + EnvironmentEdgeManager.reset(); + } + + @Test + public void testTooLowSCNWithMaxLookbackAge() throws Exception { + if (!isMaxLookbackSupported) { + return; + } + String dataTableName = generateUniqueName(); + createTable(dataTableName); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + //increase long enough to make sure we can find the syscat row for the table + injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS); + populateTable(dataTableName); + long populateTime = EnvironmentEdgeManager.currentTimeMillis(); + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, + Long.toString(populateTime)); + try (Connection connscn = DriverManager.getConnection(getUrl(), props)) { + connscn.createStatement().executeQuery("select * from " + dataTableName); + } catch (SQLException se) { + SQLExceptionCode code = + SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE; + TestUtil.assertSqlExceptionCode(code, se); + return; + } + Assert.fail("We should have thrown an exception for the too-early SCN"); + } + + @Test(timeout=120000L) + public void testRecentlyDeletedRowsNotCompactedAway() throws Exception { + if (!isMaxLookbackSupported) { + return; + } + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexName = generateUniqueName(); + createTable(dataTableName); + + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + createIndex(dataTableName, indexName, 1); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + TableName indexTable = TableName.valueOf(indexName); + injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS); + long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis(); + injectEdge.incrementValue(10); //make sure we delete at a different ts + Statement stmt = conn.createStatement(); + stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id = 'a'"); + Assert.assertEquals(1, stmt.getUpdateCount()); + conn.commit(); + //select stmt to get row we deleted + String sql = String.format("SELECT * FROM %s WHERE id = 'a'", dataTableName); + String indexSql = String.format("SELECT * FROM %s WHERE val1 = 'ab'", dataTableName); + int rowsPlusDeleteMarker = ROWS_POPULATED; + assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true); + assertExplainPlan(conn, indexSql, dataTableName, indexName); + assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true); + flush(dataTable); + flush(indexTable); + assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true); + assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true); + long beforeFirstCompactSCN = EnvironmentEdgeManager.currentTimeMillis(); + injectEdge.incrementValue(1); //new ts for major compaction + majorCompact(dataTable); + majorCompact(indexTable); + assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker); + assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker); + //wait for the lookback time. After this compactions should purge the deleted row + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000); + long beforeSecondCompactSCN = EnvironmentEdgeManager.currentTimeMillis(); + String notDeletedRowSql = + String.format("SELECT * FROM %s WHERE id = 'b'", dataTableName); + String notDeletedIndexRowSql = + String.format("SELECT * FROM %s WHERE val1 = 'bc'", dataTableName); + assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true); + assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true); + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + assertRawRowCount(conn, indexTable, ROWS_POPULATED); + conn.createStatement().execute("upsert into " + dataTableName + + " values ('c', 'cd', 'cde', 'cdef')"); + conn.commit(); + injectEdge.incrementValue(1L); + majorCompact(dataTable); + majorCompact(indexTable); + //should still be ROWS_POPULATED because we added one and deleted one + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + assertRawRowCount(conn, indexTable, ROWS_POPULATED); + + //deleted row should be gone, but not deleted row should still be there. + assertRowExistsAtSCN(getUrl(), sql, beforeSecondCompactSCN, false); + assertRowExistsAtSCN(getUrl(), indexSql, beforeSecondCompactSCN, false); + assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true); + assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true); + + } + } + + @Test(timeout=60000L) + public void testTTLAndMaxLookbackAge() throws Exception { + if (!isMaxLookbackSupported) { + return; + } + ttl = 20; + optionBuilder.append("TTL=" + ttl); + tableDDLOptions = optionBuilder.toString(); + Configuration conf = getUtility().getConfiguration(); + //disable automatic memstore flushes + long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, + HRegion.DEFAULT_CACHE_FLUSH_INTERVAL); + conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexName = generateUniqueName(); + createTable(dataTableName); + populateTable(dataTableName); + createIndex(dataTableName, indexName, 1); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + injectEdge.incrementValue(1); + long afterFirstInsertSCN = EnvironmentEdgeManager.currentTimeMillis(); + TableName dataTable = TableName.valueOf(dataTableName); + TableName indexTable = TableName.valueOf(indexName); + assertTableHasTtl(conn, dataTable, ttl); + assertTableHasTtl(conn, indexTable, ttl); + //first make sure we inserted correctly + String sql = String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName); + String indexSql = String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName); + assertRowExistsAtSCN(getUrl(),sql, afterFirstInsertSCN, true); + assertExplainPlan(conn, indexSql, dataTableName, indexName); + assertRowExistsAtSCN(getUrl(),indexSql, afterFirstInsertSCN, true); + int originalRowCount = 2; + assertRawRowCount(conn, dataTable, originalRowCount); + assertRawRowCount(conn, indexTable, originalRowCount); + //force a flush + flush(dataTable); + flush(indexTable); + //flush shouldn't have changed it + assertRawRowCount(conn, dataTable, originalRowCount); + assertRawRowCount(conn, indexTable, originalRowCount); + assertExplainPlan(conn, indexSql, dataTableName, indexName); + long timeToAdvance = (MAX_LOOKBACK_AGE * 1000) - + (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN); + if (timeToAdvance > 0) { + injectEdge.incrementValue(timeToAdvance); + } + //make sure it's still on disk + assertRawRowCount(conn, dataTable, originalRowCount); + assertRawRowCount(conn, indexTable, originalRowCount); + injectEdge.incrementValue(1); //get a new timestamp for compaction + majorCompact(dataTable); + majorCompact(indexTable); + //nothing should have been purged by this major compaction + assertRawRowCount(conn, dataTable, originalRowCount); + assertRawRowCount(conn, indexTable, originalRowCount); + //now wait the TTL + timeToAdvance = (ttl * 1000) - + (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN); + if (timeToAdvance > 0) { + injectEdge.incrementValue(timeToAdvance); + } + //make sure that we can compact away the now-expired rows + majorCompact(dataTable); + majorCompact(indexTable); + //note that before HBase 1.4, we don't have HBASE-17956 + // and this will always return 0 whether it's still on-disk or not + assertRawRowCount(conn, dataTable, 0); + assertRawRowCount(conn, indexTable, 0); + } finally{ + conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, oldMemstoreFlushInterval); + } + } + + @Test(timeout=60000) + public void testRecentMaxVersionsNotCompactedAway() throws Exception { + if (!isMaxLookbackSupported) { + return; + } + int versions = 2; + optionBuilder.append("VERSIONS=" + versions); + tableDDLOptions = optionBuilder.toString(); + String firstValue = "abc"; + String secondValue = "def"; + String thirdValue = "ghi"; + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexName = generateUniqueName(); + createTable(dataTableName); + populateTable(dataTableName); + createIndex(dataTableName, indexName, versions); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + //increment to make sure we don't "look back" past table creation + injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS); + injectEdge.incrementValue(1); //increment by 1 so we can see our write + long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis(); + //make sure table and index metadata is set up right for versions + TableName dataTable = TableName.valueOf(dataTableName); + TableName indexTable = TableName.valueOf(indexName); + assertTableHasVersions(conn, dataTable, versions); + assertTableHasVersions(conn, indexTable, versions); + //check query optimizer is doing what we expect + String dataTableSelectSql = + String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName); + String indexTableSelectSql = + String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName); + assertExplainPlan(conn, indexTableSelectSql, dataTableName, indexName); + //make sure the data was inserted correctly in the first place + assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, afterInsertSCN, firstValue); + assertRowHasExpectedValueAtSCN(getUrl(), indexTableSelectSql, afterInsertSCN, firstValue); + //force first update to get a distinct ts + injectEdge.incrementValue(1); + updateColumn(conn, dataTableName, "id", "a", "val2", secondValue); + injectEdge.incrementValue(1); //now make update visible + long afterFirstUpdateSCN = EnvironmentEdgeManager.currentTimeMillis(); + //force second update to get a distinct ts + injectEdge.incrementValue(1); + updateColumn(conn, dataTableName, "id", "a", "val2", thirdValue); + injectEdge.incrementValue(1); + long afterSecondUpdateSCN = EnvironmentEdgeManager.currentTimeMillis(); + injectEdge.incrementValue(1); + //check to make sure we can see all three versions at the appropriate times + String[] allValues = {firstValue, secondValue, thirdValue}; + long[] allSCNs = {afterInsertSCN, afterFirstUpdateSCN, afterSecondUpdateSCN}; + assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs); + assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs); + flush(dataTable); + flush(indexTable); + //after flush, check to make sure we can see all three versions at the appropriate times + assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs); + assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs); + majorCompact(dataTable); + majorCompact(indexTable); + //after major compaction, check to make sure we can see all three versions + // at the appropriate times + assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs); + assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs); + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000); + long afterLookbackAgeSCN = EnvironmentEdgeManager.currentTimeMillis(); + majorCompact(dataTable); + majorCompact(indexTable); + //empty column, 1 version of val 1, 3 versions of val2, 1 version of val3 = 6 + assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6); + //2 versions of empty column, 2 versions of val2, + // 2 versions of val3 (since we write whole rows to index) = 6 + assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 6); + //empty column + 1 version each of val1,2 and 3 = 4 + assertRawCellCount(conn, dataTable, Bytes.toBytes("b"), 4); + //1 version of empty column, 1 version of val2, 1 version of val3 = 3 + assertRawCellCount(conn, indexTable, Bytes.toBytes("bc\u0000b"), 3); + } + } + + private void flush(TableName table) throws IOException { + Admin admin = getUtility().getHBaseAdmin(); + admin.flush(table); + } + + private void majorCompact(TableName table) throws Exception { + TestUtil.majorCompact(getUtility(), table); + } + + private void assertMultiVersionLookbacks(String dataTableSelectSql, + String[] values, long[] scns) + throws Exception { + //make sure we can still look back after updating + for (int k = 0; k < values.length; k++){ + assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, scns[k], values[k]); + } + } + + private void updateColumn(Connection conn, String dataTableName, + String idColumn, String id, String valueColumn, String value) + throws SQLException { + String upsertSql = String.format("UPSERT INTO %s (%s, %s) VALUES ('%s', '%s')", + dataTableName, idColumn, valueColumn, id, value); + conn.createStatement().execute(upsertSql); + conn.commit(); + } + + private void createTable(String tableName) throws SQLException { + try(Connection conn = DriverManager.getConnection(getUrl())) { + String createSql = "create table " + tableName + + " (id varchar(10) not null primary key, val1 varchar(10), " + + "val2 varchar(10), val3 varchar(10))" + tableDDLOptions; + conn.createStatement().execute(createSql); + conn.commit(); + } + } + private void populateTable(String tableName) throws SQLException { + try(Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + } + } + + private void createIndex(String dataTableName, String indexTableName, int indexVersions) + throws SQLException { + try(Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + + dataTableName + " (val1) include (val2, val3)" + + " VERSIONS=" + indexVersions); + conn.commit(); + } + } + + public static void assertExplainPlan(Connection conn, String selectSql, + String dataTableFullName, String indexTableFullName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + IndexToolIT.assertExplainPlan(false, actualExplainPlan, dataTableFullName, indexTableFullName); + } + +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java index 51f4fe08a3b..6d4d4d66525 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Properties; +import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.junit.Test; @@ -62,8 +63,7 @@ public static synchronized Collection data() { public PointInTimeQueryIT(String idxDdl, boolean columnEncoded) throws Exception { - // These queries fail without KEEP_DELETED_CELLS=true - super(idxDdl, columnEncoded, true); + super(idxDdl, columnEncoded, !HbaseCompatCapabilities.isLookbackBeyondDeletesSupported()); } @Test diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java new file mode 100644 index 00000000000..4dcaea7ab00 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Properties; + +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities; +import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.Assert; +import org.junit.Test; + +public class SCNIT extends ParallelStatsDisabledIT { + + @Test + public void testReadBeforeDelete() throws Exception { + //we don't support reading earlier than a delete in HBase 2.0-2.2, only in 1.4+ and 2.3+ + if (!HbaseCompatCapabilities.isLookbackBeyondDeletesSupported()){ + return; + } + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + long timeBeforeDelete; + long timeAfterDelete; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','aa')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('c','cc')"); + conn.commit(); + timeBeforeDelete = EnvironmentEdgeManager.currentTime() + 1; + Thread.sleep(2); + conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k = 'b'"); + conn.commit(); + timeAfterDelete = EnvironmentEdgeManager.currentTime() + 1; + } + + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timeBeforeDelete)); + try (Connection connscn = DriverManager.getConnection(getUrl(), props)) { + ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("b", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("c", rs.getString(1)); + assertFalse(rs.next()); + rs.close(); + } + props.clear(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timeAfterDelete)); + try (Connection connscn = DriverManager.getConnection(getUrl(), props)) { + ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("c", rs.getString(1)); + assertFalse(rs.next()); + rs.close(); + } + + } + + @Test + public void testSCNWithTTL() throws Exception { + int ttl = 2; + String fullTableName = createTableWithTTL(ttl); + //sleep for one second longer than ttl + Thread.sleep(ttl * 1000 + 1000); + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, + Long.toString(EnvironmentEdgeManager.currentTime() - 1000)); + try (Connection connscn = DriverManager.getConnection(getUrl(), props)) { + ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName); + assertFalse(rs.next()); + rs.close(); + } + } + + private String createTableWithTTL(int ttl) throws SQLException, InterruptedException { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + StringBuilder optionsBuilder = new StringBuilder(); + if (ttl > 0){ + optionsBuilder.append("TTL="); + optionsBuilder.append(ttl); + } + String ddlOptions = optionsBuilder.toString(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement() + .execute(String.format("CREATE TABLE %s" + + "(k VARCHAR PRIMARY KEY, f.v VARCHAR) %s", fullTableName, ddlOptions)); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','aa')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('c','cc')"); + conn.commit(); + } + return fullTableName; + } + +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 3b57f7a7009..ebed26cd885 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -31,11 +31,15 @@ import com.google.common.base.Optional; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities; +import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.JoinCompiler.JoinSpec; import org.apache.phoenix.compile.JoinCompiler.JoinTable; import org.apache.phoenix.compile.JoinCompiler.Table; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.BaseQueryPlan; import org.apache.phoenix.execute.ClientAggregatePlan; @@ -79,6 +83,7 @@ import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ScanUtil; @@ -163,6 +168,7 @@ public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnR * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables */ public QueryPlan compile() throws SQLException{ + verifySCN(); QueryPlan plan; if (select.isUnion()) { plan = compileUnionAll(select); @@ -172,6 +178,28 @@ public QueryPlan compile() throws SQLException{ return plan; } + private void verifySCN() throws SQLException { + if (!HbaseCompatCapabilities.isMaxLookbackTimeSupported()) { + return; + } + PhoenixConnection conn = statement.getConnection(); + Long scn = conn.getSCN(); + if (scn == null) { + return; + } + ColumnResolver resolver = + FromCompiler.getResolverForQuery(select, conn); + long maxLookBackAgeInMillis = + CompatBaseScannerRegionObserver.getMaxLookbackInMillis(conn.getQueryServices(). + getConfiguration()); + long now = EnvironmentEdgeManager.currentTimeMillis(); + if (maxLookBackAgeInMillis > 0 && now - maxLookBackAgeInMillis > scn){ + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE) + .build().buildException(); + } + } + public QueryPlan compileUnionAll(SelectStatement select) throws SQLException { List unionAllSelects = select.getSelects(); List plans = new ArrayList(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 9865e918ff9..6fe5c955346 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -29,13 +29,20 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanOptions; +import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.htrace.Span; import org.apache.htrace.Trace; +import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; @@ -47,7 +54,7 @@ import org.apache.phoenix.util.ServerUtil; -abstract public class BaseScannerRegionObserver implements RegionObserver { +abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionObserver { public static final String AGGREGATORS = "_Aggs"; public static final String UNORDERED_GROUP_BY_EXPRESSIONS = "_UnorderedGroupByExpressions"; @@ -367,4 +374,18 @@ RegionScanner getWrappedScanner(final ObserverContext c, Store store, + ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't implement the "max + //lookback age" feature + } + + public void preFlushScannerOpen(ObserverContext c, Store store, + ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { + //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't implement the "max + //lookback age" feature + } + + public void preMemStoreCompactionCompactScannerOpen( + ObserverContext c, Store store, ScanOptions options) + throws IOException { + //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't implement the "max + //lookback age" feature + } + + public void preStoreScannerOpen(ObserverContext ctx, Store store, + ScanOptions options) throws IOException { + //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't override the scan + //to "look behind" delete markers on SCN queries + } + + public long getMaxLookbackInMillis(Configuration conf){ + //config param is in seconds, switch to millis + return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000; + } + + //max lookback age isn't supported in HBase 2.1 or HBase 2.2 + public static boolean isMaxLookbackTimeEnabled(Configuration conf){ + return false; + } + + public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){ + return false; + } + +} diff --git a/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java new file mode 100644 index 00000000000..629946be223 --- /dev/null +++ b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.compat.hbase; + +import org.apache.hadoop.conf.Configuration; + +public class HbaseCompatCapabilities { + + public static boolean isMaxLookbackTimeSupported() { + return false; + } + + //In HBase 2.1 and 2.2, a lookback query won't return any results if covered by a future delete + public static boolean isLookbackBeyondDeletesSupported() { return false; } + +} diff --git a/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java new file mode 100644 index 00000000000..00c25010c7c --- /dev/null +++ b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compat.hbase.coprocessor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.ScanOptions; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; + +import java.io.IOException; + +public class CompatBaseScannerRegionObserver implements RegionObserver { + + public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY = + "phoenix.max.lookback.age.seconds"; + public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0; + + public void preCompactScannerOpen(ObserverContext c, Store store, + ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't implement the "max + //lookback age" feature + } + + public void preFlushScannerOpen(ObserverContext c, Store store, + ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { + //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't implement the "max + //lookback age" feature + } + + public void preMemStoreCompactionCompactScannerOpen( + ObserverContext c, Store store, ScanOptions options) + throws IOException { + //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't implement the "max + //lookback age" feature + } + + public void preStoreScannerOpen(ObserverContext ctx, Store store, + ScanOptions options) throws IOException { + //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't override the scan + //to "look behind" delete markers on SCN queries + } + + public long getMaxLookbackInMillis(Configuration conf){ + //config param is in seconds, switch to millis + return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000; + } + + //max lookback age isn't supported in HBase 2.1 or HBase 2.2 + public static boolean isMaxLookbackTimeEnabled(Configuration conf){ + return false; + } + + public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){ + return false; + } + +} diff --git a/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java new file mode 100644 index 00000000000..9b83e6dd9d3 --- /dev/null +++ b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.compat.hbase; + +public class HbaseCompatCapabilities { + + public static boolean isMaxLookbackTimeSupported() { + return true; + } + + //In HBase 2.1 and 2.2, a lookback query won't return any results if covered by a future delete, + //but in 2.3 and later we have the preSoreScannerOpen hook that overrides that behavior + public static boolean isLookbackBeyondDeletesSupported() { return true; } + +} diff --git a/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java new file mode 100644 index 00000000000..cd1a7f5636a --- /dev/null +++ b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compat.hbase.coprocessor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.MemoryCompactionPolicy; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.ScanOptions; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.io.IOException; + +public class CompatBaseScannerRegionObserver implements RegionObserver { + + public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY = + "phoenix.max.lookback.age.seconds"; + public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0; + + public void preCompactScannerOpen(ObserverContext c, Store store, + ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + Configuration conf = c.getEnvironment().getConfiguration(); + if (isMaxLookbackTimeEnabled(conf)) { + setScanOptionsForFlushesAndCompactions(conf, options, store, scanType); + } + } + + public void preFlushScannerOpen(ObserverContext c, Store store, + ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { + Configuration conf = c.getEnvironment().getConfiguration(); + if (isMaxLookbackTimeEnabled(conf)) { + setScanOptionsForFlushesAndCompactions(conf, options, store, ScanType.COMPACT_RETAIN_DELETES); + } + } + + public void preMemStoreCompactionCompactScannerOpen( + ObserverContext c, Store store, ScanOptions options) + throws IOException { + Configuration conf = c.getEnvironment().getConfiguration(); + if (isMaxLookbackTimeEnabled(conf)) { + MemoryCompactionPolicy inMemPolicy = + store.getColumnFamilyDescriptor().getInMemoryCompaction(); + ScanType scanType; + //the eager and adaptive in-memory compaction policies can purge versions; the others + // can't. (Eager always does; adaptive sometimes does) + if (inMemPolicy.equals(MemoryCompactionPolicy.EAGER) || + inMemPolicy.equals(MemoryCompactionPolicy.ADAPTIVE)) { + scanType = ScanType.COMPACT_DROP_DELETES; + } else { + scanType = ScanType.COMPACT_RETAIN_DELETES; + } + setScanOptionsForFlushesAndCompactions(conf, options, store, scanType); + } + } + + public void preStoreScannerOpen(ObserverContext ctx, Store store, + ScanOptions options) throws IOException { + + if (!storeFileScanDoesntNeedAlteration(options)) { + //PHOENIX-4277 -- When doing a point-in-time (SCN) Scan, HBase by default will hide + // mutations that happen before a delete marker. This overrides that behavior. + options.setMinVersions(options.getMinVersions()); + KeepDeletedCells keepDeletedCells = KeepDeletedCells.TRUE; + if (store.getColumnFamilyDescriptor().getTimeToLive() != HConstants.FOREVER) { + keepDeletedCells = KeepDeletedCells.TTL; + } + options.setKeepDeletedCells(keepDeletedCells); + } + } + + private boolean storeFileScanDoesntNeedAlteration(ScanOptions options) { + Scan scan = options.getScan(); + boolean isRaw = scan.isRaw(); + //true if keep deleted cells is either TRUE or TTL + boolean keepDeletedCells = options.getKeepDeletedCells().equals(KeepDeletedCells.TRUE) || + options.getKeepDeletedCells().equals(KeepDeletedCells.TTL); + boolean timeRangeIsLatest = scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP; + boolean timestampIsTransactional = + isTransactionalTimestamp(scan.getTimeRange().getMax()); + return isRaw + || keepDeletedCells + || timeRangeIsLatest + || timestampIsTransactional; + } + + private boolean isTransactionalTimestamp(long ts) { + //have to use the HBase edge manager because the Phoenix one is in phoenix-core + return ts > (long) (EnvironmentEdgeManager.currentTime() * 1.1); + } + + /* + * If KeepDeletedCells.FALSE, KeepDeletedCells.TTL , + * let delete markers age once lookback age is done. + */ + public KeepDeletedCells getKeepDeletedCells(ScanOptions options, ScanType scanType) { + //if we're doing a minor compaction or flush, always set keep deleted cells + //to true. Otherwise, if keep deleted cells is false or TTL, use KeepDeletedCells TTL, + //where the value of the ttl might be overriden to the max lookback age elsewhere + return (options.getKeepDeletedCells() == KeepDeletedCells.TRUE + || scanType.equals(ScanType.COMPACT_RETAIN_DELETES)) ? + KeepDeletedCells.TRUE : KeepDeletedCells.TTL; + } + + /* + * if the user set a TTL we should leave MIN_VERSIONS at the default (0 in most of the cases). + * Otherwise the data (1st version) will not be removed after the TTL. If no TTL, we want + * Math.max(maxVersions, minVersions, 1) + */ + public int getMinVersions(ScanOptions options, ColumnFamilyDescriptor cfDescriptor) { + return cfDescriptor.getTimeToLive() != HConstants.FOREVER ? options.getMinVersions() + : Math.max(Math.max(options.getMinVersions(), + cfDescriptor.getMaxVersions()),1); + } + + /** + * + * @param conf HBase Configuration + * @param columnDescriptor ColumnFamilyDescriptor for the store being compacted + * @param options ScanOptions of overrides to the compaction scan + * @return Time to live in milliseconds, based on both HBase TTL and Phoenix max lookback age + */ + public long getTimeToLiveForCompactions(Configuration conf, + ColumnFamilyDescriptor columnDescriptor, + ScanOptions options) { + long ttlConfigured = columnDescriptor.getTimeToLive(); + long ttlInMillis = ttlConfigured * 1000; + long maxLookbackTtl = getMaxLookbackInMillis(conf); + if (isMaxLookbackTimeEnabled(maxLookbackTtl)) { + if (ttlConfigured == HConstants.FOREVER + && columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) { + // If user configured default TTL(FOREVER) and keep deleted cells to false or + // TTL then to remove unwanted delete markers we should change ttl to max lookback age + ttlInMillis = maxLookbackTtl; + } else { + //if there is a TTL, use TTL instead of max lookback age. + // Max lookback age should be more recent or equal to TTL + ttlInMillis = Math.max(ttlInMillis, maxLookbackTtl); + } + } + + return ttlInMillis; + } + + public void setScanOptionsForFlushesAndCompactions(Configuration conf, + ScanOptions options, + final Store store, + ScanType type) { + ColumnFamilyDescriptor cfDescriptor = store.getColumnFamilyDescriptor(); + options.setTTL(getTimeToLiveForCompactions(conf, cfDescriptor, + options)); + options.setKeepDeletedCells(getKeepDeletedCells(options, type)); + options.setMaxVersions(Integer.MAX_VALUE); + options.setMinVersions(getMinVersions(options, cfDescriptor)); + } + + public static long getMaxLookbackInMillis(Configuration conf){ + //config param is in seconds, switch to millis + return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000; + } + + public static boolean isMaxLookbackTimeEnabled(Configuration conf){ + return isMaxLookbackTimeEnabled(conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + DEFAULT_PHOENIX_MAX_LOOKBACK_AGE)); + } + + public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){ + return maxLookbackTime > 0L; + } + +}