Skip to content

Commit

Permalink
KAFKA-8615: Change to track partition time breaks TimestampExtractor (#…
Browse files Browse the repository at this point in the history
…7054)

The timestamp extractor takes a previousTimestamp parameter which should be the partition time. This PR adds back in partition time tracking for the extractor, and renames previousTimestamp --> partitionTime

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Matthias J. Sax <mjsax@apache.org>
  • Loading branch information
A. Sophie Blee-Goldman authored and guozhangwang committed Jul 18, 2019
1 parent 6ae60fd commit f1300c7
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 58 deletions.
Expand Up @@ -27,7 +27,7 @@
public class JsonTimestampExtractor implements TimestampExtractor {

@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
if (record.value() instanceof PageViewTypedDemo.PageView) {
return ((PageViewTypedDemo.PageView) record.value()).timestamp;
}
Expand Down
Expand Up @@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor {
* Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
*
* @param record a data record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the embedded metadata timestamp of the given {@link ConsumerRecord}
*/
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
final long timestamp = record.timestamp();

if (timestamp < 0) {
return onInvalidTimestamp(record, timestamp, previousTimestamp);
return onInvalidTimestamp(record, timestamp, partitionTime);
}

return timestamp;
Expand All @@ -69,10 +69,10 @@ public long extract(final ConsumerRecord<Object, Object> record, final long prev
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return a new timestamp for the record (if negative, record will not be processed but dropped silently)
*/
public abstract long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp);
final long partitionTime);
}
Expand Up @@ -54,14 +54,14 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return nothing; always raises an exception
* @throws StreamsException on every invocation
*/
@Override
public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp)
final long partitionTime)
throws StreamsException {

final String message = "Input record " + record + " has invalid (negative) timestamp. " +
Expand Down
Expand Up @@ -56,13 +56,13 @@ public class LogAndSkipOnInvalidTimestamp extends ExtractRecordMetadataTimestamp
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the originally extracted timestamp of the record
*/
@Override
public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp) {
final long partitionTime) {
log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record);
return recordTimestamp;
}
Expand Down
Expand Up @@ -46,8 +46,8 @@ public interface TimestampExtractor {
*
*
* @param record a data record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the timestamp of the record
*/
long extract(ConsumerRecord<Object, Object> record, long previousTimestamp);
long extract(ConsumerRecord<Object, Object> record, long partitionTime);
}
Expand Up @@ -51,20 +51,20 @@ public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTime
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the provided latest extracted valid timestamp as new timestamp for the record
* @throws StreamsException if latest extracted valid timestamp is unknown
*/
@Override
public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp)
final long partitionTime)
throws StreamsException {
if (previousTimestamp < 0) {
if (partitionTime < 0) {
throw new StreamsException("Could not infer new timestamp for input record " + record
+ " because latest extracted valid timestamp is unknown.");
+ " because partition time is unknown.");
}
return previousTimestamp;
return partitionTime;
}


Expand Down
Expand Up @@ -38,11 +38,11 @@ public class WallclockTimestampExtractor implements TimestampExtractor {
* Return the current wall clock time as timestamp.
*
* @param record a data record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
*/
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
return System.currentTimeMillis();
}
}
Expand Up @@ -41,8 +41,8 @@
*
* PartitionGroup also maintains a stream-time for the group as a whole.
* This is defined as the highest timestamp of any record yet polled from the PartitionGroup.
* The PartitionGroup's stream-time is also the stream-time of its task and is used as the
* stream-time for any computations that require it.
* Note however that any computation that depends on stream-time should track it on a per-operator basis to obtain an
* accurate view of the local time as seen by that processor.
*
* The PartitionGroups's stream-time is initially UNKNOWN (-1), and it set to a known value upon first poll.
* As a consequence of the definition, the PartitionGroup's stream-time is non-decreasing
Expand Down Expand Up @@ -76,7 +76,7 @@ RecordQueue queue() {
}

PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) {
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp));
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
this.partitionQueues = partitionQueues;
this.recordLatenessSensor = recordLatenessSensor;
totalBuffered = 0;
Expand Down Expand Up @@ -109,7 +109,7 @@ record = queue.poll();
nonEmptyQueuesByTime.offer(queue);
}

// always update the stream time to the record's timestamp yet to be processed if it is larger
// always update the stream-time to the record's timestamp yet to be processed if it is larger
if (record.timestamp > streamTime) {
streamTime = record.timestamp;
recordLatenessSensor.record(0);
Expand Down Expand Up @@ -140,8 +140,8 @@ int addRawRecords(final TopicPartition partition, final Iterable<ConsumerRecord<
nonEmptyQueuesByTime.offer(recordQueue);

// if all partitions now are non-empty, set the flag
// we do not need to update the stream time here since this task will definitely be
// processed next, and hence the stream time will be updated when we retrieved records by then
// we do not need to update the stream-time here since this task will definitely be
// processed next, and hence the stream-time will be updated when we retrieved records by then
if (nonEmptyQueuesByTime.size() == this.partitionQueues.size()) {
allBuffered = true;
}
Expand All @@ -157,10 +157,9 @@ public Set<TopicPartition> partitions() {
}

/**
* Return the timestamp of this partition group as the smallest
* partition timestamp among all its partitions
* Return the stream-time of this partition group defined as the largest timestamp seen across all partitions
*/
public long timestamp() {
public long streamTime() {
return streamTime;
}

Expand Down Expand Up @@ -192,6 +191,7 @@ public void close() {

public void clear() {
nonEmptyQueuesByTime.clear();
streamTime = RecordQueue.UNKNOWN;
for (final RecordQueue queue : partitionQueues.values()) {
queue.clear();
}
Expand Down
Expand Up @@ -31,8 +31,8 @@

/**
* RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the
* partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition
* timestamp is monotonically increasing such that once it is advanced, it will not be decremented.
* partition timestamp defined as the largest timestamp seen on the partition so far; this is passed to the
* timestamp extractor.
*/
public class RecordQueue {

Expand All @@ -47,6 +47,7 @@ public class RecordQueue {
private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;

private StampedRecord headRecord = null;
private long partitionTime = RecordQueue.UNKNOWN;

RecordQueue(final TopicPartition partition,
final SourceNode source,
Expand Down Expand Up @@ -136,20 +137,30 @@ public boolean isEmpty() {
}

/**
* Returns the tracked partition timestamp
* Returns the head record's timestamp
*
* @return timestamp
*/
public long timestamp() {
public long headRecordTimestamp() {
return headRecord == null ? UNKNOWN : headRecord.timestamp;
}

/**
* Returns the tracked partition time
*
* @return partition time
*/
long partitionTime() {
return partitionTime;
}

/**
* Clear the fifo queue of its elements, also clear the time tracker's kept stamped elements
*/
public void clear() {
fifoQueue.clear();
headRecord = null;
partitionTime = RecordQueue.UNKNOWN;
}

private void updateHead() {
Expand All @@ -164,7 +175,7 @@ private void updateHead() {

final long timestamp;
try {
timestamp = timestampExtractor.extract(deserialized, timestamp());
timestamp = timestampExtractor.extract(deserialized, partitionTime);
} catch (final StreamsException internalFatalExtractorException) {
throw internalFatalExtractorException;
} catch (final Exception fatalUserException) {
Expand All @@ -185,6 +196,8 @@ private void updateHead() {
}

headRecord = new StampedRecord(deserialized, timestamp);

partitionTime = Math.max(partitionTime, timestamp);
}
}
}
Expand Up @@ -816,14 +816,14 @@ int numBuffered() {
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
public boolean maybePunctuateStreamTime() {
final long timestamp = partitionGroup.timestamp();
final long streamTime = partitionGroup.streamTime();

// if the timestamp is not known yet, meaning there is not enough data accumulated
// to reason stream partition time, then skip.
if (timestamp == RecordQueue.UNKNOWN) {
if (streamTime == RecordQueue.UNKNOWN) {
return false;
} else {
final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this);
final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);

if (punctuated) {
commitNeeded = true;
Expand All @@ -841,9 +841,9 @@ public boolean maybePunctuateStreamTime() {
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
public boolean maybePunctuateSystemTime() {
final long timestamp = time.milliseconds();
final long systemTime = time.milliseconds();

final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this);
final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);

if (punctuated) {
commitNeeded = true;
Expand Down
Expand Up @@ -670,7 +670,7 @@ public Deserializer deserializer() {
public static class MockTimestampExtractor implements TimestampExtractor {

@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
return 0;
}
}
Expand Down
Expand Up @@ -107,7 +107,7 @@ public void testTimeTracking() {
// st: -1 since no records was being processed yet

verifyBuffered(6, 3, 3);
assertEquals(-1L, group.timestamp());
assertEquals(-1L, group.streamTime());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());

StampedRecord record;
Expand Down Expand Up @@ -143,7 +143,7 @@ record = group.nextRecord(info);
// 2:[4, 6]
// st: 2 (just adding records shouldn't change it)
verifyBuffered(6, 4, 2);
assertEquals(2L, group.timestamp());
assertEquals(2L, group.streamTime());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());

// get one record, time should be advanced
Expand Down Expand Up @@ -221,7 +221,7 @@ public void shouldChooseNextRecordBasedOnHeadTimestamp() {
group.addRawRecords(partition1, list1);

verifyBuffered(3, 3, 0);
assertEquals(-1L, group.timestamp());
assertEquals(-1L, group.streamTime());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());

StampedRecord record;
Expand Down Expand Up @@ -258,7 +258,7 @@ record = group.nextRecord(info);

private void verifyTimes(final StampedRecord record, final long recordTime, final long streamTime) {
assertEquals(recordTime, record.timestamp);
assertEquals(streamTime, group.timestamp());
assertEquals(streamTime, group.streamTime());
}

private void verifyBuffered(final int totalBuffered, final int partitionOneBuffered, final int partitionTwoBuffered) {
Expand Down
Expand Up @@ -731,7 +731,7 @@ public static class CustomTimestampExtractor implements TimestampExtractor {
private static final long DEFAULT_TIMESTAMP = 1000L;

@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
if (record.value().toString().matches(".*@[0-9]+")) {
return Long.parseLong(record.value().toString().split("@")[1]);
}
Expand Down

0 comments on commit f1300c7

Please sign in to comment.