Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
import org.apache.pinot.broker.grpc.BrokerGrpcServer;
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BaseSingleStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate;
Expand Down Expand Up @@ -249,6 +250,23 @@ protected WorkerManager createWorkerManager(String brokerId, String hostname, in
return new WorkerManager(brokerId, hostname, port, routingManager);
}

/**
* Override to supply a custom {@link SingleConnectionBrokerRequestHandler} subclass (e.g. one
* that overrides {@code onQueryCompletion(RequestContext, BrokerResponse)} for async query
* logging). The default implementation returns a plain {@link SingleConnectionBrokerRequestHandler}.
*/
protected SingleConnectionBrokerRequestHandler createSingleStageBrokerRequestHandler(
PinotConfiguration config, String brokerId, BrokerRequestIdGenerator requestIdGenerator,
RoutingManager routingManager, AccessControlFactory accessControlFactory,
QueryQuotaManager queryQuotaManager, TableCache tableCache, NettyConfig nettyConfig,
TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager,
FailureDetector failureDetector, ThreadAccountant threadAccountant,
MultiClusterRoutingContext multiClusterRoutingContext) {
return new SingleConnectionBrokerRequestHandler(config, brokerId, requestIdGenerator, routingManager,
accessControlFactory, queryQuotaManager, tableCache, nettyConfig, tlsConfig,
serverRoutingStatsManager, failureDetector, threadAccountant, multiClusterRoutingContext);
}

private void setupHelixSystemProperties() {
// NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
// from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
Expand Down Expand Up @@ -439,7 +457,7 @@ public void start()
brokerContext.setServerHttpsContext(sslContext);
}
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
createSingleStageBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, _tableCache, nettyDefaults, tlsDefaults,
_serverRoutingStatsManager, _failureDetector, _threadAccountant, multiClusterRoutingContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,22 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
accessControl);
brokerResponse.setBrokerId(_brokerId);
brokerResponse.setRequestId(Long.toString(requestId));
_brokerQueryEventListener.onQueryCompletion(requestContext);
onQueryCompletion(requestContext, brokerResponse);

return brokerResponse;
}

/**
* Called after every successfully executed query with the fully-populated {@link RequestContext}
* and {@link BrokerResponse}. The default implementation fires the configured
* {@link org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener}. Subclasses may
* override to intercept the complete response (e.g. for async query-log pipelines) while still
* calling {@code super} to preserve the SPI listener behaviour.
*/
protected void onQueryCompletion(RequestContext requestContext, BrokerResponse brokerResponse) {
_brokerQueryEventListener.onQueryCompletion(requestContext);
}

protected abstract BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Query.Range;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.util.TestUtils;
Expand Down Expand Up @@ -95,6 +96,55 @@ public void testUpdateColumnNames() {
}
}

@Test
public void testOnQueryCompletionHookReceivesBrokerResponse() {
// Verify that the overridable onQueryCompletion(RequestContext, BrokerResponse) hook is invoked
// and receives the BrokerResponse that handleRequest() produced.
AtomicReference<BrokerResponse> capturedResponse = new AtomicReference<>();

PinotConfiguration config = new PinotConfiguration();
BrokerQueryEventListenerFactory.init(config);
BrokerMetrics.register(mock(BrokerMetrics.class));
QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
when(queryQuotaManager.acquire(anyString())).thenReturn(true);
when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true);
when(queryQuotaManager.acquireApplication(anyString())).thenReturn(true);
TableCache tableCache = mock(TableCache.class);

BaseSingleStageBrokerRequestHandler handler =
new BaseSingleStageBrokerRequestHandler(config, "testBrokerId", new BrokerRequestIdGenerator(),
mock(org.apache.pinot.core.routing.RoutingManager.class), new AllowAllAccessControlFactory(),
queryQuotaManager, tableCache, ThreadAccountantUtils.getNoOpAccountant(), null) {
@Override
public void start() {
}

@Override
public void shutDown() {
}

@Override
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, TableRouteInfo route, long timeoutMs, ServerStats serverStats,
RequestContext requestContext) {
return new BrokerResponseNative();
}

@Override
protected void onQueryCompletion(RequestContext requestContext, BrokerResponse brokerResponse) {
capturedResponse.set(brokerResponse);
}
};

try {
handler.handleRequest("SELECT 1");
} catch (Exception ignored) {
// routing may fail — we only care that the hook was called with a non-null response
}
Assert.assertNotNull(capturedResponse.get(),
"onQueryCompletion hook must be called with the BrokerResponse from handleRequest");
}

@Test
public void testGetActualColumnNameCaseSensitive() {
Map<String, String> columnNameMap = new HashMap<>();
Expand Down
Loading