Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go][Rust] Deserializing a Byte array to a schema #13853

Closed
agneborn98 opened this issue Aug 11, 2022 · 26 comments
Closed

[Go][Rust] Deserializing a Byte array to a schema #13853

agneborn98 opened this issue Aug 11, 2022 · 26 comments

Comments

@agneborn98
Copy link

I am trying to retrieve a schema for a flight descriptor from a server written in Rust onto a client written in Go.

To do this, I am using the func DeserializeSchema to convert it from the Byte array received from the server back to the arrow.Schema format used by the client.

I do this by doing the following:

getSchema, err := s.client.GetSchema(s.ctx, s.desc)

retrievedSchema := getSchema.GetSchema()

deserializedSchema, err := flight.DeserializeSchema(retrievedSchema, memory.DefaultAllocator)

The Byte array I receive looks good enough, I think:

[16 0 0 0 0 0 10 0 14 0 12 0 11 0 4 0 10 0 0 0 20 0 0 0 0 0 0 1 4 0 10 0 12 0 0 0 8 0 4 0 10 0 0 0 8 0 0 0 8 0 0 0 0 0 0 0 3 0 0 0 136 0 0 0 52 0 0 0 4 0 0 0 148 255 255 255 16 0 0 0 20 0 0 0 0 0 0 3 16 0 0 0 206 255 255 255 0 0 1 0 0 0 0 0 5 0 0 0 118 97 108 117 101 0 0 0 192 255 255 255 28 0 0 0 12 0 0 0 0 0 0 10 32 0 0 0 0 0 0 0 0 0 6 0 8 0 6 0 6 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 9 0 0 0 116 105 109 101 115 116 97 109 112 0 0 0 16 0 20 0 16 0 0 0 15 0 4 0 0 0 8 0 16 0 0 0 24 0 0 0 32 0 0 0 0 0 0 2 28 0 0 0 8 0 12 0 4 0 11 0 8 0 0 0 32 0 0 0 0 0 0 1 0 0 0 0 3 0 0 0 116 105 100 0]

Unfortunately, I get the following error:

stderr: 2022/08/11 15:21:10 arrow/ipc: unknown error while reading: runtime error: slice bounds out of range [655360:16]

Am I doing something wrong here? If so, what?

Thank you

@zeroshade
Copy link
Member

That looks correct for retrieving the schema, but that byte payload looks incorrect. The first four bytes should be either a continuation indicator (0xFFFFFFFF) followed by the message length as a 32-bit integer, or just the message length as a 32-bit integer.

In your example byte array, the first four bytes are the 32-bit integer 16. But that message is definitely larger than 16 bytes. Are you able to try requesting the GetSchema from the rust server using any other flight clients (python / C++) and attempting to deserialize the schema? Just to confirm that the issue is not the Go client but instead the Rust Server?

@alamb Can you or someone on the arrow-rs side take a look here?

@agneborn98
Copy link
Author

I tried setting up the same process in a python script, but I don't know if I am doing this correctly either, I don't have much python experience.

client = flight.FlightClient('grpc://127.0.0.1:9999')

desc = "data"

descriptor = flight.FlightDescriptor.for_path(desc)

schemaResult = client.get_schema(descriptor, None)

schema = getattr(schemaResult, "schema")

print(schema)

deserialized = pyarrow.deserialize(schema)

print (deserialized)

But I get this error, which seems weird to me:

PS C:\Users\fragn\Documents\test> python test.py
Traceback (most recent call last):
  File "C:\Users\fragn\Documents\test\test.py", line 15, in <module>
    schema = getattr(schemaResult, "schema")
  File "pyarrow\_flight.pyx", line 721, in pyarrow._flight.SchemaResult.schema.__get__
  File "pyarrow\_flight.pyx", line 81, in pyarrow._flight.check_flight_status
  File "pyarrow\error.pxi", line 115, in pyarrow.lib.check_status
OSError: Invalid flatbuffers message.

When I print the SchemaResult some object is definitely there(<pyarrow._flight.SchemaResult object at 0x000001C9E92770B0>), but it's like it cannot retrieve the schema from the SchemaResult.

Is my code broken?

@agneborn98
Copy link
Author

This is the Rust code if anyone experienced with that decides to pitch in:

