Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Fix async response filter #11052

Merged
merged 4 commits into from
Jun 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.web;

import java.io.IOException;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
Expand Down Expand Up @@ -71,10 +73,45 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
/* connection is already invalidated */
}
}

if (request.isAsyncSupported() && request.isAsyncStarted()) {
request.getAsyncContext().addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
handleInterceptor(request, response);
}

@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
LOG.warn("Http request {} async context timeout.", request);
handleInterceptor(request, response);
}

@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
LOG.warn("Http request {} async context error.", request, asyncEvent.getThrowable());
handleInterceptor(request, response);
}

@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
// nothing to do
}
});
} else {
handleInterceptor(request, response);
}
}

private void handleInterceptor(ServletRequest request, ServletResponse response) {
if (interceptorEnabled
&& !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.MULTIPART_FORM_DATA)
&& !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.APPLICATION_OCTET_STREAM)) {
interceptor.onWebserviceResponse(request, response);
try {
interceptor.onWebserviceResponse(request, response);
} catch (Exception e) {
LOG.error("Failed to handle interceptor on web service response.", e);
}
}
}

Expand All @@ -87,4 +124,5 @@ public void init(FilterConfig arg) throws ServletException {
public void destroy() {
// No state to clean up.
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.apache.pulsar.broker.admin.v3;


/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.web.RestException;

/**
* Async response test.
*/
@Slf4j
@Path("/test")
public class AsyncResponseTest {

@GET
@Path("/asyncGet/{topicName}/{delayMilliseconds}")
public void asyncGet(@Suspended AsyncResponse response,
@PathParam("topicName") String topicName,
@PathParam("delayMilliseconds") long delayMilliseconds) {
new Thread(() -> {
if (delayMilliseconds > 0) {
try {
Thread.sleep(delayMilliseconds);
} catch (InterruptedException e) {
log.error("Failed to handle test method asyncGet.", e);
response.resume(new RestException(e));
}
}
response.resume(Response.noContent().build());
}).start();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;

Expand Down Expand Up @@ -55,7 +58,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
Expand Down Expand Up @@ -96,6 +98,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
private OrderedExecutor bkExecutor;

protected boolean enableBrokerInterceptor = false;

public MockedPulsarServiceBaseTest() {
resetConfig();
}
Expand Down Expand Up @@ -300,6 +304,17 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
doReturn(new CounterBrokerInterceptor()).when(pulsar).getBrokerInterceptor();

doAnswer((invocation) -> spy(invocation.callRealMethod())).when(pulsar).newCompactor();
if (enableBrokerInterceptor) {
mockConfigBrokerInterceptors(pulsar);
}
}

private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
ServiceConfiguration configuration = spy(conf);
Set<String> mockBrokerInterceptors = mock(Set.class);
when(mockBrokerInterceptors.isEmpty()).thenReturn(false);
when(configuration.getBrokerInterceptors()).thenReturn(mockBrokerInterceptors);
when(pulsarService.getConfig()).thenReturn(configuration);
}

protected void waitForZooKeeperWatchers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
package org.apache.pulsar.broker.intercept;

import lombok.Cleanup;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand All @@ -32,8 +37,10 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -71,6 +78,7 @@ public void setup() throws Exception {
listenerName2,
new BrokerInterceptorWithClassLoader(listener2, ncl2));
this.listeners = new BrokerInterceptors(this.listenerMap);
this.enableBrokerInterceptor = true;
super.internalSetup();
super.producerBaseSetup();
}
Expand Down Expand Up @@ -138,4 +146,37 @@ public void testBeforeSendMessage() throws PulsarClientException {

assertEquals(((CounterBrokerInterceptor) listener).getBeforeSendCount(), 1);
}

@Test
public void asyncResponseFilterTest() throws Exception {
Assert.assertTrue(pulsar.getBrokerInterceptor() instanceof CounterBrokerInterceptor);
CounterBrokerInterceptor interceptor = (CounterBrokerInterceptor) pulsar.getBrokerInterceptor();
interceptor.clearResponseList();

OkHttpClient client = new OkHttpClient();
String url = "http://127.0.0.1:" + conf.getWebServicePort().get() + "/admin/v3/test/asyncGet/my-topic/1000";
final Request request = new Request.Builder()
.url(url)
.get()
.build();
Call call = client.newCall(request);
CompletableFuture<Response> future = new CompletableFuture<>();
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
future.completeExceptionally(e);
}

@Override
public void onResponse(Call call, Response response) throws IOException {
future.complete(response);
}
});
future.get();
CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0);
Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000");
Assert.assertEquals(responseEvent.getResponseStatus(),
javax.ws.rs.core.Response.noContent().build().getStatus());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,38 @@
*/
package org.apache.pulsar.broker.intercept;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import java.io.IOException;
import org.eclipse.jetty.server.Response;

@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {

int beforeSendCount = 0;
int count = 0;
private List<ResponseEvent> responseList = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong,
This list does not support concurrent access.
We should use CopyOnWriteArrayList

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, @gaoran10 Could you please push a PR for fixing this one?

Copy link
Contributor Author

@gaoran10 gaoran10 Jun 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll fix it. But this class is used for testing, maybe there is only one admin client for testing in normal cases.


@Data
@AllArgsConstructor
public class ResponseEvent {
private String requestUri;
private int responseStatus;
}

@Override
public void beforeSendMessage(Subscription subscription,
Expand Down Expand Up @@ -68,7 +81,11 @@ public void onWebserviceRequest(ServletRequest request) {
@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
count ++;
log.info("[{}] On [{}] Webservice response", count, ((HttpServletRequest)request).getRequestURL().toString());
log.info("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest)request).getRequestURL().toString(), response);
if (response instanceof Response) {
Response res = (Response) response;
responseList.add(new ResponseEvent(res.getHttpChannel().getRequest().getRequestURI(), res.getStatus()));
}
}

@Override
Expand All @@ -95,4 +112,12 @@ public int getCount() {
public int getBeforeSendCount() {
return beforeSendCount;
}

public void clearResponseList() {
responseList.clear();
}

public List<ResponseEvent> getResponseList() {
return responseList;
}
}