Skip to content

Conversation

QuentinAmbard
Copy link

@QuentinAmbard QuentinAmbard commented Jul 30, 2018

What changes were proposed in this pull request?

Update with a better fix:
With this fix, the offsets are scanned to determine the ranges. We ensure that the when we determine the range [fromOffset, untilOffset), untilOffset is always an offset with an existing record that we've been able to fetch at least once.
This logic is applied as soon as allowNonConsecutiveOffsets is enabled.
Since we scan all the record a first time, we use this to count the number of records. OffsetRange now contains the number of record in each partition and the rdd.count() is a free operation.

Isolation level of uncomitted read is unsafe: untilOffset might become "empty" if the transaction is abort just after the offset range creation. The same thing could happen if the "untilOffset" gets compacted (it's also a potential issue before this change)

How was this patch tested?

Unit test for the offset scan. No integration test for transaction since the current kafka version doesn't support transactions. Tested against a custom streaming use-case.

@holdensmagicalunicorn
Copy link

@QuentinAmbard, thanks! I am a bot who has found some folks who might be able to help with the review:@tdas, @zsxwing and @koeninger

@koeninger
Copy link
Contributor

jenkins, ok to test

@SparkQA
Copy link

SparkQA commented Jul 30, 2018

Test build #93803 has finished for PR 21917 at commit 05c7e7f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #94055 has finished for PR 21917 at commit 70ecd38.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #94056 has finished for PR 21917 at commit 29c5406.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #94058 has finished for PR 21917 at commit 69582f4.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val offsetAndCount = localRw.getLastOffsetAndCount(localOffsets(tp), tp, o)
(tp, offsetAndCount)
}
}).collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is the benefit gained by doing a duplicate read of all the messages?

@QuentinAmbard
Copy link
Author

QuentinAmbard commented Aug 3, 2018

With this solution we don't read the data another time "just to support transaction."
The current implementation of compacted topics already ready all the messages twice in order to get a correct count for the info tracker: val inputInfo = StreamInputInfo(id, rdd.count, metadata)
Since RDD are compacted or have "empty offset" due to transactions markers/abort, the only way to get the exact count is to read.
The same logic applies to select a range: if you want to get N records per partition, the only way to know what "untilOffset" will make you read N records is to read, stop when you've read N records, and get the offset.
So one of the advantage is to be able to fetch the number of records per partition you really want (for compacted topics and transactions).
But the real advantage is that it let you pick an "untilOffset" that is not empty.
For example if you don't have record for offset [4, 6]

1, 2, 3, 4, 5, 6
a, b, c,  ,  ,  ,

if use an offset range of [1,5), you will try to read 4 but won't receive any data. In this scenario you can't tell if data is missing (so it's ok) or you lose some data because kafka is down (not ok)

To deal with this situation, we first scan the offset and stop to the last offset where we had data, in the example instead of [1,5) we would go with [1,4) because 3 has data so it's safe to stop at 3
During the next batch, if we then get extra data we would then select the next range as [4, 8) and won't have any issue.

1, 2, 3, 4, 5, 6, 7
a, b, c,  ,  ,  , g

does that make sense?
(PS: sorry for the multiple commit, I wasn't running Mima properly, I'll fix the remaining issue soon)

@koeninger
Copy link
Contributor

Still playing devil's advocate here, I don't think stopping at 3 in your example actually tells you anything about the cause of the gaps in the sequence at 4. I'm not sure you can know that the gap is because of a transaction marker, without a modified kafka consumer library.

If the actual problem is that when allowNonConsecutiveOffsets is set we need to allow gaps even at the end of an offset range... why not just fix that directly?

Master is updated to kafka 2.0 at this point, so we should be able to write a test for your original jira example of a partition consisting of 1 message followed by 1 transaction commit.

@QuentinAmbard
Copy link
Author

I'm not sure to understand your point. The cause of the gap doesn't matter, we just want to stop on an existing offset to be able to poll it. It can be because of a transaction marker, a transaction abort or even just a temporary poll failure it's not relevant in this case.
The driver is smart enough to be able to restart from any Offset, even in the middle of a transaction (abort or not)
The issue with gap at the end is that you can't know if it's a gap or if the poll failed.
For example SeekToEnd gives you 5 but the last record you get is 3 and there is no way to know if 4 is missing or just an offset gap.
How could we fix that in a different way?

@koeninger
Copy link
Contributor

If the last offset in the range as calculated by the driver is 5, and on the executor all you can poll up to after a repeated attempt is 3, and the user already told you to allowNonConsecutiveOffsets... then you're done, no error.

Why does it matter if you do this logic when you're reading all the messages in advance and counting, or when you're actually computing?

To put it another way, this PR is a lot of code change and refactoring, why not just change the logic of e.g. how CompactedKafkaRDDIterator interacts with compactedNext?

@QuentinAmbard
Copy link
Author

If you are doing it in advance you'll change the range, so for example you read until 3 and don't get any extra results. Maybe it's because of a transaction offset, maybe another issue, it's ok in both cases.
The big difference is that the next batch will restart from offset 3 and poll from this value. If seek to 3 and poll get you another result (for example 6) then everything is fine it's not a data loss it's just a gap.
The issue with your proposal is that SeekToEnd gives you the last offset which might not be the last record.
So in your example if last offset is 5 and after a few poll the last record you get is 3 what do you do, continue and execute the next batch from 5? How do you know that offset 4 isn't just lost because poll failed?
The only way to know that would be to get a record with an offset higher than 5. In this case you know it's just a gap.
But if the message you are reading is the last of the topic you won't have records higher than 3, do you can't tell if it's a poll failure or an empty offset because of the transaction commit

@koeninger
Copy link
Contributor

How do you know that offset 4 isn't just lost because poll failed?

By failed, you mean returned an empty collection after timing out, even though records should be available? You don't. You also don't know that it isn't just lost because kafka skipped a message. AFAIK from the information you have from a kafka consumer, once you start allowing gaps in offsets, you don't know.

I understand your point, but even under your proposal you have no guarantee that the poll won't work in your first pass during RDD construction, and then fail on the executor during computation, right?

The issue with your proposal is that SeekToEnd gives you the last offset which might not be the last record.

Have you tested comparing the results of consumer.endOffsets for consumers with different isolation levels?

Your proposal might end up being the best approach anyway, just because of the unfortunate effect of StreamInputInfo and count, but I want to make sure we think this through.

if (nonConsecutive) {
val localRw = rewinder()
val localOffsets = currentOffsets
context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this isn't a kafka rdd, it isn't going to take advantage of preferred locations, which means it's going to create cached consumers on different executors.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting I should create a new kafkaRDD instead, and consume from this RDD to get the last offset range?

}

/**
* Similar to compactedStart but will return None if poll doesn't
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean compactedNext?

buffer.previous()
}

def seekAndPoll(offset: Long, timeout: Long): ConsumerRecords[K, V] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere?

val fromOffset: Long,
val untilOffset: Long) extends Serializable {
val untilOffset: Long,
val recordNumber: Long) extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does mima actually complain about binary compatibility if you just make recordNumber count? It's just an accessor either way...

If so, and you have to do this, I'd name this recordCount consistently throughout. Number could refer to a lot of things that aren't counts.

private def records(offsets: Option[Long]*) = {
offsets.map(o => o.map(new ConsumerRecord("topic", 0, _, "k", "v"))).toList
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests aren't really testing the actual scenario we care about (transaction markers at the end of an offset range), which should be directly testable now that kafka has been upgraded to 2.0

@QuentinAmbard
Copy link
Author

By failed, you mean returned an empty collection after timing out, even though records should be available? You don't. You also don't know that it isn't just lost because kafka skipped a message. AFAIK from the information you have from a kafka consumer, once you start allowing gaps in offsets, you don't know.

Ok that's interesting, my understanding was that if you successfully poll and get results you are 100% sure that you don't lose anything. Do you have more details on that? Why would kafka skip a record while consuming?

Have you tested comparing the results of consumer.endOffsets for consumers with different isolation levels?

endOffsets returns the last offset (same as seekToEnd). But you're right that the easiest solution for us would be to have something like seekToLastRecord method instead. Maybe something we could also ask ?

@koeninger
Copy link
Contributor

koeninger commented Aug 6, 2018 via email

@koeninger
Copy link
Contributor

koeninger commented Aug 6, 2018 via email

@QuentinAmbard
Copy link
Author

SPARK-25005 has actually a far better solution to detect message lost. Will try to apply same logic...

@ghost
Copy link

ghost commented Dec 17, 2018

Based on my little understanding, I think this PR will fix this issue - https://stackoverflow.com/q/48344055/2272910

We're facing the same issue and would love to see some solution out in Spark-Kafka streaming package.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

github-actions bot commented Jan 9, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 9, 2020
@github-actions github-actions bot closed this Jan 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants