Skip to content

Commit

Permalink
SOLR-15635: don't repeat close hooks if SRI cleared twice due to usin…
Browse files Browse the repository at this point in the history
…g MDCThreadPool (#376)

* SOLR-15635: don't repeat close hooks if SRI cleared twice
* user impact: fromCore is closed by {!join} if used under `/export`
* Use reference counting to close on last close.
Also:
* trace logging
* protected -> private

Co-authored-by: David Smiley <dsmiley@apache.org>
Co-authored-by: Mikhail Khludnev <mkhl@apache.com>
  • Loading branch information
3 people committed Nov 15, 2021
1 parent f088857 commit dbdd2d4
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 31 deletions.
3 changes: 3 additions & 0 deletions solr/CHANGES.txt
Expand Up @@ -459,6 +459,9 @@ Bug Fixes

* SOLR-15696: Incremental backups no longer fail on collections that had previously seen a 'SPLITSHARD' operation. (Jason Gerlowski)

* SOLR-15635: Don't close hooks twice when SolrRequestInfo is cleared twice; or /export with classic join
closed fromCore if provided (Mikhail Khludnev, David Smiley)

* SOLR-15628: The SolrException.log() helper method has been fixed to correctly passes the Throwable to the Logger w/o stringification (hossman)

* SOLR-15676: Fix PeerSync failure due to RealTimeGetComponent returning duplicates. (Ramsey Haddad, Christine Poerschke, David Smiley)
Expand Down
70 changes: 48 additions & 22 deletions solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
Expand Up @@ -40,19 +40,21 @@
/** Information about the Solr request/response held in a {@link ThreadLocal}. */
public class SolrRequestInfo {

protected static final int MAX_STACK_SIZE = 10;
private static final int MAX_STACK_SIZE = 10;

protected static final ThreadLocal<Deque<SolrRequestInfo>> threadLocal = ThreadLocal.withInitial(LinkedList::new);
private static final ThreadLocal<Deque<SolrRequestInfo>> threadLocal = ThreadLocal.withInitial(LinkedList::new);

protected SolrQueryRequest req;
protected SolrQueryResponse rsp;
protected Date now;
private int refCount = 1; // prevent closing when still used

private SolrQueryRequest req;
private SolrQueryResponse rsp;
private Date now;
public HttpServletRequest httpRequest;
protected TimeZone tz;
protected ResponseBuilder rb;
protected List<Closeable> closeHooks;
protected SolrDispatchFilter.Action action;
protected boolean useServerToken = false;
private TimeZone tz;
private ResponseBuilder rb;
private List<Closeable> closeHooks;
private SolrDispatchFilter.Action action;
private boolean useServerToken = false;

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand All @@ -70,23 +72,25 @@ public static void setRequestInfo(SolrRequestInfo info) {
Deque<SolrRequestInfo> stack = threadLocal.get();
if (info == null) {
throw new IllegalArgumentException("SolrRequestInfo is null");
} else if (stack.size() <= MAX_STACK_SIZE) {
stack.push(info);
} else {
} else if (stack.size() > MAX_STACK_SIZE) {
assert false : "SolrRequestInfo Stack is full";
log.error("SolrRequestInfo Stack is full");
}
log.trace("{} {}", info, "setRequestInfo()");
assert !info.isClosed() : "SRI is already closed (odd).";
stack.push(info);
}

/** Removes the most recent SolrRequestInfo from the stack */
/** Removes the most recent SolrRequestInfo from the stack. Close hooks are called. */
public static void clearRequestInfo() {
log.trace("clearRequestInfo()");
Deque<SolrRequestInfo> stack = threadLocal.get();
if (stack.isEmpty()) {
assert false : "clearRequestInfo called too many times";
log.error("clearRequestInfo called too many times");
} else {
SolrRequestInfo info = stack.pop();
closeHooks(info);
info.close();
}
}

Expand All @@ -95,25 +99,33 @@ public static void clearRequestInfo() {
* we expect it to be empty by now because all "set" calls need to be balanced with a "clear".
*/
public static void reset() {
log.trace("reset()");
Deque<SolrRequestInfo> stack = threadLocal.get();
boolean isEmpty = stack.isEmpty();
assert stack.isEmpty() : "SolrRequestInfo Stack should have been cleared.";
while (!stack.isEmpty()) {
SolrRequestInfo info = stack.pop();
closeHooks(info);
info.close();
}
assert isEmpty : "SolrRequestInfo Stack should have been cleared.";
}

private static void closeHooks(SolrRequestInfo info) {
if (info.closeHooks != null) {
for (Closeable hook : info.closeHooks) {
private synchronized void close() {
log.trace("{} {}", this, "close()");

if (--refCount > 0) {
log.trace("{} {}", this, "not closing; still referenced");
return;
}

if (closeHooks != null) {
for (Closeable hook : closeHooks) {
try {
hook.close();
} catch (Exception e) {
SolrException.log(log, "Exception during close hook", e);
}
}
}
closeHooks = null;
}

public SolrRequestInfo(SolrQueryRequest req, SolrQueryResponse rsp) {
Expand Down Expand Up @@ -186,6 +198,9 @@ public void setResponseBuilder(ResponseBuilder rb) {
public void addCloseHook(Closeable hook) {
// is this better here, or on SolrQueryRequest?
synchronized (this) {
if (isClosed()) {
throw new IllegalStateException("Already closed!");
}
if (closeHooks == null) {
closeHooks = new LinkedList<>();
}
Expand Down Expand Up @@ -213,12 +228,23 @@ public void setUseServerToken(boolean use) {
this.useServerToken = use;
}

private synchronized boolean isClosed() {
return refCount <= 0;
}

public static ExecutorUtil.InheritableThreadLocalProvider getInheritableThreadLocalProvider() {
return new ExecutorUtil.InheritableThreadLocalProvider() {
@Override
public void store(AtomicReference<Object> ctx) {
SolrRequestInfo me = SolrRequestInfo.getRequestInfo();
if (me != null) ctx.set(me);
if (me != null) {
// increase refCount in store(), while we're still in the thread of the provider to avoid
// a race if this thread finishes its work before the pool'ed thread runs
synchronized (me) {
me.refCount++;
}
ctx.set(me);
}
}

@Override
Expand Down
27 changes: 18 additions & 9 deletions solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java
Expand Up @@ -53,17 +53,17 @@ public static void beforeTests() throws Exception {

fromCore = coreContainer.create("fromCore", ImmutableMap.of("configSet", "minimal"));

assertU(add(doc("id", "1", "name", "john", "title", "Director", "dept_s", "Engineering")));
assertU(add(doc("id", "2", "name", "mark", "title", "VP", "dept_s", "Marketing")));
assertU(add(doc("id", "3", "name", "nancy", "title", "MTS", "dept_s", "Sales")));
assertU(add(doc("id", "4", "name", "dave", "title", "MTS", "dept_s", "Support", "dept_s", "Engineering")));
assertU(add(doc("id", "5", "name", "tina", "title", "VP", "dept_s", "Engineering")));
assertU(add(doc("id", "1", "id_s_dv", "1", "name", "john", "title", "Director", "dept_s", "Engineering")));
assertU(add(doc("id", "2", "id_s_dv", "2", "name", "mark", "title", "VP", "dept_s", "Marketing")));
assertU(add(doc("id", "3", "id_s_dv", "3", "name", "nancy", "title", "MTS", "dept_s", "Sales")));
assertU(add(doc("id", "4", "id_s_dv", "4", "name", "dave", "title", "MTS", "dept_s", "Support", "dept_s", "Engineering")));
assertU(add(doc("id", "5", "id_s_dv", "5", "name", "tina", "title", "VP", "dept_s", "Engineering")));
assertU(commit());

update(fromCore, add(doc("id", "10", "dept_id_s", "Engineering", "text", "These guys develop stuff", "cat", "dev")));
update(fromCore, add(doc("id", "11", "dept_id_s", "Marketing", "text", "These guys make you look good")));
update(fromCore, add(doc("id", "12", "dept_id_s", "Sales", "text", "These guys sell stuff")));
update(fromCore, add(doc("id", "13", "dept_id_s", "Support", "text", "These guys help customers")));
update(fromCore, add(doc("id", "10", "id_s_dv", "10", "dept_id_s", "Engineering", "text", "These guys develop stuff", "cat", "dev")));
update(fromCore, add(doc("id", "11", "id_s_dv", "11", "dept_id_s", "Marketing", "text", "These guys make you look good")));
update(fromCore, add(doc("id", "12", "id_s_dv", "12", "dept_id_s", "Sales", "text", "These guys sell stuff")));
update(fromCore, add(doc("id", "13", "id_s_dv", "13", "dept_id_s", "Support", "text", "These guys help customers")));
update(fromCore, commit());

}
Expand Down Expand Up @@ -91,6 +91,15 @@ void doTestJoin(String joinPrefix) throws Exception {
, "/response=={'numFound':3,'start':0,'numFoundExact':true,'docs':[{'id':'1'},{'id':'4'},{'id':'5'}]}"
);

assertJQ(req( "qt", "/export",
"q", joinPrefix + " from=dept_id_s to=dept_s fromIndex=fromCore}cat:dev", "fl", "id_s_dv",
"sort", "id_s_dv asc",
"debugQuery", random().nextBoolean() ? "true":"false")
, "/response=={'numFound':3,'docs':[{'id_s_dv':'1'},{'id_s_dv':'4'},{'id_s_dv':'5'}]}"
);
assertFalse(fromCore.isClosed());
assertFalse(h.getCore().isClosed());

// find people that develop stuff - but limit via filter query to a name of "john"
// this tests filters being pushed down to queries (SOLR-3062)
assertJQ(req("q", joinPrefix + " from=dept_id_s to=dept_s fromIndex=fromCore}cat:dev", "fl", "id", "fq", "name:john",
Expand Down
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.request;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.response.SolrQueryResponse;
import org.junit.BeforeClass;

public class TestSolrRequestInfo extends SolrTestCaseJ4 {

@BeforeClass
public static void beforeClass() throws Exception {
initCore("solrconfig.xml","schema11.xml");
}

public void testCloseHookTwice(){
final SolrRequestInfo info = new SolrRequestInfo(
new LocalSolrQueryRequest(h.getCore(), params()),
new SolrQueryResponse());
AtomicInteger counter = new AtomicInteger();
info.addCloseHook(counter::incrementAndGet);
SolrRequestInfo.setRequestInfo(info);
SolrRequestInfo.setRequestInfo(info);
SolrRequestInfo.clearRequestInfo();
assertNotNull(SolrRequestInfo.getRequestInfo());
SolrRequestInfo.clearRequestInfo();
assertEquals("hook should be closed only once", 1, counter.get());
assertNull(SolrRequestInfo.getRequestInfo());
}

public void testThreadPool() throws InterruptedException {
final SolrRequestInfo info = new SolrRequestInfo(
new LocalSolrQueryRequest(h.getCore(), params()),
new SolrQueryResponse());
AtomicInteger counter = new AtomicInteger();

SolrRequestInfo.setRequestInfo(info);
ExecutorUtil.MDCAwareThreadPoolExecutor pool = new ExecutorUtil.MDCAwareThreadPoolExecutor(1, 1, 1,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
AtomicBoolean run = new AtomicBoolean(false);
pool.execute(() -> {
final SolrRequestInfo poolInfo = SolrRequestInfo.getRequestInfo();
assertSame(info, poolInfo);
info.addCloseHook(counter::incrementAndGet);
run.set(true);
});
if (random().nextBoolean()) {
pool.shutdown();
} else {
pool.shutdownNow();
}
SolrRequestInfo.clearRequestInfo();
SolrRequestInfo.reset();

pool.awaitTermination(1, TimeUnit.MINUTES);
assertTrue(run.get());
assertEquals("hook should be closed only once", 1, counter.get());
assertNull(SolrRequestInfo.getRequestInfo());
}
}

0 comments on commit dbdd2d4

Please sign in to comment.