Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7223: In-Memory Suppression Buffering #5693

Merged
merged 5 commits into from Oct 2, 2018
Merged

KAFKA-7223: In-Memory Suppression Buffering #5693

merged 5 commits into from Oct 2, 2018

Conversation

vvcephei
Copy link
Contributor

This is Part 3 of suppression.
Part 1 was #5567 (the API)
Part 2 was #5687 (the tests)

Implement a non-durable in-memory buffering strategy for suppression.
As of this changeset, the suppression API is fully functional.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@vvcephei
Copy link
Contributor Author

@guozhangwang @bbejeck When you get a chance, please review this code.

I have done my best locally to produce a nice, clean implementation, but now that the diff is published, I'll make another pass over it looking for sharp edges.

@vvcephei vvcephei mentioned this pull request Sep 25, 2018
3 tasks
@mjsax mjsax added the streams label Sep 25, 2018
final boolean isQueryable) {
final boolean isQueryable,
final Serde<KR> keySerde,
final Serde<T> valueSerde) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change (and similar changes below) are to make sure the serdes we need for suppression are available.
I sort of thought that we already merged a PR to do this, but perhaps it was only partial.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha. I was thinking of #5521, which just isn't merged (yet).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we merge #5521 (I think it is in pretty good shape) and rebase this PR on that? I felt a couple of the changes blew are a bit redundant, e.g. passing in the materializedInternal object as well as its serde fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that's a good plan. I agree on the reduncancy, but I wanted to keep the serde-related perturbations to a minimum so we wouldn't distract from the PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just reviewed #5521 again -- left some more comments.

@@ -21,7 +21,7 @@
import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;

