Skip to content

Commit

Permalink
PHOENIX-5881 - Port PHOENIX-5645 (MaxLookbackAge) to 5.x
Browse files Browse the repository at this point in the history
  • Loading branch information
gjacoby126 committed Aug 19, 2020
1 parent aac2633 commit 1aa7d60
Show file tree
Hide file tree
Showing 13 changed files with 1,065 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
Expand Down Expand Up @@ -1020,11 +1021,14 @@ public void testIndexToolForIncrementalVerify() throws Exception {
customEdge.incrementValue(waitForUpsert);
return;
}
//In HBase 2.0-2.2, we can't see Puts behind Deletes even on lookback / SCN queries. Starting in 2.3 we can
//That changes the counts we expect from index tool verification
int putBehindDeleteMarkerCount = HbaseCompatCapabilities.isLookbackBeyondDeletesSupported() ? 1 :0;

// regular job without delete row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t4));
verifyCounters(it, 2, 2);
verifyCounters(it, 2, 2 + putBehindDeleteMarkerCount);
customEdge.incrementValue(waitForUpsert);

// job with 2 rows
Expand All @@ -1036,13 +1040,13 @@ public void testIndexToolForIncrementalVerify() throws Exception {
// job with update on only one row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),"-et", String.valueOf(t3));
verifyCounters(it, 1, 1);
verifyCounters(it, 1, 1 + putBehindDeleteMarkerCount);
customEdge.incrementValue(waitForUpsert);

// job with update on only one row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t2),"-et", String.valueOf(t4));
verifyCounters(it, 1, 1);
verifyCounters(it, 1, 1 + putBehindDeleteMarkerCount);
customEdge.incrementValue(waitForUpsert);

// job with update on only one row
Expand Down
423 changes: 423 additions & 0 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Properties;

import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Test;
Expand All @@ -62,8 +63,7 @@ public static synchronized Collection<Object> data() {

public PointInTimeQueryIT(String idxDdl, boolean columnEncoded)
throws Exception {
// These queries fail without KEEP_DELETED_CELLS=true
super(idxDdl, columnEncoded, true);
super(idxDdl, columnEncoded, !HbaseCompatCapabilities.isLookbackBeyondDeletesSupported());
}

@Test
Expand Down
129 changes: 129 additions & 0 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;

import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Assert;
import org.junit.Test;

public class SCNIT extends ParallelStatsDisabledIT {

@Test
public void testReadBeforeDelete() throws Exception {
//we don't support reading earlier than a delete in HBase 2.0-2.2, only in 1.4+ and 2.3+
if (!HbaseCompatCapabilities.isLookbackBeyondDeletesSupported()){
return;
}
String schemaName = generateUniqueName();
String tableName = generateUniqueName();
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
long timeBeforeDelete;
long timeAfterDelete;
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR)");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','aa')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('c','cc')");
conn.commit();
timeBeforeDelete = EnvironmentEdgeManager.currentTime() + 1;
Thread.sleep(2);
conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k = 'b'");
conn.commit();
timeAfterDelete = EnvironmentEdgeManager.currentTime() + 1;
}

Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timeBeforeDelete));
try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertFalse(rs.next());
rs.close();
}
props.clear();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timeAfterDelete));
try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertFalse(rs.next());
rs.close();
}

}

@Test
public void testSCNWithTTL() throws Exception {
int ttl = 2;
String fullTableName = createTableWithTTL(ttl);
//sleep for one second longer than ttl
Thread.sleep(ttl * 1000 + 1000);
Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(EnvironmentEdgeManager.currentTime() - 1000));
try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
assertFalse(rs.next());
rs.close();
}
}

