Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-16676: Http2SolrClient loss of MDC context - test fixes #1444

Merged
merged 6 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 40 additions & 47 deletions solr/core/src/test/org/apache/solr/handler/TestHttpRequestId.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package org.apache.solr.handler;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.solr.SolrJettyTestBase;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.SolrPing;
Expand All @@ -36,7 +38,6 @@
import org.slf4j.MDC;

@LogLevel("org.apache.solr.client.solrj.impl.Http2SolrClient=DEBUG")
@SuppressForbidden(reason = "We need to use log4J2 classes directly to test MDC impacts")
public class TestHttpRequestId extends SolrJettyTestBase {

@BeforeClass
Expand All @@ -45,88 +46,80 @@ public static void beforeTest() throws Exception {
}

@Test
public void mdcContextTest() throws Exception {
public void mdcContextTest() {
String collection = "/collection1";
BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>(false);
setupClientAndRun(collection, workQueue);
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>(false);
setupClientAndRun(collection, workQueue, 0);
}

@Test
public void mdcContextFailureTest() throws Exception {
public void mdcContextFailureTest() {
String collection = "/doesnotexist";
BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>(false);
setupClientAndRun(collection, workQueue);
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>(false);
setupClientAndRun(collection, workQueue, 0);
}

@Test
public void mdcContextTest2() throws Exception {
public void mdcContextTest2() {
String collection = "/collection1";
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10, false);
setupClientAndRun(collection, workQueue);
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10, false);
setupClientAndRun(collection, workQueue, 3);
}

@Test
public void mdcContextFailureTest2() throws Exception {
public void mdcContextFailureTest2() {
String collection = "/doesnotexist";
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10, false);
setupClientAndRun(collection, workQueue);
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10, false);
setupClientAndRun(collection, workQueue, 3);
}

private void setupClientAndRun(String collection, BlockingQueue<Runnable> workQueue) {
String key = "mdcContextTestKey" + System.nanoTime();
String value = "TestHttpRequestId" + System.nanoTime();
@SuppressForbidden(reason = "We need to use log4J2 classes directly to test MDC impacts")
private void setupClientAndRun(
String collection, BlockingQueue<Runnable> workQueue, int corePoolSize) {
final String key = "mdcContextTestKey" + System.nanoTime();
final String value = "TestHttpRequestId" + System.nanoTime();

AsyncListener<NamedList<Object>> listener =
new AsyncListener<>() {

@Override
public void onSuccess(NamedList<Object> t) {
assertTrue(value, value.equals(MDC.get(key)));
assertEquals(value, MDC.get(key));
}

@Override
public void onFailure(Throwable throwable) {
assertTrue(value, value.equals(MDC.get(key)));
assertEquals(value, MDC.get(key));
}
};

try (LogListener reqLog =
LogListener.debug(Http2SolrClient.class).substring("response processing")) {

ThreadPoolExecutor commExecutor = null;
Http2SolrClient client = null;
try {
// client setup needs to be same as HttpShardHandlerFactory
commExecutor =
new ExecutorUtil.MDCAwareThreadPoolExecutor(
3,
Integer.MAX_VALUE,
1,
TimeUnit.SECONDS,
workQueue,
new SolrNamedThreadFactory("httpShardExecutor"),
false);
client =
new Http2SolrClient.Builder(jetty.getBaseUrl().toString() + collection)
.withExecutor(commExecutor)
.build();

// client setup needs to be same as HttpShardHandlerFactory
ThreadPoolExecutor commExecutor =
new ExecutorUtil.MDCAwareThreadPoolExecutor(
corePoolSize,
Integer.MAX_VALUE,
1,
TimeUnit.SECONDS,
workQueue,
new SolrNamedThreadFactory("httpShardExecutor"),
false);
try (Http2SolrClient client =
new Http2SolrClient.Builder(jetty.getBaseUrl().toString() + collection)
.withExecutor(commExecutor)
.build()) {
MDC.put(key, value);
client.asyncRequest(new SolrPing(), null, listener);

} finally {
if (client != null) {
client.close();
}
ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
MDC.remove(key);
}

// expecting 3 events: started, success|failed, completed
assertEquals(3, reqLog.getQueue().size());
while (!reqLog.getQueue().isEmpty()) {
var reqEvent = reqLog.getQueue().poll();
assertTrue(reqEvent.getContextData().containsKey(key));
// expecting 2 events: success|failed, completed
Queue<LogEvent> reqLogQueue = reqLog.getQueue();
assertEquals(2, reqLogQueue.size());
while (!reqLogQueue.isEmpty()) {
var reqEvent = reqLogQueue.poll();
assertEquals(value, reqEvent.getContextData().getValue(key));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,16 +443,15 @@ public Cancellable asyncRequest(
}
final ResponseParser parser =
solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser();
MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
req.onRequestQueued(asyncTracker.queuedListener)
.onComplete(asyncTracker.completeListener)
.send(
new InputStreamResponseListener() {
@Override
public void onHeaders(Response response) {
super.onHeaders(response);
log.debug("response processing started");
InputStreamResponseListener listener = this;
MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
executor.execute(
() -> {
InputStream is = listener.getInputStream();
Expand Down Expand Up @@ -585,15 +584,12 @@ private void decorateRequest(Request req, SolrRequest<?> solrRequest) {
}

setBasicAuthHeader(solrRequest, req);
MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
req.onRequestBegin(mdcCopyHelper);
for (HttpListenerFactory factory : listenerFactory) {
HttpListenerFactory.RequestResponseListener listener = factory.get();
listener.onQueued(req);
req.onRequestBegin(listener);
req.onComplete(listener);
}
req.onComplete(mdcCopyHelper);

Map<String, String> headers = solrRequest.getHeaders();
if (headers != null) {
Expand Down