abstract class BufferConfigImpl<BC extends Suppressed.BufferConfig<BC>> implements Suppressed.BufferConfig<BC> {
public abstract long maxKeys();
public abstract long maxRecords();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized belatedly that I missed this (internal) interface when I renamed "maxKeys" to "maxRecords" in Part 1.


import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;

class ContextualRecord<V> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wraps the value so that the buffer can store the whole record context for later forwarding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of the LRUCacheEntry class used for caching.. could we consolidate these two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is similar, but the LRUCacheEntry tracks isDirty that would be confusing in this context, so I wouldn't use LRUCacheEntry here, but we could go the other way and make LRUCacheEntry wrap ContextualRecord instead of storing the value + context itself.

Let me know if this sounds good to you... I'll go ahead and optimistically code it up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make LRUCacheEntry wrap ContextualRecord yeah that sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this is done now.

private final Map<K, TimeKey<K>> index = new HashMap<>();
private final TreeMap<TimeKey<K>, SizedValue<V>> sortedMap = new TreeMap<>();

private static class SizedValue<V> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't actually store the value serialized in the in-memory impl, we annotate the value with its size so we can maintain the current footprint of the buffer. Alternatively, we could serialize it again on removal to idempotently re-compute its size, but this seemed cheaper.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this. See my other comment. Would be good to get input from @guozhangwang and @bbejeck about this.

final V value,
final long recordSize) {
final TimeKey<K> previousKey = index.get(key);
// non-resetting semantics:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be configurable in the future, but for now, we enforce the time limit in the following fashion:

  • start a timer when a key first enters the buffer
  • that key and its most recent value will be emitted when the time limit expires, regardless of how recently it has been updated

The primary advantage of this implementation is that we guarantee that if you set a 5-minute limit, we delay emitting the key for no more than five minutes. If we instead re-set the timer on each update, you might never see a record that gets consistently updated more frequently than the time limit.

My judgement was that this is the most intuitive default and starting point for the feature. If people want to configure it, we can easily add that option later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the described semantics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree with the semantics for enforcing the time limit.

@SuppressWarnings("unchecked")
private <T> Serde<T> castSerde(final Serde<?> untyped, final String keyOrValue) {
try {
return (Serde<T>) untyped;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Come to think of it, this is probably insufficient to catch the wrong serde (due to erasure). I probably need to relocate this error message to the actual call to de/serialize

}

private void evictOldEnoughRecords(final long streamTime) {
System.out.println("st: " + streamTime);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. I'll take these out.

}
return;
case SHUT_DOWN:
throw new RuntimeException("TODO: request graceful shutdown"); // TODO: request graceful shutdown
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, I was meaning to figure out the right exception to throw to achieve a nice shutdown (I think any runtime exception will do it, but is there a semantically best one?)


import java.util.Objects;

class TimeKey<K> implements Comparable<TimeKey<K>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specifically for storing the keys sorted by timestamp in the buffer. I wasn't sure whether a more general or more specific name like BufferKey is better...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use org.apache.kafka.streams.processor.internals.Stamped? They seem very similar (feel free to rename the class if you like other names better: since it is internal classes, we can change it whenever we want.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Actually, Stamped has unusual implementations of equals, hashcode, and compareTo. They all disregard the stamped value and are only determined by the timestamp...

So, Stamped won't provide the semantics we need from TimeKey, and I'm afraid to change the equals/hashcode/compareTo of Stamped and messing up its semantics...

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to require value ordering for TimeKey here? I thought it is not required as they are not following offset ordering to break ties anyways, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that will work...

Comparable requires a total ordering and also specifies that a.compareTo(b) == 0 iff a.equals(b), which in turn requires that a.hashCode() == b.hashCode().

But this would prevent us from inserting two different keys with the same time into our buffer map. It doesn't seem like Stamped is suitable for map keys or set entries for this reason.


@Override
public int compareTo(final TimeKey<K> o) {
// ordering of keys within a time uses hashCode.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aka, ordering of keys that share a timestamp is arbitrarily. If anyone cares, we can do "better" by requiring K to be Comparable (but I don't think anyone should care, so I kept it simple)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think, it's semantically fine, it might be nice to get the same eviction behavior for a reprocessing use-case... I am also realizing, that TimeKey is actually always used with Bytes -- thus, I would recommend to remove the generic type, and exploit that Bytes implements Comparable already.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Some comments/questions.

@@ -53,34 +85,121 @@ public void process(final K key, final Change<V> value) {
internalProcessorContext.forward(key, value);
} // else skip
} else {
throw new NotImplementedException();
ensureBuffer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this in process -- seem like moving it to init() should be sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not so easy to tell when we really need to buffer records until we actually get some records. This is a consequence (maybe a downside) of my choice to use TimeDefinition to use the window-end time as "now" and the grace period as the suppressDuration. Because of this, within the buffering context, even with a suppressDuration of 0, we might still need to buffer, as the effective timestamp is in the future.

Thinking through this, we could try instead using the window start as "now" and using the window size + grace period as the suppress duration, but offhand it seems this wouldn't work too well with SessionWindows (or other variable-sized windows).

So instead what I chose to do is just do a lightweight check when I need the buffer and initialize it if it hasn't already been. I could even move the if buffer == null to right here, and jit branch prediction would ensure this lazy check is almost zero after buffer gets initialized.

Some alternatives:

  1. discard the optimization and just always initialize it, in case I need it.
  2. junk the (maybe unnecessarily) flexible TimeDefinition function and instead just use a "time strategy" enum that tells the processor whether it should use record time or window-end time:
    In the former case, if the duration is zero, we know we'll never need a buffer. If it's > zero, we'll probably need one.
    In the latter case, we'll probably need a buffer, regardless of the suppression duration.

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard to say -- JIT branch prediction might make my concern invalid -- it's just because it's on the hot code path. Would be good to get input from @guozhangwang and @bbejeck

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we should avoid pre-mature optimization...

}

private long definedRecordTime(final K key) {
return suppress.getTimeDefinition().time(internalProcessorContext, key);
}

private void buffer(final K key, final Change<V> value) {
final long time = suppress.getTimeDefinition().time(internalProcessorContext, key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suppress.getTimeDefinition() should return the same thing each time? Should we put it into a member variable?

private final Map<K, TimeKey<K>> index = new HashMap<>();
private final TreeMap<TimeKey<K>, SizedValue<V>> sortedMap = new TreeMap<>();

private static class SizedValue<V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering about this: as we compute the byte-size later, and already pay the cost to serialize the record, should we not store byte[]/byte[] in the buffer? Of course, still will imply that we need to deserialize later, however, the keeping the actual deserialized objects around would haver more storage overhead and would not obey the buffer size IMHO. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this is a reasonable thing to do. I've been going back and forth on it.

The downside of storing it serialized is then we need to deserialize it to emit it. This is a moot point for the (planned) on-disk implementation, but for in-memory it saves some CPU and possibly some GC pressure not to round-trip it through a byte array.

As is, we serialize it just once instead of serialize + deserialize. Plus we currently discard the produced array immediately, so it's easy on the gc, whereas if we keep it, then we have 3 medium-to-long term objects: the incoming record, the serialized array, and the (deserialized) outgoing record. Is this premature optimization? Possibly.

Some other factors to consider: when we send to the changelog, we'll need to serialize it anyway. But I'm planning to send only on flush and to keep the changelog buffer compact with a LinkedHashmap, so records that get updated or retracted several times within a commit interval would only get serialized once. Plus, for this purpose, we still only need the serialize side; we could hang onto the produced array after computing the size long enough to send it to the changelogger.

For changelogging purposes, we'd only need to deserialize when we recover on startup, not in steady-state operations, so I think it's still more economical to store the records as objects instead of serialized.

It is true that there's really no tight correlation between the heap used by an object and the heap used by its serialized form. So at the moment, we're only roughly obeying the size limit. For primitive data, it's probably pretty close, though.

I'm open to either way of doing it, but that was my thinking. What say you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with all trade-offs you mention. For KTable caches, we also went to storing byte[] to obey the size config. Also note, we don't need to deserialize all byte[] arrays, but only on eviction -- if we have a lot of suppression. many byte[] arrays would never the deserialized but overwritten. Depending on throughput and number if unique keys, this might happen quickly enough to still be young gen. Hard to say. Again, more input from @guozhangwang and @bbejeck would be helpful.

And as above, pre-mature optimization should be avoided. Could we do some prototyping and benchmarking of both approaches? Not sure if there is enough time. Also, it's an internal implementation and if performance becomes an issue, we ca also improve on it in 2.2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very true:

if we have a lot of suppression. many byte[] arrays would never the deserialized but overwritten

I won't do anything with it right now, but wait for more input (and take care of the other things we discussed)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I also agree with the trade-offs mentioned by @vvcephei, we can't say exactly what the better approach will be without testing. To me, the bigger savings potential would be in CPU but again we can't say without testing.

But we do need to serialize for sending to the changelog, and even if we only send on flush and couple that with the fact that a byte[] coming in does not always get deserialized due to updates by key. So I'm starting to think to go with either approach will be a wash.

So, for now, I'm leaning towards storing byte[]

  1. That's what we currently use for KTable, while that by itself is not enough of a reason, IMHO we need to be careful about having different approaches for similar issues without a clear, demonstratable reason for doing so.
  2. Benchmarking will really give us the answers we are looking for, but time is something we don't have right now for getting this into 2.1
  3. I could be wrong about this but I think the biggest users of suppression are going to have several updates per key, so as @mjsax mentions, many of the byte[] arrays are going get overwritten.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it sounds like no one has a super strong performance intuition.

I think @bbejeck's point about uniformity is a good one. If anyone wants to insist on this, I'll change it right now.

Otherwise, if we're all comfortable making a performance-based decision, I think I'll propose to implement change-logging first and then do a comparative benchmark to make the final call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been mulling this over... It seems like byte arrays is the more normal choice in the code base, so it should be the default until proven otherwise by a performance test. The fact that I made the opposite choice in development is irrelevant.

So I'll go ahead and swap it out for byte arrays tomorrow.

}
return;
case SHUT_DOWN:
throw new RuntimeException("TODO: request graceful shutdown"); // TODO: request graceful shutdown
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should resolve this before merging IMHO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely. Should it just be a KafkaException, or something more specific?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess StreamException or maybe a new sub-class would be a good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for a sub-class of StreamException


final TimeKey<K> replaced = index.put(key, timeKey);

if (!Objects.equals(previousKey, replaced)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not. I don't think this can happen unless this buffer is used across threads (which shouldn't happen), or unless we screw up the implementation in the future (which we could do in any number of ways, it doesn't mean we need guards everywhere).

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to think, that we don't need this guard because a bug that gives multi-threaded access seems to be very unlikely. But it's a personal opinion... My concern again is because this is the hot code path. But I am also ok to keep the check if somebody insists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think it's unlikely to be useful. I'll remove it.

@Override
public KeyValue<TimeKey<K>, V> next() {
final Map.Entry<TimeKey<K>, SizedValue<V>> next = delegate.next();
this.last = next;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this

delegate.remove();
index.remove(last.getKey().key());
memBufferSize = memBufferSize - last.getValue().size();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check for last == null and set last = null ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, setting it to null after I use it will make it available for gc.

I can guard against null also, but fwiw, I'm not sure how that situation could arise. It's an IllegalStateException to invoke delegate.remove without an intervening call delegate.next. Or to call it before next.
delegate.next could return null, but in that case, we'd get an exception in line 69... which I should check for there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. See your point that delegate does the check for us.

I was aware that it would imply incorrect API usage (ie, wrong call order or similar). Just wanted to make sure we catch a bug like this -- but seems it would crash anyway even if we don't add a check for null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I just confirmed that TreeMap#entrySet().iterator().next() can never return null, but we could theoretically store a null value in the map, which could still throw an NPE on this line. I'll guarded against it.

}

private void evictOldEnoughRecords(final long streamTime) {
System.out.println("st: " + streamTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess this should be removed?

buffer = new InMemoryTimeOrderedKeyValueBuffer<>();

internalProcessorContext.schedule(
1L,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to add this to my review: this seems to have large runtime overhead and IMHO, we should try to find a better way to handle this.

if (buffer == null) {
buffer = new InMemoryTimeOrderedKeyValueBuffer<>();

internalProcessorContext.schedule(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use punctuations to enforce record/byte limit? Might be better to check for record/byte limit on put and use punctuations only to evict time based?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're spot on. I'll check it out.

@vvcephei
Copy link
Contributor Author

I've partially addressed the comments so far. Notably, I've dropped the punctuator and now handle both time and size constraints during process.

@vvcephei
Copy link
Contributor Author

Hi @guozhangwang , @mjsax , and @bbejeck ,

I believe I've addressed all the comments thus far, with the exception of whether to store the values serialized.

I did notice a low-effort optimization in the current implementation to skip serializing if the buffer isn't size-constrained, which would carry over even when we add the changelog, if the buffer delays serializing until flush, so for high-turnover buffers, many records may never be serialized at all.

But that's all a little beside the point... I'm still happy to change the whole implementation to store serialized data if that's the reviewers' preference.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei I made a pass, I'll probably need to make another one to make sure I fully understand the PR.

With some of the questions raised over performance, I'm thinking we'll need some system tests, but those should probably go on a follow on PR.

final V value,
final long recordSize) {
final TimeKey<K> previousKey = index.get(key);
// non-resetting semantics:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree with the semantics for enforcing the time limit.

private final Map<K, TimeKey<K>> index = new HashMap<>();
private final TreeMap<TimeKey<K>, SizedValue<V>> sortedMap = new TreeMap<>();

private static class SizedValue<V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I also agree with the trade-offs mentioned by @vvcephei, we can't say exactly what the better approach will be without testing. To me, the bigger savings potential would be in CPU but again we can't say without testing.

But we do need to serialize for sending to the changelog, and even if we only send on flush and couple that with the fact that a byte[] coming in does not always get deserialized due to updates by key. So I'm starting to think to go with either approach will be a wash.

So, for now, I'm leaning towards storing byte[]

  1. That's what we currently use for KTable, while that by itself is not enough of a reason, IMHO we need to be careful about having different approaches for similar issues without a clear, demonstratable reason for doing so.
  2. Benchmarking will really give us the answers we are looking for, but time is something we don't have right now for getting this into 2.1
  3. I could be wrong about this but I think the biggest users of suppression are going to have several updates per key, so as @mjsax mentions, many of the byte[] arrays are going get overwritten.

break;
}
}
System.out.println(buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left over debugging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I noticed it late. It's gone now.

}
return;
case SHUT_DOWN:
throw new RuntimeException("TODO: request graceful shutdown"); // TODO: request graceful shutdown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for a sub-class of StreamException

1L,
PunctuationType.STREAM_TIME,
this::evictOldEnoughRecords
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think doing it in process is a good start as well.

@bbejeck
Copy link
Contributor

bbejeck commented Sep 27, 2018

test failure unrelated

retest this please

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei I took another pass, and it looks good to me. I have a couple of comments regarding test coverage.

final KeyValue<TimeKey<K>, ContextualRecord<Change<V>>> maybeRemaining =
drainExpiredRecords(expiryTime, iterator);

if (overCapacity()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add two cases to KTableSuppressProcessorTest to hit this branch? One for the EMIT case and another for the SHUT_DOWN

// if the buffer isn't size-constrained, we don't need to compute the record size.
return 0L;
} else {
long size = 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use a test to hit this branch as well, but IMHO it's a lower priority than the others mentioned above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is actually gone now. But if it were still there, I'd agree with you.

@vvcephei
Copy link
Contributor Author

vvcephei commented Sep 27, 2018

Hey all, FYI, I've just updated this PR to store serialized data instead.

It was a bit more work than I anticipated because I ran into some snags related to preserving the window end information for TimeWindows (and also discovered I was using the wrong serde for session windows). These issues are both fixed now.

@vvcephei
Copy link
Contributor Author

Ok, @bbejeck ,

I have added some unit tests that verify that the processor throws the exception, and also some integration tests that verifies that Streams shuts down for the same conditions.

Thanks for that catch.

@vvcephei
Copy link
Contributor Author

Also, I just had a better idea for maintaining the minTimestamp value that cleans up the implementation quite a bit.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei thanks for the updates, this looks good to me. I have just one minor comment.

@@ -479,6 +557,14 @@ private void produceSynchronously(final String topic, final List<KeyValueTimesta
}
}

private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can be simplified to:
TestUtils.waitForCondition(() -> driver.state() == KafkaStreams.State.ERROR, TIMEOUT_MS, "Streams didn't shutdown in error state");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AH, right. I looked for something like that, but I was looking in IntegrationTestUtils. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've updated it.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vvcephei, LGTM pending tests passing.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the code (good job on the added unit tests!)

@vvcephei One meta comment is around reusing a couple of classes to avoid duplicated code if possible. The rest of them are mostly minor.

@@ -226,4 +235,15 @@
}
return builder;
}

private static <T> Serde<Windowed<T>> getWindowedSerde(final Serde<T> rawSerde) {
return rawSerde == null ? null : new WindowedSerdes.TimeWindowedSerde<>(rawSerde);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this function to a single class, e.g. WindowedSerdes to avoid duplicates (we have the same function in SessionWindowedKStreamImpl.java). BTW in #5521 I just inlined each call, but I think extracting it is also fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one in SessionWindowedKStreamImpl is actually different (wraps it with a SessionWindowedSerde).

FWIW, I think inlining it is actually preferable to extracting it to a "common" location if it's actually just going to have one use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

@@ -100,11 +103,12 @@
),
AGGREGATE_NAME,
materialize(materializedInternal),
materializedInternal.isQueryable()
materializedInternal.isQueryable(),
getWindowedSerde(materializedInternal.keySerde(), windows),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only passing the windows object (for its length) here, but not in other callers below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an oversight. Thanks for the catch!


import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;

class ContextualRecord<V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of the LRUCacheEntry class used for caching.. could we consolidate these two?


import java.util.Objects;

class TimeKey<K> implements Comparable<TimeKey<K>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use org.apache.kafka.streams.processor.internals.Stamped? They seem very similar (feel free to rename the class if you like other names better: since it is internal classes, we can change it whenever we want.)

if (delegate.hasNext()) {
next = delegate.next();
}
minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we assume we will only remove the head of the iterator? If not I'm not clear why we can simply set the minTimestamp as the next key's timestamp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: it seems the above assumption is true from the other classes. In this case could we guard against the unexpected case if there are un-deleted entries before the current position?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aaah, yes. This min-timestamp update does depend on always removing the head of the iterator. I'll fix it.

Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I put in a guard.

I also refactored the interface to purely evict the head of the buffer while a condition holds, which cleans up the usage quite a bit.

Let me know what you think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comments -- it's still unclear from the code that we want to enforce next-remove-... pattern -- might also be worth to add a JavaDoc to the iterator about correct usage, even if it's an internal class only.

return minTimestamp;
}

private long computeRecordSize(final Bytes key, final ContextualRecord<byte[]> value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd suggest putting the size calculation of ContextualRecord inside the ContextualRecord class instead of in this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This computation makes use of the fact that this reference is a ContextualRecord<byte[]>, the value type is generic in ContextualRecord. Of course, this is the only usage of that class, so, I could just build the byte[] value type into ContextualRecord.

But I'm slightly in favor of keeping it as-is so we can use ContextualRecord in other contexts where we need both the value (not serialized) and the context in the future. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to have ContextualRecord contains its only computeSize() function which caluclates the size of bytes "except" the value size, which can then be called by this function, and here we only need to calculate the key size and value size plus whatever returned from ContextualRecord#computeSize. Anyways, it is a nit comment and I do not feel strong about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the part of the ContextualRecord that isn't the value is just the ProcessorContext, I just added a sizeBytes() method there.

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the advantage of using generics in ContextualRecord is it's only used once with byte[] types. As generic types are lost after compilation, I would prefer to remove the generic if not needed (AFAIK, generics have some runtime overhead as the compiler needs to insert casts that are evaluate during runtime.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't consider this runtime overhead. I'll go ahead and inline the generic type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't consider this overhead, and agree it would be good to get rid of it.

}

@Override
public void init(final ProcessorContext context) {
internalProcessorContext = (InternalProcessorContext) context;
this.keySerde = keySerde == null ? castSerde(context.keySerde()) : keySerde;
this.valueSerde =
valueSerde == null ? new FullChangeSerde<>(castSerde(context.valueSerde())) : valueSerde;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever expect the passed in not-null valueSerde is a FullChangeSerde already? If not we should wrap it with FullChangeSerde still.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would mean that they have configured the default.value.serde as a FullChangeSerde, which is in the internals package.

Nevertheless, it doesn't hurt to guard it. Will do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I point is that is seems "impossible" that the passed in serde will be a FullChangeSerde but just the inner serde used for FullChangeSerde, so we should always wrap (either the default one from config, or the inherited one) with the FullChangeSerde, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I gotcha. The type of valueSerde is already a FullChangeSerde. In the case of an inherited serde, it gets wrapped in the constructor. The types ensure that the constructor arg is not already a FullChangeSerde.

next.value.recordContext()
);
// to be safe, forward before we delete
setNodeAndForward(key, contextualRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've encountered some issues related to the ordering of this before: https://issues.apache.org/jira/browse/KAFKA-4492

Could you read that ticket and double check if flush-first-remove-later would not cause any issues for re-entrant puts on the same buffer (say, if we have a loop in the topology)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! That issue seems to be cache-specific: that two subsequent processors can be backed by the same cache (as in the join case).

I don't think loops are generally allowed in the subtopology, are they? If so, this code would indeed result in an infinite loop or possibly a concurrent modification exception.

I was concerned that the remove might be sent to the buffer's changelog record collector and maybe sent to the broker, and then some exception might happen before the forward, resulting in the record being forgotten upon restart.

I looked at some other processors, and they tend to do (logged) store operations first and then forward last. But then again, normal operations are forwarding a value that's a direct consequence of processing the current record, which wouldn't have been committed and would therefore get re-processed upon restart.

But the buffer is forwarding some older record, which has already been committed. Reprocessing the new record (which caused the eviction the first time) won't cause us to remember the old record, which we were supposed to emit.

Under EOS, if we crash after the changelog update but before the forward, we'll be fine because the changelog update won't be visible (it'll be in an aborted transaction) on restart, so the buffer will go back to it's correct starting point for reprocessing the new record.

If we can't be sure that Streams subtopologies are acyclic, then I reckon we'd better swap these two lines and tell people they'd better use EOS if they want to be protected from all crash corruption (which I think is true anyway).
Otherwise, if subtopologies are acyclic, then I think it's better to leave it as is.

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think that forward before delete is correct. Compare: https://issues.apache.org/jira/browse/KAFKA-5315 and the corresponding PR, that we never finished.


@Override
public void close() {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clear the buffer upon closing? Maybe it does not make a difference on correctness, but would it worthy for performance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wasn't sure. I'll go ahead and do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, let's defer this to Part 4, where the buffer becomes a proper store, and has its own close() method.

@vvcephei
Copy link
Contributor Author

Just to document the current state...

I think the latest set of commits should resolve all the open comments, so this PR should be ready to merge, pending a rebase once #5521 is merged (or we could merge this first, if #5521 needs more reviews).

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not review the tests yet...

@@ -30,6 +28,15 @@
public class FullChangeSerde<T> implements Serde<Change<T>> {
private final Serde<T> inner;

@SuppressWarnings("unchecked")
public static <T> FullChangeSerde<T> castAndWrap(final Serde<?> serde) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: castOrWrap

@@ -65,8 +71,8 @@ public void configure(final Map<String, ?> configs, final boolean isKey) {
final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue);
final int newSize = newBytes == null ? -1 : newBytes.length;

final ByteBuffer buffer = ByteBuffer.allocate(
4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 : newSize)
final ByteBuffer buffer = ByteBuffer.wrap(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? (Just for my own education.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just evidence of my mental slowness...

In the prior PR, Guozhang pointed out that my calling buffer.array() was incorrect, since the backing array isn't guaranteed to be exactly within the bounds we allocated. I fixed it at the time by delegating to the ByteBufferSerializer, which handles this.

Later on I realized that there is a more efficient solution available. By pre-creating the backing array and wrapping it, we know that buffer.array() returns what we needed. No need for the more general handling logic in ByteBufferSerializer.

null,
null
keySerde,
valSerde == null ? null : new FullChangeSerde<>(valSerde)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we extend wrapOrCast to add a null check and return null for this case and use it here to make code more readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can and will.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added that check because context.valueSerde() (called elsewhere) could return null.

If it's ok with you, though, I prefer the current code right here. This code ensures that valSerde is of the correct type (notice that no casting is necessary). In general, I think we should avoid casting unless we actually need it, as it makes regressions harder to catch.

// then insert the new record in the same place in the priority queue
final TimeKey<Bytes> nextKey = previousKey != null ? previousKey : new TimeKey<>(time, key);

index.put(key, nextKey);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to call put only if previousKey == null? Ie, we could merge L103 ad L105 into an if-then block? Might be more readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true about put, but we still need to choose a key to insert into sortedMap. If I don't declare the nextKey variable, I need to have a bunch of redundant code in the if and else blocks:

final TimeKey<Bytes> previousKey = index.get(key);
        // non-resetting semantics:
        // if there was a previous version of the same record,
        // then insert the new record in the same place in the priority queue
        if (previousKey == null) {
            final TimeKey<Bytes> nextKey = new TimeKey<>(time, key);

            index.put(key, nextKey);
            sortedMap.put(nextKey, value);

            minTimestamp = Math.min(minTimestamp, nextKey.time());
            memBufferSize =
                memBufferSize
                    + computeRecordSize(key, value);
        } else {
            final ContextualRecord<byte[]> removedValue = sortedMap.remove(previousKey);
            sortedMap.put(previousKey, value);
            memBufferSize =
                memBufferSize
                    + computeRecordSize(key, value)
                    - (removedValue == null ? 0 : computeRecordSize(key, removedValue));
        }

IMHO, this is less readable than the linear version where we just reuse or construct the key in line 103.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if, after looking at it, you prefer the branching version, I'll change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's personal taste -- don't insist on a change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in sum, your points elevate it beyond personal taste. I've gone ahead and done the branching. After a little cleanup, it's not too shabby anyway.

index.put(key, nextKey);
minTimestamp = Math.min(minTimestamp, nextKey.time());

final ContextualRecord<byte[]> removedValue = previousKey == null ? null : sortedMap.remove(previousKey);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check for previousKey == null could be merged with the check from above? (It's hot code path, so might be worth to unify.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comment above. I agree it's more efficient to have just one branch, but I do think this version is easier to follow.

Regardless, you have a fresher perspective. If you prefer the branching version above, I'm happy to change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think, even without the branching, this remove and the put below should be merged.

@@ -78,6 +79,23 @@ public Headers headers() {
return headers;
}

public long sizeBytes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each Java object has a natural overhead -- might be worth to add this here? would need to search the internet how many bytes, however, we would have it for ProcessorRecordContext itself, as well as topic, headers (including it's nested Header objects).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is an under-estimate, but I don't think there's much point in being exact.
The overhead is dependent on the JVM implementation, so we'd have to detect the JVM and maintain a mapping for each different implementation. Even then, we don't know how much extra memory we're using in the various garbage collectors, of which there are now three different implementations in the Oracle JDK alone...

I'd rather just make the best effort we reasonably can to live more-or-less within the desired boundary. For example, storing the byte[] value is much closer than storing the object. But beyond that, we get into diminishing returns for quickly increasing complexity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. That's fair. The existing caches also use rough estimates only. (Might be interesting how much we are off though... But this could be a follow up improvement.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

size += 8; // value.context.timestamp
size += 8; // value.context.offset
if (topic != null) {
size += topic.toCharArray().length;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a String also store the length (it's a char[] internally) -- should we add 4 more bytes here?

Also, has a char[] similar overhead than a regular object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe arrays also store their types. But again, we are getting into JVM implementation details. There are too many JVM implementations for us to be expected to worry about this, IMHO.


@Override
public String toString() {
return "ContextualRecord{value=" + value + '}';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value is always byte[] -- can we get a handle on the deserializer to get human readable output here? (one more reason to avoid generic if not necessary -- those issues slip easily with missing type information).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the record needs to know how to deserialize itself.

Since toString is only for debugging, I'm fine printing out the Arrays.toString summary of the value.

If we wanted to print out the value in a log message, we would format it more specifically (including a deserialization if desired).

That said, I will go ahead and get rid of the generic type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only for debugging

My point is, that even for debugging it's not useful to print byte[] -- my argument is, to either "fix this" or don't overwrite toString() at all.

4 + // partition
(topic == null ? 0 : topic.length());
this.sizeBytes = 1 + // isDirty
(value == null ? 0 : value.length) +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add 4 byte to store array size? Also, do we have object overhead for an array type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above.

(topic == null ? 0 : topic.length());
this.sizeBytes = 1 + // isDirty
(value == null ? 0 : value.length) +
context.sizeBytes();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add object overhead for context itself? (might be included in sizeBytes() if we update is accordingly thought)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be the responsibility of the context to account for its own overhead, but see my comments above.

@vvcephei
Copy link
Contributor Author

vvcephei commented Oct 1, 2018

Java 8 failure unrelated:

org.apache.kafka.common.network.SslSelectorTest > testCloseConnectionInClosingState FAILED
    java.lang.AssertionError: Channel not expired expected null, but was:<org.apache.kafka.common.network.KafkaChannel@4f>
...
FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':clients:unitTest'.
> There were failing tests. See the report at: file:///home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/clients/build/reports/tests/unitTest/index.html

@vvcephei
Copy link
Contributor Author

vvcephei commented Oct 1, 2018

@mjsax / @guozhangwang FYI, the tests have passed.

@bbejeck
Copy link
Contributor

bbejeck commented Oct 1, 2018

Execution failed for task ':clients:unitTest'.

retest this please

@@ -93,7 +94,7 @@
materialize(materializedInternal),
new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
materializedInternal.isQueryable(),
materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
materializedInternal.keySerde() != null ? new TimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add these so that suppress doesn't "forget" the window end time when it round-trips the record.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is the best way to tack it? Requires public API change.

@vvcephei
Copy link
Contributor Author

vvcephei commented Oct 2, 2018

I had an unused import. The tests pass for me locally now.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments. Will move to the tests now.

@@ -30,6 +30,13 @@ public TimeWindowedSerde() {
public TimeWindowedSerde(final Serde<T> inner) {
super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer()));
}

public TimeWindowedSerde(final Serde<T> inner, final long windowSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this mentioned in the KIP? It's a public API change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, no. When I did this before, I did it differently to keep it private. I thought this was a better way, but overlooked the public-ness of it.

I'll go back to private mode.

@@ -93,7 +94,7 @@
materialize(materializedInternal),
new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
materializedInternal.isQueryable(),
materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
materializedInternal.keySerde() != null ? new TimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is the best way to tack it? Requires public API change.

import java.util.function.Consumer;
import java.util.function.Supplier;

interface TimeOrderedKeyValueBuffer<K, V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can remove this generics?

final TimeKey nextKey = new TimeKey(time, key);
index.put(key, nextKey);
sortedMap.put(nextKey, value);
minTimestamp = Math.min(minTimestamp, nextKey.time());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextKey.time() -> time

throw new NotImplementedException();
}
}
final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we compute bufferTime within buffer() -- no need to pass it in, as both internalProcessorContext and key are available there, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, it was previously used also here, but it's not needed anymore. Good catch.

}
}
final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key);
final long streamTime = internalProcessorContext.streamTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need this here? No need to pass it into enforceConstraints() IMHO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. Thanks!

private void enforceConstraints(final long streamTime) {
final long expiryTime = streamTime - suppressDurationMillis;

buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, this::emit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be < instead of <= ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wouldn't be wrong, but I think <= is also right, and it's a tighter bound.

Let's say we have buffered an event with time 10 at stream time 10 and the suppressDuration is 1. The expiry time is 10-1 = 9. minTimestamp is 10, and 10 <= 9 is false, so we don't evict.

Then, we get an event with time 11 at stream time 11. Now, the expiry time is 11-1=10. minTimestamp is still 10, but now the check is 10 <= 10, so we evict that first event.

I think this matches up to the intention of saying "suppress for 1 ms".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

NotImplementedException() {
super();
private void emit(final KeyValue<Bytes, ContextualRecord> toEmit) {
final Bytes key = toEmit.key;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess we can remove variable key (only used once).

final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
internalProcessorContext.setRecordContext(toEmit.value.recordContext());
try {
final K key1 = keySerde.deserializer().deserialize(null, key.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key1 -> key and key.get() -> toEmit.key.get()


@Override
public String toString() {
return "TimeKey{time=" + time + ", key=" + key + '}';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar argument as for byte[] value: Of course, here we still get the time information, but the Bytes key is useless.

@@ -198,11 +196,9 @@ public void shouldSuppressIntermediateEventsWithTimeLimit() {
new KeyValueTimestamp<>("v1", 1L, 2L)
)
);
// note that the current stream time is 2, which causes v1 to age out of the buffer, since
// it has been buffered since time 0 (even though the current version of it in the buffer has timestamp 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this comment? Seems to be valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not anymore. Now, we buffer the new event before we enforce the buffer constraints, so we return the more intuitive most recent state of "v1", 1L, 2L right away, instead of later on.

Copy link
Member

@mjsax mjsax Oct 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. The comment focus on the second v1 -- I applied it to the third v1. Seems the comment was ambiguous :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thing it's gone!

verify(
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
singletonList(new KeyValueTimestamp<>("v1", 0L, 1L))
singletonList(new KeyValueTimestamp<>("v1", 1L, 2L))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change makes sense -- test was bubby before, but we did not notice at it threw anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't throw it away before, just emitted it later on. This is what the comment I removed was explaining.

verify(
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
singletonList(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the original intend of this part? And why don't we need it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we enforced constraints before buffering, we needed one extra tick to flush everything out. Now that we buffer first, everything happens more promptly, so we don't need this last cycle to witness all the results we're looking for.

context.setRecordMetadata("topic", 0, 0, null, timestamp);
final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW);
context.setStreamTime(timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(timestamp - 1L, timestamp));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why timestamp - 1L ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter for anything, it just seemed weird to have window start == window end. The window end is the time that matters for this test, which is why I made it the baseline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set record timestamp to timestamp -- thus, the record will be put in window [timestamp, timestamp+1), right? Seems weird to use the wrong window IMHO. Or do I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I wasn't thinking about it like this. It makes sense.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@mjsax mjsax merged commit 5ba9cad into apache:trunk Oct 2, 2018
@vvcephei vvcephei deleted the KAFKA-7223-in-mem-suppress-impl branch October 2, 2018 06:12
guozhangwang pushed a commit that referenced this pull request Oct 3, 2018
This is Part 4 of suppression (durability)
Part 1 was #5567 (the API)
Part 2 was #5687 (the tests)
Part 3 was #5693 (in-memory buffering)

Implement a changelog for the suppression buffer so that the buffer state may be recovered on restart or recovery.
As of this PR, suppression is suitable for general usage.

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Pengxiaolong pushed a commit to Pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Reviewer: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Pengxiaolong pushed a commit to Pengxiaolong/kafka that referenced this pull request Jun 14, 2019
This is Part 4 of suppression (durability)
Part 1 was apache#5567 (the API)
Part 2 was apache#5687 (the tests)
Part 3 was apache#5693 (in-memory buffering)

Implement a changelog for the suppression buffer so that the buffer state may be recovered on restart or recovery.
As of this PR, suppression is suitable for general usage.

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
4 participants