Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parsing protobuf messages #72

Closed
krisskross opened this issue Aug 3, 2016 · 12 comments
Closed

Parsing protobuf messages #72

krisskross opened this issue Aug 3, 2016 · 12 comments

Comments

@krisskross
Copy link

Hi

I'm trying to use kafkacat to tail topics where each message contain well defined protobuf bytes like this.

kafkacat -b kafka:6667 -t actions -q | protoc --decode=actions.ActionLog --proto_path=./proto ./proto/actions.proto

This fails with the following error.

Failed to parse input.
% ERROR: Write error for message of 447 bytes at offset 146): Broken pipe

I know the protoc command is correct since I can cat a file containing messages and pipe it into protoc successfully.

cat file | protoc --decode=actions.ActionLog --proto_path=./proto ./proto/actions.proto

Any tips on why kafkacat provide data that protoc doesn't understand? Buffering? Message delimiter?

Cheers,
-Kristoffer

@fsaintjacques
Copy link
Contributor

fsaintjacques commented Aug 3, 2016

How does protoc decode inputs, it surely doesn't use newlines? My guess is that it expect a format of the form [<msg_len><msg_bytes>]*.

@krisskross
Copy link
Author

The help page is not very helpful... but it says that it read a (singular?) binary message.

Read a binary message of the given type from
standard input and write it in text format
to standard output.  The message type must
be defined in PROTO_FILES or their imports.

I don't think the message length is neccessary. But I tried with the following format which gives the same error.

kafkacat -b kafka:6667 -t actions -q -f '%S%s' | protoc --decode=actions.ActionLog --proto_path=./proto ./proto/actions.proto 

@krisskross
Copy link
Author

I don't have to use protoc, I can write a simple Java program that takes stdin instead. The program below tries to parse messages it the form [<msg_len><msg_bytes]*.

while (true) {
  byte[] bytes = new byte[4];
  System.in.read(bytes);
  int size = ByteBuffer.wrap(bytes).getInt();
  bytes = new byte[size];
  System.in.read(bytes);
  try {
    Action action = Action.parseFrom(bytes);
    System.out.println(action);
  } catch (Exception e) {
    System.out.println("error " + size);
    Thread.sleep(500);
  }
}

This almost works. Sometimes the length given by kafkacat is wrong, causing the parsing to fail. Not sure why? Java reads 32-bit signed integers, as suggested by kafkacat %R format flag?

kafkacat -b kafka:6667 -t actions -q -f '%R%s' | java -cp kafkatail-1.0.0-SNAPSHOT-fat.jar KafkaTail

@vincentbernat
Copy link
Collaborator

❦ 3 août 2016 13:48 CEST, Kristoffer Sjögren notifications@github.com :

kafkacat -b kafka:6667 -t actions -q | protoc --decode=actions.ActionLog --proto_path=./proto ./proto/actions.proto

This fails with the following error.

Failed to parse input.
% ERROR: Write error for message of 447 bytes at offset 146): Broken
pipe

Try to add: -D "" -u. However, protoc is unlikely to decode all (but the
first) messages. You could use -J instead and pipe to some python/ruby

program to decode each message.

It is often the case that the man who can't tell a lie thinks he is the best
judge of one.
-- Mark Twain, "Pudd'nhead Wilson's Calendar"

@dwo
Copy link

dwo commented Sep 13, 2016

using -D "" works flawlessly across a whole protobuf topic for me (with either --decode_raw or --decode). I've been longing to be able to do this for ages, so thank you!

@edenhill
Copy link
Owner

@dwo: Can you provide the full command line to do this as an example for future users?

@dwo
Copy link

dwo commented Sep 13, 2016

@edenhill sure, I was doing something like

kafkacat -C -b kafkahost:9092 -t topicname -o beginning -D "" | protoc --decode_raw

@jcaesar
Copy link

jcaesar commented May 29, 2019

Oddly, -D '' will only work for --decode_raw for me. (@dwo Out of curiosity, what protoc version are you using? 3.7.1 here.)
I was able to work around that problem with

kafkacat -C -t … -J | parallel --pipe -N1 jq -j .payload \| protoc --decode …

@vorou
Copy link

vorou commented May 18, 2022

@edenhill sure, I was doing something like

kafkacat -C -b kafkahost:9092 -t topicname -o beginning -D "" | protoc --decode_raw

apparently, you should pass -e to kcat if you just wanna see the output, otherwise, all I see is

% Reached end of topic XYZ [0] at offset 13

here's the full command that worked for me

kcat -C -b localhost:9092 -t topic -o beginning -D "" -e | protoc --decode_raw

@jcaesar
Copy link

jcaesar commented May 21, 2022

@vorou You're probably running into the output buffering problem. (So -u instead of -e will work if you want to continuously reading messages. And -q will get you rid of that Reached end of topic message.)

@vorou
Copy link

vorou commented May 22, 2022

@jcaesar hm no, this one still hangs for me (no output)

kcat -C -t transactions-lifecycle -b localhost:9092 -o -1 -u -q -D "" | protoc --decode_raw

where this one works

kcat -C -t transactions-lifecycle -b localhost:9092 -o -1 -e -q -D "" | protoc --decode_raw

(I'm okay with -e by the way, but now I hate that it doesn't work without it lol)

@VanceLongwill
Copy link

After running into similar difficulties, I invested some time into writing a small Rust CLI to make this easier. It's called milena and is built for consuming/producing known protobuf messages. All you need is the protobuf file descriptor set & it will encode/decode JSON to/from protobuf

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants