Skip to content

Commit

Permalink
KAFKA-1250 Add logging to new producer.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkreps committed Feb 28, 2014
1 parent a810b8e commit f1a53b9
Show file tree
Hide file tree
Showing 18 changed files with 291 additions and 173 deletions.
1 change: 1 addition & 0 deletions build.gradle
Expand Up @@ -310,6 +310,7 @@ project(':clients') {
archivesBaseName = "kafka-clients"

dependencies {
compile "org.slf4j:slf4j-api:1.7.6"
testCompile 'com.novocode:junit-interface:0.9'
}

Expand Down
Expand Up @@ -45,6 +45,8 @@
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.SystemTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A Kafka client that publishes records to the Kafka cluster.
Expand All @@ -56,6 +58,8 @@
*/
public class KafkaProducer implements Producer {

private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);

private final Partitioner partitioner;
private final int maxRequestSize;
private final long metadataFetchTimeoutMs;
Expand Down Expand Up @@ -85,6 +89,7 @@ public KafkaProducer(Properties properties) {
}

private KafkaProducer(ProducerConfig config) {
log.trace("Starting the Kafka producer");
this.metrics = new Metrics(new MetricConfig(),
Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")),
new SystemTime());
Expand Down Expand Up @@ -114,8 +119,10 @@ private KafkaProducer(ProducerConfig config) {
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
new SystemTime());
this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true);
this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true);
this.ioThread.start();
config.logUnused();
log.debug("Kafka producer started");
}

private static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
Expand All @@ -124,7 +131,7 @@ private static List<InetSocketAddress> parseAndValidateAddresses(List<String> ur
if (url != null && url.length() > 0) {
String[] pieces = url.split(":");
if (pieces.length != 2)
throw new ConfigException("Invalid url in metadata.broker.list: " + url);
throw new ConfigException("Invalid url in " + ProducerConfig.BROKER_LIST_CONFIG + ": " + url);
try {
InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
if (address.isUnresolved())
Expand Down Expand Up @@ -215,12 +222,14 @@ public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
int partition = partitioner.partition(record, cluster);
ensureValidSize(record.key(), record.value());
TopicPartition tp = new TopicPartition(record.topic(), partition);
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
this.sender.wakeup();
return future;
// For API exceptions return them in the future;
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
return new FutureFailure(e);
Expand Down Expand Up @@ -260,13 +269,15 @@ public List<PartitionInfo> partitionsFor(String topic) {
*/
@Override
public void close() {
log.trace("Closing the Kafka producer.");
this.sender.initiateClose();
try {
this.ioThread.join();
} catch (InterruptedException e) {
throw new KafkaException(e);
}
this.metrics.close();
log.debug("The Kafka producer has closed.");
}

private static class FutureFailure implements Future<RecordMetadata> {
Expand Down
@@ -1,18 +1,14 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer;

Expand Down Expand Up @@ -97,4 +93,10 @@ public Integer partition() {
return partition;
}

@Override
public String toString() {
String key = this.key == null ? "null" : ("byte[" + this.key.length + "]");
String value = this.value == null ? "null" : ("byte[" + this.value.length + "]");
return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value;
}
}
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A class encapsulating some of the logic around metadata.
Expand All @@ -30,6 +32,8 @@
*/
public final class Metadata {

private static final Logger log = LoggerFactory.getLogger(Metadata.class);

private final long refreshBackoffMs;
private final long metadataExpireMs;
private long lastRefresh;
Expand Down Expand Up @@ -81,6 +85,7 @@ public synchronized Cluster fetch(String topic, long maxWaitMs) {
topics.add(topic);
forceUpdate = true;
try {
log.trace("Requesting metadata update for topic {}.", topic);
wait(maxWaitMs);
} catch (InterruptedException e) { /* this is fine, just try again */
}
Expand Down Expand Up @@ -127,6 +132,7 @@ public synchronized void update(Cluster cluster, long now) {
this.lastRefresh = now;
this.cluster = cluster;
notifyAll();
log.debug("Updated cluster metadata to {}", cluster);
}

/**
Expand Down
Expand Up @@ -33,6 +33,8 @@
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
Expand All @@ -43,6 +45,8 @@
*/
public final class RecordAccumulator {

private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);

private volatile boolean closed;
private int drainIndex;
private final int batchSize;
Expand Down Expand Up @@ -126,6 +130,7 @@ public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value,

// we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size);
synchronized (dq) {
RecordBatch first = dq.peekLast();
Expand Down
Expand Up @@ -16,17 +16,21 @@
import java.util.List;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A batch of records that is or will be sent.
*
* This class is not thread safe and external synchronization must be used when modifying it
*/
public final class RecordBatch {

private static final Logger log = LoggerFactory.getLogger(RecordBatch.class);

public int recordCount = 0;
public volatile int attempts = 0;
public final long created;
Expand Down Expand Up @@ -64,11 +68,15 @@ public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType
/**
* Complete the request
*
* @param offset The offset
* @param baseOffset The base offset of the messages assigned by the server
* @param errorCode The error code or 0 if no error
*/
public void done(long offset, RuntimeException exception) {
this.produceFuture.done(topicPartition, offset, exception);
public void done(long baseOffset, RuntimeException exception) {
this.produceFuture.done(topicPartition, baseOffset, exception);
log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
topicPartition,
baseOffset,
exception);
// execute callbacks
for (int i = 0; i < this.thunks.size(); i++) {
try {
Expand All @@ -78,7 +86,7 @@ public void done(long offset, RuntimeException exception) {
else
thunk.callback.onCompletion(null, exception);
} catch (Exception e) {
e.printStackTrace();
log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e);
}
}
}
Expand All @@ -95,4 +103,9 @@ public Thunk(Callback callback, FutureRecordMetadata future) {
this.future = future;
}
}

@Override
public String toString() {
return "RecordBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")";
}
}

0 comments on commit f1a53b9

Please sign in to comment.