Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to configure busy-wait in broker and client #10661

Merged
merged 2 commits into from
May 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/bookkeeper.conf
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ maxPendingReadRequestsPerThread=2500
# avoid the executor queue to grow indefinitely
maxPendingAddRequestsPerThread=10000

# Option to enable busy-wait settings. Default is false.
# WARNING: This option will enable spin-waiting on executors and IO threads in order to reduce latency during
# context switches. The spinning will consume 100% CPU even when bookie is not doing any work. It is recommended to
# reduce the number of threads in the main workers pool and Netty event loop to only have few CPU cores busy.
enableBusyWait=false

# Whether force compaction is allowed when the disk is full or almost full.
# Forcing GC may get some space back, but may also fill up disk space more quickly.
# This is because new log files are created before GC, while old garbage
Expand Down
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ numExecutorThreadPoolSize=
# Default is 10
numCacheExecutorThreadPoolSize=10

# Option to enable busy-wait settings. Default is false.
# WARNING: This option will enable spin-waiting on executors and IO threads in order to reduce latency during
# context switches. The spinning will consume 100% CPU even when the broker is not doing any work. It is recommended to
# reduce the number of IO threads and BK client threads to only have few CPU cores busy.
enableBusyWait=false

# Max concurrent web requests
maxConcurrentHttpRequests=1024

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ flexible messaging model and an intuitive client API.</description>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>cpu-affinity</artifactId>
<version>${bookkeeper.version}</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int numCacheExecutorThreadPoolSize = 10;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Option to enable busy-wait settings. Default is false. "
+ "WARNING: This option will enable spin-waiting on executors and IO threads in order "
+ "to reduce latency during context switches. The spinning will consume 100% CPU even "
+ "when the broker is not doing any work. It is recommended to reduce the number of IO threads "
+ "and BK client threads to only have few CPU cores busy."
)
private boolean enableBusyWait = false;

@FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web requests")
private int maxConcurrentHttpRequests = 1024;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ ClientConfiguration createBkClientConfiguration(ServiceConfiguration conf) {
bkConf.setTLSTrustStorePasswordPath(conf.getBookkeeperTLSTrustStorePasswordPath());
}

bkConf.setBusyWaitEnabled(conf.isEnableBusyWait());
bkConf.setNumWorkerThreads(conf.getBookkeeperClientNumWorkerThreads());
bkConf.setThrottleValue(conf.getBookkeeperClientThrottleValue());
bkConf.setAddEntryTimeout((int) conf.getBookkeeperClientTimeoutInSeconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public PulsarService(ServiceConfiguration config,
this.transactionReplayExecutor = null;
}

this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(),
this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
new DefaultThreadFactory("pulsar-io"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor");

this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
pulsar.getConfiguration().getNumAcceptorThreads(), acceptorThreadFactory);
pulsar.getConfiguration().getNumAcceptorThreads(), false, acceptorThreadFactory);
this.workerGroup = eventLoopGroup;
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public static void main(String[] args) throws Exception {
(int) brokerConfig.getZooKeeperSessionTimeoutMillis()).get();
BookKeeperClientFactory bkClientFactory = new BookKeeperClientFactoryImpl();

EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("compactor-io"));
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(1, false,
new DefaultThreadFactory("compactor-io"));
BookKeeper bk = bkClientFactory.create(brokerConfig, zk, eventLoopGroup, Optional.empty(), null);
try (PulsarClient pulsar = clientBuilder.build()) {
Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, bk, scheduler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import java.util.Optional;
import java.util.Properties;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -86,6 +87,10 @@ protected void setup() throws Exception {
config.setAdvertisedAddress("127.0.0.1");
config.setAllowAutoTopicCreationType("non-partitioned");
config.setZooKeeperOperationTimeoutSeconds(1);
config.setNumIOThreads(1);
Properties properties = new Properties();
properties.put("bookkeeper_numWorkerThreads", "1");
config.setProperties(properties);
configurePulsar(config);

pulsar = new PulsarService(config);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

@Test(groups = "broker")
public class BusyWaitServiceTest extends BkEnsemblesTestBase {
public BusyWaitServiceTest() {
super(1);
}

protected void configurePulsar(ServiceConfiguration config) {
config.setEnableBusyWait(true);
config.setManagedLedgerDefaultEnsembleSize(1);
config.setManagedLedgerDefaultWriteQuorum(1);
config.setManagedLedgerDefaultAckQuorum(1);
}

@Test
public void testPublishWithBusyWait() throws Exception {

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.enableBusyWait(true)
.build();

String namespace = "prop/busy-wait";
admin.namespaces().createNamespace(namespace);

String topic = namespace + "/my-topic-" + UUID.randomUUID();

@Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test")
.subscribe();

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
.create();

for (int i = 0; i < 10; i++) {
producer.send("my-message-" + i);
}

for (int i = 0; i < 10; i++) {
Message<String> msg = consumer.receive();
assertNotNull(msg);
assertEquals(msg.getValue(), "my-message-" + i);
consumer.acknowledge(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public void startMockBrokerService() throws Exception {
final int MaxMessageSize = 5 * 1024 * 1024;

try {
workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, threadFactory);
workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false, threadFactory);

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(workerGroup, workerGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void cleanup() throws Exception {
@Test
public void testSingleIpAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
Expand All @@ -77,7 +77,7 @@ public void testDoubleIpAddress() throws Exception {
String serviceUrl = "pulsar://non-existing-dns-name:" + pulsar.getBrokerListenPort().get();

ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
Expand All @@ -100,7 +100,7 @@ public void testDoubleIpAddress() throws Exception {
public void testNoConnectionPool() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(0);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, new DefaultThreadFactory("test"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));

InetSocketAddress brokerAddress =
Expand All @@ -122,7 +122,7 @@ public void testNoConnectionPool() throws Exception {
public void testEnableConnectionPool() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(5);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, new DefaultThreadFactory("test"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));

InetSocketAddress brokerAddress =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static PulsarTestClient create(ClientBuilder clientBuilder) throws Pulsar
// An anonymous subclass of ClientCnx class is used to override the getRemoteEndpointProtocolVersion()
// method.
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(clientConfigurationData.getNumIoThreads(),
false,
new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon()));

AtomicReference<Supplier<ClientCnx>> clientCnxSupplierReference = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,18 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
*/
ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);

/**
* Option to enable busy-wait settings. Default is false.
*
* <b>WARNING</b>: This option will enable spin-waiting on executors and IO threads in order to reduce latency
* during context switches. The spinning will consume 100% CPU even when the broker is not doing any work. It
* is recommended to reduce the number of IO threads and BK client threads to only have few CPU cores busy.
*
* @param enableBusyWait whether to enable busy wait
* @return the client builder instance
*/
ClientBuilder enableBusyWait(boolean enableBusyWait);

/**
* The clock used by the pulsar client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ public ClientBuilder maxBackoffInterval(long duration, TimeUnit unit) {
return this;
}

@Override
public ClientBuilder enableBusyWait(boolean enableBusyWait) {
conf.setEnableBusyWait(enableBusyWait);
return this;
}

public ClientConfigurationData getClientConfigurationData() {
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ public CompletableFuture<List<String>> getPartitionsForTopic(String topic) {

private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), conf.isEnableBusyWait(), threadFactory);
}

private static ThreadFactory getThreadFactory(String poolName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private int requestTimeoutMs = 60000;
private long initialBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(60);
private boolean enableBusyWait = false;
//
private String listenerName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ClientCnxRequestTimeoutQueueTest {

@BeforeTest
void setupClientCnx() throws Exception {
eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("testClientCnxTimeout"));
eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
ClientConfigurationData conf = new ClientConfigurationData();
conf.setKeepAliveIntervalSeconds(0);
conf.setOperationTimeoutMs(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ClientCnxTest {

@Test
public void testClientCnxTimeout() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("testClientCnxTimeout"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10);
conf.setKeepAliveIntervalSeconds(0);
Expand All @@ -75,7 +75,7 @@ public void testClientCnxTimeout() throws Exception {
@Test
public void testReceiveErrorAtSendConnectFrameState() throws Exception {
ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, threadFactory);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10);
ClientCnx cnx = new ClientCnx(conf, eventLoop);
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testReceiveErrorAtSendConnectFrameState() throws Exception {
@Test
public void testGetLastMessageIdWithError() throws Exception {
ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, threadFactory);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
ClientConfigurationData conf = new ClientConfigurationData();
ClientCnx cnx = new ClientCnx(conf, eventLoop);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testGetStats() throws Exception {
conf.setStatsIntervalSeconds(100);

ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);
ExecutorProvider executorProvider = new ExecutorProvider(1, "client-test-stats");

PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testGetStats() throws Exception {
conf.setStatsIntervalSeconds(100);

ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);

PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void setup() throws PulsarClientException {

private void initializeEventLoopGroup(ClientConfigurationData conf) {
ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -179,7 +179,7 @@ public void testInitializeWithoutTimer() throws Exception {
@Test
public void testInitializeWithTimer() throws PulsarClientException {
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
conf.setServiceUrl("pulsar://localhost:6650");

Expand Down
5 changes: 5 additions & 0 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>cpu-affinity</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
Expand Down
Loading