Skip to content

Commit

Permalink
HBASE-16721 Concurrency issue in WAL unflushed seqId tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
enis committed Sep 29, 2016
1 parent 7639671 commit bf3c928
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 2 deletions.
Expand Up @@ -49,7 +49,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface WAL {
public interface WAL extends AutoCloseable {

/**
* Registers WALActionsListener
Expand Down
Expand Up @@ -72,7 +72,7 @@

public abstract class AbstractTestFSWAL {

private static final Log LOG = LogFactory.getLog(AbstractTestFSWAL.class);
protected static final Log LOG = LogFactory.getLog(AbstractTestFSWAL.class);

protected static Configuration CONF;
protected static FileSystem FS;
Expand Down
Expand Up @@ -23,6 +23,10 @@
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -32,14 +36,21 @@
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALKey;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import static org.junit.Assert.assertEquals;

/**
* Provides FSHLog test cases.
*/
Expand Down Expand Up @@ -101,4 +112,98 @@ public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldExcepti
log.close();
}
}

/**
* Test case for https://issues.apache.org/jira/browse/HBASE-16721
*/
@Test (timeout = 30000)
public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
final String name = "testSyncRunnerIndexOverflow";
final byte[] b = Bytes.toBytes("b");

final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
final CountDownLatch holdAppend = new CountDownLatch(1);
final CountDownLatch flushFinished = new CountDownLatch(1);
final CountDownLatch putFinished = new CountDownLatch(1);

try (FSHLog log =
new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF,
null, true, null, null)) {

log.registerWALActionsListener(new WALActionsListener.Base() {
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit)
throws IOException {
if (startHoldingForAppend.get()) {
try {
holdAppend.await();
} catch (InterruptedException e) {
LOG.error(e);
}
}
}
});

// open a new region which uses this WAL
HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor(b));
HRegionInfo hri =
new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);

final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
ExecutorService exec = Executors.newFixedThreadPool(2);

// do a regular write first because of memstore size calculation.
region.put(new Put(b).addColumn(b, b,b));

startHoldingForAppend.set(true);
exec.submit(new Runnable() {
@Override
public void run() {
try {
region.put(new Put(b).addColumn(b, b,b));
putFinished.countDown();
} catch (IOException e) {
LOG.error(e);
}
}
});

// give the put a chance to start
Threads.sleep(3000);

exec.submit(new Runnable() {
@Override
public void run() {
try {
Region.FlushResult flushResult = region.flush(true);
LOG.info("Flush result:" + flushResult.getResult());
LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
flushFinished.countDown();
} catch (IOException e) {
LOG.error(e);
}
}
});

// give the flush a chance to start. Flush should have got the region lock, and
// should have been waiting on the mvcc complete after this.
Threads.sleep(3000);

// let the append to WAL go through now that the flush already started
holdAppend.countDown();
putFinished.await();
flushFinished.await();

// check whether flush went through
assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size());

// now check the region's unflushed seqIds.
long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes());
assertEquals("Found seqId for the region which is already flushed",
HConstants.NO_SEQNUM, seqId);

region.close();
}
}
}

0 comments on commit bf3c928

Please sign in to comment.