Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2386] Rework Kafka consumer for Flink #1028

Closed
wants to merge 6 commits into from

Conversation

StephanEwen
Copy link
Contributor

This is a reworked and extended version of #996 . It also build on top of #1017

It improves the Kafka consumer, fixes bugs, and offers pluggable fetcher and offset committer to make it work across Kafka versions from 0.8.1 to 0.8.3 (upcoming).

Functionality

  • The Kafka consumer properly preserves partitioning across failures/restart.
  • Pluggable fetchers / committers for multiple Kafka versions interoperability
    • Fetcher based on the low level consumer API
    • Fetcher based on the upcoming new consumer API (backported and included in the Flink Kafka consumer).
  • Proper cancelability
  • The test coverage is vastly improved.

Tests

This pull request includes a set of new thorough test for the Kafka consumer

  • Preserving of partitioning and exactly once behavior for
    • 1:1 source to kafka partition mapping
    • 1:n source to kafka partition mapping
    • n:1 source to kafka partition mapping
  • Broker failure
  • Cancelling
    • After immediate failures during deployment
    • While waiting to read from a partition
    • While currently reading from a partition
  • Commit notifications for checkpoints
  • Large records (30 MB)
  • Alignment of offsets with what is committed into ZooKeeper
  • Concurrent produce/consumer programs

Limitations

The code based on the low-level consumer seems to work well.

The high-level consumer does not work with very large records It looks like a problem in the backported Kafka code, but it is not 100% sure.

Debug Code

This pull request includes some debug code in the BarrierBuffer that I intend to remove. It is there to track possible cornercase problems in the checkpoint barrier handling.

StephanEwen and others added 3 commits August 16, 2015 18:51
…ption handling and code simplication

  - The exceptions are no longer logged by the operators themselves.
    Operators perform only cleanup in reaction to exceptions.
    Exceptions are reported only the the root Task object, which knows whether this is the first
    failure-causing exception (root cause), or is a subsequent exception, or whether the task was
    actually canceled already. In the later case, exceptions are ignored, because many
    cancellations lead to meaningless exceptions.

  - more exception in signatures, less wrapping where not needed

  - Core resource acquisition/release logic is in one streaming task, reducing code duplication

  - Guaranteed cleanup of output buffer and input buffer resources (formerly missed when other exceptions where encountered)

  - Fix mixup in instantiation of source contexts in the stream source task

  - Auto watermark generators correctly shut down their interval scheduler

  - Improve use of generics, got rid of many raw types

This closes apache#1017
@rmetzger
Copy link
Contributor

How about dropping the backported Kafka code and relying completely on our own implementation against the SimpleConsumer API?
We would need to implement the KafkaConsumer.partitionsFor() method ourselves, but I think that's doable.

@StephanEwen
Copy link
Contributor Author

I like this idea a lot. The backported code is not very stable anyways...

@rmetzger
Copy link
Contributor

I've opened a pull request with the code removed against your branch: StephanEwen#14

@hsaputra
Copy link
Contributor

So is the #1039 depends on this one?

@hsaputra
Copy link
Contributor

Since it is rebased to #1039 already, could this one be closed and do review on that one instead?

@rmetzger
Copy link
Contributor

Yes, I think we can close this issue.

@StephanEwen
Copy link
Contributor Author

Subsumed by #1039

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants