Skip to content

Commit

Permalink
SOLR-16676: Http2SolrClient loss of MDC context - test fixes (#1444)
Browse files Browse the repository at this point in the history
Co-authored-by: Kevin Risden <krisden@apache.org>
  • Loading branch information
stillalex and risdenk committed Mar 9, 2023
1 parent 676fc9d commit 91d2c8f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 52 deletions.
87 changes: 40 additions & 47 deletions solr/core/src/test/org/apache/solr/handler/TestHttpRequestId.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package org.apache.solr.handler;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.solr.SolrJettyTestBase;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.SolrPing;
Expand All @@ -36,7 +38,6 @@
import org.slf4j.MDC;

@LogLevel("org.apache.solr.client.solrj.impl.Http2SolrClient=DEBUG")
@SuppressForbidden(reason = "We need to use log4J2 classes directly to test MDC impacts")
public class TestHttpRequestId extends SolrJettyTestBase {

@BeforeClass
Expand All @@ -45,88 +46,80 @@ public static void beforeTest() throws Exception {
}

@Test
public void mdcContextTest() throws Exception {
public void mdcContextTest() {
String collection = "/collection1";
BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>(false);
setupClientAndRun(collection, workQueue);
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>(false);
setupClientAndRun(collection, workQueue, 0);
}

@Test
public void mdcContextFailureTest() throws Exception {
public void mdcContextFailureTest() {
String collection = "/doesnotexist";
BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>(false);
setupClientAndRun(collection, workQueue);
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>(false);
setupClientAndRun(collection, workQueue, 0);
}

@Test
public void mdcContextTest2() throws Exception {
public void mdcContextTest2() {
String collection = "/collection1";
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10, false);
setupClientAndRun(collection, workQueue);
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10, false);
setupClientAndRun(collection, workQueue, 3);
}

@Test
public void mdcContextFailureTest2() throws Exception {
public void mdcContextFailureTest2() {
String collection = "/doesnotexist";
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10, false);
setupClientAndRun(collection, workQueue);
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10, false);
setupClientAndRun(collection, workQueue, 3);
}

private void setupClientAndRun(String collection, BlockingQueue<Runnable> workQueue) {
String key = "mdcContextTestKey" + System.nanoTime();
String value = "TestHttpRequestId" + System.nanoTime();
@SuppressForbidden(reason = "We need to use log4J2 classes directly to test MDC impacts")
private void setupClientAndRun(
String collection, BlockingQueue<Runnable> workQueue, int corePoolSize) {
final String key = "mdcContextTestKey" + System.nanoTime();
final String value = "TestHttpRequestId" + System.nanoTime();

AsyncListener<NamedList<Object>> listener =
new AsyncListener<>() {

@Override
public void onSuccess(NamedList<Object> t) {
assertTrue(value, value.equals(MDC.get(key)));
assertEquals(value, MDC.get(key));
}

@Override
public void onFailure(Throwable throwable) {
assertTrue(value, value.equals(MDC.get(key)));
assertEquals(value, MDC.get(key));
}
};

try (LogListener reqLog =
LogListener.debug(Http2SolrClient.class).substring("response processing")) {

ThreadPoolExecutor commExecutor = null;
Http2SolrClient client = null;
try {
// client setup needs to be same as HttpShardHandlerFactory
commExecutor =
new ExecutorUtil.MDCAwareThreadPoolExecutor(
3,
Integer.MAX_VALUE,
1,
TimeUnit.SECONDS,
workQueue,
new SolrNamedThreadFactory("httpShardExecutor"),
false);
client =
new Http2SolrClient.Builder(jetty.getBaseUrl().toString() + collection)
.withExecutor(commExecutor)
.build();

// client setup needs to be same as HttpShardHandlerFactory
ThreadPoolExecutor commExecutor =
new ExecutorUtil.MDCAwareThreadPoolExecutor(
corePoolSize,
Integer.MAX_VALUE,
1,
TimeUnit.SECONDS,
workQueue,
new SolrNamedThreadFactory("httpShardExecutor"),
false);
try (Http2SolrClient client =
new Http2SolrClient.Builder(jetty.getBaseUrl().toString() + collection)
.withExecutor(commExecutor)
.build()) {
MDC.put(key, value);
client.asyncRequest(new SolrPing(), null, listener);

} finally {
if (client != null) {
client.close();
}
ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
MDC.remove(key);
}

// expecting 3 events: started, success|failed, completed
assertEquals(3, reqLog.getQueue().size());
while (!reqLog.getQueue().isEmpty()) {
var reqEvent = reqLog.getQueue().poll();
assertTrue(reqEvent.getContextData().containsKey(key));
// expecting 2 events: success|failed, completed
Queue<LogEvent> reqLogQueue = reqLog.getQueue();
assertEquals(2, reqLogQueue.size());
while (!reqLogQueue.isEmpty()) {
var reqEvent = reqLogQueue.poll();
assertEquals(value, reqEvent.getContextData().getValue(key));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,16 +443,15 @@ public Cancellable asyncRequest(
}
final ResponseParser parser =
solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser();
MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
req.onRequestQueued(asyncTracker.queuedListener)
.onComplete(asyncTracker.completeListener)
.send(
new InputStreamResponseListener() {
@Override
public void onHeaders(Response response) {
super.onHeaders(response);
log.debug("response processing started");
InputStreamResponseListener listener = this;
MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
executor.execute(
() -> {
InputStream is = listener.getInputStream();
Expand Down Expand Up @@ -585,15 +584,12 @@ private void decorateRequest(Request req, SolrRequest<?> solrRequest) {
}

setBasicAuthHeader(solrRequest, req);
MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
req.onRequestBegin(mdcCopyHelper);
for (HttpListenerFactory factory : listenerFactory) {
HttpListenerFactory.RequestResponseListener listener = factory.get();
listener.onQueued(req);
req.onRequestBegin(listener);
req.onComplete(listener);
}
req.onComplete(mdcCopyHelper);

Map<String, String> headers = solrRequest.getHeaders();
if (headers != null) {
Expand Down

0 comments on commit 91d2c8f

Please sign in to comment.