Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ public void close() {
}
}

@VisibleForTesting
int getOpenScannerCount() {
return this.openScannerCount.get();
}

/**
* Called when opening a scanner on the data of this MemStoreLAB
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,12 +895,14 @@ public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreSc
// need for the updateReaders() to happen.
LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
// no lock acquired.
clearAndClose(memStoreScanners);
return;
}
// lock acquired
updateReaders = true;
if (this.closing) {
LOG.debug("StoreScanner already closing. There is no need to updateReaders");
clearAndClose(memStoreScanners);
return;
}
flushed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Random;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -45,6 +46,7 @@
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
Expand Down Expand Up @@ -113,6 +115,61 @@ public static void setUp() throws Exception {
region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
}

@Test
public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Exception {
Put p = new Put(Bytes.toBytes("row"));
p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
region.put(p);
// create the store scanner here.
// for easiness, use Long.MAX_VALUE as read pt
try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo,
new Scan(), getCols("q1"), Long.MAX_VALUE)) {
p = new Put(Bytes.toBytes("row1"));
p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
region.put(p);
HStore store = region.getStore(fam);
ReentrantReadWriteLock lock = store.lock;
// use the lock to manually get a new memstore scanner. this is what
// HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here
//since it is just a testcase).
lock.readLock().lock();
final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE);
lock.readLock().unlock();
Thread closeThread = new Thread() {
public void run() {
// close should be completed
scan.close(false, true);
}
};
closeThread.start();
Thread updateThread = new Thread() {
public void run() {
try {
// use the updated memstoreScanners and pass it to updateReaders
scan.updateReaders(true, Collections.emptyList(), memScanners);
} catch (IOException e) {
e.printStackTrace();
}
}
};
updateThread.start();
// wait for close and updateThread to complete
closeThread.join();
updateThread.join();
MemStoreLAB memStoreLAB;
for (KeyValueScanner scanner : memScanners) {
if (scanner instanceof SegmentScanner) {
memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
if (memStoreLAB != null) {
// There should be no unpooled chunks
int openScannerCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
assertTrue("The memstore should not have unpooled chunks", openScannerCount == 0);
}
}
}
}
}

@Test
public void testScannerCloseAndUpdateReaders1() throws Exception {
testScannerCloseAndUpdateReaderInternal(true, false);
Expand Down