Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24079 [Flakey Tests] Misc fixes and debug; fix BindException in… #1388

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Queue;
Expand All @@ -32,12 +29,12 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
* ClientAsyncPrefetchScanner implements async scanner behaviour.
Expand All @@ -55,9 +52,7 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
private long maxCacheSize;
private AtomicLong cacheSizeInBytes;
// exception queue (from prefetch to main scan execution)
private Queue<Exception> exceptionsQueue;
// prefetch thread to be executed asynchronously
private Thread prefetcher;
private final Queue<Exception> exceptionsQueue;
// used for testing
private Consumer<Boolean> prefetchListener;

Expand All @@ -71,6 +66,8 @@ public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableN
int replicaCallTimeoutMicroSecondScan) throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
replicaCallTimeoutMicroSecondScan);
exceptionsQueue = new ConcurrentLinkedQueue<>();
Threads.setDaemonThreadRunning(new Thread(new PrefetchRunnable()), name + ".asyncPrefetcher");
}

@VisibleForTesting
Expand All @@ -80,13 +77,10 @@ void setPrefetchListener(Consumer<Boolean> prefetchListener) {

@Override
protected void initCache() {
// concurrent cache
maxCacheSize = resultSize2CacheSize(maxScannerResultSize);
// Override to put a different cache in place of the super's -- a concurrent one.
cache = new LinkedBlockingQueue<>();
maxCacheSize = resultSize2CacheSize(maxScannerResultSize);
cacheSizeInBytes = new AtomicLong(0);
exceptionsQueue = new ConcurrentLinkedQueue<>();
prefetcher = new Thread(new PrefetchRunnable());
Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher");
}

private long resultSize2CacheSize(long maxResultSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,13 @@ public class BucketCache implements BlockCache, HeapSize {
private final AtomicLong accessCount = new AtomicLong();

private static final int DEFAULT_CACHE_WAIT_TIME = 50;
// Used in test now. If the flag is false and the cache speed is very fast,
// bucket cache will skip some blocks when caching. If the flag is true, we
// will wait blocks flushed to IOEngine for some time when caching

/**
* Used in tests. If this flag is false and the cache speed is very fast,
* bucket cache will skip some blocks when caching. If the flag is true, we
* will wait until blocks are flushed to IOEngine.
*/
@VisibleForTesting
boolean wait_when_cache = false;

private final BucketCacheStats cacheStats = new BucketCacheStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
Expand Down Expand Up @@ -95,19 +93,23 @@ public boolean start() {
if (!super.start()) {
return false;
}
if (master.isStopped()) {
LOG.debug("Stopped");
return false;
}
// Around startup, if failed, some of the below may be set back to null so NPE is possible.
ServerManager sm = master.getServerManager();
if (sm == null) {
LOG.debug("ServerManager is null; stopping={}", master.isStopping());
LOG.debug("ServerManager is null");
return false;
}
sm.registerListener(this);
ProcedureExecutor<MasterProcedureEnv> pe = master.getMasterProcedureExecutor();
if (pe == null) {
LOG.debug("ProcedureExecutor is null; stopping={}", master.isStopping());
LOG.debug("ProcedureExecutor is null");
return false;
}
procedureEnv = pe.getEnvironment();
this.procedureEnv = pe.getEnvironment();
if (this.procedureEnv == null) {
LOG.debug("ProcedureEnv is null; stopping={}", master.isStopping());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand All @@ -35,7 +34,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
Expand All @@ -55,7 +53,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -103,7 +100,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
*/
private int regionSplitLimit;

/** @param server */
CompactSplit(HRegionServer server) {
this.server = server;
this.conf = server.getConfiguration();
Expand Down Expand Up @@ -192,12 +188,19 @@ public String dumpQueue() {

public synchronized boolean requestSplit(final Region r) {
// don't split regions that are blocking
if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= PRIORITY_USER) {
byte[] midKey = ((HRegion)r).checkSplit();
if (midKey != null) {
requestSplit(r, midKey);
return true;
HRegion hr = (HRegion)r;
try {
if (shouldSplitRegion() && hr.getCompactPriority() >= PRIORITY_USER) {
byte[] midKey = hr.checkSplit();
if (midKey != null) {
requestSplit(r, midKey);
return true;
}
}
} catch (IndexOutOfBoundsException e) {
// We get this sometimes. Not sure why. Catch and return false; no split request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add a jira reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, turns out the IndexOutOfBoundsException is thrown by a TestCompaction fail-checking test (?). Subsequent patch changes the test to make it plain when test throwing one of these. I think I've seen this in production too. When get more detail, will file JIRA.

LOG.warn("Catching out-of-bounds; region={}, policy={}", hr == null? null: hr.getRegionInfo(),
hr == null? "null": hr.getCompactPriority(), e);
}
return false;
}
Expand Down Expand Up @@ -244,8 +247,7 @@ default void completed(Store store) {
}

private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
new CompactionCompleteTracker() {
};
new CompactionCompleteTracker() {};

private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {

Expand Down Expand Up @@ -340,7 +342,8 @@ private void requestCompactionInternal(HRegion region, HStore store, String why,

CompactionContext compaction;
if (selectNow) {
Optional<CompactionContext> c = selectCompaction(region, store, priority, tracker, completeTracker, user);
Optional<CompactionContext> c =
selectCompaction(region, store, priority, tracker, completeTracker, user);
if (!c.isPresent()) {
// message logged inside
return;
Expand Down Expand Up @@ -650,8 +653,8 @@ private void doCompaction(User user) {
@Override
public void run() {
Preconditions.checkNotNull(server);
if (server.isStopped()
|| (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) {
if (server.isStopped() || (region.getTableDescriptor() != null &&
!region.getTableDescriptor().isCompactionEnabled())) {
region.decrementCompactionsQueuedCount();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testMultiThreadedGetClusterId() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
CachedClusterId cachedClusterId = new CachedClusterId(conf);
TestContext context = new TestContext(conf);
int numThreads = 100;
int numThreads = 16;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops my bad.. thanks for lowering it.

for (int i = 0; i < numThreads; i++) {
context.addThread(new GetClusterIdThread(context, cachedClusterId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -46,9 +45,13 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(LargeTests.class)
public class TestServerSideScanMetricsFromClientSide {
private static final Logger LOG =
LoggerFactory.getLogger(TestServerSideScanMetricsFromClientSide.class);

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
Expand Down Expand Up @@ -178,22 +181,27 @@ private void testRowsSeenMetric(boolean async) throws Exception {
baseScan = new Scan();
baseScan.setScanMetricsEnabled(true);
baseScan.setAsyncPrefetch(async);
testRowsSeenMetric(baseScan);

// Test case that only a single result will be returned per RPC to the serer
baseScan.setCaching(1);
testRowsSeenMetric(baseScan);

// Test case that partial results are returned from the server. At most one cell will be
// contained in each response
baseScan.setMaxResultSize(1);
testRowsSeenMetric(baseScan);

// Test case that size limit is set such that a few cells are returned per partial result from
// the server
baseScan.setCaching(NUM_ROWS);
baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1));
testRowsSeenMetric(baseScan);
try {
testRowsSeenMetric(baseScan);

// Test case that only a single result will be returned per RPC to the serer
baseScan.setCaching(1);
testRowsSeenMetric(baseScan);

// Test case that partial results are returned from the server. At most one cell will be
// contained in each response
baseScan.setMaxResultSize(1);
testRowsSeenMetric(baseScan);

// Test case that size limit is set such that a few cells are returned per partial result from
// the server
baseScan.setCaching(NUM_ROWS);
baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1));
testRowsSeenMetric(baseScan);
} catch (Throwable t) {
LOG.error("FAIL", t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why this? Doesn't junit log this anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug. Narrowing in on where exception comes up out of. Probably useless. Fell gets desperate.

throw t;
}
}

public void testRowsSeenMetric(Scan baseScan) throws Exception {
Expand All @@ -212,7 +220,8 @@ public void testRowsSeenMetric(Scan baseScan) throws Exception {
scan = new Scan(baseScan);
scan.withStartRow(ROWS[i - 1]);
scan.withStopRow(ROWS[ROWS.length - 1]);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length - i);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME,
ROWS.length - i);
}

// The filter should filter out all rows, but we still expect to see every row.
Expand Down Expand Up @@ -318,8 +327,11 @@ public void testRowsFilteredMetric(Scan baseScan) throws Exception {
public void testRowsFilteredMetric(Scan baseScan, Filter filter, int expectedNumFiltered)
throws Exception {
Scan scan = new Scan(baseScan);
if (filter != null) scan.setFilter(filter);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, expectedNumFiltered);
if (filter != null) {
scan.setFilter(filter);
}
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME,
expectedNumFiltered);
}

/**
Expand All @@ -334,7 +346,7 @@ public void testMetric(Scan scan, String metricKey, long expectedValue) throws E
ResultScanner scanner = TABLE.getScanner(scan);
// Iterate through all the results
while (scanner.next() != null) {

continue;
}
scanner.close();
ScanMetrics metrics = scanner.getScanMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -410,11 +409,11 @@ private void compactionTest(final TableName tableName, final int flushes,
}

long curt = System.currentTimeMillis();
long waitTime = 5000;
long waitTime = 10000;
long endt = curt + waitTime;
CompactionState state = admin.getCompactionState(tableName).get();
while (state == CompactionState.NONE && curt < endt) {
Thread.sleep(10);
Thread.sleep(1);
state = admin.getCompactionState(tableName).get();
curt = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,20 @@ public void testScanAfterDeletingSpecifiedRowV2() throws IOException, Interrupte
byte[] row = Bytes.toBytes("SpecifiedRow");
byte[] qual0 = Bytes.toBytes("qual0");
byte[] qual1 = Bytes.toBytes("qual1");
Delete d = new Delete(row);
long now = System.currentTimeMillis();
Delete d = new Delete(row, now);
table.delete(d);

Put put = new Put(row);
put.addColumn(FAMILY, null, VALUE);
put.addColumn(FAMILY, null, now + 1, VALUE);
table.put(put);

put = new Put(row);
put.addColumn(FAMILY, qual1, qual1);
put.addColumn(FAMILY, qual1, now + 2, qual1);
table.put(put);

put = new Put(row);
put.addColumn(FAMILY, qual0, qual0);
put.addColumn(FAMILY, qual0, now + 3, qual0);
table.put(put);

Result r = table.get(new Get(row));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,17 @@ private static String generateDummyMastersList(int size) {
@Test public void testRegistryRPCs() throws Exception {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
for (int numHedgedReqs = 1; numHedgedReqs <=3; numHedgedReqs++) {
final int size = activeMaster.getMetaRegionLocationCache().
getMetaRegionLocations().get().size();
// Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
// because not all replicas had made it up before test started.
TEST_UTIL.waitFor(10000,
() -> {
try (MasterRegistry registry = new MasterRegistry(conf)) {
return registry.getMetaRegionLocations().get().size() >= size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks always true since the registry actually gets the count from activeMaster.getMetaRegionLocationCache()?

I think the check should look something like activeMaster.getLocations().size() == NUM_REPLICAS? Actually we have a utility for this somewhere..I'm not able to find it now..:(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd know better. There is no getLocations. What you think you were referring to?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found it ..RegionReplicaTestHelper#waitUntilAllMetaReplicasAreReady().. sorry for the typo..the condition should be something like the above method..(I also fixed an edge case there to make the check tighter)..

}
});
for (int numHedgedReqs = 1; numHedgedReqs <= 3; numHedgedReqs++) {
if (numHedgedReqs == 1) {
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,10 +935,11 @@ public void testReadExpiredDataForRawScan() throws IOException {
@Test
public void testScanWithColumnsAndFilterAndVersion() throws IOException {
TableName tableName = name.getTableName();
long now = System.currentTimeMillis();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) {
for (int i = 0; i < 4; i++) {
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE);
put.addColumn(FAMILY, QUALIFIER, now + i, VALUE);
table.put(put);
}

Expand Down