Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python Avro consumer cannot consume non-union fields #108

Closed
BewareMyPower opened this issue Mar 29, 2023 · 1 comment · Fixed by #119
Closed

Python Avro consumer cannot consume non-union fields #108

BewareMyPower opened this issue Mar 29, 2023 · 1 comment · Fixed by #119
Assignees

Comments

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Mar 29, 2023

How to reproduce

Run a Pulsar standalone 2.11.

First, create a Python consumer whose schema is a class with a string field name and an integer field age:

import pulsar
from pulsar.schema import *
import fastavro

class User(Record):
    name = String()
    age = Integer()

print(fastavro.parse_schema(User.schema()))

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic',
                            subscription_name="sub",
                            schema=AvroSchema(User))

while True:
    try:
        msg = consumer.receive()
        version = int.from_bytes(msg.schema_version().encode('ascii'), byteorder='big')
        print(f"Received {len(msg.data())} bytes id='{msg.message_id()}' version='{version}'")
        value = msg.value()
        print(value)
        print(f"name: {value.name}, age: {value.age}")
        consumer.acknowledge(msg)
    except pulsar.Interrupted:
        print("Stop receiving messages")
        break

client.close()

Then, set the schema compatibility to FORWARD:

curl -L http://localhost:8080/admin/v2/namespaces/public/default/schemaCompatibilityStrategy \
   -X PUT -H 'Content-Type: application/json' -d '"FORWARD"'

NOTE: Here we have to set the schema compatibility, otherwise the Java producer cannot be created. It's another bug. I will talk about it later.

Then, run the Java producer to send a message (User{name="xyz", age=10}):

    @AllArgsConstructor
    @Getter
    public class User {
        private final String name;
        private final int age;
    }
        @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://172.22.48.50:6650").build();
        Producer<User> producer = client.newProducer(Schema.AVRO(User.class))
                .topic("my-topic").create();
        producer.send(new User("xyz", 10));

Then, the Python consumer application will crash with the following logs:

Received 6 bytes id='(1,0,-1,-1)' version='1'
Traceback (most recent call last):
  File "consumer.py", line 42, in <module>
    value = msg.value()
  File "/home/xyz/pulsar-client-python/pulsar/__init__.py", line 130, in value
    return self._schema.decode(self._message.data())
  File "/home/xyz/pulsar-client-python/pulsar/schema/schema_avro.py", line 80, in decode
    d = fastavro.schemaless_reader(buffer, self._schema)
  File "fastavro/_read.pyx", line 1107, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 1120, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 749, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 620, in fastavro._read.read_record
  File "fastavro/_read.pyx", line 740, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 521, in fastavro._read.read_union
IndexError: list index out of range

Analysis

There are two bugs. First, the schema definition generated by the Python client is different from the Java client. Copy these two classes here:

class User(Record):
    name = String()
    age = Integer()
    @AllArgsConstructor
    @Getter
    static class User {
        private final String name;
        private final int age;
    }

Check the schema definitions (pulsar-admin schemas get my-topic -v <version>) and we can find there are two versions of the schema:

{
  "name": "my-topic",
  "schema": {
    "type": "record",
    "name": "User",
    "fields": [
      {
        "name": "name",
        "type": [
          "null",
          "string"
        ]
      },
      {
        "name": "age",
        "type": [
          "null",
          "int"
        ]
      }
    ]
  },
  "type": "AVRO",
  "timestamp": 1680074000129,
  "properties": {}
}
{
  "name": "my-topic",
  "schema": {
    "type": "record",
    "name": "User",
    "namespace": "org.apache.pulsar.client.api.ConsumerIdTest",
    "fields": [
      {
        "name": "age",
        "type": "int"
      },
      {
        "name": "name",
        "type": [
          "null",
          "string"
        ]
      }
    ]
  },
  "type": "AVRO",
  "timestamp": 1680074158624,
  "properties": {
    "__alwaysAllowNull": "true",
    "__jsr310ConversionEnabled": "false"
  }
}

We can see:

  • Python client treats all fields as nullable fields, (or say it correctly, they are Avro unions), even including the int field
  • Java client only treats the string as nullable fields since Primitive types cannot be null
@BewareMyPower BewareMyPower self-assigned this Mar 29, 2023
@BewareMyPower
Copy link
Contributor Author

A workaround is to change the Python User definition from

class User(Record):
    name = String()
    age = Integer()

to

class User(Record):
    _sorted_fields = True
    name = String()
    age = Integer(required=True)

To be compatible with the Java client, we have to configure the required field of all fields with True, except String. In addition, we need to set _sorted_fields with True.

BewareMyPower added a commit to BewareMyPower/pulsar-client-python that referenced this issue May 22, 2023
Fixes apache#108

### Motivation

Currently the Python client uses the reader schema, which is the schema
of the consumer, to decode Avro messages. However, when the writer
schema is different from the reader schema, the decode will fail.

### Modifications

Add `attach_client` method to `Schema` and call it when creating
consumers and readers. This method stores a reference to a
`_pulsar.Client` instance, which leverages the C++ APIs added in
apache/pulsar-client-cpp#257 to fetch schema
info. The `AvroSchema` class fetches and caches the writer schema if it
is not cached, then use both the writer schema and reader schema to
decode messages.

Add `test_schema_evolve` to test consumers or readers can decode
any message whose writer schema is different with the reader schema.
shibd pushed a commit that referenced this issue May 25, 2023
Fixes #108

### Motivation

Currently the Python client uses the reader schema, which is the schema
of the consumer, to decode Avro messages. However, when the writer
schema is different from the reader schema, the decode will fail.

### Modifications

Add `attach_client` method to `Schema` and call it when creating
consumers and readers. This method stores a reference to a
`_pulsar.Client` instance, which leverages the C++ APIs added in
apache/pulsar-client-cpp#257 to fetch schema
info. The `AvroSchema` class fetches and caches the writer schema if it
is not cached, then use both the writer schema and reader schema to
decode messages.

Add `test_schema_evolve` to test consumers or readers can decode
any message whose writer schema is different with the reader schema.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant