18
18
*/
19
19
package org .apache .pulsar .tests .integration .profiling ;
20
20
21
+ import java .io .File ;
22
+ import java .io .IOException ;
23
+ import java .io .UncheckedIOException ;
24
+ import java .nio .file .Files ;
25
+ import java .nio .file .attribute .PosixFilePermissions ;
21
26
import java .util .HashMap ;
22
27
import java .util .List ;
23
28
import java .util .Map ;
31
36
import org .apache .pulsar .tests .integration .suites .PulsarTestSuite ;
32
37
import org .apache .pulsar .tests .integration .topologies .PulsarClusterSpec ;
33
38
import org .apache .pulsar .tests .integration .utils .DockerUtils ;
39
+ import org .testcontainers .containers .BindMode ;
34
40
import org .testcontainers .containers .GenericContainer ;
35
41
import org .testng .annotations .Test ;
36
42
40
46
* Example usage:
41
47
* # This has been tested on Mac with Orbstack (https://orbstack.dev/) docker
42
48
* # compile integration test dependencies
43
- * mvn -am -pl tests/integration -DskipTests install
49
+ * mvn -am -pl tests/integration -Dcheckstyle.skip=true -Dlicense.skip=true -Dspotbugs.skip=true - DskipTests install
44
50
* # compile apachepulsar/java-test-image with async profiler (add "clean" to ensure a clean build with recent changes)
45
- * ./build/build_java_test_image.sh -Ddocker.install.asyncprofiler=true
51
+ * ./build/build_java_test_image.sh -Ddocker.install.asyncprofiler=true -Pdocker-wolfi
46
52
* # set environment variables
47
53
* export PULSAR_TEST_IMAGE_NAME=apachepulsar/java-test-image:latest
48
54
* export NETTY_LEAK_DETECTION=off
@@ -92,31 +98,98 @@ public PulsarPerfContainer(String clusterName,
92
98
createContainerCmd .withName (clusterName + "-" + hostname );
93
99
});
94
100
withEnv ("PULSAR_MEM" , DEFAULT_PULSAR_MEM );
101
+ withEnv ("PULSAR_GC" , "-XX:+UseZGC -XX:+ZGenerational" );
95
102
setCommand ("sleep 1000000" );
103
+ File testOutputDir = new File ("target" );
104
+ if (!testOutputDir .exists ()) {
105
+ if (!testOutputDir .mkdirs ()) {
106
+ throw new IllegalArgumentException ("Test output directory + '" + testOutputDir .getAbsolutePath ()
107
+ + "' doesn't exist and cannot be created." );
108
+ }
109
+ }
110
+ if (!testOutputDir .isDirectory ()) {
111
+ throw new IllegalArgumentException (
112
+ "Test output directory '" + testOutputDir .getAbsolutePath () + "' isn't a directory." );
113
+ }
114
+ // change access to testOutputDir to allow all access so the the container user can write to it
115
+ // This matters only on Linux
116
+ try {
117
+ Files .setPosixFilePermissions (testOutputDir .toPath (), PosixFilePermissions .fromString ("rwxrwxrwx" ));
118
+ } catch (IOException e ) {
119
+ throw new UncheckedIOException ("Cannot change access to test output directory" , e );
120
+ }
121
+ withFileSystemBind (testOutputDir .getAbsolutePath (), "/testoutput" , BindMode .READ_WRITE );
96
122
}
97
123
98
124
public CompletableFuture <Long > consume (String topicName ) throws Exception {
99
125
return DockerUtils .runCommandAsyncWithLogging (getDockerClient (), getContainerId (),
100
- "/pulsar/bin/pulsar-perf" , "consume" , topicName ,
101
- "-u" , "pulsar://" + brokerHostname + ":6650" ,
102
- "-st" , "Shared" ,
103
- "-aq" ,
104
- "-m" , String .valueOf (numberOfMessages ), "-ml" , "400M" );
126
+ "bash" , "-c" , "echo $$ > /tmp/command.pid; "
127
+ + "/pulsar/bin/pulsar-perf consume " + topicName + " "
128
+ + "-u pulsar://" + brokerHostname + ":6650 "
129
+ + "-st Shared "
130
+ + "-q 50000 "
131
+ + "-m " + numberOfMessages + " -ml 400M "
132
+ + "--histogram-file=/testoutput/consume.histogram.$(date +%s).hdr "
133
+ + "2>&1 | tee /testoutput/consume.$(date +%s).txt" );
105
134
}
106
135
107
136
public CompletableFuture <Long > produce (String topicName ) throws Exception {
108
137
return DockerUtils .runCommandAsyncWithLogging (getDockerClient (), getContainerId (),
109
- "/pulsar/bin/pulsar-perf" , "produce" , topicName ,
110
- "-u" , "pulsar://" + brokerHostname + ":6650" ,
111
- "-au" , "http://" + brokerHostname + ":8080" ,
112
- "-r" , String .valueOf (Integer .MAX_VALUE ), // max-rate
113
- "-s" , "8192" , // 8kB message size
114
- "-m" , String .valueOf (numberOfMessages ), "-ml" , "400M" );
138
+ "bash" , "-c" , "echo $$ > /tmp/command.pid; "
139
+ + "/pulsar/bin/pulsar-perf produce " + topicName + " "
140
+ + "-u pulsar://" + brokerHostname + ":6650 "
141
+ + "-au http://" + brokerHostname + ":8080 "
142
+ + "-r " + Integer .MAX_VALUE + " "
143
+ + "-s 128 -db "
144
+ + "-o 20000 "
145
+ + "-m " + numberOfMessages + " -ml 400M "
146
+ + "--histogram-file=/testoutput/produce.histogram.$(date +%s).hdr "
147
+ + "2>&1 | tee /testoutput/produce.$(date +%s).txt" );
148
+ }
149
+
150
+ public CompletableFuture <Long > stats (String topicName ) throws Exception {
151
+ String basePath = "http://" + brokerHostname + ":8080/admin/v2/" + topicName .replace ("://" , "/" );
152
+ // print out stats and internal stats every 10 seconds
153
+ return DockerUtils .runCommandAsyncWithLogging (getDockerClient (), getContainerId (),
154
+ "bash" , "-c" ,
155
+ String .format ("echo $$ > /tmp/command.pid; "
156
+ + "while [[ 1 ]]; do "
157
+ + "curl -s %s/stats | jq | tee /testoutput/stats.$(date +%%s).txt; "
158
+ + "sleep 1; "
159
+ + "curl -s %s/internalStats | jq | tee /testoutput/internal_stats.$(date +%%s).txt; "
160
+ + "curl -s http://%s:8080/metrics/ > /testoutput/metrics.$(date +%%s).txt; "
161
+ + " sleep 10; "
162
+ + "done" ,
163
+ basePath , basePath , brokerHostname ));
164
+ }
165
+
166
+ public void triggerShutdown () {
167
+ if (isRunning ()) {
168
+ // attempt to stop containers gracefully
169
+ DockerUtils .runCommandAsyncWithLogging (getDockerClient (), getContainerId (),
170
+ "bash" , "-c" , "pkill java; while pgrep -c java; do "
171
+ + "echo Waiting for java processes to stop.; sleep 1; done; "
172
+ + "kill $(cat /tmp/command.pid)" )
173
+ .orTimeout (10 , TimeUnit .SECONDS )
174
+ .exceptionally (t -> null )
175
+ .join ();
176
+ }
177
+ }
178
+
179
+ public void stop () {
180
+ if (isRunning ()) {
181
+ // attempt to stop containers gracefully
182
+ dockerClient .stopContainerCmd (getContainerId ())
183
+ .withTimeout (15 )
184
+ .exec ();
185
+ }
186
+ super .stop ();
115
187
}
116
188
}
117
189
118
190
private PulsarPerfContainer perfConsume ;
119
191
private PulsarPerfContainer perfProduce ;
192
+ private PulsarPerfContainer printStats ;
120
193
121
194
@ Override
122
195
public void setupCluster () throws Exception {
@@ -126,14 +199,27 @@ public void setupCluster() throws Exception {
126
199
127
200
@ Override
128
201
public void tearDownCluster () throws Exception {
202
+ if (printStats != null ) {
203
+ printStats .triggerShutdown ();
204
+ }
205
+ if (perfProduce != null ) {
206
+ perfProduce .triggerShutdown ();
207
+ }
129
208
if (perfConsume != null ) {
130
- perfConsume .stop ();
131
- perfConsume = null ;
209
+ perfConsume .triggerShutdown ();
210
+ }
211
+ if (printStats != null ) {
212
+ printStats .stop ();
213
+ printStats = null ;
132
214
}
133
215
if (perfProduce != null ) {
134
216
perfProduce .stop ();
135
217
perfProduce = null ;
136
218
}
219
+ if (perfConsume != null ) {
220
+ perfConsume .stop ();
221
+ perfConsume = null ;
222
+ }
137
223
super .tearDownCluster ();
138
224
}
139
225
@@ -142,7 +228,10 @@ protected void beforeStartCluster() throws Exception {
142
228
super .beforeStartCluster ();
143
229
pulsarCluster .forEachContainer (
144
230
// This is effective only when -Pdocker-wolfi has been passed when building java-test-image
145
- c -> c .withEnv ("GLIBC_TUNABLES" , "glibc.malloc.hugetlb=1:glibc.malloc.mmap_threshold=2097152" ));
231
+ // setting mmap_threshold explicitly will avoid it's dynamic increase
232
+ // https://sourceware.org/glibc/manual/latest/html_node/Memory-Allocation-Tunables.html
233
+ c -> c .withEnv ("GLIBC_TUNABLES" ,
234
+ "glibc.malloc.hugetlb=1:glibc.malloc.mmap_threshold=131072:glibc.malloc.arena_max=4" ));
146
235
}
147
236
148
237
@ Override
@@ -160,15 +249,25 @@ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String c
160
249
specBuilder .numProxies (0 );
161
250
162
251
// Increase memory for brokers and configure more aggressive rollover
163
- specBuilder .brokerEnvs (Map .of ("PULSAR_MEM" , BROKER_PULSAR_MEM ,
164
- "managedLedgerMinLedgerRolloverTimeMinutes" , "1" ,
165
- "managedLedgerMaxLedgerRolloverTimeMinutes" , "5" ,
166
- "managedLedgerMaxSizePerLedgerMbytes" , "512" ,
167
- "managedLedgerDefaultEnsembleSize" , "1" ,
168
- "managedLedgerDefaultWriteQuorum" , "1" ,
169
- "managedLedgerDefaultAckQuorum" , "1" ,
170
- "maxPendingPublishRequestsPerConnection" , "100000"
171
- ));
252
+ Map <String , String > brokerEnvs = new HashMap <>();
253
+ brokerEnvs .put ("PULSAR_MEM" , BROKER_PULSAR_MEM );
254
+ brokerEnvs .put ("managedLedgerMinLedgerRolloverTimeMinutes" , "1" );
255
+ brokerEnvs .put ("managedLedgerMaxLedgerRolloverTimeMinutes" , "5" );
256
+ brokerEnvs .put ("managedLedgerMaxSizePerLedgerMbytes" , "512" );
257
+ brokerEnvs .put ("managedLedgerDefaultEnsembleSize" , "1" );
258
+ brokerEnvs .put ("managedLedgerDefaultWriteQuorum" , "1" );
259
+ brokerEnvs .put ("managedLedgerDefaultAckQuorum" , "1" );
260
+ //brokerEnvs.put("maxPendingPublishRequestsPerConnection", "1000");
261
+ brokerEnvs .put ("dispatcherRetryBackoffInitialTimeInMs" , "0" );
262
+ brokerEnvs .put ("dispatcherRetryBackoffMaxTimeInMs" , "0" );
263
+ brokerEnvs .put ("preciseDispatcherFlowControl" , "true" );
264
+ //brokerEnvs.put("PULSAR_PREFIX_subscriptionKeySharedUseClassicPersistentImplementation", "true");
265
+ //brokerEnvs.put("PULSAR_PREFIX_subscriptionSharedUseClassicPersistentImplementation", "true");
266
+ brokerEnvs .put ("dispatcherMaxReadBatchSize" , "1000" );
267
+ //brokerEnvs.put("dispatcherMaxReadSizeBytes", "10000000");
268
+ //brokerEnvs.put("dispatcherDispatchMessagesInSubscriptionThread", "false");
269
+ //brokerEnvs.put("dispatcherMaxRoundRobinBatchSize", "1000");
270
+ specBuilder .brokerEnvs (brokerEnvs );
172
271
173
272
// Increase memory for bookkeepers and make compaction run more often
174
273
Map <String , String > bkEnv = new HashMap <>();
@@ -190,9 +289,11 @@ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String c
190
289
String brokerHostname = clusterName + "-pulsar-broker-0" ;
191
290
perfProduce = new PulsarPerfContainer (clusterName , brokerHostname , "perf-produce" );
192
291
perfConsume = new PulsarPerfContainer (clusterName , brokerHostname , "perf-consume" );
292
+ printStats = new PulsarPerfContainer (clusterName , brokerHostname , "print-stats" );
193
293
specBuilder .externalServices (Map .of (
194
294
"pulsar-produce" , perfProduce ,
195
- "pulsar-consume" , perfConsume
295
+ "pulsar-consume" , perfConsume ,
296
+ "print-stats" , printStats
196
297
));
197
298
198
299
return specBuilder ;
@@ -204,6 +305,8 @@ public void runPulsarPerf() throws Exception {
204
305
CompletableFuture <Long > consumeFuture = perfConsume .consume (topicName );
205
306
Thread .sleep (1000 );
206
307
CompletableFuture <Long > produceFuture = perfProduce .produce (topicName );
308
+ Thread .sleep (4000 );
309
+ printStats .stats (topicName );
207
310
FutureUtil .waitForAll (List .of (consumeFuture , produceFuture ))
208
311
.orTimeout (3 , TimeUnit .MINUTES )
209
312
.exceptionally (t -> {
0 commit comments