New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parsing protobuf messages #72

Closed
krisskross opened this Issue Aug 3, 2016 · 7 comments

Comments

Projects
None yet
5 participants
@krisskross
Copy link

krisskross commented Aug 3, 2016

Hi

I'm trying to use kafkacat to tail topics where each message contain well defined protobuf bytes like this.

kafkacat -b kafka:6667 -t actions -q | protoc --decode=actions.ActionLog --proto_path=./proto ./proto/actions.proto

This fails with the following error.

Failed to parse input.
% ERROR: Write error for message of 447 bytes at offset 146): Broken pipe

I know the protoc command is correct since I can cat a file containing messages and pipe it into protoc successfully.

cat file | protoc --decode=actions.ActionLog --proto_path=./proto ./proto/actions.proto

Any tips on why kafkacat provide data that protoc doesn't understand? Buffering? Message delimiter?

Cheers,
-Kristoffer

@fsaintjacques

This comment has been minimized.

Copy link
Contributor

fsaintjacques commented Aug 3, 2016

How does protoc decode inputs, it surely doesn't use newlines? My guess is that it expect a format of the form [<msg_len><msg_bytes>]*.

@krisskross

This comment has been minimized.

Copy link
Author

krisskross commented Aug 3, 2016

The help page is not very helpful... but it says that it read a (singular?) binary message.

Read a binary message of the given type from
standard input and write it in text format
to standard output.  The message type must
be defined in PROTO_FILES or their imports.

I don't think the message length is neccessary. But I tried with the following format which gives the same error.

kafkacat -b kafka:6667 -t actions -q -f '%S%s' | protoc --decode=actions.ActionLog --proto_path=./proto ./proto/actions.proto 
@krisskross

This comment has been minimized.

Copy link
Author

krisskross commented Aug 3, 2016

I don't have to use protoc, I can write a simple Java program that takes stdin instead. The program below tries to parse messages it the form [<msg_len><msg_bytes]*.

while (true) {
  byte[] bytes = new byte[4];
  System.in.read(bytes);
  int size = ByteBuffer.wrap(bytes).getInt();
  bytes = new byte[size];
  System.in.read(bytes);
  try {
    Action action = Action.parseFrom(bytes);
    System.out.println(action);
  } catch (Exception e) {
    System.out.println("error " + size);
    Thread.sleep(500);
  }
}

This almost works. Sometimes the length given by kafkacat is wrong, causing the parsing to fail. Not sure why? Java reads 32-bit signed integers, as suggested by kafkacat %R format flag?

kafkacat -b kafka:6667 -t actions -q -f '%R%s' | java -cp kafkatail-1.0.0-SNAPSHOT-fat.jar KafkaTail
@vincentbernat

This comment has been minimized.

Copy link
Collaborator

vincentbernat commented Aug 3, 2016

❦ 3 août 2016 13:48 CEST, Kristoffer Sjögren notifications@github.com :

kafkacat -b kafka:6667 -t actions -q | protoc --decode=actions.ActionLog --proto_path=./proto ./proto/actions.proto

This fails with the following error.

Failed to parse input.
% ERROR: Write error for message of 447 bytes at offset 146): Broken
pipe

Try to add: -D "" -u. However, protoc is unlikely to decode all (but the
first) messages. You could use -J instead and pipe to some python/ruby

program to decode each message.

It is often the case that the man who can't tell a lie thinks he is the best
judge of one.
-- Mark Twain, "Pudd'nhead Wilson's Calendar"

@dwo

This comment has been minimized.

Copy link

dwo commented Sep 13, 2016

using -D "" works flawlessly across a whole protobuf topic for me (with either --decode_raw or --decode). I've been longing to be able to do this for ages, so thank you!

@edenhill

This comment has been minimized.

Copy link
Owner

edenhill commented Sep 13, 2016

@dwo: Can you provide the full command line to do this as an example for future users?

@dwo

This comment has been minimized.

Copy link

dwo commented Sep 13, 2016

@edenhill sure, I was doing something like

kafkacat -C -b kafkahost:9092 -t topicname -o beginning -D "" | protoc --decode_raw

@edenhill edenhill closed this Sep 13, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment