From 81e793e582f93af6edf53620d49de6fa4bb21a6f Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 30 Apr 2015 12:54:28 -0700 Subject: [PATCH] HBASE-13420 RegionEnvironment.offerExecutionLatency Blocks Threads under Heavy Load --- .../regionserver/RegionCoprocessorHost.java | 8 +- .../util/BoundedConcurrentLinkedQueue.java | 114 ++++++++++++++++++ .../TestBoundedConcurrentLinkedQueue.java | 86 +++++++++++++ 3 files changed, 204 insertions(+), 4 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 6e239520dc60..8d7555cd9861 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -27,8 +27,6 @@ import java.util.Map; import java.util.NavigableSet; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.regex.Matcher; @@ -82,6 +80,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.BoundedConcurrentLinkedQueue; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CoprocessorClassLoader; import org.apache.hadoop.hbase.util.Pair; @@ -101,6 +100,7 @@ public class RegionCoprocessorHost new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK); /** + * * Encapsulation of the environment of each coprocessor */ static class RegionEnvironment extends CoprocessorHost.Environment @@ -110,8 +110,8 @@ static class RegionEnvironment extends CoprocessorHost.Environment private RegionServerServices rsServices; ConcurrentMap sharedData; private static final int LATENCY_BUFFER_SIZE = 100; - private final BlockingQueue coprocessorTimeNanos = new ArrayBlockingQueue( - LATENCY_BUFFER_SIZE); + private final BoundedConcurrentLinkedQueue coprocessorTimeNanos = + new BoundedConcurrentLinkedQueue(LATENCY_BUFFER_SIZE); private final boolean useLegacyPre; private final boolean useLegacyPost; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java new file mode 100644 index 000000000000..920823888903 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A ConcurrentLinkedQueue that enforces a maximum queue size. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class BoundedConcurrentLinkedQueue extends ConcurrentLinkedQueue { + private static final long serialVersionUID = 1L; + private volatile long size = 0; + private final long maxSize; + + public BoundedConcurrentLinkedQueue() { + this(Long.MAX_VALUE); + } + + public BoundedConcurrentLinkedQueue(long maxSize) { + super(); + this.maxSize = maxSize; + } + + @Override + public boolean add(T e) { + return offer(e); + } + + @Override + public boolean addAll(Collection c) { + size += c.size(); // Between here and below we might reject offers, + if (size > maxSize) { // if over maxSize, but that's ok + size -= c.size(); // We're over, just back out and return. + return false; + } + return super.addAll(c); // Always true for ConcurrentLinkedQueue + } + + @Override + public void clear() { + super.clear(); + size = 0; + } + + @Override + public boolean offer(T e) { + if (++size > maxSize) { + --size; // We didn't take it after all + return false; + } + return super.offer(e); // Always true for ConcurrentLinkedQueue + } + + @Override + public T poll() { + T result = super.poll(); + if (result != null) { + --size; + } + return result; + } + + @Override + public boolean remove(Object o) { + boolean result = super.remove(o); + if (result) { + --size; + } + return result; + } + + @Override + public int size() { + return (int) size; + } + + public void drainTo(Collection list) { + long removed = 0; + T l; + while ((l = super.poll()) != null) { + list.add(l); + removed++; + } + // Limit the number of operations on a volatile by only reporting size + // change after the drain is completed. + size -= removed; + } + + public long remainingCapacity() { + long remaining = maxSize - size; + return remaining >= 0 ? remaining : 0; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java new file mode 100644 index 000000000000..3453f24c96f5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MiscTests.class, SmallTests.class}) +public class TestBoundedConcurrentLinkedQueue { + private final static int CAPACITY = 16; + + private BoundedConcurrentLinkedQueue queue; + + @Before + public void setUp() throws Exception { + this.queue = new BoundedConcurrentLinkedQueue(CAPACITY); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testOfferAndPoll() throws Exception { + // Offer + for (long i = 1; i <= CAPACITY; ++i) { + assertTrue(queue.offer(i)); + assertEquals(i, queue.size()); + assertEquals(CAPACITY - i, queue.remainingCapacity()); + } + assertFalse(queue.offer(0L)); + + // Poll + for (int i = 1; i <= CAPACITY; ++i) { + long l = queue.poll(); + assertEquals(i, l); + assertEquals(CAPACITY - i, queue.size()); + assertEquals(i, queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testDrain() throws Exception { + // Offer + for (long i = 1; i <= CAPACITY; ++i) { + assertTrue(queue.offer(i)); + assertEquals(i, queue.size()); + assertEquals(CAPACITY - i, queue.remainingCapacity()); + } + assertFalse(queue.offer(0L)); + + // Drain + List list = new ArrayList(); + queue.drainTo(list); + assertEquals(null, queue.poll()); + assertEquals(0, queue.size()); + assertEquals(CAPACITY, queue.remainingCapacity()); + } +}