Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-15635: avoid redundant closeHooks invocation by MDCThreadPool #2609

Merged
merged 1 commit into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ Bug Fixes

* SOLR-15696: Incremental backups no longer fail on collections where a 'SPLITSHARD' operation previously occurred. (Jason Gerlowski)

* SOLR-15635: Don't close hooks twice when SolrRequestInfo is cleared twice; or /export with classic join
closed fromCore if provided (Mikhail Khludnev, David Smiley)

* SOLR-15628: The SolrException.log() helper method has been fixed to correctly passes the Throwable to the Logger w/o stringification (hossman)

* SOLR-15722: Delete Replica does not delete the Per replica state (noble)
Expand Down
70 changes: 48 additions & 22 deletions solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,21 @@
/** Information about the Solr request/response held in a {@link ThreadLocal}. */
public class SolrRequestInfo {

protected static final int MAX_STACK_SIZE = 10;
private static final int MAX_STACK_SIZE = 10;

protected static final ThreadLocal<Deque<SolrRequestInfo>> threadLocal = ThreadLocal.withInitial(LinkedList::new);
private static final ThreadLocal<Deque<SolrRequestInfo>> threadLocal = ThreadLocal.withInitial(LinkedList::new);

protected SolrQueryRequest req;
protected SolrQueryResponse rsp;
protected Date now;
private int refCount = 1; // prevent closing when still used

private SolrQueryRequest req;
private SolrQueryResponse rsp;
private Date now;
protected HttpServletRequest httpRequest;
protected TimeZone tz;
protected ResponseBuilder rb;
protected List<Closeable> closeHooks;
protected SolrDispatchFilter.Action action;
protected boolean useServerToken = false;
private TimeZone tz;
private ResponseBuilder rb;
private List<Closeable> closeHooks;
private SolrDispatchFilter.Action action;
private boolean useServerToken = false;

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand All @@ -70,23 +72,25 @@ public static void setRequestInfo(SolrRequestInfo info) {
Deque<SolrRequestInfo> stack = threadLocal.get();
if (info == null) {
throw new IllegalArgumentException("SolrRequestInfo is null");
} else if (stack.size() <= MAX_STACK_SIZE) {
stack.push(info);
} else {
} else if (stack.size() > MAX_STACK_SIZE) {
assert false : "SolrRequestInfo Stack is full";
log.error("SolrRequestInfo Stack is full");
}
log.trace("{} {}", info, "setRequestInfo()");
assert !info.isClosed() : "SRI is already closed (odd).";
stack.push(info);
}

/** Removes the most recent SolrRequestInfo from the stack */
/** Removes the most recent SolrRequestInfo from the stack. Close hooks are called. */
public static void clearRequestInfo() {
log.trace("clearRequestInfo()");
Deque<SolrRequestInfo> stack = threadLocal.get();
if (stack.isEmpty()) {
assert false : "clearRequestInfo called too many times";
log.error("clearRequestInfo called too many times");
} else {
SolrRequestInfo info = stack.pop();
closeHooks(info);
info.close();
}
}

Expand All @@ -95,25 +99,33 @@ public static void clearRequestInfo() {
* we expect it to be empty by now because all "set" calls need to be balanced with a "clear".
*/
public static void reset() {
log.trace("reset()");
Deque<SolrRequestInfo> stack = threadLocal.get();
boolean isEmpty = stack.isEmpty();
assert stack.isEmpty() : "SolrRequestInfo Stack should have been cleared.";
while (!stack.isEmpty()) {
SolrRequestInfo info = stack.pop();
closeHooks(info);
info.close();
}
assert isEmpty : "SolrRequestInfo Stack should have been cleared.";
}

private static void closeHooks(SolrRequestInfo info) {
if (info.closeHooks != null) {
for (Closeable hook : info.closeHooks) {
private synchronized void close() {
log.trace("{} {}", this, "close()");

if (--refCount > 0) {
log.trace("{} {}", this, "not closing; still referenced");
return;
}

if (closeHooks != null) {
for (Closeable hook : closeHooks) {
try {
hook.close();
} catch (Exception e) {
SolrException.log(log, "Exception during close hook", e);
}
}
}
closeHooks = null;
}

public SolrRequestInfo(SolrQueryRequest req, SolrQueryResponse rsp) {
Expand Down Expand Up @@ -186,6 +198,9 @@ public void setResponseBuilder(ResponseBuilder rb) {
public void addCloseHook(Closeable hook) {
// is this better here, or on SolrQueryRequest?
synchronized (this) {
if (isClosed()) {
throw new IllegalStateException("Already closed!");
}
if (closeHooks == null) {
closeHooks = new LinkedList<>();
}
Expand Down Expand Up @@ -213,13 +228,24 @@ public void setUseServerToken(boolean use) {
this.useServerToken = use;
}

private synchronized boolean isClosed() {
return refCount <= 0;
}

public static ExecutorUtil.InheritableThreadLocalProvider getInheritableThreadLocalProvider() {
return new ExecutorUtil.InheritableThreadLocalProvider() {
@Override
@SuppressWarnings({"unchecked"})
public void store(@SuppressWarnings({"rawtypes"})AtomicReference ctx) {
SolrRequestInfo me = SolrRequestInfo.getRequestInfo();
if (me != null) ctx.set(me);
if (me != null) {
// increase refCount in store(), while we're still in the thread of the provider to avoid
// a race if this thread finishes its work before the pool'ed thread runs
synchronized (me) {
me.refCount++;
}
ctx.set(me);
}
}

@Override
Expand Down
27 changes: 18 additions & 9 deletions solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@ public static void beforeTests() throws Exception {

fromCore = coreContainer.create("fromCore", ImmutableMap.of("configSet", "minimal"));

assertU(add(doc("id", "1", "name", "john", "title", "Director", "dept_s", "Engineering")));
assertU(add(doc("id", "2", "name", "mark", "title", "VP", "dept_s", "Marketing")));
assertU(add(doc("id", "3", "name", "nancy", "title", "MTS", "dept_s", "Sales")));
assertU(add(doc("id", "4", "name", "dave", "title", "MTS", "dept_s", "Support", "dept_s", "Engineering")));
assertU(add(doc("id", "5", "name", "tina", "title", "VP", "dept_s", "Engineering")));
assertU(add(doc("id", "1", "id_s_dv", "1", "name", "john", "title", "Director", "dept_s", "Engineering")));
assertU(add(doc("id", "2", "id_s_dv", "2", "name", "mark", "title", "VP", "dept_s", "Marketing")));
assertU(add(doc("id", "3", "id_s_dv", "3", "name", "nancy", "title", "MTS", "dept_s", "Sales")));
assertU(add(doc("id", "4", "id_s_dv", "4", "name", "dave", "title", "MTS", "dept_s", "Support", "dept_s", "Engineering")));
assertU(add(doc("id", "5", "id_s_dv", "5", "name", "tina", "title", "VP", "dept_s", "Engineering")));
assertU(commit());

update(fromCore, add(doc("id", "10", "dept_id_s", "Engineering", "text", "These guys develop stuff", "cat", "dev")));
update(fromCore, add(doc("id", "11", "dept_id_s", "Marketing", "text", "These guys make you look good")));
update(fromCore, add(doc("id", "12", "dept_id_s", "Sales", "text", "These guys sell stuff")));
update(fromCore, add(doc("id", "13", "dept_id_s", "Support", "text", "These guys help customers")));
update(fromCore, add(doc("id", "10", "id_s_dv", "10", "dept_id_s", "Engineering", "text", "These guys develop stuff", "cat", "dev")));
update(fromCore, add(doc("id", "11", "id_s_dv", "11", "dept_id_s", "Marketing", "text", "These guys make you look good")));
update(fromCore, add(doc("id", "12", "id_s_dv", "12", "dept_id_s", "Sales", "text", "These guys sell stuff")));
update(fromCore, add(doc("id", "13", "id_s_dv", "13", "dept_id_s", "Support", "text", "These guys help customers")));
update(fromCore, commit());

}
Expand Down Expand Up @@ -91,6 +91,15 @@ void doTestJoin(String joinPrefix) throws Exception {
, "/response=={'numFound':3,'start':0,'numFoundExact':true,'docs':[{'id':'1'},{'id':'4'},{'id':'5'}]}"
);

assertJQ(req( "qt", "/export",
"q", joinPrefix + " from=dept_id_s to=dept_s fromIndex=fromCore}cat:dev", "fl", "id_s_dv",
"sort", "id_s_dv asc",
"debugQuery", random().nextBoolean() ? "true":"false")
, "/response=={'numFound':3,'docs':[{'id_s_dv':'1'},{'id_s_dv':'4'},{'id_s_dv':'5'}]}"
);
assertFalse(fromCore.isClosed());
assertFalse(h.getCore().isClosed());

// find people that develop stuff - but limit via filter query to a name of "john"
// this tests filters being pushed down to queries (SOLR-3062)
assertJQ(req("q", joinPrefix + " from=dept_id_s to=dept_s fromIndex=fromCore}cat:dev", "fl", "id", "fq", "name:john",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.request;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.response.SolrQueryResponse;
import org.junit.BeforeClass;

public class TestSolrRequestInfo extends SolrTestCaseJ4 {

@BeforeClass
public static void beforeClass() throws Exception {
initCore("solrconfig.xml","schema11.xml");
}

public void testCloseHookTwice(){
final SolrRequestInfo info = new SolrRequestInfo(
new LocalSolrQueryRequest(h.getCore(), params()),
new SolrQueryResponse());
AtomicInteger counter = new AtomicInteger();
info.addCloseHook(counter::incrementAndGet);
SolrRequestInfo.setRequestInfo(info);
SolrRequestInfo.setRequestInfo(info);
SolrRequestInfo.clearRequestInfo();
assertNotNull(SolrRequestInfo.getRequestInfo());
SolrRequestInfo.clearRequestInfo();
assertEquals("hook should be closed only once", 1, counter.get());
assertNull(SolrRequestInfo.getRequestInfo());
}

public void testThreadPool() throws InterruptedException {
final SolrRequestInfo info = new SolrRequestInfo(
new LocalSolrQueryRequest(h.getCore(), params()),
new SolrQueryResponse());
AtomicInteger counter = new AtomicInteger();

SolrRequestInfo.setRequestInfo(info);
ExecutorUtil.MDCAwareThreadPoolExecutor pool = new ExecutorUtil.MDCAwareThreadPoolExecutor(1, 1, 1,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
AtomicBoolean run = new AtomicBoolean(false);
pool.execute(() -> {
final SolrRequestInfo poolInfo = SolrRequestInfo.getRequestInfo();
assertSame(info, poolInfo);
info.addCloseHook(counter::incrementAndGet);
run.set(true);
});
if (random().nextBoolean()) {
pool.shutdown();
} else {
pool.shutdownNow();
}
SolrRequestInfo.clearRequestInfo();
SolrRequestInfo.reset();

pool.awaitTermination(1, TimeUnit.MINUTES);
assertTrue(run.get());
assertEquals("hook should be closed only once", 1, counter.get());
assertNull(SolrRequestInfo.getRequestInfo());
}
}