private String createTableWithTTL(int ttl) throws SQLException, InterruptedException {
String schemaName = generateUniqueName();
String tableName = generateUniqueName();
StringBuilder optionsBuilder = new StringBuilder();
if (ttl > 0){
optionsBuilder.append("TTL=");
optionsBuilder.append(ttl);
}
String ddlOptions = optionsBuilder.toString();
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement()
.execute(String.format("CREATE TABLE %s" +
"(k VARCHAR PRIMARY KEY, f.v VARCHAR) %s", fullTableName, ddlOptions));
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','aa')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('c','cc')");
conn.commit();
}
return fullTableName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@
import com.google.common.base.Optional;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.JoinCompiler.JoinSpec;
import org.apache.phoenix.compile.JoinCompiler.JoinTable;
import org.apache.phoenix.compile.JoinCompiler.Table;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.execute.ClientAggregatePlan;
Expand Down Expand Up @@ -79,6 +83,7 @@
import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;

Expand Down Expand Up @@ -163,6 +168,7 @@ public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnR
* @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
*/
public QueryPlan compile() throws SQLException{
verifySCN();
QueryPlan plan;
if (select.isUnion()) {
plan = compileUnionAll(select);
Expand All @@ -172,6 +178,28 @@ public QueryPlan compile() throws SQLException{
return plan;
}

private void verifySCN() throws SQLException {
if (!HbaseCompatCapabilities.isMaxLookbackTimeSupported()) {
return;
}
PhoenixConnection conn = statement.getConnection();
Long scn = conn.getSCN();
if (scn == null) {
return;
}
ColumnResolver resolver =
FromCompiler.getResolverForQuery(select, conn);
long maxLookBackAgeInMillis =
CompatBaseScannerRegionObserver.getMaxLookbackInMillis(conn.getQueryServices().
getConfiguration());
long now = EnvironmentEdgeManager.currentTimeMillis();
if (maxLookBackAgeInMillis > 0 && now - maxLookBackAgeInMillis > scn){
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE)
.build().buildException();
}
}

public QueryPlan compileUnionAll(SelectStatement select) throws SQLException {
List<SelectStatement> unionAllSelects = select.getSelects();
List<QueryPlan> plans = new ArrayList<QueryPlan>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanOptions;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.index.IndexMaintainer;
Expand All @@ -47,7 +54,7 @@
import org.apache.phoenix.util.ServerUtil;


abstract public class BaseScannerRegionObserver implements RegionObserver {
abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionObserver {

public static final String AGGREGATORS = "_Aggs";
public static final String UNORDERED_GROUP_BY_EXPRESSIONS = "_UnorderedGroupByExpressions";
Expand Down Expand Up @@ -367,4 +374,18 @@ RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironme
dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
}


/* We want to override the store scanner so that we can read "past" a delete
marker on an SCN / lookback query to see the underlying edit. This was possible
in HBase 1.x, but not possible after the interface changes in HBase 2.0. HBASE-24321 in
HBase 2.3 gave us this ability back, but we need to use it through a compatibility shim
so we can compile against 2.1 and 2.2. When 2.3 is the minimum supported HBase
version, the shim can be retired and the logic moved into the real coproc.
We also need to override the flush compaction coproc hooks in order to implement max lookback
age to keep versions from being purged.
Because the required APIs aren't present in HBase 2.1 and 2.2, we override in the 2.3
version of CompatBaseScannerRegionObserver and no-op in the other versions. */

}
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ public SQLException newException(SQLExceptionInfo info) {
UNEQUAL_SCN_AND_BUILD_INDEX_AT(534, "42911", "If both specified, values of CURRENT_SCN and BUILD_INDEX_AT must be equal."),
ONLY_INDEX_UPDATABLE_AT_SCN(535, "42912", "Only an index may be updated when the BUILD_INDEX_AT property is specified"),
PARENT_TABLE_NOT_FOUND(536, "42913", "Can't drop the index because the parent table in the DROP statement is incorrect."),
CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE(538, "42915",
"Cannot use SCN to look further back in the past beyond the configured max lookback age"),

/**
* HBase and Phoenix specific implementation defined sub-classes.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.phoenix.compat.hbase;

import org.apache.hadoop.conf.Configuration;

public class HbaseCompatCapabilities {

public static boolean isMaxLookbackTimeSupported() {
return false;
}

//In HBase 2.1 and 2.2, a lookback query won't return any results if covered by a future delete
public static boolean isLookbackBeyondDeletesSupported() { return false; }

}
Loading

0 comments on commit 1aa7d60

Please sign in to comment.