Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-10979: [Rust] Basic Kafka Reader #8968

Closed
wants to merge 8 commits into from

Conversation

kflansburg
Copy link

Introduce a basic Kafka reader based on rdkafka. Exposes an Iterator interface which yields Result<RecordBatch>.

Columns in the batch are:

  • key (Binary, nullable): The key of a message, if present.
  • payload (Binary, nullable): The payload bytes of a message, if present.
  • topic (Utf8): The topic of the message.
  • partition (Int32): The partition of the message.
  • offset (Int64): The offset of the message.

Note that rdkafka has a C++ dependency (librdkafka), but we can choose to make this dynamically linked. rdkafka provides an async Consumer, but I have explicitly chosen the non-async Consumer.

@codecov-io
Copy link

codecov-io commented Dec 19, 2020

Codecov Report

Merging #8968 (4807976) into master (091df20) will decrease coverage by 0.04%.
The diff coverage is 58.42%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #8968      +/-   ##
==========================================
- Coverage   83.22%   83.17%   -0.05%     
==========================================
  Files         196      199       +3     
  Lines       48232    48321      +89     
==========================================
+ Hits        40142    40193      +51     
- Misses       8090     8128      +38     
Impacted Files Coverage Δ
rust/kafka/src/reader.rs 47.72% <47.72%> (ø)
rust/kafka/src/batch.rs 57.57% <57.57%> (ø)
rust/kafka/tests/test.rs 100.00% <100.00%> (ø)
rust/parquet/src/encodings/encoding.rs 95.24% <0.00%> (-0.20%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 091df20...4807976. Read the comment docs.

@github-actions
Copy link

rust/kafka/Cargo.toml Outdated Show resolved Hide resolved
@andygrove
Copy link
Member

andygrove commented Dec 19, 2020

Thanks @kflansburg this is interesting. Would you mind explaining your use case for this? It has been a while since I used Kafka personally and I am wondering what the benefit is to have the messages in Arrow format.

Would the intent be to also use Arrow for the payload to take full advantage of this? For example, allowing DataFusion to run SQL queries against Kafka topics similar to KSQL?

@kflansburg
Copy link
Author

Hey @andygrove, this is definitely not a great use-case for Arrow since the format is not columnar, but I'm hoping to implement micro-batch style processing (possibly in DataFusion / Ballista), similar to Spark Structured Streaming.

I really like your idea of using Kafka as a transport layer for Arrow Flight messages. I was planning to try to implement some sort of JSON parsing -> Arrow StructArray for the Kafka payload field, but parsing it as Arrow flight would be very cool as well.

FYI I'm working now to resolve the CI issues related to compiling and/or linking against librdkafka, but it may not be possible to get this to build without including this dependency.

@nevi-me
Copy link
Contributor

nevi-me commented Dec 19, 2020

Hi @kflansburg this is some great work. I've just gone through the code briefly.

I really like your idea of using Kafka as a transport layer for Arrow Flight messages.

I'd be interested in seeing how we could go about with implementing this.

I was planning to try to implement some sort of JSON parsing -> Arrow StructArray for the Kafka payload field, but parsing it as Arrow flight would be very cool as well.

Our JSON reader already has the building blocks needed to trivially do this, and after #8938, you should be able to read all nested JSON types.

I played around with converting Avro messages from Kafka into Arrow data. This would also be an interesting use-case for your streaming usecase.


There is a slight downside to having the arrow-kafka live in this repository, which is that librdkafka isn't trivial to install in Windows (I use it in WSL instead). So from a development perspective, it might impose some load on developers (esp drive-by contributions).

I'm a proponent of bundling crates into arrow/rust if they could benefit from us (i.e. the commiters and regular contributors) making some changes to keep them compiling. We sometimes make breaking changes to our interfaces, so being able to fix the crates is very useful.

With the above said, I think we should use this crate as an opportunity to have a bigger discussion about where additional modules should live. For example, I recently opened a draft RFC for arrow-sql (#8731), with my main motivation of wanting to put it into rust/arrow/arrow-sql being that it could also benefit from the performance improvements that we're regularly making.

We could try the arrow-contrib approach, where we maintain additional IO modules and other crates or projects in languages other than Rust.
This would be similar to other projects like OpenTracing & OpenTelemetry where separate tracing libraries are maintained within the same organisation, but under different repos.
This is probably a bigger mailing list discussion, but I'd like to hear your and @andygrove 's thoughts first.

@kflansburg
Copy link
Author

Our JSON reader already has the building blocks needed to trivially do this, and after #8938, you should be able to read all nested JSON types.

Great, thanks for the tip!

There is a slight downside to having the arrow-kafka live in this repository, which is that librdkafka isn't trivial to install in Windows (I use it in WSL instead). So from a development perspective, it might impose some load on developers (esp drive-by contributions).

Thanks for the heads up, this seems to be the main issue with CI right now. I would switch to dynamic-linking, however Cargo will still not build without the correct libraries present.

I figured that this would be a controversial thing to include in-tree for the reasons you mentioned. I don't really want to re-implement all of librdkafka in Rust (as was done with parquet), but I think there should be a way for arrow to provide arrow-specific functionality in-tree for external IO crates.

I will leave this discussion for the core maintainers though.

@kflansburg
Copy link
Author

kflansburg commented Dec 19, 2020

I'd be interested in seeing how we could go about with implementing this.

Giving this some thought, I think we can have configuration fields that indicate the keys and/or payloads should be parsed as Raw Bytes, JSON, Arrow Flight, Avro, etc. The stretch goal here could be support for integration with a schema registry, but I haven't worked much with that.

The only concern I have is with inconsistent schemas between messages in the same RecordBatch, there may be some merging needed.

@nevi-me
Copy link
Contributor

nevi-me commented Dec 19, 2020

The stretch goal here could be support for integration with a schema registry, but I haven't worked much with that.

The only concern I have is with inconsistent schemas between messages in the same RecordBatch, there may be some merging needed.

You could address this by only allowing subscription to 1 topic.
Perhaps it'd then make sense to convert the payload into a StructArray or RecordBatch, with the other fields remaining as part of KafkaBatch?

If you have multiple topics, you'd end up with different schemas for the StructArray as you observe, which isn't in the spirit of Arrow, as all arrays should be homogenous. A UnionArray could address that issue, but it's too much effort, in my opinion.

Converting Field::new("payload", DataType::Binary, true) to StructArray will also require the data to be the same.


More rabbit-hole kinds of ideas:

  • We could support a JSONArray that is an extension of BinaryArray but holds JSON data. I believe it's already a thing in Parquet, but I haven't looked at it in detail. There's ExtensionArray as part of the Arrow spec, but we haven't implemented it in Rust as yet.

@kflansburg
Copy link
Author

kflansburg commented Dec 19, 2020

I definitely want to support subscribing to multiple topics, its often the case that multiple topics share the same schema. My concern is that the full Schema may not be possible to infer from a single message, even within a single topic. Its possible we can have the user supply the full schema but that would be cumbersome.

I think I was planning to have a BinaryArray (maybe cast as a JSONArray, that is a good idea) be able to be parsed to a StructArray, but I haven't verified that that all makes sense.

Now I'm thinking though that if the parsing happens after the JSONArray has been formed, then you have access to all of the messages in the RecordBatch, and can determine the schema. You just can't guarantee that the schema is consistent across RecordBatches, but I think that is fine.

@kflansburg
Copy link
Author

Switching to cmake appears to have resolved the Windows build issue, but cmake is not installed for AMD64 Debian 10.

I'm not sure what the error on the Dev / Source Release and Merge Script means. Can someone help with this?

@jorgecarleitao
Copy link
Member

Thanks a lot for this PR and for opening this discussion.

I am trying to understand the rational, also :) :

  • Kafka is event driven, Arrow format is batch processing
  • Kafka is event driven, Iterators are blocking
  • current PR handles the payload as a opaque binary

Wrt to the first point, we could use micro-batches, but doesn't that defeat the purpose of the Arrow format? The whole idea is based on the notion of data locality, low metadata footprint, and batch processing. All these are shredded in a micro-batch architecture.

Wrt to the second point, wouldn't it make sense to build a stream instead of an iterator? Currently we will be blocking the thread waiting for something to happen on the topic, no?

Wrt to the third point, why adding the complexity of Arrow if in the end the payload is an opaque binary? The user would still have to convert that payload to the arrow format for compute (or is the idea to keep that as opaque?), so, IMO we are not really solving the problem: why would a user prefer to use a RecordBatch of N messages instead of a Vec<KafkaBatch> (or even better, not block the thread and just use a stream of KafkaBatch?

If anything, I would say that the architecture should be something like:

stream of rows ------ stream of batches of size N ------ compute

I.e. there should be a stream adapter (a chunk iterator) that maps rows to a batch, either in arrow or in other format.

One idea in this direction would be to reduce the scope of this PR to introducing a stream adapter that does exactly this: a generic struct that takes a stream of rows and returns a new stream of RecordBatch. But again, why bother with Vec<KafkaBatch> -> RecordBatch? A StructArray with a Binary field still needs to be parsed into an arrow array given a schema, and that is the most blocking operation of all of this. We would still need to agree upfront about the format of the payload, which is typically heterogeneous and use-case dependent (and seldomly decided by the consumer). So, this bring us to the IPC already initiated above.

But maybe I am completely missing the point ^_^

@kflansburg
Copy link
Author

Hey @jorgecarleitao, thanks for the input.

Wrt to the first point, we could use micro-batches, but doesn't that defeat the purpose of the Arrow format? The whole idea is based on the notion of data locality, low metadata footprint, and batch processing. All these are shredded in a micro-batch architecture.

I don't believe this is an accurate statement, while they are called "micro" batches, depending on the throughput of the topics that you are consuming, they may contain tens of thousands of messages. Regardless, I think there is an argument for leveraging Arrow's compute functionality, and the ecosystem built around the format, even when processing smaller pieces of data.

Wrt to the second point, wouldn't it make sense to build a stream instead of an iterator? Currently we will be blocking the thread waiting for something to happen on the topic, no?

To be clear, the blocking behavior is up to the user. They may set a poll_timeout of 0 to prevent any blocking. librdkafka operates a background thread where the actual messages are fetched from Kafka. You mentioned that Kafka is event driven and I do not believe that is the case. Kafka acts as a buffer which stores event messages. Consumers fetch events at the rate that they wish to process them. If there are no new events, consumers periodically poll for new messages (this could be async, but I think that is out of scope here). This connector may also be used for mass-loading of an entire topic worth of messages, not just watching for new ones.

Wrt to the third point, why adding the complexity of Arrow if in the end the payload is an opaque binary? The user would still have to convert that payload to the arrow format for compute (or is the idea to keep that as opaque?), so, IMO we are not really solving the problem: why would a user prefer to use a RecordBatch of N messages instead of a Vec<KafkaBatch> (or even better, not block the thread and just use a stream of KafkaBatch?

According to the Kafka API, the payload is an opaque binary (which I might add is a supported data type in Arrow). As you mentioned, it is possible that the topic contains a custom encoding of data, and users should be able to access this raw data. Another case, however, is that the data is JSON or Avro (or Arrow Flight as Andy suggested). As mentioned in discussion above, I plan to allow the user to specify additional information when building the reader (schema, format) which will allow it to interpret the payload as JSON, etc. I wanted to keep this PR scoped to basic functionality for now. #8971 is another PR that I have opened to begin work on such functionality for JSON.

I.e. there should be a stream adapter (a chunk iterator) that maps rows to a batch, either in arrow or in other format.

You've mentioned Streams several times, and I'm glad that you brought it up. I purposefully avoided async code here (it seems to me that most of the core Arrow library does not use async code?). It is totally possible to introduce a Stream API here as well (rdkafka has this already), but I think that a synchronous variant is more basic and that users should have the choice of using async or not.

But again, why bother with Vec -> RecordBatch? A StructArray with a Binary field still needs to be parsed into an arrow array given a schema, and that is the most blocking operation of all of this. We would still need to agree upfront about the format of the payload, which is typically heterogeneous and use-case dependent (and seldomly decided by the consumer).

I believe that this is the first step to achieving the more complex features that you are describing. This PR:

  1. Loads data from Kafka.
  2. Forms it into Arrow based on just what we know about the data from Kafka API (opaque binary payloads).

In future work, we can:

  • Introduce an async API.
  • Automatically parse payloads.
  • Add projections to pick out specific fields from Kafka messages.
  • Build more complex KSQL-like behavior into DataFusion.

Hopefully you can see how this PR fits into that process.

@jorgecarleitao
Copy link
Member

Hey @kflansburg

Thanks for the clarifications and for explaining the rational and use-cases. I am convinced that there is a good case for using Arrow with Kafka and I would like to thank you clarifying my view on this topic 👍

According to the Kafka API, the payload is an opaque binary (which I might add is a supported data type in Arrow). [...]

Maybe it is easier to illustrate this with an example: let's say that the payload for a given topic is JSON with the schema {"value": int32}. If I understand you correctly, the idea of this PR is to build a RecordBatch with

Field("message", 
    DataType::Struct([
        Field("payload", DataType::Binary),
        Field("topic", DataType::Utf8),
        ...
    ])
)

and then convert it to something like

Field("message", 
    DataType::Struct([
        Field("payload", DataType::Struct([Field("value": DataType::Int32)])),
        Field("topic", DataType::Utf8),
        ...
    ])
)

using a to_json or something. Is this what you mean / direction?

If yes, note that creating a BinaryArray requires a complete memcopy of Vec<Vec<u8>>, which is expensive. This is what I meant with "introducing unnecessary complexity". Wouldn't it be simpler to keep the KafkaBatch as is and use arrow's json reader to convert the blob directly to the corresponding arrow arrays?

Which brings me to my initial question in item 3: IMO the core issue is not interoperability with KafkaBatch, but how to deserialize the payload into Arrow. This PR is focused in converting KafkaBatch -> RecordBatch. I am trying to understand the benefits of memcoping the whole payload from bytes into a BinaryArray instead of keep using a KafkaBatch as is.

Wrt to async, Fearless concurrency ftw :) DataFusion already supports async scans. Kafka seems to be a good case for that. In this context, if I was doing this, my strategy would be to work out a Node (DataFusion) that async polls from a Kafka stream with the following configuration:

  • topic
  • expected payload format (e.g. JSON for start)
  • a number of rows per Batch (i.e. buffering)
  • expected schema

and return RecordBatches with the deserialized payload according to the expected schema (using the corresponding converter).

Does this make sense?

@kflansburg
Copy link
Author

Maybe it is easier to illustrate this with an example: let's say that the payload for a given topic is JSON with the schema {"value": int32}. If I understand you correctly, the idea of this PR is to build a RecordBatch with

...

and then convert it to something like

...

using a to_json or something. Is this what you mean / direction?

This is correct.

If yes, note that creating a BinaryArray requires a complete memcopy of Vec<Vec<u8>>, which is expensive. This is what I meant with "introducing unnecessary complexity". Wouldn't it be simpler to keep the KafkaBatch as is and use arrow's json reader to convert the blob directly to the corresponding arrow arrays?

I agree that there is a potential optimization here by parsing at the message level, rather than forming a BinaryArray first.
I have a couple of concerns with this though:

  • I do think we should support output to BinaryArray, and I think it makes sense that this low-level reader not include a lot of format-specific functionality.
  • rdkafka yields BorrowedMessages, so forming batches from them will require a copy either way.
  • De-serializing JSON is already not zero-copy, so I'm not sure how much performance this buys us.

I will give this some thought for future work, though, because I think it becomes more critical when reading zero-copy formats.

Which brings me to my initial question in item 3: IMO the core issue is not interoperability with KafkaBatch, but how to deserialize the payload into Arrow. This PR is focused in converting KafkaBatch -> RecordBatch. I am trying to understand the benefits of memcoping the whole payload from bytes into a BinaryArray instead of keep using a KafkaBatch as is.

I intended KafkaBatch to be an internal object used for constructing a RecordBatch. I don't think that this would make sense as a component of the Arrow codebase if it yielded non-arrow values. In my opinion BinaryArray is the data type of Kafka payloads. Interpreting it as JSON requires context and something this low-level should not be opinionated about this.

Wrt to async, Fearless concurrency ftw :) DataFusion already supports async scans. Kafka seems to be a good case for that. In this context, if I was doing this, my strategy would be to work out a Node (DataFusion) that async polls from a Kafka stream with the following configuration:

* topic

* expected payload format (e.g. JSON for start)

* a number of rows per Batch (i.e. buffering)

* expected schema

and return RecordBatches with the deserialized payload according to the expected schema (using the corresponding converter).

This all makes sense, and aligns with the roadmap I mentioned above, but I think it is out of scope for this PR. Think of this PR as implementing Rust's std::fs::File for Kafka. Buffering, async, de-serialization, etc. all come after that.

@jorgecarleitao
Copy link
Member

Thanks for the response. I get what you mean, and I agree with it.

From my end, the only missing points are what @nevi-me mentioned wrt to how we are adding adapters to different data producers to the cargo workspace and the CI user story.

One practical path is to exclude the crate from the workspace like we do for the c data interface and test it separately (e.g. on a different docker image). We also need evaluate whether we plan to test this in integration or not (e.g. a local kafta service).

I will listen other maintainers (@andygrove , @alamb @nevi-me ) about this PR.

Thank you for your patience and insight, @kflansburg

@kflansburg
Copy link
Author

Hey @jorgecarleitao , sounds good.

I do have some scripts that I've used for Dockerized integration testing locally if we decide to go that route.

I personally would like to get this building on Windows and include it in the existing tests.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of adding this crate to the arrow source tree it is my personal opinion that it would be better off in a separate repo (with a dependence on Arrow) rather than in the main arrow repo:

  1. The fraction of people that would use arrow and not kafka would be large, and so the maintenance / CI burden of putting it in tree would outweigh the benefit of including it.
  2. It is not part of the arrow spec or project

I think a similar argument could be applied to datafusion and to a lesser degree the parquet json writer. The conclusion to "should it be in the arrow repo" comes down to a subjective judgement of value vs cost.

Thus, I am not opposed to including this crate either, if we have sufficient interest in the maintainers. I am thinking of things like "make an integration test" for CI

I made a diagram to try and show my thinking in this area (and grouping crates together with similar levels complexity vs specialization):

                                                                                                         
                                                                                                 ▲       
                                                                                                 │       
                                                                                                 │       
                                                                                                 │       
                                                                                                 │       
                                                                                                 │       
           ┌───────────────┐ ┌───────────────┐  ┌────────────────┐  ┌─────────────┐              │       
           │  DataFusion   │ │ arrow-flight  │  │  kafka reader  │  │ sql-reader  │       ...    │       
           └───────────────┘ └───────────────┘  └────────────────┘  └─────────────┘              │       
                                                                                                 │       
                                                                                                 │       
                                                                                                 │       
                                                                                                 │       
        ┌─────────────────────┐    ┌─────────────────────┐    ┌─────────────────────┐            │       
        │   Parquet Reader    │    │     Flight IPC      │    │ JSON Reader/writer  │            │       
        └─────────────────────┘    └─────────────────────┘    └─────────────────────┘            │       
                                                                                                 │       
                                                                                                 │       
                                                                                                 │       
                  ┌────────────────────────────────────────────────────────┐                     │       
                  │                                                        │                     │       
                  │                 Arrow Columnar Format                  │                     │       
                  │                   + Compute Kernels                    │                     │       
                  │                                                        │                     │       
                  └────────────────────────────────────────────────────────┘                             
                                                                                                         
                                                                                                         
                                                                                             Increasing  
                                                                                           Specialization
                                                                                                         
                                                                                                         
                                                                                                         

@alamb
Copy link
Contributor

alamb commented Dec 21, 2020

Thank you for this PR @kflansburg

@houqp
Copy link
Member

houqp commented Dec 21, 2020

+1 on @alamb's diagram. I think what would be interesting to extend in the future for transport protocol based readers like kafka is to make them composable with serialization based readers like json, parquet and cvs. I.e. as an end user, i can configure kafka reader to read json from kafka stream into arrow record batches. json reader parsing logic has now been decoupled from IO, so this should be pretty straight forward to implement.

@alamb
Copy link
Contributor

alamb commented Feb 13, 2021

@kflansburg

What is the status of this PR?

As part of trying to clean up the backlog of Rust PRs in this repo, I am going through seemingly stale PRs and pinging the authors to see if there are any plans to continue the work or conversation.

@kflansburg
Copy link
Author

Seems like a lot of discussion have to happen before a feature like this can be considered.

@kflansburg kflansburg closed this Feb 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants