Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8615: Change to track partition time breaks TimestampExtractor #7054

Merged
merged 13 commits into from Jul 18, 2019
Expand Up @@ -27,7 +27,7 @@
public class JsonTimestampExtractor implements TimestampExtractor {

@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
if (record.value() instanceof PageViewTypedDemo.PageView) {
return ((PageViewTypedDemo.PageView) record.value()).timestamp;
}
Expand Down
Expand Up @@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor {
* Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
*
* @param record a data record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
* @return the embedded metadata timestamp of the given {@link ConsumerRecord}
*/
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
final long timestamp = record.timestamp();

if (timestamp < 0) {
return onInvalidTimestamp(record, timestamp, previousTimestamp);
return onInvalidTimestamp(record, timestamp, partitionTime);
}

return timestamp;
Expand Down
Expand Up @@ -46,8 +46,8 @@ public interface TimestampExtractor {
*
*
* @param record a data record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
* @return the timestamp of the record
*/
long extract(ConsumerRecord<Object, Object> record, long previousTimestamp);
long extract(ConsumerRecord<Object, Object> record, long partitionTime);
}
Expand Up @@ -38,11 +38,11 @@ public class WallclockTimestampExtractor implements TimestampExtractor {
* Return the current wall clock time as timestamp.
*
* @param record a data record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
* @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
*/
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
return System.currentTimeMillis();
}
}
Expand Up @@ -47,6 +47,7 @@ public class RecordQueue {
private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;

ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
private StampedRecord headRecord = null;
private long partitionTime = RecordQueue.UNKNOWN;

private Sensor skipRecordsSensor;

Expand Down Expand Up @@ -153,6 +154,7 @@ public long timestamp() {
public void clear() {
fifoQueue.clear();
headRecord = null;
partitionTime = RecordQueue.UNKNOWN;
}

private void updateHead() {
Expand All @@ -167,7 +169,7 @@ private void updateHead() {

final long timestamp;
try {
timestamp = timestampExtractor.extract(deserialized, timestamp());
timestamp = timestampExtractor.extract(deserialized, partitionTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also update method timestamp() to headRecordTimestamp() to be more explicit what it returns? It's orthogonal to the actual fix, but might be a good improvement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar, can we piggy-back some cleanup to PartitionGroup ?

- rename PartitionGroup#timestamp() to PartitionGroup#streamTime()
(also update the JavaDocs, that seems to be wrong)

- in `clear()` reset `streamTime` to UNKNOWN ?

- in `nextRecord()`: do we need to check if `queue != null` and do we need to check if `record != null` (seem it's ensure that both can never be `null` ?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack to all...except the last point. We do check both for null..?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems, we check both for null atm:

        final RecordQueue queue = nonEmptyQueuesByTime.poll();
        info.queue = queue;

        if (queue != null) {
            // get the first record from this queue.
            record = queue.poll();

            if (record != null) {
                --totalBuffered;

But I think they cannot be null, could they?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, misunderstood your question. Yes, either one could potentially be null if we don't yet have new records to process?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. final RecordQueue queue = nonEmptyQueuesByTime.poll(); could return null. However, I am wondering if record = queue.poll(); could return null, because it's called nonEmptyQueuesByTime -- hence, queue should never be empty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree the second null check should never happen.

} catch (final StreamsException internalFatalExtractorException) {
throw internalFatalExtractorException;
} catch (final Exception fatalUserException) {
Expand All @@ -189,6 +191,11 @@ private void updateHead() {
}

headRecord = new StampedRecord(deserialized, timestamp);

// update the partition timestamp if the current head record's timestamp has exceed its value
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
if (timestamp > partitionTime) {
partitionTime = timestamp;
}
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Up @@ -662,7 +662,7 @@ public Deserializer deserializer() {
public static class MockTimestampExtractor implements TimestampExtractor {

@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
return 0;
}
}
Expand Down
Expand Up @@ -731,7 +731,7 @@ public static class CustomTimestampExtractor implements TimestampExtractor {
private static final long DEFAULT_TIMESTAMP = 1000L;

@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
if (record.value().toString().matches(".*@[0-9]+")) {
return Long.parseLong(record.value().toString().split("@")[1]);
}
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
Expand All @@ -38,7 +39,6 @@
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -53,7 +53,7 @@
public class RecordQueueTest {
private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
private final TimestampExtractor timestampExtractor = new RecordQueueTestTimestampExtractor();
private final String[] topics = {"topic"};

private final Sensor skippedRecordsSensor = new Metrics().sensor("skipped-records");
Expand Down Expand Up @@ -182,6 +182,29 @@ public void testTimeTracking() {
assertEquals(4L, queue.timestamp());
}

@Test
public void testTimestampExtractorPartitionTime() {
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved

final RecordQueueTestTimestampExtractor testTimestampExtractor = (RecordQueueTestTimestampExtractor) timestampExtractor;

assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(RecordQueue.UNKNOWN, queue.timestamp());

// add three 3 out-of-order records with timestamp 2, 1, 3, 4
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));

queue.addRawRecords(list1);
while (queue.poll() != null) {
}

assertEquals(testTimestampExtractor.observedPartitionTimes, new ArrayList<>(Arrays.asList(RecordQueue.UNKNOWN, 2L, 2L, 3L)));
}

@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
Expand Down Expand Up @@ -253,4 +276,17 @@ public void shouldDropOnNegativeTimestamp() {

assertEquals(0, queue.size());
}

static class RecordQueueTestTimestampExtractor implements TimestampExtractor {
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
private final List<Long> observedPartitionTimes = new ArrayList<>();

public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
observedPartitionTimes.add(partitionTime);
return record.offset();
}

public List<Long> observedPartitionTimes() {
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
return observedPartitionTimes;
}
}
}
Expand Up @@ -23,7 +23,7 @@
public class MockTimestampExtractor implements TimestampExtractor {

@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
return record.offset();
}
}