Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 125 additions & 2 deletions tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ public static void main(String[] args) throws Exception {
perf.start(args);
}

/**
* Runs the producer-performance test based on command-line arguments, producing records to Kafka and reporting metrics.
*
* Parses the provided CLI arguments, constructs and configures a KafkaProducer, optionally initializes transactions
* and a warmup period, sends the configured number of records while applying throughput throttling, collects latency
* and throughput statistics (including separate steady-state stats when a warmup is configured), and prints final
* summaries and producer metrics as requested.
*
* @param args command-line arguments for this tool (see {@link #argParser()})
* @throws IOException if reading producer configuration or payload files fails
*/
void start(String[] args) throws IOException {
ArgumentParser parser = argParser();

Expand Down Expand Up @@ -158,6 +169,12 @@ record = new ProducerRecord<>(config.topicName, payload);

}

/**
* Create a Kafka producer configured with the supplied properties.
*
* @param props producer configuration properties
* @return a new KafkaProducer<byte[], byte[]> configured with the given properties
*/
KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
return new KafkaProducer<>(props);
}
Expand All @@ -166,6 +183,24 @@ KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
Stats stats;
Stats steadyStateStats;

/**
* Selects or generates a message payload according to the provided inputs.
*
* If `payloadByteList` is non-empty a random entry from the list is returned.
* Otherwise, if `recordSize` is non-null the supplied `payload` buffer is filled
* with random uppercase ASCII letters and returned. If `payloadMonotonic` is true
* the UTF-8 bytes of `recordValue` are returned. If none of these sources is
* available an exception is thrown.
*
* @param recordSize fixed payload size to generate; if non-null the method fills and returns `payload`
* @param payloadByteList list of predefined payloads; a random element is chosen when non-empty
* @param payload buffer to fill when `recordSize` is provided; its length should equal `recordSize`
* @param random source of randomness for selecting list entries or generating bytes
* @param payloadMonotonic when true produce a payload from `recordValue` instead of random content
* @param recordValue numeric value converted to UTF-8 bytes when `payloadMonotonic` is true
* @return the payload byte array to send
* @throws IllegalArgumentException if no payload list, record size, or monotonic option is provided
*/
static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload,
SplittableRandom random, boolean payloadMonotonic, long recordValue) {
if (!payloadByteList.isEmpty()) {
Expand All @@ -181,6 +216,19 @@ static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByte
return payload;
}

/**
* Build producer Properties from an optional properties file and a list of key=value overrides.
*
* Merges properties loaded from the file at producerConfig (if provided) with any producerProps
* entries (each must be "key=value"), ensures key and value serializers are set to
* ByteArraySerializer, and sets a default client id of "perf-producer-client" when none is present.
*
* @param producerProps list of producer properties in "key=value" form; may be null
* @param producerConfig path to a properties file to load; may be null
* @return a Properties instance containing the merged and normalized producer configuration
* @throws IOException if loading properties from producerConfig fails
* @throws IllegalArgumentException if any string in producerProps is not in "key=value" format
*/
static Properties readProps(List<String> producerProps, String producerConfig) throws IOException {
Properties props = new Properties();
if (producerConfig != null) {
Expand Down Expand Up @@ -222,7 +270,15 @@ static List<byte[]> readPayloadFile(String payloadFilePath, String payloadDelimi
return payloadByteList;
}

/** Get the command-line argument parser. */
/**
* Create and configure the command-line ArgumentParser for the producer-performance tool.
*
* The parser defines options for topic, number of records, payload selection (exactly one of
* --record-size, --payload-file, or --payload-monotonic), payload delimiter, throughput throttling,
* producer properties/config file, metrics printing, transaction settings, and warmup records.
*
* @return the configured ArgumentParser for the producer-performance tool
*/
static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("producer-performance")
Expand Down Expand Up @@ -376,10 +432,26 @@ static class Stats {
private final boolean isSteadyState;
private boolean steadyStateActive;

/**
* Creates a Stats collector configured for a non–steady-state run.
*
* @param numRecords the expected total number of records to size internal sampling structures
*/
public Stats(long numRecords) {
this(numRecords, false);
}

/**
* Initialize a Stats accumulator for the given total record count and mode.
*
* Sets timestamps, sampling rate, allocates the latency sample buffer sized for
* the provided record count and sampling, and initializes counters and window
* bookkeeping. Also configures whether this Stats instance represents a steady-state
* measurement and whether steady-state reporting is active.
*
* @param numRecords total number of records that will be measured; used to compute sampling rate and buffer size
* @param isSteadyState true if this Stats instance should operate in steady-state mode (affects reporting and activation)
*/
public Stats(long numRecords, boolean isSteadyState) {
this.start = System.currentTimeMillis();
this.windowStart = System.currentTimeMillis();
Expand All @@ -398,6 +470,18 @@ public Stats(long numRecords, boolean isSteadyState) {
this.steadyStateActive = isSteadyState;
}

/**
* Record a single send result into the aggregated and windowed statistics.
*
* Updates total counters (count, bytes, total and max latency), updates the current window
* counters, samples the latency into the latency buffer according to the sampling rate,
* and, when the reporting interval has elapsed, prints the current window (respecting
* steady-state settings) and starts a new window.
*
* @param latency latency of the recorded send in milliseconds
* @param bytes number of payload bytes for the recorded send
* @param time current time in milliseconds used to evaluate the reporting interval
*/
public void record(int latency, int bytes, long time) {
this.count++;
this.bytes += bytes;
Expand Down Expand Up @@ -463,6 +547,14 @@ public void newWindow() {
this.windowBytes = 0;
}

/**
* Prints aggregated performance metrics to standard output, including total records sent, throughput
* (records/sec and MB/sec), average latency, maximum latency, and latency percentiles (50th, 95th,
* 99th, and 99.9th).
*
* If this Stats instance represents steady-state measurements, the output includes the text
* " steady state" after the record count.
*/
public void printTotal() {
long elapsed = System.currentTimeMillis() - start;
double recsPerSec = 1000.0 * count / (double) elapsed;
Expand Down Expand Up @@ -499,13 +591,31 @@ static final class PerfCallback implements Callback {
private final Stats stats;
private final Stats steadyStateStats;

/**
* Create a callback that records send latency and bytes into the provided statistics collectors.
*
* @param start the send start time in milliseconds since the epoch
* @param bytes the number of bytes in the sent payload
* @param stats the main Stats instance to receive latency and byte measurements
* @param steadyStateStats an optional Stats instance to receive steady-state measurements; may be null
*/
public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) {
this.start = start;
this.stats = stats;
this.steadyStateStats = steadyStateStats;
this.bytes = bytes;
}

/**
* Handle completion of a produce request by updating statistics on success or reporting the error.
*
* When the send succeeds (exception is null), records the measured latency and byte count into the main
* Stats instance and, if present, into the steady-state Stats; also increments their iteration counters.
* If an exception is present, prints its stack trace.
*
* @param metadata metadata for the sent record (may be null if the send failed)
* @param exception the exception that occurred during send, or null on success
*/
public void onCompletion(RecordMetadata metadata, Exception exception) {
long now = System.currentTimeMillis();
int latency = (int) (now - start);
Expand Down Expand Up @@ -537,6 +647,19 @@ static final class ConfigPostProcessor {
final boolean transactionsEnabled;
final List<byte[]> payloadByteList;

/**
* Parse and validate command-line arguments, load payload and producer properties,
* and initialize runtime configuration fields for the performance producer (topicName,
* numRecords, warmupRecords, recordSize, throughput, payloadMonotonic, shouldPrintMetrics,
* payloadByteList, producerProps, transactionsEnabled, transactionDurationMs).
*
* @param parser the configured ArgumentParser for the producer-performance CLI
* @param args the command-line arguments to parse
* @throws IOException if reading the payload or producer properties file fails
* @throws ArgumentParserException if required arguments are missing or invalid (for example:
* --num-records <= 0, --warmup-records >= --num-records, missing producer config,
* nonpositive --record-size, or nonpositive --transaction-duration-ms)
*/
public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOException, ArgumentParserException {
Namespace namespace = parser.parseArgs(args);
this.topicName = namespace.getString("topic");
Expand Down Expand Up @@ -591,4 +714,4 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
this.transactionDurationMs = transactionDurationMsArg;
}
}
}
}