From dbdd2d4896ba420105fe3733bc346cc8cd6a0bf8 Mon Sep 17 00:00:00 2001 From: Mikhail Khludnev Date: Mon, 15 Nov 2021 09:38:44 +0300 Subject: [PATCH] SOLR-15635: don't repeat close hooks if SRI cleared twice due to using MDCThreadPool (#376) * SOLR-15635: don't repeat close hooks if SRI cleared twice * user impact: fromCore is closed by {!join} if used under `/export` * Use reference counting to close on last close. Also: * trace logging * protected -> private Co-authored-by: David Smiley Co-authored-by: Mikhail Khludnev --- solr/CHANGES.txt | 3 + .../apache/solr/request/SolrRequestInfo.java | 70 +++++++++++----- .../org/apache/solr/TestCrossCoreJoin.java | 27 ++++--- .../solr/request/TestSolrRequestInfo.java | 80 +++++++++++++++++++ 4 files changed, 149 insertions(+), 31 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/request/TestSolrRequestInfo.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 0d5cdaff9de..ec347174898 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -459,6 +459,9 @@ Bug Fixes * SOLR-15696: Incremental backups no longer fail on collections that had previously seen a 'SPLITSHARD' operation. (Jason Gerlowski) +* SOLR-15635: Don't close hooks twice when SolrRequestInfo is cleared twice; or /export with classic join + closed fromCore if provided (Mikhail Khludnev, David Smiley) + * SOLR-15628: The SolrException.log() helper method has been fixed to correctly passes the Throwable to the Logger w/o stringification (hossman) * SOLR-15676: Fix PeerSync failure due to RealTimeGetComponent returning duplicates. (Ramsey Haddad, Christine Poerschke, David Smiley) diff --git a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java index e64a6d01cff..60ae1f6fcbe 100644 --- a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java +++ b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java @@ -40,19 +40,21 @@ /** Information about the Solr request/response held in a {@link ThreadLocal}. */ public class SolrRequestInfo { - protected static final int MAX_STACK_SIZE = 10; + private static final int MAX_STACK_SIZE = 10; - protected static final ThreadLocal> threadLocal = ThreadLocal.withInitial(LinkedList::new); + private static final ThreadLocal> threadLocal = ThreadLocal.withInitial(LinkedList::new); - protected SolrQueryRequest req; - protected SolrQueryResponse rsp; - protected Date now; + private int refCount = 1; // prevent closing when still used + + private SolrQueryRequest req; + private SolrQueryResponse rsp; + private Date now; public HttpServletRequest httpRequest; - protected TimeZone tz; - protected ResponseBuilder rb; - protected List closeHooks; - protected SolrDispatchFilter.Action action; - protected boolean useServerToken = false; + private TimeZone tz; + private ResponseBuilder rb; + private List closeHooks; + private SolrDispatchFilter.Action action; + private boolean useServerToken = false; private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -70,23 +72,25 @@ public static void setRequestInfo(SolrRequestInfo info) { Deque stack = threadLocal.get(); if (info == null) { throw new IllegalArgumentException("SolrRequestInfo is null"); - } else if (stack.size() <= MAX_STACK_SIZE) { - stack.push(info); - } else { + } else if (stack.size() > MAX_STACK_SIZE) { assert false : "SolrRequestInfo Stack is full"; log.error("SolrRequestInfo Stack is full"); } + log.trace("{} {}", info, "setRequestInfo()"); + assert !info.isClosed() : "SRI is already closed (odd)."; + stack.push(info); } - /** Removes the most recent SolrRequestInfo from the stack */ + /** Removes the most recent SolrRequestInfo from the stack. Close hooks are called. */ public static void clearRequestInfo() { + log.trace("clearRequestInfo()"); Deque stack = threadLocal.get(); if (stack.isEmpty()) { assert false : "clearRequestInfo called too many times"; log.error("clearRequestInfo called too many times"); } else { SolrRequestInfo info = stack.pop(); - closeHooks(info); + info.close(); } } @@ -95,18 +99,25 @@ public static void clearRequestInfo() { * we expect it to be empty by now because all "set" calls need to be balanced with a "clear". */ public static void reset() { + log.trace("reset()"); Deque stack = threadLocal.get(); - boolean isEmpty = stack.isEmpty(); + assert stack.isEmpty() : "SolrRequestInfo Stack should have been cleared."; while (!stack.isEmpty()) { SolrRequestInfo info = stack.pop(); - closeHooks(info); + info.close(); } - assert isEmpty : "SolrRequestInfo Stack should have been cleared."; } - private static void closeHooks(SolrRequestInfo info) { - if (info.closeHooks != null) { - for (Closeable hook : info.closeHooks) { + private synchronized void close() { + log.trace("{} {}", this, "close()"); + + if (--refCount > 0) { + log.trace("{} {}", this, "not closing; still referenced"); + return; + } + + if (closeHooks != null) { + for (Closeable hook : closeHooks) { try { hook.close(); } catch (Exception e) { @@ -114,6 +125,7 @@ private static void closeHooks(SolrRequestInfo info) { } } } + closeHooks = null; } public SolrRequestInfo(SolrQueryRequest req, SolrQueryResponse rsp) { @@ -186,6 +198,9 @@ public void setResponseBuilder(ResponseBuilder rb) { public void addCloseHook(Closeable hook) { // is this better here, or on SolrQueryRequest? synchronized (this) { + if (isClosed()) { + throw new IllegalStateException("Already closed!"); + } if (closeHooks == null) { closeHooks = new LinkedList<>(); } @@ -213,12 +228,23 @@ public void setUseServerToken(boolean use) { this.useServerToken = use; } + private synchronized boolean isClosed() { + return refCount <= 0; + } + public static ExecutorUtil.InheritableThreadLocalProvider getInheritableThreadLocalProvider() { return new ExecutorUtil.InheritableThreadLocalProvider() { @Override public void store(AtomicReference ctx) { SolrRequestInfo me = SolrRequestInfo.getRequestInfo(); - if (me != null) ctx.set(me); + if (me != null) { + // increase refCount in store(), while we're still in the thread of the provider to avoid + // a race if this thread finishes its work before the pool'ed thread runs + synchronized (me) { + me.refCount++; + } + ctx.set(me); + } } @Override diff --git a/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java b/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java index 77f4afa91ee..d21b5dbbbb0 100644 --- a/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java +++ b/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java @@ -53,17 +53,17 @@ public static void beforeTests() throws Exception { fromCore = coreContainer.create("fromCore", ImmutableMap.of("configSet", "minimal")); - assertU(add(doc("id", "1", "name", "john", "title", "Director", "dept_s", "Engineering"))); - assertU(add(doc("id", "2", "name", "mark", "title", "VP", "dept_s", "Marketing"))); - assertU(add(doc("id", "3", "name", "nancy", "title", "MTS", "dept_s", "Sales"))); - assertU(add(doc("id", "4", "name", "dave", "title", "MTS", "dept_s", "Support", "dept_s", "Engineering"))); - assertU(add(doc("id", "5", "name", "tina", "title", "VP", "dept_s", "Engineering"))); + assertU(add(doc("id", "1", "id_s_dv", "1", "name", "john", "title", "Director", "dept_s", "Engineering"))); + assertU(add(doc("id", "2", "id_s_dv", "2", "name", "mark", "title", "VP", "dept_s", "Marketing"))); + assertU(add(doc("id", "3", "id_s_dv", "3", "name", "nancy", "title", "MTS", "dept_s", "Sales"))); + assertU(add(doc("id", "4", "id_s_dv", "4", "name", "dave", "title", "MTS", "dept_s", "Support", "dept_s", "Engineering"))); + assertU(add(doc("id", "5", "id_s_dv", "5", "name", "tina", "title", "VP", "dept_s", "Engineering"))); assertU(commit()); - update(fromCore, add(doc("id", "10", "dept_id_s", "Engineering", "text", "These guys develop stuff", "cat", "dev"))); - update(fromCore, add(doc("id", "11", "dept_id_s", "Marketing", "text", "These guys make you look good"))); - update(fromCore, add(doc("id", "12", "dept_id_s", "Sales", "text", "These guys sell stuff"))); - update(fromCore, add(doc("id", "13", "dept_id_s", "Support", "text", "These guys help customers"))); + update(fromCore, add(doc("id", "10", "id_s_dv", "10", "dept_id_s", "Engineering", "text", "These guys develop stuff", "cat", "dev"))); + update(fromCore, add(doc("id", "11", "id_s_dv", "11", "dept_id_s", "Marketing", "text", "These guys make you look good"))); + update(fromCore, add(doc("id", "12", "id_s_dv", "12", "dept_id_s", "Sales", "text", "These guys sell stuff"))); + update(fromCore, add(doc("id", "13", "id_s_dv", "13", "dept_id_s", "Support", "text", "These guys help customers"))); update(fromCore, commit()); } @@ -91,6 +91,15 @@ void doTestJoin(String joinPrefix) throws Exception { , "/response=={'numFound':3,'start':0,'numFoundExact':true,'docs':[{'id':'1'},{'id':'4'},{'id':'5'}]}" ); + assertJQ(req( "qt", "/export", + "q", joinPrefix + " from=dept_id_s to=dept_s fromIndex=fromCore}cat:dev", "fl", "id_s_dv", + "sort", "id_s_dv asc", + "debugQuery", random().nextBoolean() ? "true":"false") + , "/response=={'numFound':3,'docs':[{'id_s_dv':'1'},{'id_s_dv':'4'},{'id_s_dv':'5'}]}" + ); + assertFalse(fromCore.isClosed()); + assertFalse(h.getCore().isClosed()); + // find people that develop stuff - but limit via filter query to a name of "john" // this tests filters being pushed down to queries (SOLR-3062) assertJQ(req("q", joinPrefix + " from=dept_id_s to=dept_s fromIndex=fromCore}cat:dev", "fl", "id", "fq", "name:john", diff --git a/solr/core/src/test/org/apache/solr/request/TestSolrRequestInfo.java b/solr/core/src/test/org/apache/solr/request/TestSolrRequestInfo.java new file mode 100644 index 00000000000..0b1573b5da6 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/request/TestSolrRequestInfo.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.request; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.response.SolrQueryResponse; +import org.junit.BeforeClass; + +public class TestSolrRequestInfo extends SolrTestCaseJ4 { + + @BeforeClass + public static void beforeClass() throws Exception { + initCore("solrconfig.xml","schema11.xml"); + } + + public void testCloseHookTwice(){ + final SolrRequestInfo info = new SolrRequestInfo( + new LocalSolrQueryRequest(h.getCore(), params()), + new SolrQueryResponse()); + AtomicInteger counter = new AtomicInteger(); + info.addCloseHook(counter::incrementAndGet); + SolrRequestInfo.setRequestInfo(info); + SolrRequestInfo.setRequestInfo(info); + SolrRequestInfo.clearRequestInfo(); + assertNotNull(SolrRequestInfo.getRequestInfo()); + SolrRequestInfo.clearRequestInfo(); + assertEquals("hook should be closed only once", 1, counter.get()); + assertNull(SolrRequestInfo.getRequestInfo()); + } + + public void testThreadPool() throws InterruptedException { + final SolrRequestInfo info = new SolrRequestInfo( + new LocalSolrQueryRequest(h.getCore(), params()), + new SolrQueryResponse()); + AtomicInteger counter = new AtomicInteger(); + + SolrRequestInfo.setRequestInfo(info); + ExecutorUtil.MDCAwareThreadPoolExecutor pool = new ExecutorUtil.MDCAwareThreadPoolExecutor(1, 1, 1, + TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); + AtomicBoolean run = new AtomicBoolean(false); + pool.execute(() -> { + final SolrRequestInfo poolInfo = SolrRequestInfo.getRequestInfo(); + assertSame(info, poolInfo); + info.addCloseHook(counter::incrementAndGet); + run.set(true); + }); + if (random().nextBoolean()) { + pool.shutdown(); + } else { + pool.shutdownNow(); + } + SolrRequestInfo.clearRequestInfo(); + SolrRequestInfo.reset(); + + pool.awaitTermination(1, TimeUnit.MINUTES); + assertTrue(run.get()); + assertEquals("hook should be closed only once", 1, counter.get()); + assertNull(SolrRequestInfo.getRequestInfo()); + } +}