diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index 85bedd5d2c72..fcf9b4f94dfd 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -47,6 +47,7 @@ import org.apache.pinot.broker.broker.BrokerAdminApiApplication; import org.apache.pinot.broker.grpc.BrokerGrpcServer; import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; import org.apache.pinot.broker.requesthandler.BaseSingleStageBrokerRequestHandler; import org.apache.pinot.broker.requesthandler.BrokerRequestHandler; import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate; @@ -249,6 +250,23 @@ protected WorkerManager createWorkerManager(String brokerId, String hostname, in return new WorkerManager(brokerId, hostname, port, routingManager); } + /** + * Override to supply a custom {@link SingleConnectionBrokerRequestHandler} subclass (e.g. one + * that overrides {@code onQueryCompletion(RequestContext, BrokerResponse)} for async query + * logging). The default implementation returns a plain {@link SingleConnectionBrokerRequestHandler}. + */ + protected SingleConnectionBrokerRequestHandler createSingleStageBrokerRequestHandler( + PinotConfiguration config, String brokerId, BrokerRequestIdGenerator requestIdGenerator, + RoutingManager routingManager, AccessControlFactory accessControlFactory, + QueryQuotaManager queryQuotaManager, TableCache tableCache, NettyConfig nettyConfig, + TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager, + FailureDetector failureDetector, ThreadAccountant threadAccountant, + MultiClusterRoutingContext multiClusterRoutingContext) { + return new SingleConnectionBrokerRequestHandler(config, brokerId, requestIdGenerator, routingManager, + accessControlFactory, queryQuotaManager, tableCache, nettyConfig, tlsConfig, + serverRoutingStatsManager, failureDetector, threadAccountant, multiClusterRoutingContext); + } + private void setupHelixSystemProperties() { // NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the @@ -439,7 +457,7 @@ public void start() brokerContext.setServerHttpsContext(sslContext); } singleStageBrokerRequestHandler = - new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager, + createSingleStageBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager, _accessControlFactory, _queryQuotaManager, _tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager, _failureDetector, _threadAccountant, multiClusterRoutingContext); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 30d09fa23605..1517b698a9db 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -219,11 +219,22 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption accessControl); brokerResponse.setBrokerId(_brokerId); brokerResponse.setRequestId(Long.toString(requestId)); - _brokerQueryEventListener.onQueryCompletion(requestContext); + onQueryCompletion(requestContext, brokerResponse); return brokerResponse; } + /** + * Called after every successfully executed query with the fully-populated {@link RequestContext} + * and {@link BrokerResponse}. The default implementation fires the configured + * {@link org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener}. Subclasses may + * override to intercept the complete response (e.g. for async query-log pipelines) while still + * calling {@code super} to preserve the SPI listener behaviour. + */ + protected void onQueryCompletion(RequestContext requestContext, BrokerResponse brokerResponse) { + _brokerQueryEventListener.onQueryCompletion(requestContext); + } + protected abstract BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders, AccessControl accessControl) diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java index ca216e1b145f..86e89470c8b2 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java @@ -53,6 +53,7 @@ import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Query.Range; +import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.sql.FilterKind; import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.apache.pinot.util.TestUtils; @@ -95,6 +96,55 @@ public void testUpdateColumnNames() { } } + @Test + public void testOnQueryCompletionHookReceivesBrokerResponse() { + // Verify that the overridable onQueryCompletion(RequestContext, BrokerResponse) hook is invoked + // and receives the BrokerResponse that handleRequest() produced. + AtomicReference capturedResponse = new AtomicReference<>(); + + PinotConfiguration config = new PinotConfiguration(); + BrokerQueryEventListenerFactory.init(config); + BrokerMetrics.register(mock(BrokerMetrics.class)); + QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class); + when(queryQuotaManager.acquire(anyString())).thenReturn(true); + when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true); + when(queryQuotaManager.acquireApplication(anyString())).thenReturn(true); + TableCache tableCache = mock(TableCache.class); + + BaseSingleStageBrokerRequestHandler handler = + new BaseSingleStageBrokerRequestHandler(config, "testBrokerId", new BrokerRequestIdGenerator(), + mock(org.apache.pinot.core.routing.RoutingManager.class), new AllowAllAccessControlFactory(), + queryQuotaManager, tableCache, ThreadAccountantUtils.getNoOpAccountant(), null) { + @Override + public void start() { + } + + @Override + public void shutDown() { + } + + @Override + protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, + BrokerRequest serverBrokerRequest, TableRouteInfo route, long timeoutMs, ServerStats serverStats, + RequestContext requestContext) { + return new BrokerResponseNative(); + } + + @Override + protected void onQueryCompletion(RequestContext requestContext, BrokerResponse brokerResponse) { + capturedResponse.set(brokerResponse); + } + }; + + try { + handler.handleRequest("SELECT 1"); + } catch (Exception ignored) { + // routing may fail — we only care that the hook was called with a non-null response + } + Assert.assertNotNull(capturedResponse.get(), + "onQueryCompletion hook must be called with the BrokerResponse from handleRequest"); + } + @Test public void testGetActualColumnNameCaseSensitive() { Map columnNameMap = new HashMap<>();