/// Provide the schema of a table in the catalog. The name of the table must
/// be provided as the first element in `FlightDescriptor.path`.
async fn get_schema(
    &self,
    request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, Status> {
    let flight_descriptor = request.into_inner();
    let table_name = self.get_table_name_from_flight_descriptor(&flight_descriptor)?;
    let schema = self.get_table_schema_from_default_catalog(table_name)?;

    let options = IpcWriteOptions::default();
    let schema_as_ipc = SchemaAsIpc::new(&*schema, &options);
    let serialized_schema = schema_as_ipc
        .try_into()
        .map_err(|error: Infallible| Status::internal(error.to_string()))?;
    Ok(Response::new(serialized_schema))
}

@zeroshade
Copy link
Member

@agneborn98 It looks like it's an issue with the Rust server. I'm going to guess that they made a similar mistake to one I made with my original implementation of the Go Flight server/client.

The original .proto file appeared to claim that the return from SchemaResult should just be the FlatBuffers Schema message on it's own. But the convention that was established by the C++ and Java implementations was that it should be a full IPC encoded schema message, so my original implementations didn't interact well with those until I fixed it to be a full IPC encoded message. It's possible the Rust server implementation makes the same mistake? Not sure.

But given that you're getting a similar issue from two different flight client implementations, i'm inclined to think the issue is with the Rust server.... but I'm not familiar enough with rust to debug that.

@zeroshade
Copy link
Member

@agneborn98 Out of curiousity can you provide the schema you're serializing? I want to try serializing it with Go and comparing the byte slice to what you received

@agneborn98
Copy link
Author

agneborn98 commented Aug 11, 2022

@zeroshade
Okay, thank you for the help and the fast response!

I'll look into my rust implementation again, and see if I can fix it.

The schema I am trying to serialize is:

Field { name: "tid", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, 
Field { name: "timestamp", data_type: Timestamp(Millisecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, 
Field { name: "value", data_type: Float32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }

@zeroshade
Copy link
Member

So i tried serializing that schema myself in Go and got the following:

[255 255 255 255 248 0 0 0 16 0 0 0 0 0 10 0 12 0 10 0 9 0 4 0 10 0 0 0 16 0 0 0 0 1 4 0 8 0 8 0 0 0 4 0 8 0 0 0 4 0 0 0 3 0 0 0 148 0 0 0 60 0 0 0 4 0 0 0 136 255 255 255 16 0 0 0 24 0 0 0 0 0 0 3 24 0 0 0 0 0 0 0 0 0 6 0 8 0 6 0 6 0 0 0 0 0 1 0 5 0 0 0 118 97 108 117 101 0 0 0 188 255 255 255 16 0 0 0 24 0 0 0 0 0 0 10 36 0 0 0 0 0 0 0 8 0 12 0 10 0 4 0 8 0 0 0 8 0 0 0 0 0 1 0 3 0 0 0 85 84 67 0 9 0 0 0 116 105 109 101 115 116 97 109 112 0 0 0 16 0 20 0 16 0 0 0 15 0 8 0 0 0 4 0 16 0 0 0 16 0 0 0 24 0 0 0 0 0 0 2 28 0 0 0 0 0 0 0 8 0 12 0 8 0 7 0 8 0 0 0 0 0 0 1 32 0 0 0 3 0 0 0 116 105 100 0 255 255 255 255 0 0 0 0]

Note the [255 255 255 255 248 0 0 0....] at the front much of the rest of the message looks similar to what you got, so I'm guessing the Rust version is missing that initial wrapper like i mentioned.

To prove my case I took your original bytes, appended the bytes [255 255 255 255 0 0 0 0] to the end of them, then prepended it with [255 255 255 255] and the length of the byte slice, then deserialized the schema. here's the code and the result:

data := []byte{16, 0, 0, 0, 0, 0, 10, 0, 14, 0, 12, 0, 11, 0, 4, 0, 10, 0,
		0, 0, 20, 0, 0, 0, 0, 0, 0, 1, 4, 0, 10, 0, 12, 0, 0, 0, 8, 0,
		4, 0, 10, 0, 0, 0, 8, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0,
		0, 136, 0, 0, 0, 52, 0, 0, 0, 4, 0, 0, 0, 148, 255, 255, 255,
		16, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 3, 16, 0, 0, 0, 206, 255, 255,
		255, 0, 0, 1, 0, 0, 0, 0, 0, 5, 0, 0, 0, 118, 97, 108, 117, 101,
		0, 0, 0, 192, 255, 255, 255, 28, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0,
		10, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0, 8, 0, 6, 0, 6, 0, 0, 0,
		0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 116, 105, 109,
		101, 115, 116, 97, 109, 112, 0, 0, 0, 16, 0, 20, 0, 16, 0, 0, 0,
		15, 0, 4, 0, 0, 0, 8, 0, 16, 0, 0, 0, 24, 0, 0, 0, 32, 0, 0, 0, 0,
		0, 0, 2, 28, 0, 0, 0, 8, 0, 12, 0, 4, 0, 11, 0, 8, 0, 0, 0, 32,
		0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 3, 0, 0, 0, 116, 105, 100, 0}

data = append(data, []byte{255, 255, 255, 255, 0, 0, 0, 0}...)
v := uint32(len(data))
data = append([]byte{255, 255, 255, 255, 0, 0, 0, 0}, data...)
binary.LittleEndian.PutUint32(data[4:], v)

fmt.Println(flight.DeserializeSchema(data, memory.DefaultAllocator))

Output:

schema:
  fields: 3
    - tid: type=int32
    - timestamp: type=timestamp[ms]
    - value: type=float32 <nil>

So it's definition an issue with the message coming from the Rust server

@agneborn98
Copy link
Author

@zeroshade

Case definitely proven 😄

Thanks again!

@zeroshade
Copy link
Member

@agneborn98 You can create an issue on https://github.com/apache/arrow-rs or file a Jira card for this if you like!

@agneborn98
Copy link
Author

Thank you @zeroshade

I have to check my own rust implementation to see if there is something I screwed up before I embarrass myself posting something that was easily solvable.

In case the issue still persists, I'll return in a few days and post it on one of those forums.

@alamb
Copy link
Contributor

alamb commented Aug 11, 2022

The original .proto file appeared to claim that the return from SchemaResult should just be the FlatBuffers Schema message on it's own. But the convention that was established by the C++ and Java implementations was that it should be a full IPC encoded schema message, so my original implementations didn't interact well with those until I fixed it to be a full IPC encoded message. It's possible the Rust server implementation makes the same mistake? Not sure.

@zeroshade sounds like a very plausible explanation. @agneborn98 I don't fully all the details as I am not an IPC expert but filing an issue on https://github.com/apache/arrow-rs i think is the correct way to get people who are to be aware of the issue

@agneborn98
Copy link
Author

@alamb
Yeah, thanks for the response.

I'll try debugging my rust server and see if I've missed some option when converting my schema to an IPC message.

@liukun4515
Copy link
Contributor

liukun4515 commented Aug 23, 2022

@agneborn98 It looks like it's an issue with the Rust server. I'm going to guess that they made a similar mistake to one I made with my original implementation of the Go Flight server/client.

The original .proto file appeared to claim that the return from SchemaResult should just be the FlatBuffers Schema message on it's own. But the convention that was established by the C++ and Java implementations was that it should be a full IPC encoded schema message, so my original implementations didn't interact well with those until I fixed it to be a full IPC encoded message. It's possible the Rust server implementation makes the same mistake? Not sure.

why c++ java return the full IPC encoded schema message for the method GetSchema defined in the .proto, and do not return FlatBuffers Schema message for the schema message?

But given that you're getting a similar issue from two different flight client implementations, i'm inclined to think the issue is with the Rust server.... but I'm not familiar enough with rust to debug that.

Hi @zeroshade , I from arrow-rs community and try to resolve apache/arrow-rs#2445 about the get_schema.

I think the gab between arrow-rs and c++/go is the different implementation for the GetSchema.
As description from @zeroshade , the c++/go will convert the schema to the full flight message, but the rust don't do like that, the rust just return the bytes from the flatbuffer.

@agneborn98 agneborn98 reopened this Aug 23, 2022
@zeroshade
Copy link
Member

@liukun4515 Slight correction: The C++/Go does not convert it to a full flight message, it just returns the full IPC Schema message bytes which can then be slotted into the flight SchemaResult protobuf. SerializeSchema and DeserializeSchema do not work with the protobuf bytes, only the IPC Schema Message

@liukun4515
Copy link
Contributor

liukun4515 commented Aug 24, 2022

@liukun4515 Slight correction: The C++/Go does not convert it to a full flight message, it just returns the full IPC Schema message bytes which can then be slotted into the flight SchemaResult protobuf. SerializeSchema and DeserializeSchema do not work with the protobuf bytes, only the IPC Schema Message

That means the c++/go add the prefix continuation and length to the protobuf bytes of the schema?
@zeroshade

Is there some documents to explain this interface GetSchema what content should be returned?
I think this is just different implementation of two sides between rust and other version. I can easily fix this in rust side by following the implementation of java/c++, but it's better to have a document to explain the content of the GetSchema, and make all implementation consistent.

@liukun4515
Copy link
Contributor

I think we can close this issue.

Thanks @zeroshade @agneborn98

@liukun4515
Copy link
Contributor

liukun4515 commented Aug 29, 2022

@agneborn98 It looks like it's an issue with the Rust server. I'm going to guess that they made a similar mistake to one I made with my original implementation of the Go Flight server/client.

The original .proto file appeared to claim that the return from SchemaResult should just be the FlatBuffers Schema message on it's own. But the convention that was established by the C++ and Java implementations was that it should be a full IPC encoded schema message, so my original implementations didn't interact well with those until I fixed it to be a full IPC encoded message. It's possible the Rust server implementation makes the same mistake? Not sure.

But given that you're getting a similar issue from two different flight client implementations, i'm inclined to think the issue is with the Rust server.... but I'm not familiar enough with rust to debug that.

@zeroshade
we meet the compatibility about the client and server, how does go/java/c++ resolve the problem of compatibility?
apache/arrow-rs#2586 (comment)

@lidavidm
Copy link
Member

C++/Java I think actually had this incompatibility before, but it was fixed very early on, like in 0.12.0 or something, so there wasn't any consideration of backwards compatibility.

You could just try to parse it twice (checking if the stream starts with the IPC continuation token or not to see if it's even worthwhile).

@zeroshade
Copy link
Member

@liukun4515 Go also fixed the incompatibly early on and so didn't have any consideration of backwards compatibility. I agree with @lidavidm that you should check for the continuation token or not and try parsing each way

@lidavidm
Copy link
Member

Did we file a Jira for this? We should add more methods to the integration tests to try to catch this kind of thing earlier. (Another thing that would be useful is to finally tackle ARROW-4419. C♯ exposed an issue that such a test would presumably have caught earlier.)

@liukun4515
Copy link
Contributor

Go also fixed the incompatibly early on and so didn't have any consideration of backwards compatibility. I agree with @lidavidm that you should check for the continuation token or not and try parsing each way

Do you means that go just support the format with the continuation token.
I have checked the code of java, which just support the format without the continuation token.
@lidavidm Maybe we should add the IT to make sure the compatibility among each implementation of language

@lidavidm
Copy link
Member

@liukun4515 what are you referring to? The Java code uses an IPC message with the continuation token. (Note that Schema#serialize is not the right implementation).

/**
* Converts to the protocol buffer representation.
*/
Flight.SchemaResult toProtocol() {
// Encode schema in a Message payload
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(baos)), schema, option);
} catch (IOException e) {
throw new RuntimeException(e);
}
return Flight.SchemaResult.newBuilder()
.setSchema(ByteString.copyFrom(baos.toByteArray()))
.build();
}
/**
* Converts from the protocol buffer representation.
*/
static SchemaResult fromProtocol(Flight.SchemaResult pbSchemaResult) {
try {
final ByteBuffer schemaBuf = pbSchemaResult.getSchema().asReadOnlyByteBuffer();
Schema schema = pbSchemaResult.getSchema().size() > 0 ?
MessageSerializer.deserializeSchema(
new ReadChannel(Channels.newChannel(new ByteBufferBackedInputStream(schemaBuf))))
: new Schema(ImmutableList.of());
return new SchemaResult(schema);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@lidavidm
Copy link
Member

Filed ARROW-17568

@liukun4515
Copy link
Contributor

@liukun4515 what are you referring to? The Java code uses an IPC message with the continuation token. (Note that Schema#serialize is not the right implementation).

/**
* Converts to the protocol buffer representation.
*/
Flight.SchemaResult toProtocol() {
// Encode schema in a Message payload
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(baos)), schema, option);
} catch (IOException e) {
throw new RuntimeException(e);
}
return Flight.SchemaResult.newBuilder()
.setSchema(ByteString.copyFrom(baos.toByteArray()))
.build();
}
/**
* Converts from the protocol buffer representation.
*/
static SchemaResult fromProtocol(Flight.SchemaResult pbSchemaResult) {
try {
final ByteBuffer schemaBuf = pbSchemaResult.getSchema().asReadOnlyByteBuffer();
Schema schema = pbSchemaResult.getSchema().size() > 0 ?
MessageSerializer.deserializeSchema(
new ReadChannel(Channels.newChannel(new ByteBufferBackedInputStream(schemaBuf))))
: new Schema(ImmutableList.of());
return new SchemaResult(schema);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@lidavidm @zeroshade
Thanks for you reply.

I think i got it.

It's a good opportunity to implement integration test for the RPC method.

I can take the integration test for rust side for the rpc method.

@MicroGery
Copy link

@zeroshade Sorry to interrupt. The data I read from the Java interface is 0xffffffff 0x00000300 0x00000010 0x000a0000. Does this mean that the length of the subsequent data is 768 bytes?

@zeroshade
Copy link
Member

@MicroGery It means that the length of the metadata (the flatbuffer bytes + padding to an 8-byte boundary) is 768 bytes. Once you decode the flatbuffer metadata, it'll contain the length of the body data as a member which should follow the 768 bytes you read.

Does that makes sense? You can refer to the documentation for more description of interpreting the bytes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants