Skip to content

Commit

Permalink
TEPHRA-219 execute cross region calls in coprocessor as the login user.
Browse files Browse the repository at this point in the history
Make pruneEnable and txMaxLifetimeMillis volatile so that derived
classes can make use of it.

Introduced stopped variable in PruneUpperBoundWriter.
  • Loading branch information
gokulavasan committed Feb 13, 2017
1 parent 13201db commit 9bc97ee
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 25 deletions.
Expand Up @@ -106,12 +106,15 @@ public class TransactionProcessor extends BaseRegionObserver {

private final TransactionCodec txCodec;
private TransactionStateCache cache;
private CompactionState compactionState;
private TableName pruneTable;
private long pruneFlushInterval;
private volatile CompactionState compactionState;

protected volatile Boolean pruneEnable;
protected volatile Long txMaxLifetimeMillis;
protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
protected Long txMaxLifetimeMillis;
private Boolean pruneEnable;

public TransactionProcessor() {
this.txCodec = new TransactionCodec();
Expand Down Expand Up @@ -319,28 +322,30 @@ public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEn

if (pruneEnable == null) {
Configuration conf = getConfiguration(c.getEnvironment());
// Configuration won't be null in TransactionProcessor but the derived classes might return
// null if it is not available temporarily
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
if (Boolean.TRUE.equals(pruneEnable)) {
String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
if (LOG.isDebugEnabled()) {
LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+ pruneTable);
}
}
// pruneTable and pruneFlushInterval cannot be changed by simply loading the Configuration dynamically
// since we have only one flush thread across all regions and we might loose.
pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
}
}

if (Boolean.TRUE.equals(pruneEnable)) {
// Record tx state before the compaction
// If the compactionState is null, we need to create a new compaction state object
if (compactionState == null) {
compactionState = new CompactionState(c.getEnvironment(), pruneTable, pruneFlushInterval);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s. Compaction state " +
"will be recorded in table %s",
c.getEnvironment().getRegionInfo().getTable().getNameWithNamespaceInclAsString(),
pruneTable.getNameWithNamespaceInclAsString()));
}
}
compactionState.record(request, snapshot);
}

Expand All @@ -351,8 +356,8 @@ public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEn
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
// Persist the compaction state after a succesful compaction
if (compactionState != null) {
// Persist the compaction state after a successful compaction
if (compactionState != null && Boolean.TRUE.equals(pruneEnable)) {
compactionState.persist();
}
}
Expand Down Expand Up @@ -411,13 +416,14 @@ protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
return;
}


if (txMaxLifetimeMillis == null) {
Configuration conf = getConfiguration(env);
// Configuration won't be null in TransactionProcessor but the derived classes might return
// null if it is not available temporarily
if (conf != null) {
this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
} else {
throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
"unavailable. Please retry the operation."));
Expand Down
Expand Up @@ -38,6 +38,9 @@
public class CompactionState {
private static final Log LOG = LogFactory.getLog(CompactionState.class);

private final TableName stateTable;
private final long pruneFlushInterval;

private final byte[] regionName;
private final String regionNameAsString;
private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier;
Expand All @@ -57,6 +60,8 @@ public Table get() throws IOException {
this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState,
pruneFlushInterval);
this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get();
this.stateTable = stateTable;
this.pruneFlushInterval = pruneFlushInterval;
}

/**
Expand Down
Expand Up @@ -22,9 +22,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,6 +44,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
private final ConcurrentSkipListMap<byte[], Long> pruneEntries;

private volatile Thread flushThread;
private volatile boolean stopped;

private long lastChecked;

Expand Down Expand Up @@ -71,6 +74,7 @@ protected void startUp() throws Exception {
@Override
protected void shutDown() throws Exception {
LOG.info("Stopping PruneUpperBoundWriter Thread.");
stopped = true;
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
Expand All @@ -81,14 +85,20 @@ private void startFlushThread() {
flushThread = new Thread("tephra-prune-upper-bound-writer") {
@Override
public void run() {
while ((!isInterrupted()) && isRunning()) {
while ((!isInterrupted()) && (!stopped)) {
long now = System.currentTimeMillis();
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
while (pruneEntries.firstEntry() != null) {
Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
final Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
return null;
}
});
// We can now remove the entry only if the key and value match with what we wrote since it is
// possible that a new pruneUpperBound for the same key has been added
pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
Expand Down

0 comments on commit 9bc97ee

Please sign in to comment.