Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3025 Added timetamp to Message and use relative offset. #764

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ce01646
Added timetamp to Message and use relative offset.
becketqin Jan 7, 2016
d16091c
Added timestamp and relative offset to clients package.
becketqin Jan 20, 2016
3b23383
Rebased on trunk.
becketqin Jan 21, 2016
813e0dd
Updated java doc for KafkaProducer and RecordMetadata. Added comments…
becketqin Jan 22, 2016
b424c45
Use the timestamp attribute bit as Guozhang proposed.
becketqin Jan 27, 2016
e76aa20
Fixed issue where timestmap type is not updated for non-compressed me…
becketqin Jan 27, 2016
60108b0
Change the compression codec bits back to 3
becketqin Jan 28, 2016
8606c41
Extracted TimestampType out of Record and Message.
becketqin Jan 28, 2016
78b0936
Fixed connect and streams test
becketqin Jan 28, 2016
0a1a0b8
Addressed Anna's comments.
becketqin Jan 28, 2016
b0ba4a0
Addressed Jun's comments
becketqin Jan 29, 2016
1824bfa
Added test for invalid timestamp
becketqin Feb 2, 2016
c691fd0
Addressed Jun's comments
becketqin Feb 2, 2016
b8e65cb
Addressed Jun's comments
becketqin Feb 3, 2016
fd1afde
Addressed Jun's comments
becketqin Feb 3, 2016
56fc79f
Changed message format version to v0 and v1
becketqin Feb 4, 2016
67201a6
Updated upgrade doc
becketqin Feb 8, 2016
92a39ba
Addressed Jun's comments
becketqin Feb 11, 2016
acc7189
Updated doc for message format version
becketqin Feb 13, 2016
b38877f
Fix EdgeCaseRequestTest
becketqin Feb 14, 2016
def2673
remove default value for Message constructor.
becketqin Feb 14, 2016
a0f76bb
Addressed Jun's comments.
becketqin Feb 16, 2016
89958b9
Addressed Ismael's comments
becketqin Feb 16, 2016
c3ca748
Addressed Ismael's comments.
becketqin Feb 16, 2016
085958e
Added documentation for changing internal offset topic configuration …
becketqin Feb 17, 2016
9bd7f88
rebased on trunk
becketqin Feb 17, 2016
095d88d
Minor comment improvements
ijuma Feb 17, 2016
648a457
Use `long` instead of `Long` for `offsetCounter`
ijuma Feb 17, 2016
d0772a3
Minor code style improvements
ijuma Feb 17, 2016
a4313f4
Use `Time` interface in `GroupCoordinator` and `GroupMetadataManager`
ijuma Feb 17, 2016
0ec15f9
Merge pull request #1 from ijuma/KAFKA-3025
becketqin Feb 17, 2016
200d69b
Addressed Jun's comments
becketqin Feb 18, 2016
99a4a27
Addressed Ismael's comments.
becketqin Feb 18, 2016
22c74cd
Addressed Jun's comments
becketqin Feb 19, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
<subpackage name="requests">
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.record" />
<!-- for testing -->
<allow pkg="org.apache.kafka.common.errors" />
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.record.TimestampType;

/**
* A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the
* record is being received and an offset that points to the record in a Kafka partition.
Expand All @@ -20,6 +22,8 @@ public final class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final K key;
private final V value;

Expand All @@ -29,15 +33,25 @@ public final class ConsumerRecord<K, V> {
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the doc for timestampType?

* @param timestampType The timestamp type
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
*/
public ConsumerRecord(String topic, int partition, long offset, K key, V value) {
public ConsumerRecord(String topic,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would you feel about adding a testing constructor for this class that matches the existing signature, and would just use dummy values for the timestamp and TimestampType? Eliminating that constructor will break compilation for any unit tests that rely on building ConsumerRecords

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the unit tests in projects other than Kafka?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. It's a minor change to update anywhere these constructors are used in unit tests, but any test that instantiates a ConsumerRecord isn't going to compile as soon as a user upgrades past 0.9.0.0. We've used the @VisibleForTesting annotation in the past to denote certain constructors that are only there for unit-testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we should add the testing constructor for other projects. It is a little bit weird to have a testing constructor which never used by Kafka but for some unknown external project. Technically speaking ConsumerRecord should only be constructed by Kafka, but not other projects. If we do so, arguably we should maintain the constructor backward compatibility for any public class, even though most of them are not supposed to be constructed by any user.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ConsumerRecord is only intended to be instantiated by Kafka, then I withdraw my comment. Internal APIs shouldn't be forced to remain backwards-compatible.

int partition,
long offset,
long timestamp,
TimestampType timestampType,
K key,
V value) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
this.timestampType = timestampType;
this.key = key;
this.value = value;
}
Expand Down Expand Up @@ -77,9 +91,23 @@ public long offset() {
return offset;
}

/**
* The timestamp of this record
*/
public long timestamp() {
return timestamp;
}

/**
* The timestamp type of this record
*/
public TimestampType timestampType() {
return timestampType;
}

@Override
public String toString() {
return "ConsumerRecord(topic = " + topic() + ", partition = " + partition() + ", offset = " + offset()
+ ", key = " + key + ", value = " + value + ")";
+ ", " + timestampType + " = " + timestamp + ", key = " + key + ", value = " + value + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
Expand Down Expand Up @@ -614,12 +615,14 @@ private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logE
if (this.checkCrcs)
logEntry.record().ensureValid();
long offset = logEntry.offset();
long timestamp = logEntry.record().timestamp();
TimestampType timestampType = logEntry.record().timestampType();
ByteBuffer keyBytes = logEntry.record().key();
K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
ByteBuffer valueBytes = logEntry.record().value();
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));

return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, key, value);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, key, value);
} catch (KafkaException e) {
throw e;
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,12 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
* records waiting to be sent. This allows sending many records in parallel without blocking to wait for the
* response after each one.
* <p>
* The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to and the offset
* it was assigned.
* The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to, the offset
* it was assigned and the timestamp of the record. If
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime} is used by the topic, the timestamp
* will be the user provided timestamp or the record send time if the user did not specify a timestamp for the
* record. If {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime} is used for the
* topic, the timestamp will be the Kafka broker local time when the message is appended.
* <p>
* Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
* {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
Expand Down Expand Up @@ -456,8 +460,9 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
TopicPartition tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, callback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.serialization.Serializer;


Expand Down Expand Up @@ -116,10 +117,10 @@ public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Cal
if (this.cluster.partitionsForTopic(record.topic()) != null)
partition = partition(record, this.cluster);
ProduceRequestResult result = new ProduceRequestResult();
FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
FutureRecordMetadata future = new FutureRecordMetadata(result, 0, Record.NO_TIMESTAMP);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
long offset = nextOffset(topicPartition);
Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset), result, callback);
Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset, Record.NO_TIMESTAMP), result, callback);
this.sent.add(record);
if (autoComplete)
completion.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,62 @@
* If a valid partition number is specified that partition will be used when sending the record. If no partition is
* specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
* present a partition will be assigned in a round-robin fashion.
* <p>
* The record also has an associated timestamp. If the user did not provide a timestamp, the producer will stamp the
* record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for
* the topic.
* <li>
* If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime},
* the timestamp in the producer record will be used by the broker.
* </li>
* <li>
* If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime},
* the timestamp in the producer record will be overwritten by the broker with the broker local time when it appends the
* message to its log.
* </li>
* <p>
* In either of the cases above, the timestamp that has actually been used will be returned to user in
* {@link RecordMetadata}
*/
public final class ProducerRecord<K, V> {

private final String topic;
private final Integer partition;
private final K key;
private final V value;
private final Long timestamp;

/**
* Creates a record to be sent to a specified topic and partition
* Creates a record with a specified timestamp to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param timestamp The timestamp of the record
* @param key The key that will be included in the record
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, K key, V value) {
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException("Invalid timestamp " + timestamp);
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
}

/**
* Creates a record to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param key The key that will be included in the record
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value);
}

/**
Expand All @@ -52,7 +85,7 @@ public ProducerRecord(String topic, Integer partition, K key, V value) {
* @param value The record contents
*/
public ProducerRecord(String topic, K key, V value) {
this(topic, null, key, value);
this(topic, null, null, key, value);
}

/**
Expand All @@ -62,18 +95,18 @@ public ProducerRecord(String topic, K key, V value) {
* @param value The record contents
*/
public ProducerRecord(String topic, V value) {
this(topic, null, value);
this(topic, null, null, null, value);
}

/**
* The topic this record is being sent to
* @return The topic this record is being sent to
*/
public String topic() {
return topic;
}

/**
* The key (or null if no key is specified)
* @return The key (or null if no key is specified)
*/
public K key() {
return key;
Expand All @@ -87,7 +120,14 @@ public V value() {
}

/**
* The partition to which the record will be sent (or null if no partition was specified)
* @return The timestamp
*/
public Long timestamp() {
return timestamp;
}

/**
* @return The partition to which the record will be sent (or null if no partition was specified)
*/
public Integer partition() {
return partition;
Expand All @@ -97,7 +137,9 @@ public Integer partition() {
public String toString() {
String key = this.key == null ? "null" : this.key.toString();
String value = this.value == null ? "null" : this.value.toString();
return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value;
String timestamp = this.timestamp == null ? "null" : this.timestamp.toString();
return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value +
", timestamp=" + timestamp + ")";
}

@Override
Expand All @@ -117,6 +159,8 @@ else if (topic != null ? !topic.equals(that.topic) : that.topic != null)
return false;
else if (value != null ? !value.equals(that.value) : that.value != null)
return false;
else if (timestamp != null ? !timestamp.equals(that.timestamp) : that.timestamp != null)
return false;

return true;
}
Expand All @@ -127,6 +171,7 @@ public int hashCode() {
result = 31 * result + (partition != null ? partition.hashCode() : 0);
result = 31 * result + (key != null ? key.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,25 @@
public final class RecordMetadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also would be good to add a comment to RecordMetadata class description what timestamp is actually returned (either set by client, producer, or broker).


private final long offset;
// The timestamp of the message.
// If LogAppendTime is used for the topic, the timestamp will be the timestamp returned by the broker.
// If CreateTime is used for the topic, the timestamp is the timestamp in the corresponding ProducerRecord if the
// user provided one. Otherwise, it will be the producer local time when the producer record was handed to the
// producer.
private final long timestamp;
private final TopicPartition topicPartition;

private RecordMetadata(TopicPartition topicPartition, long offset) {
private RecordMetadata(TopicPartition topicPartition, long offset, long timestamp) {
super();
this.offset = offset;
this.timestamp = timestamp;
this.topicPartition = topicPartition;
}

public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) {
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp) {
// ignore the relativeOffset if the base offset is -1,
// since this indicates the offset is unknown
this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset);
this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset, timestamp);
}

/**
Expand All @@ -45,6 +52,13 @@ public long offset() {
return this.offset;
}

/**
* The timestamp of the record in the topic/partition.
*/
public long timestamp() {
return timestamp;
}

/**
* The topic the record was appended to
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {

private final ProduceRequestResult result;
private final long relativeOffset;
private final long timestamp;

public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset) {
public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long timestamp) {
this.result = result;
this.relativeOffset = relativeOffset;
this.timestamp = timestamp;
}

@Override
Expand Down Expand Up @@ -59,13 +61,17 @@ RecordMetadata valueOrError() throws ExecutionException {
}

RecordMetadata value() {
return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset, this.timestamp);
}

public long relativeOffset() {
return this.relativeOffset;
}

public long timestamp() {
return this.timestamp;
}

@Override
public boolean isCancelled() {
return false;
Expand Down
Loading