From 20e1b4d1ffc5cf0901576f342fcc8852ef55301f Mon Sep 17 00:00:00 2001 From: poorna Date: Fri, 28 Oct 2016 17:46:01 -0700 Subject: [PATCH 1/3] Refactor existing test --- tephra-hbase-compat-1.1-base/pom.xml | 5 + .../tephra/hbase/AbstractHBaseTableTest.java | 107 ++++++++++++++++++ .../hbase/TransactionAwareHTableTest.java | 75 ++---------- 3 files changed, 123 insertions(+), 64 deletions(-) create mode 100644 tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java diff --git a/tephra-hbase-compat-1.1-base/pom.xml b/tephra-hbase-compat-1.1-base/pom.xml index b0eee6d4..b6a58e02 100644 --- a/tephra-hbase-compat-1.1-base/pom.xml +++ b/tephra-hbase-compat-1.1-base/pom.xml @@ -28,6 +28,11 @@ tephra-hbase-compat-1.1-base Apache Tephra HBase 1.1 Compatibility Base + + 2.5.1 + 1.1.1 + + pom tephra-hbase-compat-1.2-cdh diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java new file mode 100644 index 00000000..68c43aee --- /dev/null +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.Collections; +import java.util.List; + +/** + * + */ +@SuppressWarnings("WeakerAccess") +public abstract class AbstractHBaseTableTest { + static HBaseTestingUtility testUtil; + static HBaseAdmin hBaseAdmin; + static Configuration conf; + + @BeforeClass + public static void startMiniCluster() throws Exception { + testUtil = new HBaseTestingUtility(); + conf = testUtil.getConfiguration(); + + // Tune down the connection thread pool size + conf.setInt("hbase.hconnection.threads.core", 5); + conf.setInt("hbase.hconnection.threads.max", 10); + // Tunn down handler threads in regionserver + conf.setInt("hbase.regionserver.handler.count", 10); + + // Set to random port + conf.setInt("hbase.master.port", 0); + conf.setInt("hbase.master.info.port", 0); + conf.setInt("hbase.regionserver.port", 0); + conf.setInt("hbase.regionserver.info.port", 0); + + testUtil.startMiniCluster(); + hBaseAdmin = testUtil.getHBaseAdmin(); + } + + @AfterClass + public static void shutdownMiniCluster() throws Exception { + try { + if (hBaseAdmin != null) { + hBaseAdmin.close(); + } + } finally { + testUtil.shutdownMiniCluster(); + } + } + + static HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception { + return createTable(tableName, columnFamilies, false, + Collections.singletonList(TransactionProcessor.class.getName())); + } + + static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData, + List coprocessors) throws Exception { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + for (byte[] family : columnFamilies) { + HColumnDescriptor columnDesc = new HColumnDescriptor(family); + columnDesc.setMaxVersions(Integer.MAX_VALUE); + columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis + desc.addFamily(columnDesc); + } + if (existingData) { + desc.setValue(TxConstants.READ_NON_TX_DATA, "true"); + } + // Divide individually to prevent any overflow + int priority = Coprocessor.PRIORITY_USER; + // order in list is the same order that coprocessors will be invoked + for (String coprocessor : coprocessors) { + desc.addCoprocessor(coprocessor, null, ++priority, null); + } + hBaseAdmin.createTable(desc); + testUtil.waitTableAvailable(tableName, 5000); + return new HTable(testUtil.getConfiguration(), tableName); + } + +} diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index de1fa6bc..c3367120 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -19,21 +19,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.OperationWithAttributes; @@ -89,14 +82,11 @@ /** * Tests for TransactionAwareHTables. */ -public class TransactionAwareHTableTest { +public class TransactionAwareHTableTest extends AbstractHBaseTableTest { private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class); - private static HBaseTestingUtility testUtil; - private static HBaseAdmin hBaseAdmin; - private static TransactionStateStorage txStateStorage; - private static TransactionManager txManager; - private static Configuration conf; + static TransactionStateStorage txStateStorage; + static TransactionManager txManager; private TransactionContext transactionContext; private TransactionAwareHTable transactionAwareHTable; private HTable hTable; @@ -146,23 +136,6 @@ public void preDelete(final ObserverContext c, @BeforeClass public static void setupBeforeClass() throws Exception { - testUtil = new HBaseTestingUtility(); - conf = testUtil.getConfiguration(); - - // Tune down the connection thread pool size - conf.setInt("hbase.hconnection.threads.core", 5); - conf.setInt("hbase.hconnection.threads.max", 10); - // Tunn down handler threads in regionserver - conf.setInt("hbase.regionserver.handler.count", 10); - - // Set to random port - conf.setInt("hbase.master.port", 0); - conf.setInt("hbase.master.info.port", 0); - conf.setInt("hbase.regionserver.port", 0); - conf.setInt("hbase.regionserver.info.port", 0); - - testUtil.startMiniCluster(); - hBaseAdmin = testUtil.getHBaseAdmin(); txStateStorage = new InMemoryTransactionStateStorage(); txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); txManager.startAndWait(); @@ -170,8 +143,9 @@ public static void setupBeforeClass() throws Exception { @AfterClass public static void shutdownAfterClass() throws Exception { - testUtil.shutdownMiniCluster(); - hBaseAdmin.close(); + if (txManager != null) { + txManager.stopAndWait(); + } } @Before @@ -187,34 +161,6 @@ public void shutdownAfterTest() throws IOException { hBaseAdmin.deleteTable(TestBytes.table); } - private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception { - return createTable(tableName, columnFamilies, false, Collections.emptyList()); - } - - private HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData, - List coprocessors) throws Exception { - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - for (byte[] family : columnFamilies) { - HColumnDescriptor columnDesc = new HColumnDescriptor(family); - columnDesc.setMaxVersions(Integer.MAX_VALUE); - columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis - desc.addFamily(columnDesc); - } - if (existingData) { - desc.setValue(TxConstants.READ_NON_TX_DATA, "true"); - } - // Divide individually to prevent any overflow - int priority = Coprocessor.PRIORITY_USER; - desc.addCoprocessor(TransactionProcessor.class.getName(), null, priority, null); - // order in list is the same order that coprocessors will be invoked - for (String coprocessor : coprocessors) { - desc.addCoprocessor(coprocessor, null, ++priority, null); - } - hBaseAdmin.createTable(desc); - testUtil.waitTableAvailable(tableName, 5000); - return new HTable(testUtil.getConfiguration(), tableName); - } - /** * Test transactional put and get requests. * @@ -406,7 +352,7 @@ public void testValidTransactionalDelete() throws Exception { public void testAttributesPreserved() throws Exception { HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"), new byte[][]{TestBytes.family, TestBytes.family2}, false, - Lists.newArrayList(TestRegionObserver.class.getName())); + Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName())); try { TransactionAwareHTable txTable = new TransactionAwareHTable(hTable); TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); @@ -1117,7 +1063,7 @@ public void testExistingData() throws Exception { TransactionAwareHTable txTable = new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family}, true, - Collections.emptyList())); + Collections.singletonList(TransactionProcessor.class.getName()))); TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); // Add some pre-existing, non-transactional data @@ -1266,8 +1212,9 @@ private void verifyScan(HTableInterface table, Scan scan, List expecte @Test public void testVisibilityAll() throws Exception { - HTable nonTxTable = createTable(Bytes.toBytes("testVisibilityAll"), - new byte[][]{TestBytes.family, TestBytes.family2}, true, Collections.emptyList()); + HTable nonTxTable = + createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2}, + true, Collections.singletonList(TransactionProcessor.class.getName())); TransactionAwareHTable txTable = new TransactionAwareHTable(nonTxTable, TxConstants.ConflictDetection.ROW); // ROW conflict detection to verify family deletes From d3b0fa716ed2ef5a2b7c9cf37879a09d4e032bdc Mon Sep 17 00:00:00 2001 From: poorna Date: Fri, 28 Oct 2016 15:12:23 -0700 Subject: [PATCH 2/3] TEPHRA-35 Save compaction state for pruning invalid list --- .../java/org/apache/tephra/TxConstants.java | 10 + .../java/org/apache/tephra/util/TxUtils.java | 17 ++ .../org/apache/tephra/util/TxUtilsTest.java | 22 ++ .../coprocessor/TransactionProcessor.java | 33 ++- .../coprocessor/janitor/CompactionState.java | 92 ++++++++ .../coprocessor/janitor/DataJanitorState.java | 71 ++++++ .../tephra/hbase/AbstractHBaseTableTest.java | 4 +- .../tephra/hbase/InvalidListPruneTest.java | 210 ++++++++++++++++++ 8 files changed, 456 insertions(+), 3 deletions(-) create mode 100644 tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java create mode 100644 tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java create mode 100644 tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java index b9a7929a..25451b3b 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java +++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java @@ -345,4 +345,14 @@ public static final class TransactionLog { public static final byte CURRENT_VERSION = 3; } + /** + * Configuration for data janitor + */ + public static final class DataJanitor { + public static final String PRUNE_ENABLE = "data.tx.prune.enable"; + public static final String PRUNE_STATE_TABLE = "data.tx.prune.state.table"; + + public static final boolean DEFAULT_PRUNE_ENABLE = false; + public static final String DEFAULT_PRUNE_STATE_TABLE = "data_tx_janitor_state"; + } } diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java index 08b1545d..b3d4ace4 100644 --- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java +++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java @@ -149,4 +149,21 @@ private static long getMaxTTL(Map ttlByFamily) { public static boolean isPreExistingVersion(long version) { return version < MAX_NON_TX_TIMESTAMP; } + + /** + * Returns the maximum transaction that can be removed from the invalid list for the state represented by the given + * transaction. + */ + public static long getPruneUpperBound(Transaction tx) { + // If there are no invalid transactions, and no in-progress transactions then we can prune the invalid list + // up to the current read pointer + if (tx.getInvalids().length == 0 && tx.getInProgress().length == 0) { + return tx.getReadPointer() - 1; + } + + long maxInvalidTx = + tx.getInvalids().length > 0 ? tx.getInvalids()[tx.getInvalids().length - 1] : Transaction.NO_TX_IN_PROGRESS; + long firstInProgress = tx.getFirstInProgress(); + return Math.min(maxInvalidTx, firstInProgress - 1); + } } diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java index 7743105b..db687fe7 100644 --- a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java @@ -19,6 +19,7 @@ package org.apache.tephra.util; import org.apache.tephra.Transaction; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -32,4 +33,25 @@ public void testMaxVisibleTimestamp() { // make sure we don't overflow with MAX_VALUE write pointer assertEquals(Long.MAX_VALUE, TxUtils.getMaxVisibleTimestamp(Transaction.ALL_VISIBLE_LATEST)); } + + @Test + public void testPruneUpperBound() { + Transaction tx = new Transaction(100, 100, new long[] {10, 30}, new long[] {80, 90}, 80); + Assert.assertEquals(30, TxUtils.getPruneUpperBound(tx)); + + tx = new Transaction(100, 100, new long[] {10, 95}, new long[] {80, 90}, 80); + Assert.assertEquals(79, TxUtils.getPruneUpperBound(tx)); + + tx = new Transaction(100, 110, new long[] {10}, new long[] {}, Transaction.NO_TX_IN_PROGRESS); + Assert.assertEquals(10, TxUtils.getPruneUpperBound(tx)); + + tx = new Transaction(100, 110, new long[] {}, new long[] {60}, 60); + Assert.assertEquals(59, TxUtils.getPruneUpperBound(tx)); + + tx = new Transaction(100, 110, new long[] {}, new long[] {50}, 50); + Assert.assertEquals(49, TxUtils.getPruneUpperBound(tx)); + + tx = new Transaction(100, 110, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS); + Assert.assertEquals(99, TxUtils.getPruneUpperBound(tx)); + } } diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 14941b36..9f723d67 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -54,6 +56,7 @@ import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; +import org.apache.tephra.hbase.coprocessor.janitor.CompactionState; import org.apache.tephra.persist.TransactionVisibilityState; import org.apache.tephra.util.TxUtils; @@ -99,6 +102,7 @@ public class TransactionProcessor extends BaseRegionObserver { private TransactionStateCache cache; private final TransactionCodec txCodec; + private CompactionState compactionState; protected Map ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; @@ -138,6 +142,16 @@ public void start(CoprocessorEnvironment e) throws IOException { if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } + + boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.DataJanitor.PRUNE_ENABLE, + TxConstants.DataJanitor.DEFAULT_PRUNE_ENABLE); + if (pruneEnabled) { + String pruneTable = env.getConfiguration().get(TxConstants.DataJanitor.PRUNE_STATE_TABLE, + TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE); + compactionState = new CompactionState(env, TableName.valueOf(pruneTable)); + LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + + pruneTable); + } } } @@ -268,10 +282,27 @@ public InternalScanner preCompactScannerOpen(ObserverContext scanners, ScanType scanType, long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException { - return createStoreScanner(c.getEnvironment(), "compaction", cache.getLatestState(), store, scanners, + // Get the latest tx snapshot state for the compaction + TransactionVisibilityState snapshot = cache.getLatestState(); + + // Record tx state before the compaction + if (compactionState != null) { + compactionState.record(request, snapshot); + } + // Also make sure to use the same snapshot for the compaction + return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs); } + @Override + public void postCompact(ObserverContext e, Store store, StoreFile resultFile, + CompactionRequest request) throws IOException { + // Persist the compaction state after a succesful compaction + if (compactionState != null) { + compactionState.persist(); + } + } + protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action, TransactionVisibilityState snapshot, Store store, List scanners, ScanType type, diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java new file mode 100644 index 00000000..d02456a3 --- /dev/null +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.coprocessor.janitor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.tephra.Transaction; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.apache.tephra.util.TxUtils; + +import java.io.IOException; +import javax.annotation.Nullable; + +/** + * Record compaction state for invalid list pruning + */ +public class CompactionState { + private static final Log LOG = LogFactory.getLog(CompactionState.class); + + private final byte[] regionName; + private final String regionNameAsString; + private final TableName stateTable; + private final DataJanitorState dataJanitorState; + private volatile long pruneUpperBound = -1; + + public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) { + this.regionName = env.getRegionInfo().getRegionName(); + this.regionNameAsString = env.getRegionInfo().getRegionNameAsString(); + this.stateTable = stateTable; + this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return env.getTable(stateTable); + } + }); + } + + /** + * Records the transaction state used for a compaction. This method is called when the compaction starts. + * + * @param request {@link CompactionRequest} for the compaction + * @param snapshot transaction state that will be used for the compaction + */ + public void record(CompactionRequest request, @Nullable TransactionVisibilityState snapshot) { + if (request.isMajor() && snapshot != null) { + Transaction tx = TxUtils.createDummyTransaction(snapshot); + pruneUpperBound = TxUtils.getPruneUpperBound(tx); + LOG.debug( + String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s", + pruneUpperBound, request, snapshot.getTimestamp())); + } else { + pruneUpperBound = -1; + } + } + + /** + * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}. + * This method is called after the compaction has successfully completed. + */ + public void persist() { + if (pruneUpperBound != -1) { + try { + dataJanitorState.savePruneUpperBound(regionName, pruneUpperBound); + LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString)); + } catch (IOException e) { + LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s", + stateTable, regionNameAsString), e); + } + } + } +} diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java new file mode 100644 index 00000000..9d4f279f --- /dev/null +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.coprocessor.janitor; + +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; + +/** + * Persist data janitor state into an HBase table. + */ +public class DataJanitorState { + public static final byte[] FAMILY = {'f'}; + private static final byte[] PRUNE_UPPER_BOUND_COL = {'u'}; + private static final byte[] REGION_KEY_PREFIX = {0x1}; + + private final TableSupplier stateTableSupplier; + + + public DataJanitorState(TableSupplier stateTableSupplier) { + this.stateTableSupplier = stateTableSupplier; + } + + public void savePruneUpperBound(byte[] regionId, long pruneUpperBound) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Put put = new Put(makeRegionKey(regionId)); + put.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound)); + stateTable.put(put); + } + } + + public long getPruneUpperBound(byte[] regionId) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Get get = new Get(makeRegionKey(regionId)); + get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL); + return result == null ? -1 : Bytes.toLong(result); + } + } + + private byte[] makeRegionKey(byte[] regionId) { + return Bytes.add(REGION_KEY_PREFIX, regionId); + } + + /** + * Supplies table for persisting state + */ + public interface TableSupplier { + Table get() throws IOException; + } +} diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java index 68c43aee..cb8d695b 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java @@ -36,7 +36,7 @@ import java.util.List; /** - * + * Base class for tests that need a HBase cluster */ @SuppressWarnings("WeakerAccess") public abstract class AbstractHBaseTableTest { @@ -46,7 +46,7 @@ public abstract class AbstractHBaseTableTest { @BeforeClass public static void startMiniCluster() throws Exception { - testUtil = new HBaseTestingUtility(); + testUtil = conf == null ? new HBaseTestingUtility() : new HBaseTestingUtility(conf); conf = testUtil.getConfiguration(); // Tune down the connection thread pool size diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java new file mode 100644 index 00000000..ebf58eb1 --- /dev/null +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedMap; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionType; +import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.TransactionStateCache; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.hbase.coprocessor.janitor.DataJanitorState; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.apache.tephra.persist.TransactionSnapshot; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +/** + * Test invalid list pruning + */ +public class InvalidListPruneTest extends AbstractHBaseTableTest { + private static final byte[] family = Bytes.toBytes("f1"); + private static final byte[] qualifier = Bytes.toBytes("col1"); + + private static TableName dataTable; + private static TableName pruneStateTable; + + // Override AbstractHBaseTableTest.startMiniCluster to setup configuration + @BeforeClass + public static void startMiniCluster() throws Exception { + // Setup the configuration to start HBase cluster with the invalid list pruning enabled + conf = HBaseConfiguration.create(); + conf.setBoolean(TxConstants.DataJanitor.PRUNE_ENABLE, true); + AbstractHBaseTableTest.startMiniCluster(); + + TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage(); + TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); + txManager.startAndWait(); + + // Do some transactional data operations + dataTable = TableName.valueOf("invalidListPruneTestTable"); + HTable hTable = createTable(dataTable.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) { + TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); + txContext.start(); + for (int i = 0; i < 10; ++i) { + txTable.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, Bytes.toBytes(i))); + } + txContext.finish(); + } + + testUtil.flush(dataTable); + txManager.stopAndWait(); + + pruneStateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE, + TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE)); + } + + @AfterClass + public static void shutdownAfterClass() throws Exception { + hBaseAdmin.disableTable(dataTable); + hBaseAdmin.deleteTable(dataTable); + } + + @Before + public void beforeTest() throws Exception { + HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false, + // Prune state table is a non-transactional table, hence no transaction co-processor + Collections.emptyList()); + table.close(); + } + + @After + public void afterTest() throws Exception { + hBaseAdmin.disableTable(pruneStateTable); + hBaseAdmin.deleteTable(pruneStateTable); + } + + @Test + public void testRecordCompactionState() throws Exception { + DataJanitorState dataJanitorState = + new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return testUtil.getConnection().getTable(pruneStateTable); + } + }); + + // No prune upper bound initially + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0)))); + + // Create a new transaction snapshot + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L), + ImmutableSortedMap.of())); + // Run minor compaction + testUtil.compact(dataTable, false); + // No prune upper bound after minor compaction too + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0)))); + + // Run major compaction, and verify prune upper bound + testUtil.compact(dataTable, true); + Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0)))); + + // Run major compaction again with same snapshot, prune upper bound should not change + testUtil.compact(dataTable, true); + Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0)))); + + // Create a new transaction snapshot + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L), + ImmutableSortedMap.of( + 105L, new TransactionManager.InProgressTx(100, 30, TransactionType.SHORT) + ) + )); + Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0)))); + + // Run major compaction again, now prune upper bound should change + testUtil.compact(dataTable, true); + Assert.assertEquals(104, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0)))); + } + + private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException { + HRegionLocation regionLocation = + testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row); + return regionLocation.getRegionInfo().getRegionName(); + } + + /** + * A transaction co-processor that uses in-memory {@link TransactionSnapshot} for testing + */ + @SuppressWarnings("WeakerAccess") + public static class TestTransactionProcessor extends TransactionProcessor { + @Override + protected Supplier getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + return new Supplier() { + @Override + public TransactionStateCache get() { + return new InMemoryTransactionStateCache(); + } + }; + } + } + + /** + * Used to supply in-memory {@link TransactionSnapshot} to {@link TestTransactionProcessor} for testing + */ + @SuppressWarnings("WeakerAccess") + public static class InMemoryTransactionStateCache extends TransactionStateCache { + private static TransactionVisibilityState transactionSnapshot; + + public static void setTransactionSnapshot(TransactionVisibilityState transactionSnapshot) { + InMemoryTransactionStateCache.transactionSnapshot = transactionSnapshot; + } + + @Override + protected void startUp() throws Exception { + // Nothing to do + } + + @Override + protected void shutDown() throws Exception { + // Nothing to do + } + + @Override + public TransactionVisibilityState getLatestState() { + return transactionSnapshot; + } + } +} From 3fd4bd8774ffb63500e6e2a0f53a2895bedef1ae Mon Sep 17 00:00:00 2001 From: poorna Date: Mon, 7 Nov 2016 15:28:48 -0800 Subject: [PATCH 3/3] Reduce test stdout output due to Travis log output limit of 4 MB --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 898142de..fb3a64c5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,7 +29,7 @@ branches: - /^hotfix\/.*$/ - /^release\/.*$/ -script: mvn test -Dsurefire.redirectTestOutputToFile=false +script: mvn test sudo: false