-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Describe the bug
Using Pulsar C++ client,I can't publish messages with the desired schema .
To Reproduce
Steps to reproduce the behavior:
- C++ Client as Producer,Python Client as Consumer
- Using JSONSchema which defined as:
static const std::string exampleSchema =
"{"type":"record","name":"Example","
""fields":[{"name":"a","type":["null", "int" ]},{"name":"b","type":["null","int"]}]}"; - Producer send a struct with value (a=100 and b=200):
struct Example
{
int a;
int b;
}; - Consumer received message with incorrect format:
0200000000000000a08baa13ed7f00000800000000000000ffffffffffffffff0000000064000000c800000000
which raised error:
Received message msg
0200000000000000a08baa13ed7f00000800000000000000ffffffffffffffff0000000064000000c800000000
Traceback (most recent call last):
File "ConsumerForSchema.py", line 23, in
printMem(msg.value())
File "/home/Pulsar/apache-pulsar-2.4.0-src/pulsar-client-cpp/python/pulsar/init.py", line 160, in value
return self._schema.decode(self._message.data())
File "/home/Pulsar/apache-pulsar-2.4.0-src/pulsar-client-cpp/python/pulsar/schema/schema.py", line 86, in decode
return self._record_cls(**json.loads(data))
File "/usr/lib64/python2.7/json/init.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python2.7/json/decoder.py", line 366, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python2.7/json/decoder.py", line 384, in raw_decode
raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded
Expected behavior
Python Consumer expected to receive message like this with JSONSchema:
0200000000000000a09bdd72ee7f00001900000000000000ffffffffffffffff000000007b0a202261223a203130302c200a202262223a203230300a7d00
7b0a202261223a203130302c200a202262223a203230300a7d means:
{
"a": 100,
"b": 200
}
Screenshots
If applicable, add screenshots to help explain your problem.
Desktop (please complete the following information):
- OS: [e.g. iOS]
Additional context
- Pulsar C++ Client -- Producer Code:
static const std::string exampleSchema =
"{"type":"record","name":"Example","
""fields":[{"name":"a","type":["null", "int" ]},{"name":"b","type":["null","int"]}]}";
struct Example
{
int a;
int b;
};
ClientConfiguration config;
Client client(lookupUrl);
Result res;
Producer producer;
ProducerConfiguration producerConf;
producerConf.setSchema(SchemaInfo(JSON, "Json", exampleSchema));
res = client1.createProducer("topic-avro1", producerConf, producer);
// Publish 10 messages to the topic
Example myExample;
myExample.a = 100;
myExample.b = 200;
Message msg = MessageBuilder().setContent(&myExample,sizeof(Example)).build();
Result res = producer.send(msg);
producer.close();
client.close();
- Python Client -- Consumer Code:
import pulsar
from pulsar.schema import *
class Example(Record):
a = Integer()
b = Integer()
def printMem(data):
from ctypes import string_at
from sys import getsizeof
from binascii import hexlify
print(hexlify(string_at(id(data), getsizeof(data))))
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe(
topic='topic-avro1',
subscription_name='my-subscription',
schema=JsonSchema(Example))
while True:
msg = consumer.receive()
print("Received message msg")
printMem(msg.data())
printMem(msg.value())
ex = msg.value()
try:
# print("Received message a={} b={} ".format(ex.a, ex.b))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
client.close()